From 9604aec0c9615465d073bb3067372c67b37fb021 Mon Sep 17 00:00:00 2001 From: Konstantin Knizhnik Date: Sat, 20 May 2023 21:28:56 +0300 Subject: [PATCH] Store timeline logical size in key-value storage to make it persistent --- libs/pageserver_api/src/models.rs | 1 - pageserver/src/consumption_metrics.rs | 36 +- pageserver/src/http/routes.rs | 76 +-- pageserver/src/pgdatadir_mapping.rs | 79 ++- pageserver/src/task_mgr.rs | 5 - pageserver/src/tenant.rs | 30 +- pageserver/src/tenant/size.rs | 111 +---- pageserver/src/tenant/timeline.rs | 461 +----------------- .../src/tenant/timeline/eviction_task.rs | 63 +-- .../walreceiver/walreceiver_connection.rs | 4 +- 10 files changed, 95 insertions(+), 771 deletions(-) diff --git a/libs/pageserver_api/src/models.rs b/libs/pageserver_api/src/models.rs index 3bfedd14ea..e0bb813443 100644 --- a/libs/pageserver_api/src/models.rs +++ b/libs/pageserver_api/src/models.rs @@ -282,7 +282,6 @@ pub struct TimelineInfo { /// Sum of the size of all layer files. /// If a layer is present in both local FS and S3, it counts only once. pub current_physical_size: Option, // is None when timeline is Unloaded - pub current_logical_size_non_incremental: Option, pub timeline_dir_layer_file_size_sum: Option, diff --git a/pageserver/src/consumption_metrics.rs b/pageserver/src/consumption_metrics.rs index ca7b9650e8..169a8c1975 100644 --- a/pageserver/src/consumption_metrics.rs +++ b/pageserver/src/consumption_metrics.rs @@ -5,7 +5,7 @@ //! use crate::context::{DownloadBehavior, RequestContext}; use crate::task_mgr::{self, TaskKind, BACKGROUND_RUNTIME}; -use crate::tenant::{mgr, LogicalSizeCalculationCause}; +use crate::tenant::mgr; use anyhow; use chrono::Utc; use consumption_metrics::{idempotency_key, Event, EventChunk, EventType, CHUNK_SIZE}; @@ -113,7 +113,7 @@ pub async fn collect_metrics_iteration( cached_metrics: &mut HashMap, metric_collection_endpoint: &reqwest::Url, node_id: NodeId, - ctx: &RequestContext, + _ctx: &RequestContext, send_cached: bool, ) { let mut current_metrics: Vec<(PageserverConsumptionMetricsKey, u64)> = Vec::new(); @@ -164,30 +164,15 @@ pub async fn collect_metrics_iteration( timeline_written_size, )); - let span = info_span!("collect_metrics_iteration", tenant_id = %timeline.tenant_id, timeline_id = %timeline.timeline_id); - match span.in_scope(|| timeline.get_current_logical_size(ctx)) { - // Only send timeline logical size when it is fully calculated. - Ok((size, is_exact)) if is_exact => { - current_metrics.push(( - PageserverConsumptionMetricsKey { - tenant_id, - timeline_id: Some(timeline.timeline_id), - metric: TIMELINE_LOGICAL_SIZE, - }, - size, - )); - } - Ok((_, _)) => {} - Err(err) => { - error!( - "failed to get current logical size for timeline {}: {err:?}", - timeline.timeline_id - ); - continue; - } - }; + current_metrics.push(( + PageserverConsumptionMetricsKey { + tenant_id, + timeline_id: Some(timeline.timeline_id), + metric: TIMELINE_LOGICAL_SIZE, + }, + timeline.get_current_logical_size(), + )); } - let timeline_resident_size = timeline.get_resident_physical_size(); tenant_resident_size += timeline_resident_size; } @@ -336,7 +321,6 @@ pub async fn calculate_synthetic_size_worker( if let Ok(tenant) = mgr::get_tenant(tenant_id, true).await { if let Err(e) = tenant.calculate_synthetic_size( - LogicalSizeCalculationCause::ConsumptionMetricsSyntheticSize, ctx).await { error!("failed to calculate synthetic size for tenant {}: {}", tenant_id, e); } diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index 7d60d3568a..2a94573302 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -26,7 +26,7 @@ use crate::tenant::config::TenantConfOpt; use crate::tenant::mgr::{TenantMapInsertError, TenantStateError}; use crate::tenant::size::ModelInputs; use crate::tenant::storage_layer::LayerAccessStatsReset; -use crate::tenant::{LogicalSizeCalculationCause, PageReconstructError, Timeline}; +use crate::tenant::{PageReconstructError, Timeline}; use crate::{config::PageServerConf, tenant::mgr}; use utils::{ auth::JwtAuth, @@ -168,36 +168,12 @@ impl From for ApiError { } // Helper function to construct a TimelineInfo struct for a timeline -async fn build_timeline_info( +fn build_timeline_info( timeline: &Arc, - include_non_incremental_logical_size: bool, - ctx: &RequestContext, + _ctx: &RequestContext, ) -> anyhow::Result { crate::tenant::debug_assert_current_span_has_tenant_and_timeline_id(); - let mut info = build_timeline_info_common(timeline, ctx)?; - if include_non_incremental_logical_size { - // XXX we should be using spawn_ondemand_logical_size_calculation here. - // Otherwise, if someone deletes the timeline / detaches the tenant while - // we're executing this function, we will outlive the timeline on-disk state. - info.current_logical_size_non_incremental = Some( - timeline - .get_current_logical_size_non_incremental( - info.last_record_lsn, - CancellationToken::new(), - ctx, - ) - .await?, - ); - } - Ok(info) -} - -fn build_timeline_info_common( - timeline: &Arc, - ctx: &RequestContext, -) -> anyhow::Result { - crate::tenant::debug_assert_current_span_has_tenant_and_timeline_id(); let last_record_lsn = timeline.get_last_record_lsn(); let (wal_source_connstr, last_received_msg_lsn, last_received_msg_ts) = { let guard = timeline.last_received_wal.lock().unwrap(); @@ -217,13 +193,7 @@ fn build_timeline_info_common( Lsn(0) => None, lsn @ Lsn(_) => Some(lsn), }; - let current_logical_size = match timeline.get_current_logical_size(ctx) { - Ok((size, _)) => Some(size), - Err(err) => { - error!("Timeline info creation failed to get current logical size: {err:?}"); - None - } - }; + let current_logical_size = Some(timeline.get_current_logical_size()); let current_physical_size = Some(timeline.layer_size_sum()); let state = timeline.current_state(); let remote_consistent_lsn = timeline.get_remote_consistent_lsn().unwrap_or(Lsn(0)); @@ -240,7 +210,6 @@ fn build_timeline_info_common( latest_gc_cutoff_lsn: *timeline.get_latest_gc_cutoff_lsn(), current_logical_size, current_physical_size, - current_logical_size_non_incremental: None, timeline_dir_layer_file_size_sum: None, wal_source_connstr, last_received_msg_lsn, @@ -282,7 +251,7 @@ async fn timeline_create_handler(mut request: Request) -> Result { // Created. Construct a TimelineInfo for it. - let timeline_info = build_timeline_info_common(&new_timeline, &ctx) + let timeline_info = build_timeline_info(&new_timeline, &ctx) .map_err(ApiError::InternalServerError)?; json_response(StatusCode::CREATED, timeline_info) } @@ -296,8 +265,6 @@ async fn timeline_create_handler(mut request: Request) -> Result) -> Result, ApiError> { let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?; - let include_non_incremental_logical_size: Option = - parse_query_param(&request, "include-non-incremental-logical-size")?; check_permission(&request, Some(tenant_id))?; let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download); @@ -308,15 +275,11 @@ async fn timeline_list_handler(request: Request) -> Result, let mut response_data = Vec::with_capacity(timelines.len()); for timeline in timelines { - let timeline_info = build_timeline_info( - &timeline, - include_non_incremental_logical_size.unwrap_or(false), - &ctx, - ) - .instrument(info_span!("build_timeline_info", timeline_id = %timeline.timeline_id)) - .await - .context("Failed to convert tenant timeline {timeline_id} into the local one: {e:?}") - .map_err(ApiError::InternalServerError)?; + let timeline_info = build_timeline_info(&timeline, &ctx) + .context( + "Failed to convert tenant timeline {timeline_id} into the local one: {e:?}", + ) + .map_err(ApiError::InternalServerError)?; response_data.push(timeline_info); } @@ -331,8 +294,6 @@ async fn timeline_list_handler(request: Request) -> Result, async fn timeline_detail_handler(request: Request) -> Result, ApiError> { let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?; let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?; - let include_non_incremental_logical_size: Option = - parse_query_param(&request, "include-non-incremental-logical-size")?; check_permission(&request, Some(tenant_id))?; // Logical size calculation needs downloading. @@ -345,14 +306,9 @@ async fn timeline_detail_handler(request: Request) -> Result(timeline_info) } @@ -546,11 +502,7 @@ async fn tenant_size_handler(request: Request) -> Result, A // this can be long operation let inputs = tenant - .gather_size_inputs( - retention_period, - LogicalSizeCalculationCause::TenantSizeHandler, - &ctx, - ) + .gather_size_inputs(retention_period, &ctx) .await .map_err(ApiError::InternalServerError)?; diff --git a/pageserver/src/pgdatadir_mapping.rs b/pageserver/src/pgdatadir_mapping.rs index 67f37ee519..0d945ea50f 100644 --- a/pageserver/src/pgdatadir_mapping.rs +++ b/pageserver/src/pgdatadir_mapping.rs @@ -20,7 +20,6 @@ use postgres_ffi::{Oid, TimestampTz, TransactionId}; use serde::{Deserialize, Serialize}; use std::collections::{hash_map, HashMap, HashSet}; use std::ops::Range; -use tokio_util::sync::CancellationToken; use tracing::{debug, trace, warn}; use utils::{bin_ser::BeSer, lsn::Lsn}; @@ -139,6 +138,17 @@ impl Timeline { Ok(total_blocks) } + /// Get timeline logical size + pub async fn get_logical_size( + &self, + lsn: Lsn, + ctx: &RequestContext, + ) -> Result { + let mut buf = self.get(LOGICAL_SIZE_KEY, lsn, ctx).await?; + let size = buf.get_u64_le(); + Ok(size) + } + /// Get size of a relation file pub async fn get_rel_size( &self, @@ -489,46 +499,6 @@ impl Timeline { self.get(CHECKPOINT_KEY, lsn, ctx).await } - /// Does the same as get_current_logical_size but counted on demand. - /// Used to initialize the logical size tracking on startup. - /// - /// Only relation blocks are counted currently. That excludes metadata, - /// SLRUs, twophase files etc. - pub async fn get_current_logical_size_non_incremental( - &self, - lsn: Lsn, - cancel: CancellationToken, - ctx: &RequestContext, - ) -> Result { - crate::tenant::debug_assert_current_span_has_tenant_and_timeline_id(); - - // Fetch list of database dirs and iterate them - let buf = self.get(DBDIR_KEY, lsn, ctx).await.context("read dbdir")?; - let dbdir = DbDirectory::des(&buf).context("deserialize db directory")?; - - let mut total_size: u64 = 0; - for (spcnode, dbnode) in dbdir.dbdirs.keys() { - for rel in self - .list_rels(*spcnode, *dbnode, lsn, ctx) - .await - .context("list rels")? - { - if cancel.is_cancelled() { - return Err(CalculateLogicalSizeError::Cancelled); - } - let relsize_key = rel_size_to_key(rel); - let mut buf = self - .get(relsize_key, lsn, ctx) - .await - .with_context(|| format!("read relation size of {rel:?}"))?; - let relsize = buf.get_u32_le(); - - total_size += relsize as u64; - } - } - Ok(total_size * BLCKSZ as u64) - } - /// /// Get a KeySpace that covers all the Keys that are in use at the given LSN. /// Anything that's not listed maybe removed from the underlying storage (from @@ -819,6 +789,12 @@ impl<'a> DatadirModification<'a> { Ok(()) } + pub fn put_logical_size(&mut self, size: u64) -> anyhow::Result<()> { + let buf = size.to_le_bytes(); + self.put(LOGICAL_SIZE_KEY, Value::Image(Bytes::from(buf.to_vec()))); + Ok(()) + } + pub async fn drop_dbdir( &mut self, spcnode: Oid, @@ -1131,7 +1107,8 @@ impl<'a> DatadirModification<'a> { result?; if pending_nblocks != 0 { - writer.update_current_logical_size(pending_nblocks * i64::from(BLCKSZ)); + let size = writer.update_current_logical_size(pending_nblocks * i64::from(BLCKSZ)); + self.put_logical_size(size)?; self.pending_nblocks = 0; } @@ -1159,7 +1136,8 @@ impl<'a> DatadirModification<'a> { writer.finish_write(lsn); if pending_nblocks != 0 { - writer.update_current_logical_size(pending_nblocks * i64::from(BLCKSZ)); + let size = writer.update_current_logical_size(pending_nblocks * i64::from(BLCKSZ)); + self.put_logical_size(size)?; } Ok(()) @@ -1274,7 +1252,7 @@ static ZERO_PAGE: Bytes = Bytes::from_static(&[0u8; BLCKSZ as usize]); // 03 misc // controlfile // checkpoint -// pg_version +// logical_size // // Below is a full list of the keyspace allocation: // @@ -1314,6 +1292,10 @@ static ZERO_PAGE: Bytes = Bytes::from_static(&[0u8; BLCKSZ as usize]); // Checkpoint: // 03 00000000 00000000 00000000 00 00000001 //-- Section 01: relation data and metadata +// +// LogicalSize: +// 03 00000000 00000000 00000000 00 00000002 +// const DBDIR_KEY: Key = Key { field1: 0x00, @@ -1536,6 +1518,15 @@ const CHECKPOINT_KEY: Key = Key { field6: 1, }; +const LOGICAL_SIZE_KEY: Key = Key { + field1: 0x03, + field2: 0, + field3: 0, + field4: 0, + field5: 0, + field6: 3, +}; + // Reverse mappings for a few Keys. // These are needed by WAL redo manager. diff --git a/pageserver/src/task_mgr.rs b/pageserver/src/task_mgr.rs index 82aebc6c07..aa2d95ddf4 100644 --- a/pageserver/src/task_mgr.rs +++ b/pageserver/src/task_mgr.rs @@ -237,11 +237,6 @@ pub enum TaskKind { /// See [`crate::disk_usage_eviction_task`]. DiskUsageEviction, - // Initial logical size calculation - InitialLogicalSizeCalculation, - - OndemandLogicalSizeCalculation, - // Task that flushes frozen in-memory layers to disk LayerFlushTask, diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index 8349e1993f..121b8b9505 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -98,9 +98,7 @@ mod timeline; pub mod size; pub(crate) use timeline::debug_assert_current_span_has_tenant_and_timeline_id; -pub use timeline::{ - LocalLayerInfoForDiskUsageEviction, LogicalSizeCalculationCause, PageReconstructError, Timeline, -}; +pub use timeline::{LocalLayerInfoForDiskUsageEviction, PageReconstructError, Timeline}; // re-export this function so that page_cache.rs can use it. pub use crate::tenant::ephemeral_file::writeback as writeback_ephemeral_file; @@ -545,6 +543,8 @@ impl Tenant { } }; + timeline.init_logical_size().await; + if self.remote_storage.is_some() { // Reconcile local state with remote storage, downloading anything that's // missing locally, and scheduling uploads for anything that's missing @@ -2637,14 +2637,8 @@ impl Tenant { // `max_retention_period` overrides the cutoff that is used to calculate the size // (only if it is shorter than the real cutoff). max_retention_period: Option, - cause: LogicalSizeCalculationCause, ctx: &RequestContext, ) -> anyhow::Result { - let logical_sizes_at_once = self - .conf - .concurrent_tenant_size_logical_size_queries - .inner(); - // TODO: Having a single mutex block concurrent reads is not great for performance. // // But the only case where we need to run multiple of these at once is when we @@ -2654,27 +2648,15 @@ impl Tenant { // See more for on the issue #2748 condenced out of the initial PR review. let mut shared_cache = self.cached_logical_sizes.lock().await; - size::gather_inputs( - self, - logical_sizes_at_once, - max_retention_period, - &mut shared_cache, - cause, - ctx, - ) - .await + size::gather_inputs(self, max_retention_period, &mut shared_cache, ctx).await } /// Calculate synthetic tenant size and cache the result. /// This is periodically called by background worker. /// result is cached in tenant struct #[instrument(skip_all, fields(tenant_id=%self.tenant_id))] - pub async fn calculate_synthetic_size( - &self, - cause: LogicalSizeCalculationCause, - ctx: &RequestContext, - ) -> anyhow::Result { - let inputs = self.gather_size_inputs(None, cause, ctx).await?; + pub async fn calculate_synthetic_size(&self, ctx: &RequestContext) -> anyhow::Result { + let inputs = self.gather_size_inputs(None, ctx).await?; let size = inputs.calculate()?; diff --git a/pageserver/src/tenant/size.rs b/pageserver/src/tenant/size.rs index ffcbdc1f1d..e3280d2f0b 100644 --- a/pageserver/src/tenant/size.rs +++ b/pageserver/src/tenant/size.rs @@ -4,20 +4,14 @@ use std::collections::{HashMap, HashSet}; use std::sync::Arc; use anyhow::{bail, Context}; -use tokio::sync::oneshot::error::RecvError; -use tokio::sync::Semaphore; -use tokio_util::sync::CancellationToken; use crate::context::RequestContext; -use crate::pgdatadir_mapping::CalculateLogicalSizeError; -use super::{LogicalSizeCalculationCause, Tenant}; +use super::Tenant; use crate::tenant::Timeline; use utils::id::TimelineId; use utils::lsn::Lsn; -use tracing::*; - use tenant_size_model::{Segment, StorageModel}; /// Inputs to the actual tenant sizing model @@ -123,10 +117,8 @@ pub struct TimelineInputs { /// tenant size will be zero. pub(super) async fn gather_inputs( tenant: &Tenant, - limit: &Arc, max_retention_period: Option, logical_size_cache: &mut HashMap<(TimelineId, Lsn), u64>, - cause: LogicalSizeCalculationCause, ctx: &RequestContext, ) -> anyhow::Result { // refresh is needed to update gc related pitr_cutoff and horizon_cutoff @@ -319,15 +311,7 @@ pub(super) async fn gather_inputs( // We left the 'size' field empty in all of the Segments so far. // Now find logical sizes for all of the points that might need or benefit from them. - fill_logical_sizes( - &timelines, - &mut segments, - limit, - logical_size_cache, - cause, - ctx, - ) - .await?; + fill_logical_sizes(&timelines, &mut segments, logical_size_cache, ctx).await?; Ok(ModelInputs { segments, @@ -343,9 +327,7 @@ pub(super) async fn gather_inputs( async fn fill_logical_sizes( timelines: &[Arc], segments: &mut [SegmentMeta], - limit: &Arc, logical_size_cache: &mut HashMap<(TimelineId, Lsn), u64>, - cause: LogicalSizeCalculationCause, ctx: &RequestContext, ) -> anyhow::Result<()> { let timeline_hash: HashMap> = HashMap::from_iter( @@ -361,11 +343,6 @@ async fn fill_logical_sizes( // with joinset, on drop, all of the tasks will just be de-scheduled, which we can use to // our advantage with `?` error handling. - let mut joinset = tokio::task::JoinSet::new(); - - let cancel = tokio_util::sync::CancellationToken::new(); - // be sure to cancel all spawned tasks if we are dropped - let _dg = cancel.clone().drop_guard(); // For each point that would benefit from having a logical size available, // spawn a Task to fetch it, unless we have it cached already. @@ -378,71 +355,18 @@ async fn fill_logical_sizes( let lsn = Lsn(seg.segment.lsn); if let Entry::Vacant(e) = sizes_needed.entry((timeline_id, lsn)) { - let cached_size = logical_size_cache.get(&(timeline_id, lsn)).cloned(); + let mut cached_size = logical_size_cache.get(&(timeline_id, lsn)).cloned(); if cached_size.is_none() { let timeline = Arc::clone(timeline_hash.get(&timeline_id).unwrap()); - let parallel_size_calcs = Arc::clone(limit); - let ctx = ctx.attached_child(); - joinset.spawn( - calculate_logical_size( - parallel_size_calcs, - timeline, - lsn, - cause, - ctx, - cancel.child_token(), - ) - .in_current_span(), - ); + cached_size = Some(timeline.get_logical_size(lsn, ctx).await?); } e.insert(cached_size); } } - // Perform the size lookups - let mut have_any_error = false; - while let Some(res) = joinset.join_next().await { - // each of these come with Result, JoinError> - // because of spawn + spawn_blocking - match res { - Err(join_error) if join_error.is_cancelled() => { - unreachable!("we are not cancelling any of the futures, nor should be"); - } - Err(join_error) => { - // cannot really do anything, as this panic is likely a bug - error!("task that calls spawn_ondemand_logical_size_calculation panicked: {join_error:#}"); - have_any_error = true; - } - Ok(Err(recv_result_error)) => { - // cannot really do anything, as this panic is likely a bug - error!("failed to receive logical size query result: {recv_result_error:#}"); - have_any_error = true; - } - Ok(Ok(TimelineAtLsnSizeResult(timeline, lsn, Err(error)))) => { - warn!( - timeline_id=%timeline.timeline_id, - "failed to calculate logical size at {lsn}: {error:#}" - ); - have_any_error = true; - } - Ok(Ok(TimelineAtLsnSizeResult(timeline, lsn, Ok(size)))) => { - debug!(timeline_id=%timeline.timeline_id, %lsn, size, "size calculated"); - - logical_size_cache.insert((timeline.timeline_id, lsn), size); - sizes_needed.insert((timeline.timeline_id, lsn), Some(size)); - } - } - } - // prune any keys not needed anymore; we record every used key and added key. logical_size_cache.retain(|key, _| sizes_needed.contains_key(key)); - if have_any_error { - // we cannot complete this round, because we are missing data. - // we have however cached all we were able to request calculation on. - anyhow::bail!("failed to calculate some logical_sizes"); - } - // Insert the looked up sizes to the Segments for seg in segments.iter_mut() { if !seg.size_needed() { @@ -484,33 +408,6 @@ impl ModelInputs { } } -/// Newtype around the tuple that carries the timeline at lsn logical size calculation. -struct TimelineAtLsnSizeResult( - Arc, - utils::lsn::Lsn, - Result, -); - -#[instrument(skip_all, fields(timeline_id=%timeline.timeline_id, lsn=%lsn))] -async fn calculate_logical_size( - limit: Arc, - timeline: Arc, - lsn: utils::lsn::Lsn, - cause: LogicalSizeCalculationCause, - ctx: RequestContext, - cancel: CancellationToken, -) -> Result { - let _permit = tokio::sync::Semaphore::acquire_owned(limit) - .await - .expect("global semaphore should not had been closed"); - - let size_res = timeline - .spawn_ondemand_logical_size_calculation(lsn, cause, ctx, cancel) - .instrument(info_span!("spawn_ondemand_logical_size_calculation")) - .await?; - Ok(TimelineAtLsnSizeResult(timeline, lsn, size_res)) -} - #[test] fn verify_size_for_multiple_branches() { // this is generated from integration test test_tenant_size_with_multiple_branches, but this way diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index c47f4444f5..af0085fd49 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -8,7 +8,6 @@ use bytes::Bytes; use fail::fail_point; use futures::StreamExt; use itertools::Itertools; -use once_cell::sync::OnceCell; use pageserver_api::models::{ DownloadRemoteLayersTaskInfo, DownloadRemoteLayersTaskSpawnRequest, DownloadRemoteLayersTaskState, LayerMapInfo, LayerResidenceEventReason, LayerResidenceStatus, @@ -16,7 +15,7 @@ use pageserver_api::models::{ }; use remote_storage::GenericRemoteStorage; use storage_broker::BrokerClientChannel; -use tokio::sync::{oneshot, watch, Semaphore, TryAcquireError}; +use tokio::sync::watch; use tokio_util::sync::CancellationToken; use tracing::*; use utils::id::TenantTimelineId; @@ -50,9 +49,9 @@ use crate::tenant::{ use crate::config::PageServerConf; use crate::keyspace::{KeyPartitioning, KeySpace}; use crate::metrics::{TimelineMetrics, UNEXPECTED_ONDEMAND_DOWNLOADS}; +use crate::pgdatadir_mapping::BlockNumber; use crate::pgdatadir_mapping::LsnForTimestamp; use crate::pgdatadir_mapping::{is_rel_fsm_block_key, is_rel_vm_block_key}; -use crate::pgdatadir_mapping::{BlockNumber, CalculateLogicalSizeError}; use crate::tenant::config::{EvictionPolicy, TenantConfOpt}; use pageserver_api::reltag::RelTag; @@ -211,7 +210,7 @@ pub struct Timeline { repartition_threshold: u64, /// Current logical size of the "datadir", at the last LSN. - current_logical_size: LogicalSize, + current_logical_size: AtomicI64, /// Information about the last processed message by the WAL receiver, /// or None if WAL receiver has not received anything for this timeline @@ -229,126 +228,6 @@ pub struct Timeline { eviction_task_timeline_state: tokio::sync::Mutex, } -/// Internal structure to hold all data needed for logical size calculation. -/// -/// Calculation consists of two stages: -/// -/// 1. Initial size calculation. That might take a long time, because it requires -/// reading all layers containing relation sizes at `initial_part_end`. -/// -/// 2. Collecting an incremental part and adding that to the initial size. -/// Increments are appended on walreceiver writing new timeline data, -/// which result in increase or decrease of the logical size. -struct LogicalSize { - /// Size, potentially slow to compute. Calculating this might require reading multiple - /// layers, and even ancestor's layers. - /// - /// NOTE: size at a given LSN is constant, but after a restart we will calculate - /// the initial size at a different LSN. - initial_logical_size: OnceCell, - - /// Semaphore to track ongoing calculation of `initial_logical_size`. - initial_size_computation: Arc, - - /// Latest Lsn that has its size uncalculated, could be absent for freshly created timelines. - initial_part_end: Option, - - /// All other size changes after startup, combined together. - /// - /// Size shouldn't ever be negative, but this is signed for two reasons: - /// - /// 1. If we initialized the "baseline" size lazily, while we already - /// process incoming WAL, the incoming WAL records could decrement the - /// variable and temporarily make it negative. (This is just future-proofing; - /// the initialization is currently not done lazily.) - /// - /// 2. If there is a bug and we e.g. forget to increment it in some cases - /// when size grows, but remember to decrement it when it shrinks again, the - /// variable could go negative. In that case, it seems better to at least - /// try to keep tracking it, rather than clamp or overflow it. Note that - /// get_current_logical_size() will clamp the returned value to zero if it's - /// negative, and log an error. Could set it permanently to zero or some - /// special value to indicate "broken" instead, but this will do for now. - /// - /// Note that we also expose a copy of this value as a prometheus metric, - /// see `current_logical_size_gauge`. Use the `update_current_logical_size` - /// to modify this, it will also keep the prometheus metric in sync. - size_added_after_initial: AtomicI64, -} - -/// Normalized current size, that the data in pageserver occupies. -#[derive(Debug, Clone, Copy)] -enum CurrentLogicalSize { - /// The size is not yet calculated to the end, this is an intermediate result, - /// constructed from walreceiver increments and normalized: logical data could delete some objects, hence be negative, - /// yet total logical size cannot be below 0. - Approximate(u64), - // Fully calculated logical size, only other future walreceiver increments are changing it, and those changes are - // available for observation without any calculations. - Exact(u64), -} - -impl CurrentLogicalSize { - fn size(&self) -> u64 { - *match self { - Self::Approximate(size) => size, - Self::Exact(size) => size, - } - } -} - -impl LogicalSize { - fn empty_initial() -> Self { - Self { - initial_logical_size: OnceCell::with_value(0), - // initial_logical_size already computed, so, don't admit any calculations - initial_size_computation: Arc::new(Semaphore::new(0)), - initial_part_end: None, - size_added_after_initial: AtomicI64::new(0), - } - } - - fn deferred_initial(compute_to: Lsn) -> Self { - Self { - initial_logical_size: OnceCell::new(), - initial_size_computation: Arc::new(Semaphore::new(1)), - initial_part_end: Some(compute_to), - size_added_after_initial: AtomicI64::new(0), - } - } - - fn current_size(&self) -> anyhow::Result { - let size_increment: i64 = self.size_added_after_initial.load(AtomicOrdering::Acquire); - // ^^^ keep this type explicit so that the casts in this function break if - // we change the type. - match self.initial_logical_size.get() { - Some(initial_size) => { - initial_size.checked_add_signed(size_increment) - .with_context(|| format!("Overflow during logical size calculation, initial_size: {initial_size}, size_increment: {size_increment}")) - .map(CurrentLogicalSize::Exact) - } - None => { - let non_negative_size_increment = u64::try_from(size_increment).unwrap_or(0); - Ok(CurrentLogicalSize::Approximate(non_negative_size_increment)) - } - } - } - - fn increment_size(&self, delta: i64) { - self.size_added_after_initial - .fetch_add(delta, AtomicOrdering::SeqCst); - } - - /// Make the value computed by initial logical size computation - /// available for re-use. This doesn't contain the incremental part. - fn initialized_size(&self, lsn: Lsn) -> Option { - match self.initial_part_end { - Some(v) if v == lsn => self.initial_logical_size.get().copied(), - _ => None, - } - } -} - pub struct WalReceiverInfo { pub wal_source_connconf: PgConnectionConfig, pub last_received_msg_lsn: Lsn, @@ -446,14 +325,6 @@ impl std::fmt::Display for PageReconstructError { } } -#[derive(Clone, Copy)] -pub enum LogicalSizeCalculationCause { - Initial, - ConsumptionMetricsSyntheticSize, - EvictionTaskImitation, - TenantSizeHandler, -} - /// Public interface functions impl Timeline { /// Get the LSN where this branch was created @@ -838,23 +709,17 @@ impl Timeline { /// the initial size calculation has not been run (gets triggered on the first size access). /// /// return size and boolean flag that shows if the size is exact - pub fn get_current_logical_size( - self: &Arc, - ctx: &RequestContext, - ) -> anyhow::Result<(u64, bool)> { - let current_size = self.current_logical_size.current_size()?; - debug!("Current size: {current_size:?}"); + pub fn get_current_logical_size(self: &Arc) -> u64 { + self.current_logical_size.load(AtomicOrdering::Relaxed) as u64 + } - let mut is_exact = true; - let size = current_size.size(); - if let (CurrentLogicalSize::Approximate(_), Some(initial_part_end)) = - (current_size, self.current_logical_size.initial_part_end) - { - is_exact = false; - self.try_spawn_size_init_task(initial_part_end, ctx); + pub async fn init_logical_size(&self) { + let ctx = RequestContext::todo_child(TaskKind::Startup, DownloadBehavior::Error); + let last_record_lsn = self.get_last_record_lsn(); + if let Ok(size) = self.get_logical_size(last_record_lsn, &ctx).await { + self.current_logical_size + .store(size as i64, AtomicOrdering::Relaxed); } - - Ok((size, is_exact)) } /// Check if more than 'checkpoint_distance' of WAL has been accumulated in @@ -1399,15 +1264,7 @@ impl Timeline { latest_gc_cutoff_lsn: Rcu::new(metadata.latest_gc_cutoff_lsn()), initdb_lsn: metadata.initdb_lsn(), - current_logical_size: if disk_consistent_lsn.is_valid() { - // we're creating timeline data with some layer files existing locally, - // need to recalculate timeline's logical size based on data in the layers. - LogicalSize::deferred_initial(disk_consistent_lsn) - } else { - // we're creating timeline data without any layers existing locally, - // initial logical size is 0. - LogicalSize::empty_initial() - }, + current_logical_size: AtomicI64::new(0), partitioning: Mutex::new((KeyPartitioning::new(), Lsn(0))), repartition_threshold: 0, @@ -1840,292 +1697,12 @@ impl Timeline { Ok(()) } - fn try_spawn_size_init_task(self: &Arc, lsn: Lsn, ctx: &RequestContext) { - let permit = match Arc::clone(&self.current_logical_size.initial_size_computation) - .try_acquire_owned() - { - Ok(permit) => permit, - Err(TryAcquireError::NoPermits) => { - // computation already ongoing or finished with success - return; - } - Err(TryAcquireError::Closed) => unreachable!("we never call close"), - }; - debug_assert!(self - .current_logical_size - .initial_logical_size - .get() - .is_none()); - - info!( - "spawning logical size computation from context of task kind {:?}", - ctx.task_kind() - ); - // We need to start the computation task. - // It gets a separate context since it will outlive the request that called this function. - let self_clone = Arc::clone(self); - let background_ctx = ctx.detached_child( - TaskKind::InitialLogicalSizeCalculation, - DownloadBehavior::Download, - ); - task_mgr::spawn( - task_mgr::BACKGROUND_RUNTIME.handle(), - task_mgr::TaskKind::InitialLogicalSizeCalculation, - Some(self.tenant_id), - Some(self.timeline_id), - "initial size calculation", - false, - // NB: don't log errors here, task_mgr will do that. - async move { - // no cancellation here, because nothing really waits for this to complete compared - // to spawn_ondemand_logical_size_calculation. - let cancel = CancellationToken::new(); - let calculated_size = match self_clone - .logical_size_calculation_task(lsn, LogicalSizeCalculationCause::Initial, &background_ctx, cancel) - .await - { - Ok(s) => s, - Err(CalculateLogicalSizeError::Cancelled) => { - // Don't make noise, this is a common task. - // In the unlikely case that there is another call to this function, we'll retry - // because initial_logical_size is still None. - info!("initial size calculation cancelled, likely timeline delete / tenant detach"); - return Ok(()); - } - Err(CalculateLogicalSizeError::Other(err)) => { - if let Some(e @ PageReconstructError::AncestorStopping(_)) = - err.root_cause().downcast_ref() - { - // This can happen if the timeline parent timeline switches to - // Stopping state while we're still calculating the initial - // timeline size for the child, for example if the tenant is - // being detached or the pageserver is shut down. Like with - // CalculateLogicalSizeError::Cancelled, don't make noise. - info!("initial size calculation failed because the timeline or its ancestor is Stopping, likely because the tenant is being detached: {e:#}"); - return Ok(()); - } - return Err(err.context("Failed to calculate logical size")); - } - }; - - // we cannot query current_logical_size.current_size() to know the current - // *negative* value, only truncated to u64. - let added = self_clone - .current_logical_size - .size_added_after_initial - .load(AtomicOrdering::Relaxed); - - let sum = calculated_size.saturating_add_signed(added); - - // set the gauge value before it can be set in `update_current_logical_size`. - self_clone.metrics.current_logical_size_gauge.set(sum); - - match self_clone - .current_logical_size - .initial_logical_size - .set(calculated_size) - { - Ok(()) => (), - Err(_what_we_just_attempted_to_set) => { - let existing_size = self_clone - .current_logical_size - .initial_logical_size - .get() - .expect("once_cell set was lost, then get failed, impossible."); - // This shouldn't happen because the semaphore is initialized with 1. - // But if it happens, just complain & report success so there are no further retries. - error!("Tried to update initial timeline size value to {calculated_size}, but the size was already set to {existing_size}, not changing") - } - } - // now that `initial_logical_size.is_some()`, reduce permit count to 0 - // so that we prevent future callers from spawning this task - permit.forget(); - Ok(()) - }.in_current_span(), - ); - } - - pub fn spawn_ondemand_logical_size_calculation( - self: &Arc, - lsn: Lsn, - cause: LogicalSizeCalculationCause, - ctx: RequestContext, - cancel: CancellationToken, - ) -> oneshot::Receiver> { - let (sender, receiver) = oneshot::channel(); - let self_clone = Arc::clone(self); - // XXX if our caller loses interest, i.e., ctx is cancelled, - // we should stop the size calculation work and return an error. - // That would require restructuring this function's API to - // return the result directly, instead of a Receiver for the result. - let ctx = ctx.detached_child( - TaskKind::OndemandLogicalSizeCalculation, - DownloadBehavior::Download, - ); - task_mgr::spawn( - task_mgr::BACKGROUND_RUNTIME.handle(), - task_mgr::TaskKind::OndemandLogicalSizeCalculation, - Some(self.tenant_id), - Some(self.timeline_id), - "ondemand logical size calculation", - false, - async move { - let res = self_clone - .logical_size_calculation_task(lsn, cause, &ctx, cancel) - .await; - let _ = sender.send(res).ok(); - Ok(()) // Receiver is responsible for handling errors - } - .in_current_span(), - ); - receiver - } - - #[instrument(skip_all)] - async fn logical_size_calculation_task( - self: &Arc, - lsn: Lsn, - cause: LogicalSizeCalculationCause, - ctx: &RequestContext, - cancel: CancellationToken, - ) -> Result { - debug_assert_current_span_has_tenant_and_timeline_id(); - - let mut timeline_state_updates = self.subscribe_for_state_updates(); - let self_calculation = Arc::clone(self); - - let mut calculation = pin!(async { - let cancel = cancel.child_token(); - let ctx = ctx.attached_child(); - self_calculation - .calculate_logical_size(lsn, cause, cancel, &ctx) - .await - }); - let timeline_state_cancellation = async { - loop { - match timeline_state_updates.changed().await { - Ok(()) => { - let new_state = *timeline_state_updates.borrow(); - match new_state { - // we're running this job for active timelines only - TimelineState::Active => continue, - TimelineState::Broken - | TimelineState::Stopping - | TimelineState::Loading => { - break format!("aborted because timeline became inactive (new state: {new_state:?})") - } - } - } - Err(_sender_dropped_error) => { - // can't happen, the sender is not dropped as long as the Timeline exists - break "aborted because state watch was dropped".to_string(); - } - } - } - }; - - let taskmgr_shutdown_cancellation = async { - task_mgr::shutdown_watcher().await; - "aborted because task_mgr shutdown requested".to_string() - }; - - loop { - tokio::select! { - res = &mut calculation => { return res } - reason = timeline_state_cancellation => { - debug!(reason = reason, "cancelling calculation"); - cancel.cancel(); - return calculation.await; - } - reason = taskmgr_shutdown_cancellation => { - debug!(reason = reason, "cancelling calculation"); - cancel.cancel(); - return calculation.await; - } - } - } - } - - /// Calculate the logical size of the database at the latest LSN. - /// - /// NOTE: counted incrementally, includes ancestors. This can be a slow operation, - /// especially if we need to download remote layers. - pub async fn calculate_logical_size( - &self, - up_to_lsn: Lsn, - cause: LogicalSizeCalculationCause, - cancel: CancellationToken, - ctx: &RequestContext, - ) -> Result { - info!( - "Calculating logical size for timeline {} at {}", - self.timeline_id, up_to_lsn - ); - // These failpoints are used by python tests to ensure that we don't delete - // the timeline while the logical size computation is ongoing. - // The first failpoint is used to make this function pause. - // Then the python test initiates timeline delete operation in a thread. - // It waits for a few seconds, then arms the second failpoint and disables - // the first failpoint. The second failpoint prints an error if the timeline - // delete code has deleted the on-disk state while we're still running here. - // It shouldn't do that. If it does it anyway, the error will be caught - // by the test suite, highlighting the problem. - fail::fail_point!("timeline-calculate-logical-size-pause"); - fail::fail_point!("timeline-calculate-logical-size-check-dir-exists", |_| { - if !self - .conf - .metadata_path(self.timeline_id, self.tenant_id) - .exists() - { - error!("timeline-calculate-logical-size-pre metadata file does not exist") - } - // need to return something - Ok(0) - }); - // See if we've already done the work for initial size calculation. - // This is a short-cut for timelines that are mostly unused. - if let Some(size) = self.current_logical_size.initialized_size(up_to_lsn) { - return Ok(size); - } - let storage_time_metrics = match cause { - LogicalSizeCalculationCause::Initial - | LogicalSizeCalculationCause::ConsumptionMetricsSyntheticSize - | LogicalSizeCalculationCause::TenantSizeHandler => &self.metrics.logical_size_histo, - LogicalSizeCalculationCause::EvictionTaskImitation => { - &self.metrics.imitate_logical_size_histo - } - }; - let timer = storage_time_metrics.start_timer(); - let logical_size = self - .get_current_logical_size_non_incremental(up_to_lsn, cancel, ctx) - .await?; - debug!("calculated logical size: {logical_size}"); - timer.stop_and_record(); - Ok(logical_size) - } - /// Update current logical size, adding `delta' to the old value. - fn update_current_logical_size(&self, delta: i64) { - let logical_size = &self.current_logical_size; - logical_size.increment_size(delta); - - // Also set the value in the prometheus gauge. Note that - // there is a race condition here: if this is is called by two - // threads concurrently, the prometheus gauge might be set to - // one value while current_logical_size is set to the - // other. - match logical_size.current_size() { - Ok(CurrentLogicalSize::Exact(new_current_size)) => self - .metrics - .current_logical_size_gauge - .set(new_current_size), - Ok(CurrentLogicalSize::Approximate(_)) => { - // don't update the gauge yet, this allows us not to update the gauge back and - // forth between the initial size calculation task. - } - // this is overflow - Err(e) => error!("Failed to compute current logical size for metrics update: {e:?}"), - } + fn update_current_logical_size(&self, delta: i64) -> u64 { + let prev_size = self + .current_logical_size + .fetch_add(delta, AtomicOrdering::SeqCst); + (prev_size + delta) as u64 } fn find_layer(&self, layer_file_name: &str) -> Option> { @@ -4382,7 +3959,7 @@ impl<'a> TimelineWriter<'a> { self.tl.finish_write(new_lsn); } - pub fn update_current_logical_size(&self, delta: i64) { + pub fn update_current_logical_size(&self, delta: i64) -> u64 { self.tl.update_current_logical_size(delta) } } diff --git a/pageserver/src/tenant/timeline/eviction_task.rs b/pageserver/src/tenant/timeline/eviction_task.rs index 558600692e..964209d831 100644 --- a/pageserver/src/tenant/timeline/eviction_task.rs +++ b/pageserver/src/tenant/timeline/eviction_task.rs @@ -30,7 +30,7 @@ use crate::{ tenant::{ config::{EvictionPolicy, EvictionPolicyLayerAccessThreshold}, storage_layer::PersistentLayer, - LogicalSizeCalculationCause, Tenant, + Tenant, }, }; @@ -294,17 +294,12 @@ impl Timeline { match state.last_layer_access_imitation { Some(ts) if ts.elapsed() < p.threshold => { /* no need to run */ } _ => { - self.imitate_timeline_cached_layer_accesses(cancel, ctx) - .await; + self.imitate_timeline_cached_layer_accesses(ctx).await; state.last_layer_access_imitation = Some(tokio::time::Instant::now()) } } drop(state); - if cancel.is_cancelled() { - return ControlFlow::Break(()); - } - // This task is timeline-scoped, but the synthetic size calculation is tenant-scoped. // Make one of the tenant's timelines draw the short straw and run the calculation. // The others wait until the calculation is done so that they take into account the @@ -333,36 +328,8 @@ impl Timeline { /// Recompute the values which would cause on-demand downloads during restart. #[instrument(skip_all)] - async fn imitate_timeline_cached_layer_accesses( - &self, - cancel: &CancellationToken, - ctx: &RequestContext, - ) { + async fn imitate_timeline_cached_layer_accesses(&self, ctx: &RequestContext) { let lsn = self.get_last_record_lsn(); - - // imitiate on-restart initial logical size - let size = self - .calculate_logical_size( - lsn, - LogicalSizeCalculationCause::EvictionTaskImitation, - cancel.clone(), - ctx, - ) - .instrument(info_span!("calculate_logical_size")) - .await; - - match &size { - Ok(_size) => { - // good, don't log it to avoid confusion - } - Err(_) => { - // we have known issues for which we already log this on consumption metrics, - // gc, and compaction. leave logging out for now. - // - // https://github.com/neondatabase/neon/issues/2539 - } - } - // imitiate repartiting on first compactation if let Err(e) = self .collect_keyspace(lsn, ctx) @@ -370,13 +337,7 @@ impl Timeline { .await { // if this failed, we probably failed logical size because these use the same keys - if size.is_err() { - // ignore, see above comment - } else { - warn!( - "failed to collect keyspace but succeeded in calculating logical size: {e:#}" - ); - } + warn!("failed to collect keyspace but succeeded in calculating logical size: {e:#}"); } } @@ -413,21 +374,9 @@ impl Timeline { // So, the chance of the worst case is quite low in practice. // It runs as a per-tenant task, but the eviction_task.rs is per-timeline. // So, we must coordinate with other with other eviction tasks of this tenant. - let limit = self - .conf - .eviction_task_immitated_concurrent_logical_size_queries - .inner(); - let mut throwaway_cache = HashMap::new(); - let gather = crate::tenant::size::gather_inputs( - tenant, - limit, - None, - &mut throwaway_cache, - LogicalSizeCalculationCause::EvictionTaskImitation, - ctx, - ) - .instrument(info_span!("gather_inputs")); + let gather = crate::tenant::size::gather_inputs(tenant, None, &mut throwaway_cache, ctx) + .instrument(info_span!("gather_inputs")); tokio::select! { _ = cancel.cancelled() => {} diff --git a/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs b/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs index 1cbed3416c..c221c52f00 100644 --- a/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs +++ b/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs @@ -346,9 +346,7 @@ pub(super) async fn handle_walreceiver_connection( // Send the replication feedback message. // Regular standby_status_update fields are put into this message. - let (timeline_logical_size, _) = timeline - .get_current_logical_size(&ctx) - .context("Status update creation failed to get current logical size")?; + let timeline_logical_size = timeline.get_current_logical_size(); let status_update = PageserverFeedback { current_timeline_size: timeline_logical_size, last_received_lsn,