Store timeline logical size in key-value storage to make it persistent

This commit is contained in:
Konstantin Knizhnik
2023-05-20 21:28:56 +03:00
parent 3837fca7a2
commit 9604aec0c9
10 changed files with 95 additions and 771 deletions

View File

@@ -282,7 +282,6 @@ pub struct TimelineInfo {
/// Sum of the size of all layer files.
/// If a layer is present in both local FS and S3, it counts only once.
pub current_physical_size: Option<u64>, // is None when timeline is Unloaded
pub current_logical_size_non_incremental: Option<u64>,
pub timeline_dir_layer_file_size_sum: Option<u64>,

View File

@@ -5,7 +5,7 @@
//!
use crate::context::{DownloadBehavior, RequestContext};
use crate::task_mgr::{self, TaskKind, BACKGROUND_RUNTIME};
use crate::tenant::{mgr, LogicalSizeCalculationCause};
use crate::tenant::mgr;
use anyhow;
use chrono::Utc;
use consumption_metrics::{idempotency_key, Event, EventChunk, EventType, CHUNK_SIZE};
@@ -113,7 +113,7 @@ pub async fn collect_metrics_iteration(
cached_metrics: &mut HashMap<PageserverConsumptionMetricsKey, u64>,
metric_collection_endpoint: &reqwest::Url,
node_id: NodeId,
ctx: &RequestContext,
_ctx: &RequestContext,
send_cached: bool,
) {
let mut current_metrics: Vec<(PageserverConsumptionMetricsKey, u64)> = Vec::new();
@@ -164,30 +164,15 @@ pub async fn collect_metrics_iteration(
timeline_written_size,
));
let span = info_span!("collect_metrics_iteration", tenant_id = %timeline.tenant_id, timeline_id = %timeline.timeline_id);
match span.in_scope(|| timeline.get_current_logical_size(ctx)) {
// Only send timeline logical size when it is fully calculated.
Ok((size, is_exact)) if is_exact => {
current_metrics.push((
PageserverConsumptionMetricsKey {
tenant_id,
timeline_id: Some(timeline.timeline_id),
metric: TIMELINE_LOGICAL_SIZE,
},
size,
));
}
Ok((_, _)) => {}
Err(err) => {
error!(
"failed to get current logical size for timeline {}: {err:?}",
timeline.timeline_id
);
continue;
}
};
current_metrics.push((
PageserverConsumptionMetricsKey {
tenant_id,
timeline_id: Some(timeline.timeline_id),
metric: TIMELINE_LOGICAL_SIZE,
},
timeline.get_current_logical_size(),
));
}
let timeline_resident_size = timeline.get_resident_physical_size();
tenant_resident_size += timeline_resident_size;
}
@@ -336,7 +321,6 @@ pub async fn calculate_synthetic_size_worker(
if let Ok(tenant) = mgr::get_tenant(tenant_id, true).await
{
if let Err(e) = tenant.calculate_synthetic_size(
LogicalSizeCalculationCause::ConsumptionMetricsSyntheticSize,
ctx).await {
error!("failed to calculate synthetic size for tenant {}: {}", tenant_id, e);
}

View File

@@ -26,7 +26,7 @@ use crate::tenant::config::TenantConfOpt;
use crate::tenant::mgr::{TenantMapInsertError, TenantStateError};
use crate::tenant::size::ModelInputs;
use crate::tenant::storage_layer::LayerAccessStatsReset;
use crate::tenant::{LogicalSizeCalculationCause, PageReconstructError, Timeline};
use crate::tenant::{PageReconstructError, Timeline};
use crate::{config::PageServerConf, tenant::mgr};
use utils::{
auth::JwtAuth,
@@ -168,36 +168,12 @@ impl From<crate::tenant::mgr::DeleteTimelineError> for ApiError {
}
// Helper function to construct a TimelineInfo struct for a timeline
async fn build_timeline_info(
fn build_timeline_info(
timeline: &Arc<Timeline>,
include_non_incremental_logical_size: bool,
ctx: &RequestContext,
_ctx: &RequestContext,
) -> anyhow::Result<TimelineInfo> {
crate::tenant::debug_assert_current_span_has_tenant_and_timeline_id();
let mut info = build_timeline_info_common(timeline, ctx)?;
if include_non_incremental_logical_size {
// XXX we should be using spawn_ondemand_logical_size_calculation here.
// Otherwise, if someone deletes the timeline / detaches the tenant while
// we're executing this function, we will outlive the timeline on-disk state.
info.current_logical_size_non_incremental = Some(
timeline
.get_current_logical_size_non_incremental(
info.last_record_lsn,
CancellationToken::new(),
ctx,
)
.await?,
);
}
Ok(info)
}
fn build_timeline_info_common(
timeline: &Arc<Timeline>,
ctx: &RequestContext,
) -> anyhow::Result<TimelineInfo> {
crate::tenant::debug_assert_current_span_has_tenant_and_timeline_id();
let last_record_lsn = timeline.get_last_record_lsn();
let (wal_source_connstr, last_received_msg_lsn, last_received_msg_ts) = {
let guard = timeline.last_received_wal.lock().unwrap();
@@ -217,13 +193,7 @@ fn build_timeline_info_common(
Lsn(0) => None,
lsn @ Lsn(_) => Some(lsn),
};
let current_logical_size = match timeline.get_current_logical_size(ctx) {
Ok((size, _)) => Some(size),
Err(err) => {
error!("Timeline info creation failed to get current logical size: {err:?}");
None
}
};
let current_logical_size = Some(timeline.get_current_logical_size());
let current_physical_size = Some(timeline.layer_size_sum());
let state = timeline.current_state();
let remote_consistent_lsn = timeline.get_remote_consistent_lsn().unwrap_or(Lsn(0));
@@ -240,7 +210,6 @@ fn build_timeline_info_common(
latest_gc_cutoff_lsn: *timeline.get_latest_gc_cutoff_lsn(),
current_logical_size,
current_physical_size,
current_logical_size_non_incremental: None,
timeline_dir_layer_file_size_sum: None,
wal_source_connstr,
last_received_msg_lsn,
@@ -282,7 +251,7 @@ async fn timeline_create_handler(mut request: Request<Body>) -> Result<Response<
.await {
Ok(Some(new_timeline)) => {
// Created. Construct a TimelineInfo for it.
let timeline_info = build_timeline_info_common(&new_timeline, &ctx)
let timeline_info = build_timeline_info(&new_timeline, &ctx)
.map_err(ApiError::InternalServerError)?;
json_response(StatusCode::CREATED, timeline_info)
}
@@ -296,8 +265,6 @@ async fn timeline_create_handler(mut request: Request<Body>) -> Result<Response<
async fn timeline_list_handler(request: Request<Body>) -> Result<Response<Body>, ApiError> {
let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
let include_non_incremental_logical_size: Option<bool> =
parse_query_param(&request, "include-non-incremental-logical-size")?;
check_permission(&request, Some(tenant_id))?;
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
@@ -308,15 +275,11 @@ async fn timeline_list_handler(request: Request<Body>) -> Result<Response<Body>,
let mut response_data = Vec::with_capacity(timelines.len());
for timeline in timelines {
let timeline_info = build_timeline_info(
&timeline,
include_non_incremental_logical_size.unwrap_or(false),
&ctx,
)
.instrument(info_span!("build_timeline_info", timeline_id = %timeline.timeline_id))
.await
.context("Failed to convert tenant timeline {timeline_id} into the local one: {e:?}")
.map_err(ApiError::InternalServerError)?;
let timeline_info = build_timeline_info(&timeline, &ctx)
.context(
"Failed to convert tenant timeline {timeline_id} into the local one: {e:?}",
)
.map_err(ApiError::InternalServerError)?;
response_data.push(timeline_info);
}
@@ -331,8 +294,6 @@ async fn timeline_list_handler(request: Request<Body>) -> Result<Response<Body>,
async fn timeline_detail_handler(request: Request<Body>) -> Result<Response<Body>, ApiError> {
let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
let include_non_incremental_logical_size: Option<bool> =
parse_query_param(&request, "include-non-incremental-logical-size")?;
check_permission(&request, Some(tenant_id))?;
// Logical size calculation needs downloading.
@@ -345,14 +306,9 @@ async fn timeline_detail_handler(request: Request<Body>) -> Result<Response<Body
.get_timeline(timeline_id, false)
.map_err(ApiError::NotFound)?;
let timeline_info = build_timeline_info(
&timeline,
include_non_incremental_logical_size.unwrap_or(false),
&ctx,
)
.await
.context("get local timeline info")
.map_err(ApiError::InternalServerError)?;
let timeline_info = build_timeline_info(&timeline, &ctx)
.context("get local timeline info")
.map_err(ApiError::InternalServerError)?;
Ok::<_, ApiError>(timeline_info)
}
@@ -546,11 +502,7 @@ async fn tenant_size_handler(request: Request<Body>) -> Result<Response<Body>, A
// this can be long operation
let inputs = tenant
.gather_size_inputs(
retention_period,
LogicalSizeCalculationCause::TenantSizeHandler,
&ctx,
)
.gather_size_inputs(retention_period, &ctx)
.await
.map_err(ApiError::InternalServerError)?;

View File

@@ -20,7 +20,6 @@ use postgres_ffi::{Oid, TimestampTz, TransactionId};
use serde::{Deserialize, Serialize};
use std::collections::{hash_map, HashMap, HashSet};
use std::ops::Range;
use tokio_util::sync::CancellationToken;
use tracing::{debug, trace, warn};
use utils::{bin_ser::BeSer, lsn::Lsn};
@@ -139,6 +138,17 @@ impl Timeline {
Ok(total_blocks)
}
/// Get timeline logical size
pub async fn get_logical_size(
&self,
lsn: Lsn,
ctx: &RequestContext,
) -> Result<u64, PageReconstructError> {
let mut buf = self.get(LOGICAL_SIZE_KEY, lsn, ctx).await?;
let size = buf.get_u64_le();
Ok(size)
}
/// Get size of a relation file
pub async fn get_rel_size(
&self,
@@ -489,46 +499,6 @@ impl Timeline {
self.get(CHECKPOINT_KEY, lsn, ctx).await
}
/// Does the same as get_current_logical_size but counted on demand.
/// Used to initialize the logical size tracking on startup.
///
/// Only relation blocks are counted currently. That excludes metadata,
/// SLRUs, twophase files etc.
pub async fn get_current_logical_size_non_incremental(
&self,
lsn: Lsn,
cancel: CancellationToken,
ctx: &RequestContext,
) -> Result<u64, CalculateLogicalSizeError> {
crate::tenant::debug_assert_current_span_has_tenant_and_timeline_id();
// Fetch list of database dirs and iterate them
let buf = self.get(DBDIR_KEY, lsn, ctx).await.context("read dbdir")?;
let dbdir = DbDirectory::des(&buf).context("deserialize db directory")?;
let mut total_size: u64 = 0;
for (spcnode, dbnode) in dbdir.dbdirs.keys() {
for rel in self
.list_rels(*spcnode, *dbnode, lsn, ctx)
.await
.context("list rels")?
{
if cancel.is_cancelled() {
return Err(CalculateLogicalSizeError::Cancelled);
}
let relsize_key = rel_size_to_key(rel);
let mut buf = self
.get(relsize_key, lsn, ctx)
.await
.with_context(|| format!("read relation size of {rel:?}"))?;
let relsize = buf.get_u32_le();
total_size += relsize as u64;
}
}
Ok(total_size * BLCKSZ as u64)
}
///
/// Get a KeySpace that covers all the Keys that are in use at the given LSN.
/// Anything that's not listed maybe removed from the underlying storage (from
@@ -819,6 +789,12 @@ impl<'a> DatadirModification<'a> {
Ok(())
}
pub fn put_logical_size(&mut self, size: u64) -> anyhow::Result<()> {
let buf = size.to_le_bytes();
self.put(LOGICAL_SIZE_KEY, Value::Image(Bytes::from(buf.to_vec())));
Ok(())
}
pub async fn drop_dbdir(
&mut self,
spcnode: Oid,
@@ -1131,7 +1107,8 @@ impl<'a> DatadirModification<'a> {
result?;
if pending_nblocks != 0 {
writer.update_current_logical_size(pending_nblocks * i64::from(BLCKSZ));
let size = writer.update_current_logical_size(pending_nblocks * i64::from(BLCKSZ));
self.put_logical_size(size)?;
self.pending_nblocks = 0;
}
@@ -1159,7 +1136,8 @@ impl<'a> DatadirModification<'a> {
writer.finish_write(lsn);
if pending_nblocks != 0 {
writer.update_current_logical_size(pending_nblocks * i64::from(BLCKSZ));
let size = writer.update_current_logical_size(pending_nblocks * i64::from(BLCKSZ));
self.put_logical_size(size)?;
}
Ok(())
@@ -1274,7 +1252,7 @@ static ZERO_PAGE: Bytes = Bytes::from_static(&[0u8; BLCKSZ as usize]);
// 03 misc
// controlfile
// checkpoint
// pg_version
// logical_size
//
// Below is a full list of the keyspace allocation:
//
@@ -1314,6 +1292,10 @@ static ZERO_PAGE: Bytes = Bytes::from_static(&[0u8; BLCKSZ as usize]);
// Checkpoint:
// 03 00000000 00000000 00000000 00 00000001
//-- Section 01: relation data and metadata
//
// LogicalSize:
// 03 00000000 00000000 00000000 00 00000002
//
const DBDIR_KEY: Key = Key {
field1: 0x00,
@@ -1536,6 +1518,15 @@ const CHECKPOINT_KEY: Key = Key {
field6: 1,
};
const LOGICAL_SIZE_KEY: Key = Key {
field1: 0x03,
field2: 0,
field3: 0,
field4: 0,
field5: 0,
field6: 3,
};
// Reverse mappings for a few Keys.
// These are needed by WAL redo manager.

View File

@@ -237,11 +237,6 @@ pub enum TaskKind {
/// See [`crate::disk_usage_eviction_task`].
DiskUsageEviction,
// Initial logical size calculation
InitialLogicalSizeCalculation,
OndemandLogicalSizeCalculation,
// Task that flushes frozen in-memory layers to disk
LayerFlushTask,

View File

@@ -98,9 +98,7 @@ mod timeline;
pub mod size;
pub(crate) use timeline::debug_assert_current_span_has_tenant_and_timeline_id;
pub use timeline::{
LocalLayerInfoForDiskUsageEviction, LogicalSizeCalculationCause, PageReconstructError, Timeline,
};
pub use timeline::{LocalLayerInfoForDiskUsageEviction, PageReconstructError, Timeline};
// re-export this function so that page_cache.rs can use it.
pub use crate::tenant::ephemeral_file::writeback as writeback_ephemeral_file;
@@ -545,6 +543,8 @@ impl Tenant {
}
};
timeline.init_logical_size().await;
if self.remote_storage.is_some() {
// Reconcile local state with remote storage, downloading anything that's
// missing locally, and scheduling uploads for anything that's missing
@@ -2637,14 +2637,8 @@ impl Tenant {
// `max_retention_period` overrides the cutoff that is used to calculate the size
// (only if it is shorter than the real cutoff).
max_retention_period: Option<u64>,
cause: LogicalSizeCalculationCause,
ctx: &RequestContext,
) -> anyhow::Result<size::ModelInputs> {
let logical_sizes_at_once = self
.conf
.concurrent_tenant_size_logical_size_queries
.inner();
// TODO: Having a single mutex block concurrent reads is not great for performance.
//
// But the only case where we need to run multiple of these at once is when we
@@ -2654,27 +2648,15 @@ impl Tenant {
// See more for on the issue #2748 condenced out of the initial PR review.
let mut shared_cache = self.cached_logical_sizes.lock().await;
size::gather_inputs(
self,
logical_sizes_at_once,
max_retention_period,
&mut shared_cache,
cause,
ctx,
)
.await
size::gather_inputs(self, max_retention_period, &mut shared_cache, ctx).await
}
/// Calculate synthetic tenant size and cache the result.
/// This is periodically called by background worker.
/// result is cached in tenant struct
#[instrument(skip_all, fields(tenant_id=%self.tenant_id))]
pub async fn calculate_synthetic_size(
&self,
cause: LogicalSizeCalculationCause,
ctx: &RequestContext,
) -> anyhow::Result<u64> {
let inputs = self.gather_size_inputs(None, cause, ctx).await?;
pub async fn calculate_synthetic_size(&self, ctx: &RequestContext) -> anyhow::Result<u64> {
let inputs = self.gather_size_inputs(None, ctx).await?;
let size = inputs.calculate()?;

View File

@@ -4,20 +4,14 @@ use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use anyhow::{bail, Context};
use tokio::sync::oneshot::error::RecvError;
use tokio::sync::Semaphore;
use tokio_util::sync::CancellationToken;
use crate::context::RequestContext;
use crate::pgdatadir_mapping::CalculateLogicalSizeError;
use super::{LogicalSizeCalculationCause, Tenant};
use super::Tenant;
use crate::tenant::Timeline;
use utils::id::TimelineId;
use utils::lsn::Lsn;
use tracing::*;
use tenant_size_model::{Segment, StorageModel};
/// Inputs to the actual tenant sizing model
@@ -123,10 +117,8 @@ pub struct TimelineInputs {
/// tenant size will be zero.
pub(super) async fn gather_inputs(
tenant: &Tenant,
limit: &Arc<Semaphore>,
max_retention_period: Option<u64>,
logical_size_cache: &mut HashMap<(TimelineId, Lsn), u64>,
cause: LogicalSizeCalculationCause,
ctx: &RequestContext,
) -> anyhow::Result<ModelInputs> {
// refresh is needed to update gc related pitr_cutoff and horizon_cutoff
@@ -319,15 +311,7 @@ pub(super) async fn gather_inputs(
// We left the 'size' field empty in all of the Segments so far.
// Now find logical sizes for all of the points that might need or benefit from them.
fill_logical_sizes(
&timelines,
&mut segments,
limit,
logical_size_cache,
cause,
ctx,
)
.await?;
fill_logical_sizes(&timelines, &mut segments, logical_size_cache, ctx).await?;
Ok(ModelInputs {
segments,
@@ -343,9 +327,7 @@ pub(super) async fn gather_inputs(
async fn fill_logical_sizes(
timelines: &[Arc<Timeline>],
segments: &mut [SegmentMeta],
limit: &Arc<Semaphore>,
logical_size_cache: &mut HashMap<(TimelineId, Lsn), u64>,
cause: LogicalSizeCalculationCause,
ctx: &RequestContext,
) -> anyhow::Result<()> {
let timeline_hash: HashMap<TimelineId, Arc<Timeline>> = HashMap::from_iter(
@@ -361,11 +343,6 @@ async fn fill_logical_sizes(
// with joinset, on drop, all of the tasks will just be de-scheduled, which we can use to
// our advantage with `?` error handling.
let mut joinset = tokio::task::JoinSet::new();
let cancel = tokio_util::sync::CancellationToken::new();
// be sure to cancel all spawned tasks if we are dropped
let _dg = cancel.clone().drop_guard();
// For each point that would benefit from having a logical size available,
// spawn a Task to fetch it, unless we have it cached already.
@@ -378,71 +355,18 @@ async fn fill_logical_sizes(
let lsn = Lsn(seg.segment.lsn);
if let Entry::Vacant(e) = sizes_needed.entry((timeline_id, lsn)) {
let cached_size = logical_size_cache.get(&(timeline_id, lsn)).cloned();
let mut cached_size = logical_size_cache.get(&(timeline_id, lsn)).cloned();
if cached_size.is_none() {
let timeline = Arc::clone(timeline_hash.get(&timeline_id).unwrap());
let parallel_size_calcs = Arc::clone(limit);
let ctx = ctx.attached_child();
joinset.spawn(
calculate_logical_size(
parallel_size_calcs,
timeline,
lsn,
cause,
ctx,
cancel.child_token(),
)
.in_current_span(),
);
cached_size = Some(timeline.get_logical_size(lsn, ctx).await?);
}
e.insert(cached_size);
}
}
// Perform the size lookups
let mut have_any_error = false;
while let Some(res) = joinset.join_next().await {
// each of these come with Result<anyhow::Result<_>, JoinError>
// because of spawn + spawn_blocking
match res {
Err(join_error) if join_error.is_cancelled() => {
unreachable!("we are not cancelling any of the futures, nor should be");
}
Err(join_error) => {
// cannot really do anything, as this panic is likely a bug
error!("task that calls spawn_ondemand_logical_size_calculation panicked: {join_error:#}");
have_any_error = true;
}
Ok(Err(recv_result_error)) => {
// cannot really do anything, as this panic is likely a bug
error!("failed to receive logical size query result: {recv_result_error:#}");
have_any_error = true;
}
Ok(Ok(TimelineAtLsnSizeResult(timeline, lsn, Err(error)))) => {
warn!(
timeline_id=%timeline.timeline_id,
"failed to calculate logical size at {lsn}: {error:#}"
);
have_any_error = true;
}
Ok(Ok(TimelineAtLsnSizeResult(timeline, lsn, Ok(size)))) => {
debug!(timeline_id=%timeline.timeline_id, %lsn, size, "size calculated");
logical_size_cache.insert((timeline.timeline_id, lsn), size);
sizes_needed.insert((timeline.timeline_id, lsn), Some(size));
}
}
}
// prune any keys not needed anymore; we record every used key and added key.
logical_size_cache.retain(|key, _| sizes_needed.contains_key(key));
if have_any_error {
// we cannot complete this round, because we are missing data.
// we have however cached all we were able to request calculation on.
anyhow::bail!("failed to calculate some logical_sizes");
}
// Insert the looked up sizes to the Segments
for seg in segments.iter_mut() {
if !seg.size_needed() {
@@ -484,33 +408,6 @@ impl ModelInputs {
}
}
/// Newtype around the tuple that carries the timeline at lsn logical size calculation.
struct TimelineAtLsnSizeResult(
Arc<crate::tenant::Timeline>,
utils::lsn::Lsn,
Result<u64, CalculateLogicalSizeError>,
);
#[instrument(skip_all, fields(timeline_id=%timeline.timeline_id, lsn=%lsn))]
async fn calculate_logical_size(
limit: Arc<tokio::sync::Semaphore>,
timeline: Arc<crate::tenant::Timeline>,
lsn: utils::lsn::Lsn,
cause: LogicalSizeCalculationCause,
ctx: RequestContext,
cancel: CancellationToken,
) -> Result<TimelineAtLsnSizeResult, RecvError> {
let _permit = tokio::sync::Semaphore::acquire_owned(limit)
.await
.expect("global semaphore should not had been closed");
let size_res = timeline
.spawn_ondemand_logical_size_calculation(lsn, cause, ctx, cancel)
.instrument(info_span!("spawn_ondemand_logical_size_calculation"))
.await?;
Ok(TimelineAtLsnSizeResult(timeline, lsn, size_res))
}
#[test]
fn verify_size_for_multiple_branches() {
// this is generated from integration test test_tenant_size_with_multiple_branches, but this way

View File

@@ -8,7 +8,6 @@ use bytes::Bytes;
use fail::fail_point;
use futures::StreamExt;
use itertools::Itertools;
use once_cell::sync::OnceCell;
use pageserver_api::models::{
DownloadRemoteLayersTaskInfo, DownloadRemoteLayersTaskSpawnRequest,
DownloadRemoteLayersTaskState, LayerMapInfo, LayerResidenceEventReason, LayerResidenceStatus,
@@ -16,7 +15,7 @@ use pageserver_api::models::{
};
use remote_storage::GenericRemoteStorage;
use storage_broker::BrokerClientChannel;
use tokio::sync::{oneshot, watch, Semaphore, TryAcquireError};
use tokio::sync::watch;
use tokio_util::sync::CancellationToken;
use tracing::*;
use utils::id::TenantTimelineId;
@@ -50,9 +49,9 @@ use crate::tenant::{
use crate::config::PageServerConf;
use crate::keyspace::{KeyPartitioning, KeySpace};
use crate::metrics::{TimelineMetrics, UNEXPECTED_ONDEMAND_DOWNLOADS};
use crate::pgdatadir_mapping::BlockNumber;
use crate::pgdatadir_mapping::LsnForTimestamp;
use crate::pgdatadir_mapping::{is_rel_fsm_block_key, is_rel_vm_block_key};
use crate::pgdatadir_mapping::{BlockNumber, CalculateLogicalSizeError};
use crate::tenant::config::{EvictionPolicy, TenantConfOpt};
use pageserver_api::reltag::RelTag;
@@ -211,7 +210,7 @@ pub struct Timeline {
repartition_threshold: u64,
/// Current logical size of the "datadir", at the last LSN.
current_logical_size: LogicalSize,
current_logical_size: AtomicI64,
/// Information about the last processed message by the WAL receiver,
/// or None if WAL receiver has not received anything for this timeline
@@ -229,126 +228,6 @@ pub struct Timeline {
eviction_task_timeline_state: tokio::sync::Mutex<EvictionTaskTimelineState>,
}
/// Internal structure to hold all data needed for logical size calculation.
///
/// Calculation consists of two stages:
///
/// 1. Initial size calculation. That might take a long time, because it requires
/// reading all layers containing relation sizes at `initial_part_end`.
///
/// 2. Collecting an incremental part and adding that to the initial size.
/// Increments are appended on walreceiver writing new timeline data,
/// which result in increase or decrease of the logical size.
struct LogicalSize {
/// Size, potentially slow to compute. Calculating this might require reading multiple
/// layers, and even ancestor's layers.
///
/// NOTE: size at a given LSN is constant, but after a restart we will calculate
/// the initial size at a different LSN.
initial_logical_size: OnceCell<u64>,
/// Semaphore to track ongoing calculation of `initial_logical_size`.
initial_size_computation: Arc<tokio::sync::Semaphore>,
/// Latest Lsn that has its size uncalculated, could be absent for freshly created timelines.
initial_part_end: Option<Lsn>,
/// All other size changes after startup, combined together.
///
/// Size shouldn't ever be negative, but this is signed for two reasons:
///
/// 1. If we initialized the "baseline" size lazily, while we already
/// process incoming WAL, the incoming WAL records could decrement the
/// variable and temporarily make it negative. (This is just future-proofing;
/// the initialization is currently not done lazily.)
///
/// 2. If there is a bug and we e.g. forget to increment it in some cases
/// when size grows, but remember to decrement it when it shrinks again, the
/// variable could go negative. In that case, it seems better to at least
/// try to keep tracking it, rather than clamp or overflow it. Note that
/// get_current_logical_size() will clamp the returned value to zero if it's
/// negative, and log an error. Could set it permanently to zero or some
/// special value to indicate "broken" instead, but this will do for now.
///
/// Note that we also expose a copy of this value as a prometheus metric,
/// see `current_logical_size_gauge`. Use the `update_current_logical_size`
/// to modify this, it will also keep the prometheus metric in sync.
size_added_after_initial: AtomicI64,
}
/// Normalized current size, that the data in pageserver occupies.
#[derive(Debug, Clone, Copy)]
enum CurrentLogicalSize {
/// The size is not yet calculated to the end, this is an intermediate result,
/// constructed from walreceiver increments and normalized: logical data could delete some objects, hence be negative,
/// yet total logical size cannot be below 0.
Approximate(u64),
// Fully calculated logical size, only other future walreceiver increments are changing it, and those changes are
// available for observation without any calculations.
Exact(u64),
}
impl CurrentLogicalSize {
fn size(&self) -> u64 {
*match self {
Self::Approximate(size) => size,
Self::Exact(size) => size,
}
}
}
impl LogicalSize {
fn empty_initial() -> Self {
Self {
initial_logical_size: OnceCell::with_value(0),
// initial_logical_size already computed, so, don't admit any calculations
initial_size_computation: Arc::new(Semaphore::new(0)),
initial_part_end: None,
size_added_after_initial: AtomicI64::new(0),
}
}
fn deferred_initial(compute_to: Lsn) -> Self {
Self {
initial_logical_size: OnceCell::new(),
initial_size_computation: Arc::new(Semaphore::new(1)),
initial_part_end: Some(compute_to),
size_added_after_initial: AtomicI64::new(0),
}
}
fn current_size(&self) -> anyhow::Result<CurrentLogicalSize> {
let size_increment: i64 = self.size_added_after_initial.load(AtomicOrdering::Acquire);
// ^^^ keep this type explicit so that the casts in this function break if
// we change the type.
match self.initial_logical_size.get() {
Some(initial_size) => {
initial_size.checked_add_signed(size_increment)
.with_context(|| format!("Overflow during logical size calculation, initial_size: {initial_size}, size_increment: {size_increment}"))
.map(CurrentLogicalSize::Exact)
}
None => {
let non_negative_size_increment = u64::try_from(size_increment).unwrap_or(0);
Ok(CurrentLogicalSize::Approximate(non_negative_size_increment))
}
}
}
fn increment_size(&self, delta: i64) {
self.size_added_after_initial
.fetch_add(delta, AtomicOrdering::SeqCst);
}
/// Make the value computed by initial logical size computation
/// available for re-use. This doesn't contain the incremental part.
fn initialized_size(&self, lsn: Lsn) -> Option<u64> {
match self.initial_part_end {
Some(v) if v == lsn => self.initial_logical_size.get().copied(),
_ => None,
}
}
}
pub struct WalReceiverInfo {
pub wal_source_connconf: PgConnectionConfig,
pub last_received_msg_lsn: Lsn,
@@ -446,14 +325,6 @@ impl std::fmt::Display for PageReconstructError {
}
}
#[derive(Clone, Copy)]
pub enum LogicalSizeCalculationCause {
Initial,
ConsumptionMetricsSyntheticSize,
EvictionTaskImitation,
TenantSizeHandler,
}
/// Public interface functions
impl Timeline {
/// Get the LSN where this branch was created
@@ -838,23 +709,17 @@ impl Timeline {
/// the initial size calculation has not been run (gets triggered on the first size access).
///
/// return size and boolean flag that shows if the size is exact
pub fn get_current_logical_size(
self: &Arc<Self>,
ctx: &RequestContext,
) -> anyhow::Result<(u64, bool)> {
let current_size = self.current_logical_size.current_size()?;
debug!("Current size: {current_size:?}");
pub fn get_current_logical_size(self: &Arc<Self>) -> u64 {
self.current_logical_size.load(AtomicOrdering::Relaxed) as u64
}
let mut is_exact = true;
let size = current_size.size();
if let (CurrentLogicalSize::Approximate(_), Some(initial_part_end)) =
(current_size, self.current_logical_size.initial_part_end)
{
is_exact = false;
self.try_spawn_size_init_task(initial_part_end, ctx);
pub async fn init_logical_size(&self) {
let ctx = RequestContext::todo_child(TaskKind::Startup, DownloadBehavior::Error);
let last_record_lsn = self.get_last_record_lsn();
if let Ok(size) = self.get_logical_size(last_record_lsn, &ctx).await {
self.current_logical_size
.store(size as i64, AtomicOrdering::Relaxed);
}
Ok((size, is_exact))
}
/// Check if more than 'checkpoint_distance' of WAL has been accumulated in
@@ -1399,15 +1264,7 @@ impl Timeline {
latest_gc_cutoff_lsn: Rcu::new(metadata.latest_gc_cutoff_lsn()),
initdb_lsn: metadata.initdb_lsn(),
current_logical_size: if disk_consistent_lsn.is_valid() {
// we're creating timeline data with some layer files existing locally,
// need to recalculate timeline's logical size based on data in the layers.
LogicalSize::deferred_initial(disk_consistent_lsn)
} else {
// we're creating timeline data without any layers existing locally,
// initial logical size is 0.
LogicalSize::empty_initial()
},
current_logical_size: AtomicI64::new(0),
partitioning: Mutex::new((KeyPartitioning::new(), Lsn(0))),
repartition_threshold: 0,
@@ -1840,292 +1697,12 @@ impl Timeline {
Ok(())
}
fn try_spawn_size_init_task(self: &Arc<Self>, lsn: Lsn, ctx: &RequestContext) {
let permit = match Arc::clone(&self.current_logical_size.initial_size_computation)
.try_acquire_owned()
{
Ok(permit) => permit,
Err(TryAcquireError::NoPermits) => {
// computation already ongoing or finished with success
return;
}
Err(TryAcquireError::Closed) => unreachable!("we never call close"),
};
debug_assert!(self
.current_logical_size
.initial_logical_size
.get()
.is_none());
info!(
"spawning logical size computation from context of task kind {:?}",
ctx.task_kind()
);
// We need to start the computation task.
// It gets a separate context since it will outlive the request that called this function.
let self_clone = Arc::clone(self);
let background_ctx = ctx.detached_child(
TaskKind::InitialLogicalSizeCalculation,
DownloadBehavior::Download,
);
task_mgr::spawn(
task_mgr::BACKGROUND_RUNTIME.handle(),
task_mgr::TaskKind::InitialLogicalSizeCalculation,
Some(self.tenant_id),
Some(self.timeline_id),
"initial size calculation",
false,
// NB: don't log errors here, task_mgr will do that.
async move {
// no cancellation here, because nothing really waits for this to complete compared
// to spawn_ondemand_logical_size_calculation.
let cancel = CancellationToken::new();
let calculated_size = match self_clone
.logical_size_calculation_task(lsn, LogicalSizeCalculationCause::Initial, &background_ctx, cancel)
.await
{
Ok(s) => s,
Err(CalculateLogicalSizeError::Cancelled) => {
// Don't make noise, this is a common task.
// In the unlikely case that there is another call to this function, we'll retry
// because initial_logical_size is still None.
info!("initial size calculation cancelled, likely timeline delete / tenant detach");
return Ok(());
}
Err(CalculateLogicalSizeError::Other(err)) => {
if let Some(e @ PageReconstructError::AncestorStopping(_)) =
err.root_cause().downcast_ref()
{
// This can happen if the timeline parent timeline switches to
// Stopping state while we're still calculating the initial
// timeline size for the child, for example if the tenant is
// being detached or the pageserver is shut down. Like with
// CalculateLogicalSizeError::Cancelled, don't make noise.
info!("initial size calculation failed because the timeline or its ancestor is Stopping, likely because the tenant is being detached: {e:#}");
return Ok(());
}
return Err(err.context("Failed to calculate logical size"));
}
};
// we cannot query current_logical_size.current_size() to know the current
// *negative* value, only truncated to u64.
let added = self_clone
.current_logical_size
.size_added_after_initial
.load(AtomicOrdering::Relaxed);
let sum = calculated_size.saturating_add_signed(added);
// set the gauge value before it can be set in `update_current_logical_size`.
self_clone.metrics.current_logical_size_gauge.set(sum);
match self_clone
.current_logical_size
.initial_logical_size
.set(calculated_size)
{
Ok(()) => (),
Err(_what_we_just_attempted_to_set) => {
let existing_size = self_clone
.current_logical_size
.initial_logical_size
.get()
.expect("once_cell set was lost, then get failed, impossible.");
// This shouldn't happen because the semaphore is initialized with 1.
// But if it happens, just complain & report success so there are no further retries.
error!("Tried to update initial timeline size value to {calculated_size}, but the size was already set to {existing_size}, not changing")
}
}
// now that `initial_logical_size.is_some()`, reduce permit count to 0
// so that we prevent future callers from spawning this task
permit.forget();
Ok(())
}.in_current_span(),
);
}
pub fn spawn_ondemand_logical_size_calculation(
self: &Arc<Self>,
lsn: Lsn,
cause: LogicalSizeCalculationCause,
ctx: RequestContext,
cancel: CancellationToken,
) -> oneshot::Receiver<Result<u64, CalculateLogicalSizeError>> {
let (sender, receiver) = oneshot::channel();
let self_clone = Arc::clone(self);
// XXX if our caller loses interest, i.e., ctx is cancelled,
// we should stop the size calculation work and return an error.
// That would require restructuring this function's API to
// return the result directly, instead of a Receiver for the result.
let ctx = ctx.detached_child(
TaskKind::OndemandLogicalSizeCalculation,
DownloadBehavior::Download,
);
task_mgr::spawn(
task_mgr::BACKGROUND_RUNTIME.handle(),
task_mgr::TaskKind::OndemandLogicalSizeCalculation,
Some(self.tenant_id),
Some(self.timeline_id),
"ondemand logical size calculation",
false,
async move {
let res = self_clone
.logical_size_calculation_task(lsn, cause, &ctx, cancel)
.await;
let _ = sender.send(res).ok();
Ok(()) // Receiver is responsible for handling errors
}
.in_current_span(),
);
receiver
}
#[instrument(skip_all)]
async fn logical_size_calculation_task(
self: &Arc<Self>,
lsn: Lsn,
cause: LogicalSizeCalculationCause,
ctx: &RequestContext,
cancel: CancellationToken,
) -> Result<u64, CalculateLogicalSizeError> {
debug_assert_current_span_has_tenant_and_timeline_id();
let mut timeline_state_updates = self.subscribe_for_state_updates();
let self_calculation = Arc::clone(self);
let mut calculation = pin!(async {
let cancel = cancel.child_token();
let ctx = ctx.attached_child();
self_calculation
.calculate_logical_size(lsn, cause, cancel, &ctx)
.await
});
let timeline_state_cancellation = async {
loop {
match timeline_state_updates.changed().await {
Ok(()) => {
let new_state = *timeline_state_updates.borrow();
match new_state {
// we're running this job for active timelines only
TimelineState::Active => continue,
TimelineState::Broken
| TimelineState::Stopping
| TimelineState::Loading => {
break format!("aborted because timeline became inactive (new state: {new_state:?})")
}
}
}
Err(_sender_dropped_error) => {
// can't happen, the sender is not dropped as long as the Timeline exists
break "aborted because state watch was dropped".to_string();
}
}
}
};
let taskmgr_shutdown_cancellation = async {
task_mgr::shutdown_watcher().await;
"aborted because task_mgr shutdown requested".to_string()
};
loop {
tokio::select! {
res = &mut calculation => { return res }
reason = timeline_state_cancellation => {
debug!(reason = reason, "cancelling calculation");
cancel.cancel();
return calculation.await;
}
reason = taskmgr_shutdown_cancellation => {
debug!(reason = reason, "cancelling calculation");
cancel.cancel();
return calculation.await;
}
}
}
}
/// Calculate the logical size of the database at the latest LSN.
///
/// NOTE: counted incrementally, includes ancestors. This can be a slow operation,
/// especially if we need to download remote layers.
pub async fn calculate_logical_size(
&self,
up_to_lsn: Lsn,
cause: LogicalSizeCalculationCause,
cancel: CancellationToken,
ctx: &RequestContext,
) -> Result<u64, CalculateLogicalSizeError> {
info!(
"Calculating logical size for timeline {} at {}",
self.timeline_id, up_to_lsn
);
// These failpoints are used by python tests to ensure that we don't delete
// the timeline while the logical size computation is ongoing.
// The first failpoint is used to make this function pause.
// Then the python test initiates timeline delete operation in a thread.
// It waits for a few seconds, then arms the second failpoint and disables
// the first failpoint. The second failpoint prints an error if the timeline
// delete code has deleted the on-disk state while we're still running here.
// It shouldn't do that. If it does it anyway, the error will be caught
// by the test suite, highlighting the problem.
fail::fail_point!("timeline-calculate-logical-size-pause");
fail::fail_point!("timeline-calculate-logical-size-check-dir-exists", |_| {
if !self
.conf
.metadata_path(self.timeline_id, self.tenant_id)
.exists()
{
error!("timeline-calculate-logical-size-pre metadata file does not exist")
}
// need to return something
Ok(0)
});
// See if we've already done the work for initial size calculation.
// This is a short-cut for timelines that are mostly unused.
if let Some(size) = self.current_logical_size.initialized_size(up_to_lsn) {
return Ok(size);
}
let storage_time_metrics = match cause {
LogicalSizeCalculationCause::Initial
| LogicalSizeCalculationCause::ConsumptionMetricsSyntheticSize
| LogicalSizeCalculationCause::TenantSizeHandler => &self.metrics.logical_size_histo,
LogicalSizeCalculationCause::EvictionTaskImitation => {
&self.metrics.imitate_logical_size_histo
}
};
let timer = storage_time_metrics.start_timer();
let logical_size = self
.get_current_logical_size_non_incremental(up_to_lsn, cancel, ctx)
.await?;
debug!("calculated logical size: {logical_size}");
timer.stop_and_record();
Ok(logical_size)
}
/// Update current logical size, adding `delta' to the old value.
fn update_current_logical_size(&self, delta: i64) {
let logical_size = &self.current_logical_size;
logical_size.increment_size(delta);
// Also set the value in the prometheus gauge. Note that
// there is a race condition here: if this is is called by two
// threads concurrently, the prometheus gauge might be set to
// one value while current_logical_size is set to the
// other.
match logical_size.current_size() {
Ok(CurrentLogicalSize::Exact(new_current_size)) => self
.metrics
.current_logical_size_gauge
.set(new_current_size),
Ok(CurrentLogicalSize::Approximate(_)) => {
// don't update the gauge yet, this allows us not to update the gauge back and
// forth between the initial size calculation task.
}
// this is overflow
Err(e) => error!("Failed to compute current logical size for metrics update: {e:?}"),
}
fn update_current_logical_size(&self, delta: i64) -> u64 {
let prev_size = self
.current_logical_size
.fetch_add(delta, AtomicOrdering::SeqCst);
(prev_size + delta) as u64
}
fn find_layer(&self, layer_file_name: &str) -> Option<Arc<dyn PersistentLayer>> {
@@ -4382,7 +3959,7 @@ impl<'a> TimelineWriter<'a> {
self.tl.finish_write(new_lsn);
}
pub fn update_current_logical_size(&self, delta: i64) {
pub fn update_current_logical_size(&self, delta: i64) -> u64 {
self.tl.update_current_logical_size(delta)
}
}

View File

@@ -30,7 +30,7 @@ use crate::{
tenant::{
config::{EvictionPolicy, EvictionPolicyLayerAccessThreshold},
storage_layer::PersistentLayer,
LogicalSizeCalculationCause, Tenant,
Tenant,
},
};
@@ -294,17 +294,12 @@ impl Timeline {
match state.last_layer_access_imitation {
Some(ts) if ts.elapsed() < p.threshold => { /* no need to run */ }
_ => {
self.imitate_timeline_cached_layer_accesses(cancel, ctx)
.await;
self.imitate_timeline_cached_layer_accesses(ctx).await;
state.last_layer_access_imitation = Some(tokio::time::Instant::now())
}
}
drop(state);
if cancel.is_cancelled() {
return ControlFlow::Break(());
}
// This task is timeline-scoped, but the synthetic size calculation is tenant-scoped.
// Make one of the tenant's timelines draw the short straw and run the calculation.
// The others wait until the calculation is done so that they take into account the
@@ -333,36 +328,8 @@ impl Timeline {
/// Recompute the values which would cause on-demand downloads during restart.
#[instrument(skip_all)]
async fn imitate_timeline_cached_layer_accesses(
&self,
cancel: &CancellationToken,
ctx: &RequestContext,
) {
async fn imitate_timeline_cached_layer_accesses(&self, ctx: &RequestContext) {
let lsn = self.get_last_record_lsn();
// imitiate on-restart initial logical size
let size = self
.calculate_logical_size(
lsn,
LogicalSizeCalculationCause::EvictionTaskImitation,
cancel.clone(),
ctx,
)
.instrument(info_span!("calculate_logical_size"))
.await;
match &size {
Ok(_size) => {
// good, don't log it to avoid confusion
}
Err(_) => {
// we have known issues for which we already log this on consumption metrics,
// gc, and compaction. leave logging out for now.
//
// https://github.com/neondatabase/neon/issues/2539
}
}
// imitiate repartiting on first compactation
if let Err(e) = self
.collect_keyspace(lsn, ctx)
@@ -370,13 +337,7 @@ impl Timeline {
.await
{
// if this failed, we probably failed logical size because these use the same keys
if size.is_err() {
// ignore, see above comment
} else {
warn!(
"failed to collect keyspace but succeeded in calculating logical size: {e:#}"
);
}
warn!("failed to collect keyspace but succeeded in calculating logical size: {e:#}");
}
}
@@ -413,21 +374,9 @@ impl Timeline {
// So, the chance of the worst case is quite low in practice.
// It runs as a per-tenant task, but the eviction_task.rs is per-timeline.
// So, we must coordinate with other with other eviction tasks of this tenant.
let limit = self
.conf
.eviction_task_immitated_concurrent_logical_size_queries
.inner();
let mut throwaway_cache = HashMap::new();
let gather = crate::tenant::size::gather_inputs(
tenant,
limit,
None,
&mut throwaway_cache,
LogicalSizeCalculationCause::EvictionTaskImitation,
ctx,
)
.instrument(info_span!("gather_inputs"));
let gather = crate::tenant::size::gather_inputs(tenant, None, &mut throwaway_cache, ctx)
.instrument(info_span!("gather_inputs"));
tokio::select! {
_ = cancel.cancelled() => {}

View File

@@ -346,9 +346,7 @@ pub(super) async fn handle_walreceiver_connection(
// Send the replication feedback message.
// Regular standby_status_update fields are put into this message.
let (timeline_logical_size, _) = timeline
.get_current_logical_size(&ctx)
.context("Status update creation failed to get current logical size")?;
let timeline_logical_size = timeline.get_current_logical_size();
let status_update = PageserverFeedback {
current_timeline_size: timeline_logical_size,
last_received_lsn,