diff --git a/pageserver/src/consumption_metrics/metrics.rs b/pageserver/src/consumption_metrics/metrics.rs index ad5a564259..0ff019f562 100644 --- a/pageserver/src/consumption_metrics/metrics.rs +++ b/pageserver/src/consumption_metrics/metrics.rs @@ -351,7 +351,12 @@ impl TimelineSnapshot { let current_exact_logical_size = { let span = tracing::info_span!("collect_metrics_iteration", tenant_id = %t.tenant_id, timeline_id = %t.timeline_id); - let size = span.in_scope(|| t.get_current_logical_size(ctx)); + let size = span.in_scope(|| { + t.get_current_logical_size( + crate::tenant::timeline::GetLogicalSizePriority::Background, + ctx, + ) + }); match size { // Only send timeline logical size when it is fully calculated. CurrentLogicalSize::Exact(ref size) => Some(size.into()), diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index 42084a2340..0ba3a4aa82 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -337,7 +337,8 @@ async fn build_timeline_info_common( Lsn(0) => None, lsn @ Lsn(_) => Some(lsn), }; - let current_logical_size = timeline.get_current_logical_size(ctx); + let current_logical_size = + timeline.get_current_logical_size(tenant::timeline::GetLogicalSizePriority::User, ctx); let current_physical_size = Some(timeline.layer_size_sum().await); let state = timeline.current_state(); let remote_consistent_lsn_projected = timeline diff --git a/pageserver/src/metrics.rs b/pageserver/src/metrics.rs index c04b7d075f..ee1a16cb54 100644 --- a/pageserver/src/metrics.rs +++ b/pageserver/src/metrics.rs @@ -406,16 +406,14 @@ pub(crate) mod initial_logical_size { use metrics::{register_int_counter, register_int_counter_vec, IntCounter, IntCounterVec}; use once_cell::sync::Lazy; - use crate::task_mgr::TaskKind; - pub(crate) struct StartCalculation(IntCounterVec); pub(crate) static START_CALCULATION: Lazy = Lazy::new(|| { StartCalculation( register_int_counter_vec!( "pageserver_initial_logical_size_start_calculation", "Incremented each time we start an initial logical size calculation attempt. \ - The `task_kind` label is for the task kind that caused this attempt.", - &["attempt", "task_kind"] + The `circumstances` label provides some additional details.", + &["attempt", "circumstances"] ) .unwrap(), ) @@ -463,19 +461,24 @@ pub(crate) mod initial_logical_size { inc_drop_calculation: Option, } + #[derive(strum_macros::IntoStaticStr)] + pub(crate) enum StartCircumstances { + EmptyInitial, + SkippedConcurrencyLimiter, + AfterBackgroundTasksRateLimit, + } + impl StartCalculation { - pub(crate) fn first(&self, causing_task_kind: Option) -> OngoingCalculationGuard { - let task_kind_label: &'static str = - causing_task_kind.map(|k| k.into()).unwrap_or_default(); - self.0.with_label_values(&["first", task_kind_label]); + pub(crate) fn first(&self, circumstances: StartCircumstances) -> OngoingCalculationGuard { + let circumstances_label: &'static str = circumstances.into(); + self.0.with_label_values(&["first", circumstances_label]); OngoingCalculationGuard { inc_drop_calculation: Some(DROP_CALCULATION.first.clone()), } } - pub(crate) fn retry(&self, causing_task_kind: Option) -> OngoingCalculationGuard { - let task_kind_label: &'static str = - causing_task_kind.map(|k| k.into()).unwrap_or_default(); - self.0.with_label_values(&["retry", task_kind_label]); + pub(crate) fn retry(&self, circumstances: StartCircumstances) -> OngoingCalculationGuard { + let circumstances_label: &'static str = circumstances.into(); + self.0.with_label_values(&["retry", circumstances_label]); OngoingCalculationGuard { inc_drop_calculation: Some(DROP_CALCULATION.retry.clone()), } diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index bc8779b26f..81c001ffde 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -672,7 +672,7 @@ impl Tenant { // as we are no longer loading, signal completion by dropping // the completion while we resume deletion drop(_completion); - // do not hold to initial_logical_size_attempt as it will prevent loading from proceeding without timeout +// do not hold to initial_logical_size_attempt as it will prevent loading from proceeding without timeout let _ = init_order .as_mut() .and_then(|x| x.initial_logical_size_attempt.take()); @@ -689,7 +689,7 @@ impl Tenant { &tenant_clone, preload, tenants, - init_order, +init_order, &ctx, ) .await diff --git a/pageserver/src/tenant/tasks.rs b/pageserver/src/tenant/tasks.rs index 860bb255ca..f572ef1e60 100644 --- a/pageserver/src/tenant/tasks.rs +++ b/pageserver/src/tenant/tasks.rs @@ -44,6 +44,7 @@ pub(crate) enum BackgroundLoopKind { Eviction, ConsumptionMetricsCollectMetrics, ConsumptionMetricsSyntheticSizeWorker, + InitialLogicalSizeCalculation, } impl BackgroundLoopKind { diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index f178bf5766..b673e4fd20 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -16,23 +16,27 @@ use itertools::Itertools; use pageserver_api::models::{ DownloadRemoteLayersTaskInfo, DownloadRemoteLayersTaskSpawnRequest, LayerMapInfo, TimelineState, }; +use rand::Rng; use serde_with::serde_as; use storage_broker::BrokerClientChannel; use tokio::{ runtime::Handle, - sync::{oneshot, watch, TryAcquireError}, + sync::{oneshot, watch}, }; use tokio_util::sync::CancellationToken; use tracing::*; use utils::{id::TenantTimelineId, sync::gate::Gate}; -use std::cmp::{max, min, Ordering}; use std::collections::{BinaryHeap, HashMap, HashSet}; use std::ops::{Deref, Range}; use std::pin::pin; use std::sync::atomic::Ordering as AtomicOrdering; use std::sync::{Arc, Mutex, RwLock, Weak}; use std::time::{Duration, Instant, SystemTime}; +use std::{ + cmp::{max, min, Ordering}, + ops::ControlFlow, +}; use crate::context::{ AccessStatsBehavior, DownloadBehavior, RequestContext, RequestContextBuilder, @@ -449,6 +453,11 @@ pub enum LogicalSizeCalculationCause { TenantSizeHandler, } +pub enum GetLogicalSizePriority { + User, + Background, +} + #[derive(enumset::EnumSetType)] pub(crate) enum CompactFlags { ForceRepartition, @@ -845,28 +854,6 @@ impl Timeline { } } - /// Retrieve current logical size of the timeline. - /// - /// The size could be lagging behind the actual number, in case - /// the initial size calculation has not been run (gets triggered on the first size access). - /// - /// return size and boolean flag that shows if the size is exact - pub(crate) fn get_current_logical_size( - self: &Arc, - ctx: &RequestContext, - ) -> logical_size::CurrentLogicalSize { - let current_size = self.current_logical_size.current_size(); - debug!("Current size: {current_size:?}"); - - if let (CurrentLogicalSize::Approximate(_), Some(initial_part_end)) = - (current_size, self.current_logical_size.initial_part_end) - { - self.try_spawn_size_init_task(initial_part_end, ctx); - } - - current_size - } - /// Check if more than 'checkpoint_distance' of WAL has been accumulated in /// the in-memory layer, and initiate flushing it if so. /// @@ -919,6 +906,7 @@ impl Timeline { self.launch_wal_receiver(ctx, broker_client); self.set_state(TimelineState::Active); self.launch_eviction_task(background_jobs_can_start); + self.spawn_initial_logical_size_computation_task(ctx); } /// Graceful shutdown, may do a lot of I/O as we flush any open layers to disk and then @@ -1759,39 +1747,60 @@ impl Timeline { Ok(()) } - fn try_spawn_size_init_task(self: &Arc, lsn: Lsn, ctx: &RequestContext) { - let state = self.current_state(); - if matches!( - state, - TimelineState::Broken { .. } | TimelineState::Stopping - ) { - // Can happen when timeline detail endpoint is used when deletion is ongoing (or its broken). - return; + /// Retrieve current logical size of the timeline. + /// + /// The size could be lagging behind the actual number, in case + /// the initial size calculation has not been run (gets triggered on the first size access). + /// + /// return size and boolean flag that shows if the size is exact + pub(crate) fn get_current_logical_size( + self: &Arc, + priority: GetLogicalSizePriority, + _ctx: &RequestContext, + ) -> logical_size::CurrentLogicalSize { + let current_size = self.current_logical_size.current_size(); + debug!("Current size: {current_size:?}"); + + match (current_size.accuracy(), priority) { + (logical_size::Accuracy::Exact, _) => (), // nothing to do + (logical_size::Accuracy::Approximate, GetLogicalSizePriority::Background) => { + // background task will eventually deliver an exact value, we're in no rush + } + (logical_size::Accuracy::Approximate, GetLogicalSizePriority::User) => { + // background task is not ready, but user is asking for it now; + // => make the background task skip the line + // (The alternative would be to calculate the size here, but, + // it can actually take a long time if the user has a lot of rels. + // And we'll inevitable need it again; So, let the background task do the work.) + match self + .current_logical_size + .cancel_wait_for_background_loop_concurrency_limit_semaphore + .get() + { + Some(cancel) => cancel.cancel(), + None => { + warn!("unexpected: priority_tx not set, logical size calculation will not be prioritized"); + } + }; + } } - let permit = match Arc::clone(&self.current_logical_size.initial_size_computation) - .try_acquire_owned() - { - Ok(permit) => permit, - Err(TryAcquireError::NoPermits) => { - // computation already ongoing or finished with success - return; - } - Err(TryAcquireError::Closed) => unreachable!("we never call close"), - }; - debug_assert!(self - .current_logical_size - .initial_logical_size - .get() - .is_none()); + current_size + } + + fn spawn_initial_logical_size_computation_task(self: &Arc, ctx: &RequestContext) { + let Some(initial_part_end) = self.current_logical_size.initial_part_end else { + // nothing to do for freshly created timelines; + assert_eq!( + self.current_logical_size.current_size().accuracy(), + logical_size::Accuracy::Exact, + ); + return; + }; + + let cancel_wait_for_background_loop_concurrency_limit_semaphore = CancellationToken::new(); + self.current_logical_size.cancel_wait_for_background_loop_concurrency_limit_semaphore.set(cancel_wait_for_background_loop_concurrency_limit_semaphore.clone()).expect("initial logical size calculation task must be spawned exactly once per Timeline object"); - info!( - "spawning logical size computation from context of task kind {:?}", - ctx.task_kind() - ); - let causing_task_kind = ctx.task_kind(); - // We need to start the computation task. - // It gets a separate context since it will outlive the request that called this function. let self_clone = Arc::clone(self); let background_ctx = ctx.detached_child( TaskKind::InitialLogicalSizeCalculation, @@ -1806,96 +1815,151 @@ impl Timeline { false, // NB: don't log errors here, task_mgr will do that. async move { - let cancel = task_mgr::shutdown_token(); + self_clone + .initial_logical_size_calculation_task( + initial_part_end, + cancel_wait_for_background_loop_concurrency_limit_semaphore, + cancel, + background_ctx, + ) + .await; + Ok(()) + } + .in_current_span(), + ); + } - // in case we were created during pageserver initialization, wait for - // initialization to complete before proceeding. startup time init runs on the same - // runtime. - tokio::select! { - _ = cancel.cancelled() => { return Ok(()); }, - _ = completion::Barrier::maybe_wait(self_clone.initial_logical_size_can_start.clone()) => {} - }; + async fn initial_logical_size_calculation_task( + self: Arc, + initial_part_end: Lsn, + skip_concurrency_limiter: CancellationToken, + cancel: CancellationToken, + background_ctx: RequestContext, + ) { + enum BackgroundCalculationError { + Cancelled, + Other(anyhow::Error), + } + let retrying = { + let self_clone = self.clone(); + async move { + let mut attempt = 0; + loop { + attempt += 1; + let try_once_res = async { + let cancel = task_mgr::shutdown_token(); + let wait_for_permit = super::tasks::concurrent_background_tasks_rate_limit( + BackgroundLoopKind::InitialLogicalSizeCalculation, + &background_ctx, + &cancel, + ); + use crate::metrics::initial_logical_size::StartCircumstances; + let (_maybe_permit, circumstances) = tokio::select! { + res = wait_for_permit => { + match res { + Ok(permit) => (Some(permit), StartCircumstances::AfterBackgroundTasksRateLimit), + Err(RateLimitError::Cancelled) => { + return Err(BackgroundCalculationError::Cancelled); + } + } + } + () = skip_concurrency_limiter.cancelled() => { + // Some action that is part of a end user interaction requested logical size + // => break out of the rate limit + // TODO: ideally we'd not run on BackgroundRuntime but the requester's runtime; + // but then again what happens if they cancel; also, we should just be using + // one runtime across the entire process, so, let's leave this for now. + (None, StartCircumstances::SkippedConcurrencyLimiter) + } + }; - // hold off background tasks from starting until all timelines get to try at least - // once initial logical size calculation; though retry will rarely be useful. - // holding off is done because heavier tasks execute blockingly on the same - // runtime. - // - // dropping this at every outcome is probably better than trying to cling on to it, - // delay will be terminated by a timeout regardless. - let completion = { self_clone.initial_logical_size_attempt.lock().expect("unexpected initial_logical_size_attempt poisoned").take() }; + let metrics_guard = if attempt == 1 { + crate::metrics::initial_logical_size::START_CALCULATION.first(circumstances) + } else { + crate::metrics::initial_logical_size::START_CALCULATION.retry(circumstances) + }; - let metrics_guard = match &completion { - Some(_) => crate::metrics::initial_logical_size::START_CALCULATION.first(Some(causing_task_kind)), - None => crate::metrics::initial_logical_size::START_CALCULATION.retry(Some(causing_task_kind)), - }; - - let calculated_size = match self_clone - .logical_size_calculation_task(lsn, LogicalSizeCalculationCause::Initial, &background_ctx) - .await - { - Ok(s) => s, - Err(CalculateLogicalSizeError::Cancelled) => { - // Don't make noise, this is a common task. - // In the unlikely case that there is another call to this function, we'll retry - // because initial_logical_size is still None. - info!("initial size calculation cancelled, likely timeline delete / tenant detach"); - return Ok(()); - } - Err(CalculateLogicalSizeError::Other(err)) => { - if let Some(e @ PageReconstructError::AncestorStopping(_)) = - err.root_cause().downcast_ref() + match self_clone + .logical_size_calculation_task( + initial_part_end, + LogicalSizeCalculationCause::Initial, + &background_ctx, + ) + .await { - // This can happen if the timeline parent timeline switches to - // Stopping state while we're still calculating the initial - // timeline size for the child, for example if the tenant is - // being detached or the pageserver is shut down. Like with - // CalculateLogicalSizeError::Cancelled, don't make noise. - info!("initial size calculation failed because the timeline or its ancestor is Stopping, likely because the tenant is being detached: {e:#}"); - return Ok(()); + Ok(calculated_size) => Ok((calculated_size, metrics_guard)), + Err(CalculateLogicalSizeError::Cancelled) => { + Err(BackgroundCalculationError::Cancelled) + } + Err(CalculateLogicalSizeError::Other(err)) => { + if let Some(PageReconstructError::AncestorStopping(_)) = + err.root_cause().downcast_ref() + { + Err(BackgroundCalculationError::Cancelled) + } else { + Err(BackgroundCalculationError::Other(err)) + } + } } - return Err(err.context("Failed to calculate logical size")); } - }; + .await; - // we cannot query current_logical_size.current_size() to know the current - // *negative* value, only truncated to u64. - let added = self_clone - .current_logical_size - .size_added_after_initial - .load(AtomicOrdering::Relaxed); - - let sum = calculated_size.saturating_add_signed(added); - - // set the gauge value before it can be set in `update_current_logical_size`. - self_clone.metrics.current_logical_size_gauge.set(sum); - - match self_clone - .current_logical_size - .initial_logical_size - .set((calculated_size, metrics_guard.calculation_result_saved())) - { - Ok(()) => (), - Err(_what_we_just_attempted_to_set) => { - let (existing_size, _) = self_clone - .current_logical_size - .initial_logical_size - .get() - .expect("once_cell set was lost, then get failed, impossible."); - // This shouldn't happen because the semaphore is initialized with 1. - // But if it happens, just complain & report success so there are no further retries. - error!("Tried to update initial timeline size value to {calculated_size}, but the size was already set to {existing_size}, not changing") + match try_once_res { + Ok(res) => return ControlFlow::Continue(res), + Err(BackgroundCalculationError::Cancelled) => return ControlFlow::Break(()), + Err(BackgroundCalculationError::Other(e)) => { + warn!(attempt, "initial size calculation failed: {e:?}"); + // exponential back-off doesn't make sense at these long intervals; + // use fixed retry interval with generous jitter instead + let sleep_duration = Duration::from_secs( + u64::try_from( + // 1hour base + (60_i64 * 60_i64) + // 10min jitter + + rand::thread_rng().gen_range(-10 * 60..10 * 60), + ) + .expect("10min < 1hour"), + ); + tokio::time::sleep(sleep_duration).await; + } } } - // now that `initial_logical_size.is_some()`, reduce permit count to 0 - // so that we prevent future callers from spawning this task - permit.forget(); - Ok(()) - }.in_current_span(), - ); + } + }; + + let (calculated_size, metrics_guard) = tokio::select! { + res = retrying => { + match res { + ControlFlow::Continue(calculated_size) => calculated_size, + ControlFlow::Break(()) => return, + } + } + _ = cancel.cancelled() => { + return; + } + }; + + // we cannot query current_logical_size.current_size() to know the current + // *negative* value, only truncated to u64. + let added = self + .current_logical_size + .size_added_after_initial + .load(AtomicOrdering::Relaxed); + + let sum = calculated_size.saturating_add_signed(added); + + // set the gauge value before it can be set in `update_current_logical_size`. + // TODO: shouldn't this simple .add(calculated_size)? + self.metrics.current_logical_size_gauge.set(sum); + + self.current_logical_size + .initial_logical_size + .set((calculated_size, metrics_guard.calculation_result_saved())) + .ok() + .expect("only this task sets it"); } pub fn spawn_ondemand_logical_size_calculation( @@ -1933,6 +1997,7 @@ impl Timeline { receiver } + /// TODO: must be cancellation safe, I think it is (?) #[instrument(skip_all)] async fn logical_size_calculation_task( self: &Arc, diff --git a/pageserver/src/tenant/timeline/logical_size.rs b/pageserver/src/tenant/timeline/logical_size.rs index 1f103051ef..e71a936300 100644 --- a/pageserver/src/tenant/timeline/logical_size.rs +++ b/pageserver/src/tenant/timeline/logical_size.rs @@ -1,11 +1,10 @@ use anyhow::Context; -use once_cell::sync::OnceCell; -use tokio::sync::Semaphore; +use once_cell::sync::OnceCell; +use tokio_util::sync::CancellationToken; use utils::lsn::Lsn; use std::sync::atomic::{AtomicI64, Ordering as AtomicOrdering}; -use std::sync::Arc; /// Internal structure to hold all data needed for logical size calculation. /// @@ -27,9 +26,7 @@ pub(super) struct LogicalSize { u64, crate::metrics::initial_logical_size::FinishedCalculationGuard, )>, - - /// Semaphore to track ongoing calculation of `initial_logical_size`. - pub initial_size_computation: Arc, + pub cancel_wait_for_background_loop_concurrency_limit_semaphore: OnceCell, /// Latest Lsn that has its size uncalculated, could be absent for freshly created timelines. pub initial_part_end: Option, @@ -69,7 +66,7 @@ pub(crate) enum CurrentLogicalSize { Exact(Exact), } -#[derive(Debug, Copy, Clone)] +#[derive(Debug, Copy, Clone, PartialEq, Eq)] pub(crate) enum Accuracy { Approximate, Exact, @@ -112,11 +109,10 @@ impl LogicalSize { Self { initial_logical_size: OnceCell::with_value((0, { crate::metrics::initial_logical_size::START_CALCULATION - .first(None) + .first(crate::metrics::initial_logical_size::StartCircumstances::EmptyInitial) .calculation_result_saved() })), - // initial_logical_size already computed, so, don't admit any calculations - initial_size_computation: Arc::new(Semaphore::new(0)), + cancel_wait_for_background_loop_concurrency_limit_semaphore: OnceCell::new(), initial_part_end: None, size_added_after_initial: AtomicI64::new(0), } @@ -125,7 +121,7 @@ impl LogicalSize { pub(super) fn deferred_initial(compute_to: Lsn) -> Self { Self { initial_logical_size: OnceCell::new(), - initial_size_computation: Arc::new(Semaphore::new(1)), + cancel_wait_for_background_loop_concurrency_limit_semaphore: OnceCell::new(), initial_part_end: Some(compute_to), size_added_after_initial: AtomicI64::new(0), } diff --git a/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs b/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs index 167fd150f8..094f831738 100644 --- a/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs +++ b/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs @@ -397,7 +397,10 @@ pub(super) async fn handle_walreceiver_connection( // Send the replication feedback message. // Regular standby_status_update fields are put into this message. let current_timeline_size = timeline - .get_current_logical_size(&ctx) + .get_current_logical_size( + crate::tenant::timeline::GetLogicalSizePriority::User, + &ctx, + ) // FIXME: https://github.com/neondatabase/neon/issues/5963 .size_dont_care_about_accuracy(); let status_update = PageserverFeedback {