mirror of
https://github.com/neondatabase/neon.git
synced 2026-03-21 09:10:37 +00:00
Compare commits
2 Commits
RemoteExte
...
store_logi
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
0aecfbb09c | ||
|
|
9604aec0c9 |
@@ -282,7 +282,6 @@ pub struct TimelineInfo {
|
|||||||
/// Sum of the size of all layer files.
|
/// Sum of the size of all layer files.
|
||||||
/// If a layer is present in both local FS and S3, it counts only once.
|
/// If a layer is present in both local FS and S3, it counts only once.
|
||||||
pub current_physical_size: Option<u64>, // is None when timeline is Unloaded
|
pub current_physical_size: Option<u64>, // is None when timeline is Unloaded
|
||||||
pub current_logical_size_non_incremental: Option<u64>,
|
|
||||||
|
|
||||||
pub timeline_dir_layer_file_size_sum: Option<u64>,
|
pub timeline_dir_layer_file_size_sum: Option<u64>,
|
||||||
|
|
||||||
|
|||||||
@@ -5,7 +5,7 @@
|
|||||||
//!
|
//!
|
||||||
use crate::context::{DownloadBehavior, RequestContext};
|
use crate::context::{DownloadBehavior, RequestContext};
|
||||||
use crate::task_mgr::{self, TaskKind, BACKGROUND_RUNTIME};
|
use crate::task_mgr::{self, TaskKind, BACKGROUND_RUNTIME};
|
||||||
use crate::tenant::{mgr, LogicalSizeCalculationCause};
|
use crate::tenant::mgr;
|
||||||
use anyhow;
|
use anyhow;
|
||||||
use chrono::Utc;
|
use chrono::Utc;
|
||||||
use consumption_metrics::{idempotency_key, Event, EventChunk, EventType, CHUNK_SIZE};
|
use consumption_metrics::{idempotency_key, Event, EventChunk, EventType, CHUNK_SIZE};
|
||||||
@@ -113,7 +113,7 @@ pub async fn collect_metrics_iteration(
|
|||||||
cached_metrics: &mut HashMap<PageserverConsumptionMetricsKey, u64>,
|
cached_metrics: &mut HashMap<PageserverConsumptionMetricsKey, u64>,
|
||||||
metric_collection_endpoint: &reqwest::Url,
|
metric_collection_endpoint: &reqwest::Url,
|
||||||
node_id: NodeId,
|
node_id: NodeId,
|
||||||
ctx: &RequestContext,
|
_ctx: &RequestContext,
|
||||||
send_cached: bool,
|
send_cached: bool,
|
||||||
) {
|
) {
|
||||||
let mut current_metrics: Vec<(PageserverConsumptionMetricsKey, u64)> = Vec::new();
|
let mut current_metrics: Vec<(PageserverConsumptionMetricsKey, u64)> = Vec::new();
|
||||||
@@ -164,30 +164,15 @@ pub async fn collect_metrics_iteration(
|
|||||||
timeline_written_size,
|
timeline_written_size,
|
||||||
));
|
));
|
||||||
|
|
||||||
let span = info_span!("collect_metrics_iteration", tenant_id = %timeline.tenant_id, timeline_id = %timeline.timeline_id);
|
current_metrics.push((
|
||||||
match span.in_scope(|| timeline.get_current_logical_size(ctx)) {
|
PageserverConsumptionMetricsKey {
|
||||||
// Only send timeline logical size when it is fully calculated.
|
tenant_id,
|
||||||
Ok((size, is_exact)) if is_exact => {
|
timeline_id: Some(timeline.timeline_id),
|
||||||
current_metrics.push((
|
metric: TIMELINE_LOGICAL_SIZE,
|
||||||
PageserverConsumptionMetricsKey {
|
},
|
||||||
tenant_id,
|
timeline.get_current_logical_size(),
|
||||||
timeline_id: Some(timeline.timeline_id),
|
));
|
||||||
metric: TIMELINE_LOGICAL_SIZE,
|
|
||||||
},
|
|
||||||
size,
|
|
||||||
));
|
|
||||||
}
|
|
||||||
Ok((_, _)) => {}
|
|
||||||
Err(err) => {
|
|
||||||
error!(
|
|
||||||
"failed to get current logical size for timeline {}: {err:?}",
|
|
||||||
timeline.timeline_id
|
|
||||||
);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
}
|
}
|
||||||
|
|
||||||
let timeline_resident_size = timeline.get_resident_physical_size();
|
let timeline_resident_size = timeline.get_resident_physical_size();
|
||||||
tenant_resident_size += timeline_resident_size;
|
tenant_resident_size += timeline_resident_size;
|
||||||
}
|
}
|
||||||
@@ -336,7 +321,6 @@ pub async fn calculate_synthetic_size_worker(
|
|||||||
if let Ok(tenant) = mgr::get_tenant(tenant_id, true).await
|
if let Ok(tenant) = mgr::get_tenant(tenant_id, true).await
|
||||||
{
|
{
|
||||||
if let Err(e) = tenant.calculate_synthetic_size(
|
if let Err(e) = tenant.calculate_synthetic_size(
|
||||||
LogicalSizeCalculationCause::ConsumptionMetricsSyntheticSize,
|
|
||||||
ctx).await {
|
ctx).await {
|
||||||
error!("failed to calculate synthetic size for tenant {}: {}", tenant_id, e);
|
error!("failed to calculate synthetic size for tenant {}: {}", tenant_id, e);
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -26,7 +26,7 @@ use crate::tenant::config::TenantConfOpt;
|
|||||||
use crate::tenant::mgr::{TenantMapInsertError, TenantStateError};
|
use crate::tenant::mgr::{TenantMapInsertError, TenantStateError};
|
||||||
use crate::tenant::size::ModelInputs;
|
use crate::tenant::size::ModelInputs;
|
||||||
use crate::tenant::storage_layer::LayerAccessStatsReset;
|
use crate::tenant::storage_layer::LayerAccessStatsReset;
|
||||||
use crate::tenant::{LogicalSizeCalculationCause, PageReconstructError, Timeline};
|
use crate::tenant::{PageReconstructError, Timeline};
|
||||||
use crate::{config::PageServerConf, tenant::mgr};
|
use crate::{config::PageServerConf, tenant::mgr};
|
||||||
use utils::{
|
use utils::{
|
||||||
auth::JwtAuth,
|
auth::JwtAuth,
|
||||||
@@ -168,36 +168,12 @@ impl From<crate::tenant::mgr::DeleteTimelineError> for ApiError {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Helper function to construct a TimelineInfo struct for a timeline
|
// Helper function to construct a TimelineInfo struct for a timeline
|
||||||
async fn build_timeline_info(
|
fn build_timeline_info(
|
||||||
timeline: &Arc<Timeline>,
|
timeline: &Arc<Timeline>,
|
||||||
include_non_incremental_logical_size: bool,
|
_ctx: &RequestContext,
|
||||||
ctx: &RequestContext,
|
|
||||||
) -> anyhow::Result<TimelineInfo> {
|
) -> anyhow::Result<TimelineInfo> {
|
||||||
crate::tenant::debug_assert_current_span_has_tenant_and_timeline_id();
|
crate::tenant::debug_assert_current_span_has_tenant_and_timeline_id();
|
||||||
|
|
||||||
let mut info = build_timeline_info_common(timeline, ctx)?;
|
|
||||||
if include_non_incremental_logical_size {
|
|
||||||
// XXX we should be using spawn_ondemand_logical_size_calculation here.
|
|
||||||
// Otherwise, if someone deletes the timeline / detaches the tenant while
|
|
||||||
// we're executing this function, we will outlive the timeline on-disk state.
|
|
||||||
info.current_logical_size_non_incremental = Some(
|
|
||||||
timeline
|
|
||||||
.get_current_logical_size_non_incremental(
|
|
||||||
info.last_record_lsn,
|
|
||||||
CancellationToken::new(),
|
|
||||||
ctx,
|
|
||||||
)
|
|
||||||
.await?,
|
|
||||||
);
|
|
||||||
}
|
|
||||||
Ok(info)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn build_timeline_info_common(
|
|
||||||
timeline: &Arc<Timeline>,
|
|
||||||
ctx: &RequestContext,
|
|
||||||
) -> anyhow::Result<TimelineInfo> {
|
|
||||||
crate::tenant::debug_assert_current_span_has_tenant_and_timeline_id();
|
|
||||||
let last_record_lsn = timeline.get_last_record_lsn();
|
let last_record_lsn = timeline.get_last_record_lsn();
|
||||||
let (wal_source_connstr, last_received_msg_lsn, last_received_msg_ts) = {
|
let (wal_source_connstr, last_received_msg_lsn, last_received_msg_ts) = {
|
||||||
let guard = timeline.last_received_wal.lock().unwrap();
|
let guard = timeline.last_received_wal.lock().unwrap();
|
||||||
@@ -217,13 +193,7 @@ fn build_timeline_info_common(
|
|||||||
Lsn(0) => None,
|
Lsn(0) => None,
|
||||||
lsn @ Lsn(_) => Some(lsn),
|
lsn @ Lsn(_) => Some(lsn),
|
||||||
};
|
};
|
||||||
let current_logical_size = match timeline.get_current_logical_size(ctx) {
|
let current_logical_size = Some(timeline.get_current_logical_size());
|
||||||
Ok((size, _)) => Some(size),
|
|
||||||
Err(err) => {
|
|
||||||
error!("Timeline info creation failed to get current logical size: {err:?}");
|
|
||||||
None
|
|
||||||
}
|
|
||||||
};
|
|
||||||
let current_physical_size = Some(timeline.layer_size_sum());
|
let current_physical_size = Some(timeline.layer_size_sum());
|
||||||
let state = timeline.current_state();
|
let state = timeline.current_state();
|
||||||
let remote_consistent_lsn = timeline.get_remote_consistent_lsn().unwrap_or(Lsn(0));
|
let remote_consistent_lsn = timeline.get_remote_consistent_lsn().unwrap_or(Lsn(0));
|
||||||
@@ -240,7 +210,6 @@ fn build_timeline_info_common(
|
|||||||
latest_gc_cutoff_lsn: *timeline.get_latest_gc_cutoff_lsn(),
|
latest_gc_cutoff_lsn: *timeline.get_latest_gc_cutoff_lsn(),
|
||||||
current_logical_size,
|
current_logical_size,
|
||||||
current_physical_size,
|
current_physical_size,
|
||||||
current_logical_size_non_incremental: None,
|
|
||||||
timeline_dir_layer_file_size_sum: None,
|
timeline_dir_layer_file_size_sum: None,
|
||||||
wal_source_connstr,
|
wal_source_connstr,
|
||||||
last_received_msg_lsn,
|
last_received_msg_lsn,
|
||||||
@@ -282,7 +251,7 @@ async fn timeline_create_handler(mut request: Request<Body>) -> Result<Response<
|
|||||||
.await {
|
.await {
|
||||||
Ok(Some(new_timeline)) => {
|
Ok(Some(new_timeline)) => {
|
||||||
// Created. Construct a TimelineInfo for it.
|
// Created. Construct a TimelineInfo for it.
|
||||||
let timeline_info = build_timeline_info_common(&new_timeline, &ctx)
|
let timeline_info = build_timeline_info(&new_timeline, &ctx)
|
||||||
.map_err(ApiError::InternalServerError)?;
|
.map_err(ApiError::InternalServerError)?;
|
||||||
json_response(StatusCode::CREATED, timeline_info)
|
json_response(StatusCode::CREATED, timeline_info)
|
||||||
}
|
}
|
||||||
@@ -296,8 +265,6 @@ async fn timeline_create_handler(mut request: Request<Body>) -> Result<Response<
|
|||||||
|
|
||||||
async fn timeline_list_handler(request: Request<Body>) -> Result<Response<Body>, ApiError> {
|
async fn timeline_list_handler(request: Request<Body>) -> Result<Response<Body>, ApiError> {
|
||||||
let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
|
let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
|
||||||
let include_non_incremental_logical_size: Option<bool> =
|
|
||||||
parse_query_param(&request, "include-non-incremental-logical-size")?;
|
|
||||||
check_permission(&request, Some(tenant_id))?;
|
check_permission(&request, Some(tenant_id))?;
|
||||||
|
|
||||||
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
|
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
|
||||||
@@ -308,15 +275,11 @@ async fn timeline_list_handler(request: Request<Body>) -> Result<Response<Body>,
|
|||||||
|
|
||||||
let mut response_data = Vec::with_capacity(timelines.len());
|
let mut response_data = Vec::with_capacity(timelines.len());
|
||||||
for timeline in timelines {
|
for timeline in timelines {
|
||||||
let timeline_info = build_timeline_info(
|
let timeline_info = build_timeline_info(&timeline, &ctx)
|
||||||
&timeline,
|
.context(
|
||||||
include_non_incremental_logical_size.unwrap_or(false),
|
"Failed to convert tenant timeline {timeline_id} into the local one: {e:?}",
|
||||||
&ctx,
|
)
|
||||||
)
|
.map_err(ApiError::InternalServerError)?;
|
||||||
.instrument(info_span!("build_timeline_info", timeline_id = %timeline.timeline_id))
|
|
||||||
.await
|
|
||||||
.context("Failed to convert tenant timeline {timeline_id} into the local one: {e:?}")
|
|
||||||
.map_err(ApiError::InternalServerError)?;
|
|
||||||
|
|
||||||
response_data.push(timeline_info);
|
response_data.push(timeline_info);
|
||||||
}
|
}
|
||||||
@@ -331,8 +294,6 @@ async fn timeline_list_handler(request: Request<Body>) -> Result<Response<Body>,
|
|||||||
async fn timeline_detail_handler(request: Request<Body>) -> Result<Response<Body>, ApiError> {
|
async fn timeline_detail_handler(request: Request<Body>) -> Result<Response<Body>, ApiError> {
|
||||||
let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
|
let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
|
||||||
let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
|
let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
|
||||||
let include_non_incremental_logical_size: Option<bool> =
|
|
||||||
parse_query_param(&request, "include-non-incremental-logical-size")?;
|
|
||||||
check_permission(&request, Some(tenant_id))?;
|
check_permission(&request, Some(tenant_id))?;
|
||||||
|
|
||||||
// Logical size calculation needs downloading.
|
// Logical size calculation needs downloading.
|
||||||
@@ -345,14 +306,9 @@ async fn timeline_detail_handler(request: Request<Body>) -> Result<Response<Body
|
|||||||
.get_timeline(timeline_id, false)
|
.get_timeline(timeline_id, false)
|
||||||
.map_err(ApiError::NotFound)?;
|
.map_err(ApiError::NotFound)?;
|
||||||
|
|
||||||
let timeline_info = build_timeline_info(
|
let timeline_info = build_timeline_info(&timeline, &ctx)
|
||||||
&timeline,
|
.context("get local timeline info")
|
||||||
include_non_incremental_logical_size.unwrap_or(false),
|
.map_err(ApiError::InternalServerError)?;
|
||||||
&ctx,
|
|
||||||
)
|
|
||||||
.await
|
|
||||||
.context("get local timeline info")
|
|
||||||
.map_err(ApiError::InternalServerError)?;
|
|
||||||
|
|
||||||
Ok::<_, ApiError>(timeline_info)
|
Ok::<_, ApiError>(timeline_info)
|
||||||
}
|
}
|
||||||
@@ -546,11 +502,7 @@ async fn tenant_size_handler(request: Request<Body>) -> Result<Response<Body>, A
|
|||||||
|
|
||||||
// this can be long operation
|
// this can be long operation
|
||||||
let inputs = tenant
|
let inputs = tenant
|
||||||
.gather_size_inputs(
|
.gather_size_inputs(retention_period, &ctx)
|
||||||
retention_period,
|
|
||||||
LogicalSizeCalculationCause::TenantSizeHandler,
|
|
||||||
&ctx,
|
|
||||||
)
|
|
||||||
.await
|
.await
|
||||||
.map_err(ApiError::InternalServerError)?;
|
.map_err(ApiError::InternalServerError)?;
|
||||||
|
|
||||||
|
|||||||
@@ -20,7 +20,6 @@ use postgres_ffi::{Oid, TimestampTz, TransactionId};
|
|||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
use std::collections::{hash_map, HashMap, HashSet};
|
use std::collections::{hash_map, HashMap, HashSet};
|
||||||
use std::ops::Range;
|
use std::ops::Range;
|
||||||
use tokio_util::sync::CancellationToken;
|
|
||||||
use tracing::{debug, trace, warn};
|
use tracing::{debug, trace, warn};
|
||||||
use utils::{bin_ser::BeSer, lsn::Lsn};
|
use utils::{bin_ser::BeSer, lsn::Lsn};
|
||||||
|
|
||||||
@@ -139,6 +138,17 @@ impl Timeline {
|
|||||||
Ok(total_blocks)
|
Ok(total_blocks)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Get timeline logical size
|
||||||
|
pub async fn get_logical_size(
|
||||||
|
&self,
|
||||||
|
lsn: Lsn,
|
||||||
|
ctx: &RequestContext,
|
||||||
|
) -> Result<u64, PageReconstructError> {
|
||||||
|
let mut buf = self.get(LOGICAL_SIZE_KEY, lsn, ctx).await?;
|
||||||
|
let size = buf.get_u64_le();
|
||||||
|
Ok(size)
|
||||||
|
}
|
||||||
|
|
||||||
/// Get size of a relation file
|
/// Get size of a relation file
|
||||||
pub async fn get_rel_size(
|
pub async fn get_rel_size(
|
||||||
&self,
|
&self,
|
||||||
@@ -489,46 +499,6 @@ impl Timeline {
|
|||||||
self.get(CHECKPOINT_KEY, lsn, ctx).await
|
self.get(CHECKPOINT_KEY, lsn, ctx).await
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Does the same as get_current_logical_size but counted on demand.
|
|
||||||
/// Used to initialize the logical size tracking on startup.
|
|
||||||
///
|
|
||||||
/// Only relation blocks are counted currently. That excludes metadata,
|
|
||||||
/// SLRUs, twophase files etc.
|
|
||||||
pub async fn get_current_logical_size_non_incremental(
|
|
||||||
&self,
|
|
||||||
lsn: Lsn,
|
|
||||||
cancel: CancellationToken,
|
|
||||||
ctx: &RequestContext,
|
|
||||||
) -> Result<u64, CalculateLogicalSizeError> {
|
|
||||||
crate::tenant::debug_assert_current_span_has_tenant_and_timeline_id();
|
|
||||||
|
|
||||||
// Fetch list of database dirs and iterate them
|
|
||||||
let buf = self.get(DBDIR_KEY, lsn, ctx).await.context("read dbdir")?;
|
|
||||||
let dbdir = DbDirectory::des(&buf).context("deserialize db directory")?;
|
|
||||||
|
|
||||||
let mut total_size: u64 = 0;
|
|
||||||
for (spcnode, dbnode) in dbdir.dbdirs.keys() {
|
|
||||||
for rel in self
|
|
||||||
.list_rels(*spcnode, *dbnode, lsn, ctx)
|
|
||||||
.await
|
|
||||||
.context("list rels")?
|
|
||||||
{
|
|
||||||
if cancel.is_cancelled() {
|
|
||||||
return Err(CalculateLogicalSizeError::Cancelled);
|
|
||||||
}
|
|
||||||
let relsize_key = rel_size_to_key(rel);
|
|
||||||
let mut buf = self
|
|
||||||
.get(relsize_key, lsn, ctx)
|
|
||||||
.await
|
|
||||||
.with_context(|| format!("read relation size of {rel:?}"))?;
|
|
||||||
let relsize = buf.get_u32_le();
|
|
||||||
|
|
||||||
total_size += relsize as u64;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Ok(total_size * BLCKSZ as u64)
|
|
||||||
}
|
|
||||||
|
|
||||||
///
|
///
|
||||||
/// Get a KeySpace that covers all the Keys that are in use at the given LSN.
|
/// Get a KeySpace that covers all the Keys that are in use at the given LSN.
|
||||||
/// Anything that's not listed maybe removed from the underlying storage (from
|
/// Anything that's not listed maybe removed from the underlying storage (from
|
||||||
@@ -819,6 +789,12 @@ impl<'a> DatadirModification<'a> {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn put_logical_size(&mut self, size: u64) -> anyhow::Result<()> {
|
||||||
|
let buf = size.to_le_bytes();
|
||||||
|
self.put(LOGICAL_SIZE_KEY, Value::Image(Bytes::from(buf.to_vec())));
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
pub async fn drop_dbdir(
|
pub async fn drop_dbdir(
|
||||||
&mut self,
|
&mut self,
|
||||||
spcnode: Oid,
|
spcnode: Oid,
|
||||||
@@ -1131,7 +1107,8 @@ impl<'a> DatadirModification<'a> {
|
|||||||
result?;
|
result?;
|
||||||
|
|
||||||
if pending_nblocks != 0 {
|
if pending_nblocks != 0 {
|
||||||
writer.update_current_logical_size(pending_nblocks * i64::from(BLCKSZ));
|
let size = writer.update_current_logical_size(pending_nblocks * i64::from(BLCKSZ));
|
||||||
|
self.put_logical_size(size)?;
|
||||||
self.pending_nblocks = 0;
|
self.pending_nblocks = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -1159,7 +1136,8 @@ impl<'a> DatadirModification<'a> {
|
|||||||
writer.finish_write(lsn);
|
writer.finish_write(lsn);
|
||||||
|
|
||||||
if pending_nblocks != 0 {
|
if pending_nblocks != 0 {
|
||||||
writer.update_current_logical_size(pending_nblocks * i64::from(BLCKSZ));
|
let size = writer.update_current_logical_size(pending_nblocks * i64::from(BLCKSZ));
|
||||||
|
self.put_logical_size(size)?;
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
@@ -1274,7 +1252,7 @@ static ZERO_PAGE: Bytes = Bytes::from_static(&[0u8; BLCKSZ as usize]);
|
|||||||
// 03 misc
|
// 03 misc
|
||||||
// controlfile
|
// controlfile
|
||||||
// checkpoint
|
// checkpoint
|
||||||
// pg_version
|
// logical_size
|
||||||
//
|
//
|
||||||
// Below is a full list of the keyspace allocation:
|
// Below is a full list of the keyspace allocation:
|
||||||
//
|
//
|
||||||
@@ -1314,6 +1292,10 @@ static ZERO_PAGE: Bytes = Bytes::from_static(&[0u8; BLCKSZ as usize]);
|
|||||||
// Checkpoint:
|
// Checkpoint:
|
||||||
// 03 00000000 00000000 00000000 00 00000001
|
// 03 00000000 00000000 00000000 00 00000001
|
||||||
//-- Section 01: relation data and metadata
|
//-- Section 01: relation data and metadata
|
||||||
|
//
|
||||||
|
// LogicalSize:
|
||||||
|
// 03 00000000 00000000 00000000 00 00000002
|
||||||
|
//
|
||||||
|
|
||||||
const DBDIR_KEY: Key = Key {
|
const DBDIR_KEY: Key = Key {
|
||||||
field1: 0x00,
|
field1: 0x00,
|
||||||
@@ -1536,6 +1518,15 @@ const CHECKPOINT_KEY: Key = Key {
|
|||||||
field6: 1,
|
field6: 1,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
const LOGICAL_SIZE_KEY: Key = Key {
|
||||||
|
field1: 0x03,
|
||||||
|
field2: 0,
|
||||||
|
field3: 0,
|
||||||
|
field4: 0,
|
||||||
|
field5: 0,
|
||||||
|
field6: 3,
|
||||||
|
};
|
||||||
|
|
||||||
// Reverse mappings for a few Keys.
|
// Reverse mappings for a few Keys.
|
||||||
// These are needed by WAL redo manager.
|
// These are needed by WAL redo manager.
|
||||||
|
|
||||||
|
|||||||
@@ -237,11 +237,6 @@ pub enum TaskKind {
|
|||||||
/// See [`crate::disk_usage_eviction_task`].
|
/// See [`crate::disk_usage_eviction_task`].
|
||||||
DiskUsageEviction,
|
DiskUsageEviction,
|
||||||
|
|
||||||
// Initial logical size calculation
|
|
||||||
InitialLogicalSizeCalculation,
|
|
||||||
|
|
||||||
OndemandLogicalSizeCalculation,
|
|
||||||
|
|
||||||
// Task that flushes frozen in-memory layers to disk
|
// Task that flushes frozen in-memory layers to disk
|
||||||
LayerFlushTask,
|
LayerFlushTask,
|
||||||
|
|
||||||
|
|||||||
@@ -98,9 +98,7 @@ mod timeline;
|
|||||||
pub mod size;
|
pub mod size;
|
||||||
|
|
||||||
pub(crate) use timeline::debug_assert_current_span_has_tenant_and_timeline_id;
|
pub(crate) use timeline::debug_assert_current_span_has_tenant_and_timeline_id;
|
||||||
pub use timeline::{
|
pub use timeline::{LocalLayerInfoForDiskUsageEviction, PageReconstructError, Timeline};
|
||||||
LocalLayerInfoForDiskUsageEviction, LogicalSizeCalculationCause, PageReconstructError, Timeline,
|
|
||||||
};
|
|
||||||
|
|
||||||
// re-export this function so that page_cache.rs can use it.
|
// re-export this function so that page_cache.rs can use it.
|
||||||
pub use crate::tenant::ephemeral_file::writeback as writeback_ephemeral_file;
|
pub use crate::tenant::ephemeral_file::writeback as writeback_ephemeral_file;
|
||||||
@@ -1043,11 +1041,29 @@ impl Tenant {
|
|||||||
// The loops will shut themselves down when they notice that the tenant is inactive.
|
// The loops will shut themselves down when they notice that the tenant is inactive.
|
||||||
self.activate(ctx)?;
|
self.activate(ctx)?;
|
||||||
|
|
||||||
|
self.load_logical_sizes().await?;
|
||||||
|
|
||||||
info!("Done");
|
info!("Done");
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn load_logical_sizes(&self) -> anyhow::Result<()> {
|
||||||
|
let not_broken_timelines: Vec<Arc<Timeline>>;
|
||||||
|
{
|
||||||
|
let timelines_accessor = self.timelines.lock().unwrap();
|
||||||
|
not_broken_timelines = timelines_accessor
|
||||||
|
.values()
|
||||||
|
.filter(|timeline| timeline.current_state() != TimelineState::Broken)
|
||||||
|
.cloned()
|
||||||
|
.collect();
|
||||||
|
}
|
||||||
|
for timeline in not_broken_timelines {
|
||||||
|
timeline.load_inmem_logical_size().await?;
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
/// Subroutine of `load_tenant`, to load an individual timeline
|
/// Subroutine of `load_tenant`, to load an individual timeline
|
||||||
///
|
///
|
||||||
/// NB: The parent is assumed to be already loaded!
|
/// NB: The parent is assumed to be already loaded!
|
||||||
@@ -2516,7 +2532,6 @@ impl Tenant {
|
|||||||
ancestor: Option<Arc<Timeline>>,
|
ancestor: Option<Arc<Timeline>>,
|
||||||
) -> anyhow::Result<UninitializedTimeline> {
|
) -> anyhow::Result<UninitializedTimeline> {
|
||||||
let tenant_id = self.tenant_id;
|
let tenant_id = self.tenant_id;
|
||||||
|
|
||||||
let remote_client = if let Some(remote_storage) = self.remote_storage.as_ref() {
|
let remote_client = if let Some(remote_storage) = self.remote_storage.as_ref() {
|
||||||
let remote_client = RemoteTimelineClient::new(
|
let remote_client = RemoteTimelineClient::new(
|
||||||
remote_storage.clone(),
|
remote_storage.clone(),
|
||||||
@@ -2637,14 +2652,8 @@ impl Tenant {
|
|||||||
// `max_retention_period` overrides the cutoff that is used to calculate the size
|
// `max_retention_period` overrides the cutoff that is used to calculate the size
|
||||||
// (only if it is shorter than the real cutoff).
|
// (only if it is shorter than the real cutoff).
|
||||||
max_retention_period: Option<u64>,
|
max_retention_period: Option<u64>,
|
||||||
cause: LogicalSizeCalculationCause,
|
|
||||||
ctx: &RequestContext,
|
ctx: &RequestContext,
|
||||||
) -> anyhow::Result<size::ModelInputs> {
|
) -> anyhow::Result<size::ModelInputs> {
|
||||||
let logical_sizes_at_once = self
|
|
||||||
.conf
|
|
||||||
.concurrent_tenant_size_logical_size_queries
|
|
||||||
.inner();
|
|
||||||
|
|
||||||
// TODO: Having a single mutex block concurrent reads is not great for performance.
|
// TODO: Having a single mutex block concurrent reads is not great for performance.
|
||||||
//
|
//
|
||||||
// But the only case where we need to run multiple of these at once is when we
|
// But the only case where we need to run multiple of these at once is when we
|
||||||
@@ -2654,27 +2663,15 @@ impl Tenant {
|
|||||||
// See more for on the issue #2748 condenced out of the initial PR review.
|
// See more for on the issue #2748 condenced out of the initial PR review.
|
||||||
let mut shared_cache = self.cached_logical_sizes.lock().await;
|
let mut shared_cache = self.cached_logical_sizes.lock().await;
|
||||||
|
|
||||||
size::gather_inputs(
|
size::gather_inputs(self, max_retention_period, &mut shared_cache, ctx).await
|
||||||
self,
|
|
||||||
logical_sizes_at_once,
|
|
||||||
max_retention_period,
|
|
||||||
&mut shared_cache,
|
|
||||||
cause,
|
|
||||||
ctx,
|
|
||||||
)
|
|
||||||
.await
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Calculate synthetic tenant size and cache the result.
|
/// Calculate synthetic tenant size and cache the result.
|
||||||
/// This is periodically called by background worker.
|
/// This is periodically called by background worker.
|
||||||
/// result is cached in tenant struct
|
/// result is cached in tenant struct
|
||||||
#[instrument(skip_all, fields(tenant_id=%self.tenant_id))]
|
#[instrument(skip_all, fields(tenant_id=%self.tenant_id))]
|
||||||
pub async fn calculate_synthetic_size(
|
pub async fn calculate_synthetic_size(&self, ctx: &RequestContext) -> anyhow::Result<u64> {
|
||||||
&self,
|
let inputs = self.gather_size_inputs(None, ctx).await?;
|
||||||
cause: LogicalSizeCalculationCause,
|
|
||||||
ctx: &RequestContext,
|
|
||||||
) -> anyhow::Result<u64> {
|
|
||||||
let inputs = self.gather_size_inputs(None, cause, ctx).await?;
|
|
||||||
|
|
||||||
let size = inputs.calculate()?;
|
let size = inputs.calculate()?;
|
||||||
|
|
||||||
|
|||||||
@@ -4,20 +4,14 @@ use std::collections::{HashMap, HashSet};
|
|||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
use anyhow::{bail, Context};
|
use anyhow::{bail, Context};
|
||||||
use tokio::sync::oneshot::error::RecvError;
|
|
||||||
use tokio::sync::Semaphore;
|
|
||||||
use tokio_util::sync::CancellationToken;
|
|
||||||
|
|
||||||
use crate::context::RequestContext;
|
use crate::context::RequestContext;
|
||||||
use crate::pgdatadir_mapping::CalculateLogicalSizeError;
|
|
||||||
|
|
||||||
use super::{LogicalSizeCalculationCause, Tenant};
|
use super::Tenant;
|
||||||
use crate::tenant::Timeline;
|
use crate::tenant::Timeline;
|
||||||
use utils::id::TimelineId;
|
use utils::id::TimelineId;
|
||||||
use utils::lsn::Lsn;
|
use utils::lsn::Lsn;
|
||||||
|
|
||||||
use tracing::*;
|
|
||||||
|
|
||||||
use tenant_size_model::{Segment, StorageModel};
|
use tenant_size_model::{Segment, StorageModel};
|
||||||
|
|
||||||
/// Inputs to the actual tenant sizing model
|
/// Inputs to the actual tenant sizing model
|
||||||
@@ -123,10 +117,8 @@ pub struct TimelineInputs {
|
|||||||
/// tenant size will be zero.
|
/// tenant size will be zero.
|
||||||
pub(super) async fn gather_inputs(
|
pub(super) async fn gather_inputs(
|
||||||
tenant: &Tenant,
|
tenant: &Tenant,
|
||||||
limit: &Arc<Semaphore>,
|
|
||||||
max_retention_period: Option<u64>,
|
max_retention_period: Option<u64>,
|
||||||
logical_size_cache: &mut HashMap<(TimelineId, Lsn), u64>,
|
logical_size_cache: &mut HashMap<(TimelineId, Lsn), u64>,
|
||||||
cause: LogicalSizeCalculationCause,
|
|
||||||
ctx: &RequestContext,
|
ctx: &RequestContext,
|
||||||
) -> anyhow::Result<ModelInputs> {
|
) -> anyhow::Result<ModelInputs> {
|
||||||
// refresh is needed to update gc related pitr_cutoff and horizon_cutoff
|
// refresh is needed to update gc related pitr_cutoff and horizon_cutoff
|
||||||
@@ -319,15 +311,7 @@ pub(super) async fn gather_inputs(
|
|||||||
|
|
||||||
// We left the 'size' field empty in all of the Segments so far.
|
// We left the 'size' field empty in all of the Segments so far.
|
||||||
// Now find logical sizes for all of the points that might need or benefit from them.
|
// Now find logical sizes for all of the points that might need or benefit from them.
|
||||||
fill_logical_sizes(
|
fill_logical_sizes(&timelines, &mut segments, logical_size_cache, ctx).await?;
|
||||||
&timelines,
|
|
||||||
&mut segments,
|
|
||||||
limit,
|
|
||||||
logical_size_cache,
|
|
||||||
cause,
|
|
||||||
ctx,
|
|
||||||
)
|
|
||||||
.await?;
|
|
||||||
|
|
||||||
Ok(ModelInputs {
|
Ok(ModelInputs {
|
||||||
segments,
|
segments,
|
||||||
@@ -343,9 +327,7 @@ pub(super) async fn gather_inputs(
|
|||||||
async fn fill_logical_sizes(
|
async fn fill_logical_sizes(
|
||||||
timelines: &[Arc<Timeline>],
|
timelines: &[Arc<Timeline>],
|
||||||
segments: &mut [SegmentMeta],
|
segments: &mut [SegmentMeta],
|
||||||
limit: &Arc<Semaphore>,
|
|
||||||
logical_size_cache: &mut HashMap<(TimelineId, Lsn), u64>,
|
logical_size_cache: &mut HashMap<(TimelineId, Lsn), u64>,
|
||||||
cause: LogicalSizeCalculationCause,
|
|
||||||
ctx: &RequestContext,
|
ctx: &RequestContext,
|
||||||
) -> anyhow::Result<()> {
|
) -> anyhow::Result<()> {
|
||||||
let timeline_hash: HashMap<TimelineId, Arc<Timeline>> = HashMap::from_iter(
|
let timeline_hash: HashMap<TimelineId, Arc<Timeline>> = HashMap::from_iter(
|
||||||
@@ -361,11 +343,6 @@ async fn fill_logical_sizes(
|
|||||||
|
|
||||||
// with joinset, on drop, all of the tasks will just be de-scheduled, which we can use to
|
// with joinset, on drop, all of the tasks will just be de-scheduled, which we can use to
|
||||||
// our advantage with `?` error handling.
|
// our advantage with `?` error handling.
|
||||||
let mut joinset = tokio::task::JoinSet::new();
|
|
||||||
|
|
||||||
let cancel = tokio_util::sync::CancellationToken::new();
|
|
||||||
// be sure to cancel all spawned tasks if we are dropped
|
|
||||||
let _dg = cancel.clone().drop_guard();
|
|
||||||
|
|
||||||
// For each point that would benefit from having a logical size available,
|
// For each point that would benefit from having a logical size available,
|
||||||
// spawn a Task to fetch it, unless we have it cached already.
|
// spawn a Task to fetch it, unless we have it cached already.
|
||||||
@@ -378,71 +355,18 @@ async fn fill_logical_sizes(
|
|||||||
let lsn = Lsn(seg.segment.lsn);
|
let lsn = Lsn(seg.segment.lsn);
|
||||||
|
|
||||||
if let Entry::Vacant(e) = sizes_needed.entry((timeline_id, lsn)) {
|
if let Entry::Vacant(e) = sizes_needed.entry((timeline_id, lsn)) {
|
||||||
let cached_size = logical_size_cache.get(&(timeline_id, lsn)).cloned();
|
let mut cached_size = logical_size_cache.get(&(timeline_id, lsn)).cloned();
|
||||||
if cached_size.is_none() {
|
if cached_size.is_none() {
|
||||||
let timeline = Arc::clone(timeline_hash.get(&timeline_id).unwrap());
|
let timeline = Arc::clone(timeline_hash.get(&timeline_id).unwrap());
|
||||||
let parallel_size_calcs = Arc::clone(limit);
|
cached_size = Some(timeline.get_logical_size(lsn, ctx).await?);
|
||||||
let ctx = ctx.attached_child();
|
|
||||||
joinset.spawn(
|
|
||||||
calculate_logical_size(
|
|
||||||
parallel_size_calcs,
|
|
||||||
timeline,
|
|
||||||
lsn,
|
|
||||||
cause,
|
|
||||||
ctx,
|
|
||||||
cancel.child_token(),
|
|
||||||
)
|
|
||||||
.in_current_span(),
|
|
||||||
);
|
|
||||||
}
|
}
|
||||||
e.insert(cached_size);
|
e.insert(cached_size);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Perform the size lookups
|
|
||||||
let mut have_any_error = false;
|
|
||||||
while let Some(res) = joinset.join_next().await {
|
|
||||||
// each of these come with Result<anyhow::Result<_>, JoinError>
|
|
||||||
// because of spawn + spawn_blocking
|
|
||||||
match res {
|
|
||||||
Err(join_error) if join_error.is_cancelled() => {
|
|
||||||
unreachable!("we are not cancelling any of the futures, nor should be");
|
|
||||||
}
|
|
||||||
Err(join_error) => {
|
|
||||||
// cannot really do anything, as this panic is likely a bug
|
|
||||||
error!("task that calls spawn_ondemand_logical_size_calculation panicked: {join_error:#}");
|
|
||||||
have_any_error = true;
|
|
||||||
}
|
|
||||||
Ok(Err(recv_result_error)) => {
|
|
||||||
// cannot really do anything, as this panic is likely a bug
|
|
||||||
error!("failed to receive logical size query result: {recv_result_error:#}");
|
|
||||||
have_any_error = true;
|
|
||||||
}
|
|
||||||
Ok(Ok(TimelineAtLsnSizeResult(timeline, lsn, Err(error)))) => {
|
|
||||||
warn!(
|
|
||||||
timeline_id=%timeline.timeline_id,
|
|
||||||
"failed to calculate logical size at {lsn}: {error:#}"
|
|
||||||
);
|
|
||||||
have_any_error = true;
|
|
||||||
}
|
|
||||||
Ok(Ok(TimelineAtLsnSizeResult(timeline, lsn, Ok(size)))) => {
|
|
||||||
debug!(timeline_id=%timeline.timeline_id, %lsn, size, "size calculated");
|
|
||||||
|
|
||||||
logical_size_cache.insert((timeline.timeline_id, lsn), size);
|
|
||||||
sizes_needed.insert((timeline.timeline_id, lsn), Some(size));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// prune any keys not needed anymore; we record every used key and added key.
|
// prune any keys not needed anymore; we record every used key and added key.
|
||||||
logical_size_cache.retain(|key, _| sizes_needed.contains_key(key));
|
logical_size_cache.retain(|key, _| sizes_needed.contains_key(key));
|
||||||
|
|
||||||
if have_any_error {
|
|
||||||
// we cannot complete this round, because we are missing data.
|
|
||||||
// we have however cached all we were able to request calculation on.
|
|
||||||
anyhow::bail!("failed to calculate some logical_sizes");
|
|
||||||
}
|
|
||||||
|
|
||||||
// Insert the looked up sizes to the Segments
|
// Insert the looked up sizes to the Segments
|
||||||
for seg in segments.iter_mut() {
|
for seg in segments.iter_mut() {
|
||||||
if !seg.size_needed() {
|
if !seg.size_needed() {
|
||||||
@@ -484,33 +408,6 @@ impl ModelInputs {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Newtype around the tuple that carries the timeline at lsn logical size calculation.
|
|
||||||
struct TimelineAtLsnSizeResult(
|
|
||||||
Arc<crate::tenant::Timeline>,
|
|
||||||
utils::lsn::Lsn,
|
|
||||||
Result<u64, CalculateLogicalSizeError>,
|
|
||||||
);
|
|
||||||
|
|
||||||
#[instrument(skip_all, fields(timeline_id=%timeline.timeline_id, lsn=%lsn))]
|
|
||||||
async fn calculate_logical_size(
|
|
||||||
limit: Arc<tokio::sync::Semaphore>,
|
|
||||||
timeline: Arc<crate::tenant::Timeline>,
|
|
||||||
lsn: utils::lsn::Lsn,
|
|
||||||
cause: LogicalSizeCalculationCause,
|
|
||||||
ctx: RequestContext,
|
|
||||||
cancel: CancellationToken,
|
|
||||||
) -> Result<TimelineAtLsnSizeResult, RecvError> {
|
|
||||||
let _permit = tokio::sync::Semaphore::acquire_owned(limit)
|
|
||||||
.await
|
|
||||||
.expect("global semaphore should not had been closed");
|
|
||||||
|
|
||||||
let size_res = timeline
|
|
||||||
.spawn_ondemand_logical_size_calculation(lsn, cause, ctx, cancel)
|
|
||||||
.instrument(info_span!("spawn_ondemand_logical_size_calculation"))
|
|
||||||
.await?;
|
|
||||||
Ok(TimelineAtLsnSizeResult(timeline, lsn, size_res))
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn verify_size_for_multiple_branches() {
|
fn verify_size_for_multiple_branches() {
|
||||||
// this is generated from integration test test_tenant_size_with_multiple_branches, but this way
|
// this is generated from integration test test_tenant_size_with_multiple_branches, but this way
|
||||||
|
|||||||
@@ -8,7 +8,6 @@ use bytes::Bytes;
|
|||||||
use fail::fail_point;
|
use fail::fail_point;
|
||||||
use futures::StreamExt;
|
use futures::StreamExt;
|
||||||
use itertools::Itertools;
|
use itertools::Itertools;
|
||||||
use once_cell::sync::OnceCell;
|
|
||||||
use pageserver_api::models::{
|
use pageserver_api::models::{
|
||||||
DownloadRemoteLayersTaskInfo, DownloadRemoteLayersTaskSpawnRequest,
|
DownloadRemoteLayersTaskInfo, DownloadRemoteLayersTaskSpawnRequest,
|
||||||
DownloadRemoteLayersTaskState, LayerMapInfo, LayerResidenceEventReason, LayerResidenceStatus,
|
DownloadRemoteLayersTaskState, LayerMapInfo, LayerResidenceEventReason, LayerResidenceStatus,
|
||||||
@@ -16,7 +15,7 @@ use pageserver_api::models::{
|
|||||||
};
|
};
|
||||||
use remote_storage::GenericRemoteStorage;
|
use remote_storage::GenericRemoteStorage;
|
||||||
use storage_broker::BrokerClientChannel;
|
use storage_broker::BrokerClientChannel;
|
||||||
use tokio::sync::{oneshot, watch, Semaphore, TryAcquireError};
|
use tokio::sync::watch;
|
||||||
use tokio_util::sync::CancellationToken;
|
use tokio_util::sync::CancellationToken;
|
||||||
use tracing::*;
|
use tracing::*;
|
||||||
use utils::id::TenantTimelineId;
|
use utils::id::TenantTimelineId;
|
||||||
@@ -50,9 +49,9 @@ use crate::tenant::{
|
|||||||
use crate::config::PageServerConf;
|
use crate::config::PageServerConf;
|
||||||
use crate::keyspace::{KeyPartitioning, KeySpace};
|
use crate::keyspace::{KeyPartitioning, KeySpace};
|
||||||
use crate::metrics::{TimelineMetrics, UNEXPECTED_ONDEMAND_DOWNLOADS};
|
use crate::metrics::{TimelineMetrics, UNEXPECTED_ONDEMAND_DOWNLOADS};
|
||||||
|
use crate::pgdatadir_mapping::BlockNumber;
|
||||||
use crate::pgdatadir_mapping::LsnForTimestamp;
|
use crate::pgdatadir_mapping::LsnForTimestamp;
|
||||||
use crate::pgdatadir_mapping::{is_rel_fsm_block_key, is_rel_vm_block_key};
|
use crate::pgdatadir_mapping::{is_rel_fsm_block_key, is_rel_vm_block_key};
|
||||||
use crate::pgdatadir_mapping::{BlockNumber, CalculateLogicalSizeError};
|
|
||||||
use crate::tenant::config::{EvictionPolicy, TenantConfOpt};
|
use crate::tenant::config::{EvictionPolicy, TenantConfOpt};
|
||||||
use pageserver_api::reltag::RelTag;
|
use pageserver_api::reltag::RelTag;
|
||||||
|
|
||||||
@@ -211,7 +210,7 @@ pub struct Timeline {
|
|||||||
repartition_threshold: u64,
|
repartition_threshold: u64,
|
||||||
|
|
||||||
/// Current logical size of the "datadir", at the last LSN.
|
/// Current logical size of the "datadir", at the last LSN.
|
||||||
current_logical_size: LogicalSize,
|
current_logical_size: AtomicI64,
|
||||||
|
|
||||||
/// Information about the last processed message by the WAL receiver,
|
/// Information about the last processed message by the WAL receiver,
|
||||||
/// or None if WAL receiver has not received anything for this timeline
|
/// or None if WAL receiver has not received anything for this timeline
|
||||||
@@ -229,126 +228,6 @@ pub struct Timeline {
|
|||||||
eviction_task_timeline_state: tokio::sync::Mutex<EvictionTaskTimelineState>,
|
eviction_task_timeline_state: tokio::sync::Mutex<EvictionTaskTimelineState>,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Internal structure to hold all data needed for logical size calculation.
|
|
||||||
///
|
|
||||||
/// Calculation consists of two stages:
|
|
||||||
///
|
|
||||||
/// 1. Initial size calculation. That might take a long time, because it requires
|
|
||||||
/// reading all layers containing relation sizes at `initial_part_end`.
|
|
||||||
///
|
|
||||||
/// 2. Collecting an incremental part and adding that to the initial size.
|
|
||||||
/// Increments are appended on walreceiver writing new timeline data,
|
|
||||||
/// which result in increase or decrease of the logical size.
|
|
||||||
struct LogicalSize {
|
|
||||||
/// Size, potentially slow to compute. Calculating this might require reading multiple
|
|
||||||
/// layers, and even ancestor's layers.
|
|
||||||
///
|
|
||||||
/// NOTE: size at a given LSN is constant, but after a restart we will calculate
|
|
||||||
/// the initial size at a different LSN.
|
|
||||||
initial_logical_size: OnceCell<u64>,
|
|
||||||
|
|
||||||
/// Semaphore to track ongoing calculation of `initial_logical_size`.
|
|
||||||
initial_size_computation: Arc<tokio::sync::Semaphore>,
|
|
||||||
|
|
||||||
/// Latest Lsn that has its size uncalculated, could be absent for freshly created timelines.
|
|
||||||
initial_part_end: Option<Lsn>,
|
|
||||||
|
|
||||||
/// All other size changes after startup, combined together.
|
|
||||||
///
|
|
||||||
/// Size shouldn't ever be negative, but this is signed for two reasons:
|
|
||||||
///
|
|
||||||
/// 1. If we initialized the "baseline" size lazily, while we already
|
|
||||||
/// process incoming WAL, the incoming WAL records could decrement the
|
|
||||||
/// variable and temporarily make it negative. (This is just future-proofing;
|
|
||||||
/// the initialization is currently not done lazily.)
|
|
||||||
///
|
|
||||||
/// 2. If there is a bug and we e.g. forget to increment it in some cases
|
|
||||||
/// when size grows, but remember to decrement it when it shrinks again, the
|
|
||||||
/// variable could go negative. In that case, it seems better to at least
|
|
||||||
/// try to keep tracking it, rather than clamp or overflow it. Note that
|
|
||||||
/// get_current_logical_size() will clamp the returned value to zero if it's
|
|
||||||
/// negative, and log an error. Could set it permanently to zero or some
|
|
||||||
/// special value to indicate "broken" instead, but this will do for now.
|
|
||||||
///
|
|
||||||
/// Note that we also expose a copy of this value as a prometheus metric,
|
|
||||||
/// see `current_logical_size_gauge`. Use the `update_current_logical_size`
|
|
||||||
/// to modify this, it will also keep the prometheus metric in sync.
|
|
||||||
size_added_after_initial: AtomicI64,
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Normalized current size, that the data in pageserver occupies.
|
|
||||||
#[derive(Debug, Clone, Copy)]
|
|
||||||
enum CurrentLogicalSize {
|
|
||||||
/// The size is not yet calculated to the end, this is an intermediate result,
|
|
||||||
/// constructed from walreceiver increments and normalized: logical data could delete some objects, hence be negative,
|
|
||||||
/// yet total logical size cannot be below 0.
|
|
||||||
Approximate(u64),
|
|
||||||
// Fully calculated logical size, only other future walreceiver increments are changing it, and those changes are
|
|
||||||
// available for observation without any calculations.
|
|
||||||
Exact(u64),
|
|
||||||
}
|
|
||||||
|
|
||||||
impl CurrentLogicalSize {
|
|
||||||
fn size(&self) -> u64 {
|
|
||||||
*match self {
|
|
||||||
Self::Approximate(size) => size,
|
|
||||||
Self::Exact(size) => size,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl LogicalSize {
|
|
||||||
fn empty_initial() -> Self {
|
|
||||||
Self {
|
|
||||||
initial_logical_size: OnceCell::with_value(0),
|
|
||||||
// initial_logical_size already computed, so, don't admit any calculations
|
|
||||||
initial_size_computation: Arc::new(Semaphore::new(0)),
|
|
||||||
initial_part_end: None,
|
|
||||||
size_added_after_initial: AtomicI64::new(0),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn deferred_initial(compute_to: Lsn) -> Self {
|
|
||||||
Self {
|
|
||||||
initial_logical_size: OnceCell::new(),
|
|
||||||
initial_size_computation: Arc::new(Semaphore::new(1)),
|
|
||||||
initial_part_end: Some(compute_to),
|
|
||||||
size_added_after_initial: AtomicI64::new(0),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn current_size(&self) -> anyhow::Result<CurrentLogicalSize> {
|
|
||||||
let size_increment: i64 = self.size_added_after_initial.load(AtomicOrdering::Acquire);
|
|
||||||
// ^^^ keep this type explicit so that the casts in this function break if
|
|
||||||
// we change the type.
|
|
||||||
match self.initial_logical_size.get() {
|
|
||||||
Some(initial_size) => {
|
|
||||||
initial_size.checked_add_signed(size_increment)
|
|
||||||
.with_context(|| format!("Overflow during logical size calculation, initial_size: {initial_size}, size_increment: {size_increment}"))
|
|
||||||
.map(CurrentLogicalSize::Exact)
|
|
||||||
}
|
|
||||||
None => {
|
|
||||||
let non_negative_size_increment = u64::try_from(size_increment).unwrap_or(0);
|
|
||||||
Ok(CurrentLogicalSize::Approximate(non_negative_size_increment))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn increment_size(&self, delta: i64) {
|
|
||||||
self.size_added_after_initial
|
|
||||||
.fetch_add(delta, AtomicOrdering::SeqCst);
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Make the value computed by initial logical size computation
|
|
||||||
/// available for re-use. This doesn't contain the incremental part.
|
|
||||||
fn initialized_size(&self, lsn: Lsn) -> Option<u64> {
|
|
||||||
match self.initial_part_end {
|
|
||||||
Some(v) if v == lsn => self.initial_logical_size.get().copied(),
|
|
||||||
_ => None,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub struct WalReceiverInfo {
|
pub struct WalReceiverInfo {
|
||||||
pub wal_source_connconf: PgConnectionConfig,
|
pub wal_source_connconf: PgConnectionConfig,
|
||||||
pub last_received_msg_lsn: Lsn,
|
pub last_received_msg_lsn: Lsn,
|
||||||
@@ -446,14 +325,6 @@ impl std::fmt::Display for PageReconstructError {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Clone, Copy)]
|
|
||||||
pub enum LogicalSizeCalculationCause {
|
|
||||||
Initial,
|
|
||||||
ConsumptionMetricsSyntheticSize,
|
|
||||||
EvictionTaskImitation,
|
|
||||||
TenantSizeHandler,
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Public interface functions
|
/// Public interface functions
|
||||||
impl Timeline {
|
impl Timeline {
|
||||||
/// Get the LSN where this branch was created
|
/// Get the LSN where this branch was created
|
||||||
@@ -838,23 +709,23 @@ impl Timeline {
|
|||||||
/// the initial size calculation has not been run (gets triggered on the first size access).
|
/// the initial size calculation has not been run (gets triggered on the first size access).
|
||||||
///
|
///
|
||||||
/// return size and boolean flag that shows if the size is exact
|
/// return size and boolean flag that shows if the size is exact
|
||||||
pub fn get_current_logical_size(
|
pub fn get_current_logical_size(self: &Arc<Self>) -> u64 {
|
||||||
self: &Arc<Self>,
|
self.current_logical_size.load(AtomicOrdering::Relaxed) as u64
|
||||||
ctx: &RequestContext,
|
}
|
||||||
) -> anyhow::Result<(u64, bool)> {
|
|
||||||
let current_size = self.current_logical_size.current_size()?;
|
|
||||||
debug!("Current size: {current_size:?}");
|
|
||||||
|
|
||||||
let mut is_exact = true;
|
/// Load from KV storage value of logical timeline size and store it in inmemory atomic variable
|
||||||
let size = current_size.size();
|
pub async fn load_inmem_logical_size(&self) -> anyhow::Result<()> {
|
||||||
if let (CurrentLogicalSize::Approximate(_), Some(initial_part_end)) =
|
let lsn = self.get_disk_consistent_lsn();
|
||||||
(current_size, self.current_logical_size.initial_part_end)
|
if lsn != Lsn::INVALID {
|
||||||
{
|
let ctx = RequestContext::todo_child(TaskKind::Startup, DownloadBehavior::Error);
|
||||||
is_exact = false;
|
match self.get_logical_size(lsn, &ctx).await {
|
||||||
self.try_spawn_size_init_task(initial_part_end, ctx);
|
Ok(size) => self
|
||||||
|
.current_logical_size
|
||||||
|
.store(size as i64, AtomicOrdering::Relaxed),
|
||||||
|
Err(e) => info!("Failed to load logical size: {:?}", e),
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
Ok(())
|
||||||
Ok((size, is_exact))
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Check if more than 'checkpoint_distance' of WAL has been accumulated in
|
/// Check if more than 'checkpoint_distance' of WAL has been accumulated in
|
||||||
@@ -1399,15 +1270,7 @@ impl Timeline {
|
|||||||
latest_gc_cutoff_lsn: Rcu::new(metadata.latest_gc_cutoff_lsn()),
|
latest_gc_cutoff_lsn: Rcu::new(metadata.latest_gc_cutoff_lsn()),
|
||||||
initdb_lsn: metadata.initdb_lsn(),
|
initdb_lsn: metadata.initdb_lsn(),
|
||||||
|
|
||||||
current_logical_size: if disk_consistent_lsn.is_valid() {
|
current_logical_size: AtomicI64::new(0),
|
||||||
// we're creating timeline data with some layer files existing locally,
|
|
||||||
// need to recalculate timeline's logical size based on data in the layers.
|
|
||||||
LogicalSize::deferred_initial(disk_consistent_lsn)
|
|
||||||
} else {
|
|
||||||
// we're creating timeline data without any layers existing locally,
|
|
||||||
// initial logical size is 0.
|
|
||||||
LogicalSize::empty_initial()
|
|
||||||
},
|
|
||||||
partitioning: Mutex::new((KeyPartitioning::new(), Lsn(0))),
|
partitioning: Mutex::new((KeyPartitioning::new(), Lsn(0))),
|
||||||
repartition_threshold: 0,
|
repartition_threshold: 0,
|
||||||
|
|
||||||
@@ -1840,292 +1703,12 @@ impl Timeline {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
fn try_spawn_size_init_task(self: &Arc<Self>, lsn: Lsn, ctx: &RequestContext) {
|
|
||||||
let permit = match Arc::clone(&self.current_logical_size.initial_size_computation)
|
|
||||||
.try_acquire_owned()
|
|
||||||
{
|
|
||||||
Ok(permit) => permit,
|
|
||||||
Err(TryAcquireError::NoPermits) => {
|
|
||||||
// computation already ongoing or finished with success
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
Err(TryAcquireError::Closed) => unreachable!("we never call close"),
|
|
||||||
};
|
|
||||||
debug_assert!(self
|
|
||||||
.current_logical_size
|
|
||||||
.initial_logical_size
|
|
||||||
.get()
|
|
||||||
.is_none());
|
|
||||||
|
|
||||||
info!(
|
|
||||||
"spawning logical size computation from context of task kind {:?}",
|
|
||||||
ctx.task_kind()
|
|
||||||
);
|
|
||||||
// We need to start the computation task.
|
|
||||||
// It gets a separate context since it will outlive the request that called this function.
|
|
||||||
let self_clone = Arc::clone(self);
|
|
||||||
let background_ctx = ctx.detached_child(
|
|
||||||
TaskKind::InitialLogicalSizeCalculation,
|
|
||||||
DownloadBehavior::Download,
|
|
||||||
);
|
|
||||||
task_mgr::spawn(
|
|
||||||
task_mgr::BACKGROUND_RUNTIME.handle(),
|
|
||||||
task_mgr::TaskKind::InitialLogicalSizeCalculation,
|
|
||||||
Some(self.tenant_id),
|
|
||||||
Some(self.timeline_id),
|
|
||||||
"initial size calculation",
|
|
||||||
false,
|
|
||||||
// NB: don't log errors here, task_mgr will do that.
|
|
||||||
async move {
|
|
||||||
// no cancellation here, because nothing really waits for this to complete compared
|
|
||||||
// to spawn_ondemand_logical_size_calculation.
|
|
||||||
let cancel = CancellationToken::new();
|
|
||||||
let calculated_size = match self_clone
|
|
||||||
.logical_size_calculation_task(lsn, LogicalSizeCalculationCause::Initial, &background_ctx, cancel)
|
|
||||||
.await
|
|
||||||
{
|
|
||||||
Ok(s) => s,
|
|
||||||
Err(CalculateLogicalSizeError::Cancelled) => {
|
|
||||||
// Don't make noise, this is a common task.
|
|
||||||
// In the unlikely case that there is another call to this function, we'll retry
|
|
||||||
// because initial_logical_size is still None.
|
|
||||||
info!("initial size calculation cancelled, likely timeline delete / tenant detach");
|
|
||||||
return Ok(());
|
|
||||||
}
|
|
||||||
Err(CalculateLogicalSizeError::Other(err)) => {
|
|
||||||
if let Some(e @ PageReconstructError::AncestorStopping(_)) =
|
|
||||||
err.root_cause().downcast_ref()
|
|
||||||
{
|
|
||||||
// This can happen if the timeline parent timeline switches to
|
|
||||||
// Stopping state while we're still calculating the initial
|
|
||||||
// timeline size for the child, for example if the tenant is
|
|
||||||
// being detached or the pageserver is shut down. Like with
|
|
||||||
// CalculateLogicalSizeError::Cancelled, don't make noise.
|
|
||||||
info!("initial size calculation failed because the timeline or its ancestor is Stopping, likely because the tenant is being detached: {e:#}");
|
|
||||||
return Ok(());
|
|
||||||
}
|
|
||||||
return Err(err.context("Failed to calculate logical size"));
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
// we cannot query current_logical_size.current_size() to know the current
|
|
||||||
// *negative* value, only truncated to u64.
|
|
||||||
let added = self_clone
|
|
||||||
.current_logical_size
|
|
||||||
.size_added_after_initial
|
|
||||||
.load(AtomicOrdering::Relaxed);
|
|
||||||
|
|
||||||
let sum = calculated_size.saturating_add_signed(added);
|
|
||||||
|
|
||||||
// set the gauge value before it can be set in `update_current_logical_size`.
|
|
||||||
self_clone.metrics.current_logical_size_gauge.set(sum);
|
|
||||||
|
|
||||||
match self_clone
|
|
||||||
.current_logical_size
|
|
||||||
.initial_logical_size
|
|
||||||
.set(calculated_size)
|
|
||||||
{
|
|
||||||
Ok(()) => (),
|
|
||||||
Err(_what_we_just_attempted_to_set) => {
|
|
||||||
let existing_size = self_clone
|
|
||||||
.current_logical_size
|
|
||||||
.initial_logical_size
|
|
||||||
.get()
|
|
||||||
.expect("once_cell set was lost, then get failed, impossible.");
|
|
||||||
// This shouldn't happen because the semaphore is initialized with 1.
|
|
||||||
// But if it happens, just complain & report success so there are no further retries.
|
|
||||||
error!("Tried to update initial timeline size value to {calculated_size}, but the size was already set to {existing_size}, not changing")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// now that `initial_logical_size.is_some()`, reduce permit count to 0
|
|
||||||
// so that we prevent future callers from spawning this task
|
|
||||||
permit.forget();
|
|
||||||
Ok(())
|
|
||||||
}.in_current_span(),
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn spawn_ondemand_logical_size_calculation(
|
|
||||||
self: &Arc<Self>,
|
|
||||||
lsn: Lsn,
|
|
||||||
cause: LogicalSizeCalculationCause,
|
|
||||||
ctx: RequestContext,
|
|
||||||
cancel: CancellationToken,
|
|
||||||
) -> oneshot::Receiver<Result<u64, CalculateLogicalSizeError>> {
|
|
||||||
let (sender, receiver) = oneshot::channel();
|
|
||||||
let self_clone = Arc::clone(self);
|
|
||||||
// XXX if our caller loses interest, i.e., ctx is cancelled,
|
|
||||||
// we should stop the size calculation work and return an error.
|
|
||||||
// That would require restructuring this function's API to
|
|
||||||
// return the result directly, instead of a Receiver for the result.
|
|
||||||
let ctx = ctx.detached_child(
|
|
||||||
TaskKind::OndemandLogicalSizeCalculation,
|
|
||||||
DownloadBehavior::Download,
|
|
||||||
);
|
|
||||||
task_mgr::spawn(
|
|
||||||
task_mgr::BACKGROUND_RUNTIME.handle(),
|
|
||||||
task_mgr::TaskKind::OndemandLogicalSizeCalculation,
|
|
||||||
Some(self.tenant_id),
|
|
||||||
Some(self.timeline_id),
|
|
||||||
"ondemand logical size calculation",
|
|
||||||
false,
|
|
||||||
async move {
|
|
||||||
let res = self_clone
|
|
||||||
.logical_size_calculation_task(lsn, cause, &ctx, cancel)
|
|
||||||
.await;
|
|
||||||
let _ = sender.send(res).ok();
|
|
||||||
Ok(()) // Receiver is responsible for handling errors
|
|
||||||
}
|
|
||||||
.in_current_span(),
|
|
||||||
);
|
|
||||||
receiver
|
|
||||||
}
|
|
||||||
|
|
||||||
#[instrument(skip_all)]
|
|
||||||
async fn logical_size_calculation_task(
|
|
||||||
self: &Arc<Self>,
|
|
||||||
lsn: Lsn,
|
|
||||||
cause: LogicalSizeCalculationCause,
|
|
||||||
ctx: &RequestContext,
|
|
||||||
cancel: CancellationToken,
|
|
||||||
) -> Result<u64, CalculateLogicalSizeError> {
|
|
||||||
debug_assert_current_span_has_tenant_and_timeline_id();
|
|
||||||
|
|
||||||
let mut timeline_state_updates = self.subscribe_for_state_updates();
|
|
||||||
let self_calculation = Arc::clone(self);
|
|
||||||
|
|
||||||
let mut calculation = pin!(async {
|
|
||||||
let cancel = cancel.child_token();
|
|
||||||
let ctx = ctx.attached_child();
|
|
||||||
self_calculation
|
|
||||||
.calculate_logical_size(lsn, cause, cancel, &ctx)
|
|
||||||
.await
|
|
||||||
});
|
|
||||||
let timeline_state_cancellation = async {
|
|
||||||
loop {
|
|
||||||
match timeline_state_updates.changed().await {
|
|
||||||
Ok(()) => {
|
|
||||||
let new_state = *timeline_state_updates.borrow();
|
|
||||||
match new_state {
|
|
||||||
// we're running this job for active timelines only
|
|
||||||
TimelineState::Active => continue,
|
|
||||||
TimelineState::Broken
|
|
||||||
| TimelineState::Stopping
|
|
||||||
| TimelineState::Loading => {
|
|
||||||
break format!("aborted because timeline became inactive (new state: {new_state:?})")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Err(_sender_dropped_error) => {
|
|
||||||
// can't happen, the sender is not dropped as long as the Timeline exists
|
|
||||||
break "aborted because state watch was dropped".to_string();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
let taskmgr_shutdown_cancellation = async {
|
|
||||||
task_mgr::shutdown_watcher().await;
|
|
||||||
"aborted because task_mgr shutdown requested".to_string()
|
|
||||||
};
|
|
||||||
|
|
||||||
loop {
|
|
||||||
tokio::select! {
|
|
||||||
res = &mut calculation => { return res }
|
|
||||||
reason = timeline_state_cancellation => {
|
|
||||||
debug!(reason = reason, "cancelling calculation");
|
|
||||||
cancel.cancel();
|
|
||||||
return calculation.await;
|
|
||||||
}
|
|
||||||
reason = taskmgr_shutdown_cancellation => {
|
|
||||||
debug!(reason = reason, "cancelling calculation");
|
|
||||||
cancel.cancel();
|
|
||||||
return calculation.await;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Calculate the logical size of the database at the latest LSN.
|
|
||||||
///
|
|
||||||
/// NOTE: counted incrementally, includes ancestors. This can be a slow operation,
|
|
||||||
/// especially if we need to download remote layers.
|
|
||||||
pub async fn calculate_logical_size(
|
|
||||||
&self,
|
|
||||||
up_to_lsn: Lsn,
|
|
||||||
cause: LogicalSizeCalculationCause,
|
|
||||||
cancel: CancellationToken,
|
|
||||||
ctx: &RequestContext,
|
|
||||||
) -> Result<u64, CalculateLogicalSizeError> {
|
|
||||||
info!(
|
|
||||||
"Calculating logical size for timeline {} at {}",
|
|
||||||
self.timeline_id, up_to_lsn
|
|
||||||
);
|
|
||||||
// These failpoints are used by python tests to ensure that we don't delete
|
|
||||||
// the timeline while the logical size computation is ongoing.
|
|
||||||
// The first failpoint is used to make this function pause.
|
|
||||||
// Then the python test initiates timeline delete operation in a thread.
|
|
||||||
// It waits for a few seconds, then arms the second failpoint and disables
|
|
||||||
// the first failpoint. The second failpoint prints an error if the timeline
|
|
||||||
// delete code has deleted the on-disk state while we're still running here.
|
|
||||||
// It shouldn't do that. If it does it anyway, the error will be caught
|
|
||||||
// by the test suite, highlighting the problem.
|
|
||||||
fail::fail_point!("timeline-calculate-logical-size-pause");
|
|
||||||
fail::fail_point!("timeline-calculate-logical-size-check-dir-exists", |_| {
|
|
||||||
if !self
|
|
||||||
.conf
|
|
||||||
.metadata_path(self.timeline_id, self.tenant_id)
|
|
||||||
.exists()
|
|
||||||
{
|
|
||||||
error!("timeline-calculate-logical-size-pre metadata file does not exist")
|
|
||||||
}
|
|
||||||
// need to return something
|
|
||||||
Ok(0)
|
|
||||||
});
|
|
||||||
// See if we've already done the work for initial size calculation.
|
|
||||||
// This is a short-cut for timelines that are mostly unused.
|
|
||||||
if let Some(size) = self.current_logical_size.initialized_size(up_to_lsn) {
|
|
||||||
return Ok(size);
|
|
||||||
}
|
|
||||||
let storage_time_metrics = match cause {
|
|
||||||
LogicalSizeCalculationCause::Initial
|
|
||||||
| LogicalSizeCalculationCause::ConsumptionMetricsSyntheticSize
|
|
||||||
| LogicalSizeCalculationCause::TenantSizeHandler => &self.metrics.logical_size_histo,
|
|
||||||
LogicalSizeCalculationCause::EvictionTaskImitation => {
|
|
||||||
&self.metrics.imitate_logical_size_histo
|
|
||||||
}
|
|
||||||
};
|
|
||||||
let timer = storage_time_metrics.start_timer();
|
|
||||||
let logical_size = self
|
|
||||||
.get_current_logical_size_non_incremental(up_to_lsn, cancel, ctx)
|
|
||||||
.await?;
|
|
||||||
debug!("calculated logical size: {logical_size}");
|
|
||||||
timer.stop_and_record();
|
|
||||||
Ok(logical_size)
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Update current logical size, adding `delta' to the old value.
|
/// Update current logical size, adding `delta' to the old value.
|
||||||
fn update_current_logical_size(&self, delta: i64) {
|
fn update_current_logical_size(&self, delta: i64) -> u64 {
|
||||||
let logical_size = &self.current_logical_size;
|
let prev_size = self
|
||||||
logical_size.increment_size(delta);
|
.current_logical_size
|
||||||
|
.fetch_add(delta, AtomicOrdering::SeqCst);
|
||||||
// Also set the value in the prometheus gauge. Note that
|
(prev_size + delta) as u64
|
||||||
// there is a race condition here: if this is is called by two
|
|
||||||
// threads concurrently, the prometheus gauge might be set to
|
|
||||||
// one value while current_logical_size is set to the
|
|
||||||
// other.
|
|
||||||
match logical_size.current_size() {
|
|
||||||
Ok(CurrentLogicalSize::Exact(new_current_size)) => self
|
|
||||||
.metrics
|
|
||||||
.current_logical_size_gauge
|
|
||||||
.set(new_current_size),
|
|
||||||
Ok(CurrentLogicalSize::Approximate(_)) => {
|
|
||||||
// don't update the gauge yet, this allows us not to update the gauge back and
|
|
||||||
// forth between the initial size calculation task.
|
|
||||||
}
|
|
||||||
// this is overflow
|
|
||||||
Err(e) => error!("Failed to compute current logical size for metrics update: {e:?}"),
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fn find_layer(&self, layer_file_name: &str) -> Option<Arc<dyn PersistentLayer>> {
|
fn find_layer(&self, layer_file_name: &str) -> Option<Arc<dyn PersistentLayer>> {
|
||||||
@@ -4382,7 +3965,7 @@ impl<'a> TimelineWriter<'a> {
|
|||||||
self.tl.finish_write(new_lsn);
|
self.tl.finish_write(new_lsn);
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn update_current_logical_size(&self, delta: i64) {
|
pub fn update_current_logical_size(&self, delta: i64) -> u64 {
|
||||||
self.tl.update_current_logical_size(delta)
|
self.tl.update_current_logical_size(delta)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -30,7 +30,7 @@ use crate::{
|
|||||||
tenant::{
|
tenant::{
|
||||||
config::{EvictionPolicy, EvictionPolicyLayerAccessThreshold},
|
config::{EvictionPolicy, EvictionPolicyLayerAccessThreshold},
|
||||||
storage_layer::PersistentLayer,
|
storage_layer::PersistentLayer,
|
||||||
LogicalSizeCalculationCause, Tenant,
|
Tenant,
|
||||||
},
|
},
|
||||||
};
|
};
|
||||||
|
|
||||||
@@ -294,17 +294,12 @@ impl Timeline {
|
|||||||
match state.last_layer_access_imitation {
|
match state.last_layer_access_imitation {
|
||||||
Some(ts) if ts.elapsed() < p.threshold => { /* no need to run */ }
|
Some(ts) if ts.elapsed() < p.threshold => { /* no need to run */ }
|
||||||
_ => {
|
_ => {
|
||||||
self.imitate_timeline_cached_layer_accesses(cancel, ctx)
|
self.imitate_timeline_cached_layer_accesses(ctx).await;
|
||||||
.await;
|
|
||||||
state.last_layer_access_imitation = Some(tokio::time::Instant::now())
|
state.last_layer_access_imitation = Some(tokio::time::Instant::now())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
drop(state);
|
drop(state);
|
||||||
|
|
||||||
if cancel.is_cancelled() {
|
|
||||||
return ControlFlow::Break(());
|
|
||||||
}
|
|
||||||
|
|
||||||
// This task is timeline-scoped, but the synthetic size calculation is tenant-scoped.
|
// This task is timeline-scoped, but the synthetic size calculation is tenant-scoped.
|
||||||
// Make one of the tenant's timelines draw the short straw and run the calculation.
|
// Make one of the tenant's timelines draw the short straw and run the calculation.
|
||||||
// The others wait until the calculation is done so that they take into account the
|
// The others wait until the calculation is done so that they take into account the
|
||||||
@@ -333,36 +328,8 @@ impl Timeline {
|
|||||||
|
|
||||||
/// Recompute the values which would cause on-demand downloads during restart.
|
/// Recompute the values which would cause on-demand downloads during restart.
|
||||||
#[instrument(skip_all)]
|
#[instrument(skip_all)]
|
||||||
async fn imitate_timeline_cached_layer_accesses(
|
async fn imitate_timeline_cached_layer_accesses(&self, ctx: &RequestContext) {
|
||||||
&self,
|
|
||||||
cancel: &CancellationToken,
|
|
||||||
ctx: &RequestContext,
|
|
||||||
) {
|
|
||||||
let lsn = self.get_last_record_lsn();
|
let lsn = self.get_last_record_lsn();
|
||||||
|
|
||||||
// imitiate on-restart initial logical size
|
|
||||||
let size = self
|
|
||||||
.calculate_logical_size(
|
|
||||||
lsn,
|
|
||||||
LogicalSizeCalculationCause::EvictionTaskImitation,
|
|
||||||
cancel.clone(),
|
|
||||||
ctx,
|
|
||||||
)
|
|
||||||
.instrument(info_span!("calculate_logical_size"))
|
|
||||||
.await;
|
|
||||||
|
|
||||||
match &size {
|
|
||||||
Ok(_size) => {
|
|
||||||
// good, don't log it to avoid confusion
|
|
||||||
}
|
|
||||||
Err(_) => {
|
|
||||||
// we have known issues for which we already log this on consumption metrics,
|
|
||||||
// gc, and compaction. leave logging out for now.
|
|
||||||
//
|
|
||||||
// https://github.com/neondatabase/neon/issues/2539
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// imitiate repartiting on first compactation
|
// imitiate repartiting on first compactation
|
||||||
if let Err(e) = self
|
if let Err(e) = self
|
||||||
.collect_keyspace(lsn, ctx)
|
.collect_keyspace(lsn, ctx)
|
||||||
@@ -370,13 +337,7 @@ impl Timeline {
|
|||||||
.await
|
.await
|
||||||
{
|
{
|
||||||
// if this failed, we probably failed logical size because these use the same keys
|
// if this failed, we probably failed logical size because these use the same keys
|
||||||
if size.is_err() {
|
warn!("failed to collect keyspace but succeeded in calculating logical size: {e:#}");
|
||||||
// ignore, see above comment
|
|
||||||
} else {
|
|
||||||
warn!(
|
|
||||||
"failed to collect keyspace but succeeded in calculating logical size: {e:#}"
|
|
||||||
);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -413,21 +374,9 @@ impl Timeline {
|
|||||||
// So, the chance of the worst case is quite low in practice.
|
// So, the chance of the worst case is quite low in practice.
|
||||||
// It runs as a per-tenant task, but the eviction_task.rs is per-timeline.
|
// It runs as a per-tenant task, but the eviction_task.rs is per-timeline.
|
||||||
// So, we must coordinate with other with other eviction tasks of this tenant.
|
// So, we must coordinate with other with other eviction tasks of this tenant.
|
||||||
let limit = self
|
|
||||||
.conf
|
|
||||||
.eviction_task_immitated_concurrent_logical_size_queries
|
|
||||||
.inner();
|
|
||||||
|
|
||||||
let mut throwaway_cache = HashMap::new();
|
let mut throwaway_cache = HashMap::new();
|
||||||
let gather = crate::tenant::size::gather_inputs(
|
let gather = crate::tenant::size::gather_inputs(tenant, None, &mut throwaway_cache, ctx)
|
||||||
tenant,
|
.instrument(info_span!("gather_inputs"));
|
||||||
limit,
|
|
||||||
None,
|
|
||||||
&mut throwaway_cache,
|
|
||||||
LogicalSizeCalculationCause::EvictionTaskImitation,
|
|
||||||
ctx,
|
|
||||||
)
|
|
||||||
.instrument(info_span!("gather_inputs"));
|
|
||||||
|
|
||||||
tokio::select! {
|
tokio::select! {
|
||||||
_ = cancel.cancelled() => {}
|
_ = cancel.cancelled() => {}
|
||||||
|
|||||||
@@ -346,9 +346,7 @@ pub(super) async fn handle_walreceiver_connection(
|
|||||||
|
|
||||||
// Send the replication feedback message.
|
// Send the replication feedback message.
|
||||||
// Regular standby_status_update fields are put into this message.
|
// Regular standby_status_update fields are put into this message.
|
||||||
let (timeline_logical_size, _) = timeline
|
let timeline_logical_size = timeline.get_current_logical_size();
|
||||||
.get_current_logical_size(&ctx)
|
|
||||||
.context("Status update creation failed to get current logical size")?;
|
|
||||||
let status_update = PageserverFeedback {
|
let status_update = PageserverFeedback {
|
||||||
current_timeline_size: timeline_logical_size,
|
current_timeline_size: timeline_logical_size,
|
||||||
last_received_lsn,
|
last_received_lsn,
|
||||||
|
|||||||
Reference in New Issue
Block a user