From c7f1143e570924eadd15053949647707ba042c5b Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Mon, 4 Dec 2023 18:22:26 +0100 Subject: [PATCH] concurrency-limit low-priority initial logical size calculation [v2] (#6000) Problem ------- Before this PR, there was no concurrency limit on initial logical size computations. While logical size computations are lazy in theory, in practice (production), they happen in a short timeframe after restart. This means that on a PS with 20k tenants, we'd have up to 20k concurrent initial logical size calculation requests. This is self-inflicted needless overload. This hasn't been a problem so far because the `.await` points on the logical size calculation path never return `Pending`, hence we have a natural concurrency limit of the number of executor threads. But, as soon as we return `Pending` somewhere in the logical size calculation path, other concurrent tasks get scheduled by tokio. If these other tasks are also logical size calculations, they eventually pound on the same bottleneck. For example, in #5479, we want to switch the VirtualFile descriptor cache to a `tokio::sync::RwLock`, which makes us return `Pending`, and without measures like this patch, after PS restart, VirtualFile descriptor cache thrashes heavily for 2 hours until all the logical size calculations have been computed and the degree of concurrency / concurrent VirtualFile operations is down to regular levels. See the *Experiment* section below for details. Background ---------- Before this PR, initial logical size calculation was spawned lazily on first call to `Timeline::get_current_logical_size()`. In practice (prod), the lazy calculation is triggered by `WalReceiverConnectionHandler` if the timeline is active according to storage broker, or by the first iteration of consumption metrics worker after restart (`MetricsCollection`). The spawns by walreceiver are high-priority because logical size is needed by Safekeepers (via walreceiver `PageserverFeedback`) to enforce the project logical size limit. The spawns by metrics collection are not on the user-critical path and hence low-priority. [^consumption_metrics_slo] [^consumption_metrics_slo]: We can't delay metrics collection indefintely because there are TBD internal SLOs tied to metrics collection happening in a timeline manner (https://github.com/neondatabase/cloud/issues/7408). But let's ignore that in this issue. The ratio of walreceiver-initiated spawns vs consumption-metrics-initiated spawns can be reconstructed from logs (`spawning logical size computation from context of task kind {:?}"`). PR #5995 and #6018 adds metrics for this. First investigation of the ratio lead to the discovery that walreceiver spawns 75% of init logical size computations. That's because of two bugs: - In Safekeepers: https://github.com/neondatabase/neon/issues/5993 - In interaction between Pageservers and Safekeepers: https://github.com/neondatabase/neon/issues/5962 The safekeeper bug is likely primarily responsible but we don't have the data yet. The metrics will hopefully provide some insights. When assessing production-readiness of this PR, please assume that neither of these bugs are fixed yet. Changes In This PR ------------------ With this PR, initial logical size calculation is reworked as follows: First, all initial logical size calculation task_mgr tasks are started early, as part of timeline activation, and run a retry loop with long back-off until success. This removes the lazy computation; it was needless complexity because in practice, we compute all logical sizes anyways, because consumption metrics collects it. Second, within the initial logical size calculation task, each attempt queues behind the background loop concurrency limiter semaphore. This fixes the performance issue that we pointed out in the "Problem" section earlier. Third, there is a twist to queuing behind the background loop concurrency limiter semaphore. Logical size is needed by Safekeepers (via walreceiver `PageserverFeedback`) to enforce the project logical size limit. However, we currently do open walreceiver connections even before we have an exact logical size. That's bad, and I'll build on top of this PR to fix that (https://github.com/neondatabase/neon/issues/5963). But, for the purposes of this PR, we don't want to introduce a regression, i.e., we don't want to provide an exact value later than before this PR. The solution is to introduce a priority-boosting mechanism (`GetLogicalSizePriority`), allowing callers of `Timeline::get_current_logical_size` to specify how urgently they need an exact value. The effect of specifying high urgency is that the initial logical size calculation task for the timeline will skip the concurrency limiting semaphore. This should yield effectively the same behavior as we had before this PR with lazy spawning. Last, the priority-boosting mechanism obsoletes the `init_order`'s grace period for initial logical size calculations. It's a separate commit to reduce the churn during review. We can drop that commit if people think it's too much churn, and commit it later once we know this PR here worked as intended. Experiment With #5479 --------------------- I validated this PR combined with #5479 to assess whether we're making forward progress towards asyncification. The setup is an `i3en.3xlarge` instance with 20k tenants, each with one timeline that has 9 layers. All tenants are inactive, i.e., not known to SKs nor storage broker. This means all initial logical size calculations are spawned by consumption metrics `MetricsCollection` task kind. The consumption metrics worker starts requesting logical sizes at low priority immediately after restart. This is achieved by deleting the consumption metrics cache file on disk before starting PS.[^consumption_metrics_cache_file] [^consumption_metrics_cache_file] Consumption metrics worker persists its interval across restarts to achieve persistent reporting intervals across PS restarts; delete the state file on disk to get predictable (and I believe worst-case in terms of concurrency during PS restart) behavior. Before this patch, all of these timelines would all do their initial logical size calculation in parallel, leading to extreme thrashing in page cache and virtual file cache. With this patch, the virtual file cache thrashing is reduced significantly (from 80k `open`-system-calls/second to ~500 `open`-system-calls/second during loading). ### Critique The obvious critique with above experiment is that there's no skipping of the semaphore, i.e., the priority-boosting aspect of this PR is not exercised. If even just 1% of our 20k tenants in the setup were active in SK/storage_broker, then 200 logical size calculations would skip the limiting semaphore immediately after restart and run concurrently. Further critique: given the two bugs wrt timeline inactive vs active state that were mentioned in the Background section, we could have 75% of our 20k tenants being (falsely) active on restart. So... (next section) This Doesn't Make Us Ready For Async VirtualFile ------------------------------------------------ This PR is a step towards asynchronous `VirtualFile`, aka, #5479 or even #4744. But it doesn't yet enable us to ship #5479. The reason is that this PR doesn't limit the amount of high-priority logical size computations. If there are many high-priority logical size calculations requested, we'll fall over like we did if #5479 is applied without this PR. And currently, at very least due to the bugs mentioned in the Background section, we run thousands of high-priority logical size calculations on PS startup in prod. So, at a minimum, we need to fix these bugs. Then we can ship #5479 and #4744, and things will likely be fine under normal operation. But in high-traffic situations, overload problems will still be more likely to happen, e.g., VirtualFile cache descriptor thrashing. The solution candidates for that are orthogonal to this PR though: * global concurrency limiting * per-tenant rate limiting => #5899 * load shedding * scaling bottleneck resources (fd cache size (neondatabase/cloud#8351), page cache size(neondatabase/cloud#8351), spread load across more PSes, etc) Conclusion ---------- Even with the remarks from in the previous section, we should merge this PR because: 1. it's an improvement over the status quo (esp. if the aforementioned bugs wrt timeline active / inactive are fixed) 2. it prepares the way for https://github.com/neondatabase/neon/pull/6010 3. it gets us close to shipping #5479 and #4744 --- pageserver/src/bin/pageserver.rs | 29 +- pageserver/src/consumption_metrics/metrics.rs | 7 +- pageserver/src/http/routes.rs | 3 +- pageserver/src/lib.rs | 7 - pageserver/src/metrics.rs | 27 +- pageserver/src/pgdatadir_mapping.rs | 8 + pageserver/src/tenant.rs | 63 +-- pageserver/src/tenant/delete.rs | 7 +- pageserver/src/tenant/storage_layer/layer.rs | 4 + pageserver/src/tenant/tasks.rs | 1 + pageserver/src/tenant/timeline.rs | 407 +++++++++++------- pageserver/src/tenant/timeline/delete.rs | 3 - .../src/tenant/timeline/logical_size.rs | 22 +- .../walreceiver/walreceiver_connection.rs | 5 +- test_runner/regress/test_ondemand_download.py | 2 +- .../regress/test_pageserver_restart.py | 1 - 16 files changed, 306 insertions(+), 290 deletions(-) diff --git a/pageserver/src/bin/pageserver.rs b/pageserver/src/bin/pageserver.rs index 542c1b7b30..43b35c6d08 100644 --- a/pageserver/src/bin/pageserver.rs +++ b/pageserver/src/bin/pageserver.rs @@ -402,15 +402,11 @@ fn start_pageserver( let (init_remote_done_tx, init_remote_done_rx) = utils::completion::channel(); let (init_done_tx, init_done_rx) = utils::completion::channel(); - let (init_logical_size_done_tx, init_logical_size_done_rx) = utils::completion::channel(); - let (background_jobs_can_start, background_jobs_barrier) = utils::completion::channel(); let order = pageserver::InitializationOrder { initial_tenant_load_remote: Some(init_done_tx), initial_tenant_load: Some(init_remote_done_tx), - initial_logical_size_can_start: init_done_rx.clone(), - initial_logical_size_attempt: Some(init_logical_size_done_tx), background_jobs_can_start: background_jobs_barrier.clone(), }; @@ -464,7 +460,7 @@ fn start_pageserver( }); let WaitForPhaseResult { - timeout_remaining: timeout, + timeout_remaining: _timeout, skipped: init_load_skipped, } = wait_for_phase("initial_tenant_load", init_load_done, timeout).await; @@ -472,26 +468,6 @@ fn start_pageserver( scopeguard::ScopeGuard::into_inner(guard); - let guard = scopeguard::guard_on_success((), |_| { - tracing::info!("Cancelled before initial logical sizes completed") - }); - - let logical_sizes_done = std::pin::pin!(async { - init_logical_size_done_rx.wait().await; - startup_checkpoint( - started_startup_at, - "initial_logical_sizes", - "Initial logical sizes completed", - ); - }); - - let WaitForPhaseResult { - timeout_remaining: _, - skipped: logical_sizes_skipped, - } = wait_for_phase("initial_logical_sizes", logical_sizes_done, timeout).await; - - scopeguard::ScopeGuard::into_inner(guard); - // allow background jobs to start: we either completed prior stages, or they reached timeout // and were skipped. It is important that we do not let them block background jobs indefinitely, // because things like consumption metrics for billing are blocked by this barrier. @@ -514,9 +490,6 @@ fn start_pageserver( if let Some(f) = init_load_skipped { f.await; } - if let Some(f) = logical_sizes_skipped { - f.await; - } scopeguard::ScopeGuard::into_inner(guard); startup_checkpoint(started_startup_at, "complete", "Startup complete"); diff --git a/pageserver/src/consumption_metrics/metrics.rs b/pageserver/src/consumption_metrics/metrics.rs index c6ff91e560..d70f1fec4d 100644 --- a/pageserver/src/consumption_metrics/metrics.rs +++ b/pageserver/src/consumption_metrics/metrics.rs @@ -351,7 +351,12 @@ impl TimelineSnapshot { let current_exact_logical_size = { let span = tracing::info_span!("collect_metrics_iteration", tenant_id = %t.tenant_shard_id.tenant_id, timeline_id = %t.timeline_id); - let size = span.in_scope(|| t.get_current_logical_size(ctx)); + let size = span.in_scope(|| { + t.get_current_logical_size( + crate::tenant::timeline::GetLogicalSizePriority::Background, + ctx, + ) + }); match size { // Only send timeline logical size when it is fully calculated. CurrentLogicalSize::Exact(ref size) => Some(size.into()), diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index 29a1ff52e8..71b7ea05ec 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -338,7 +338,8 @@ async fn build_timeline_info_common( Lsn(0) => None, lsn @ Lsn(_) => Some(lsn), }; - let current_logical_size = timeline.get_current_logical_size(ctx); + let current_logical_size = + timeline.get_current_logical_size(tenant::timeline::GetLogicalSizePriority::User, ctx); let current_physical_size = Some(timeline.layer_size_sum().await); let state = timeline.current_state(); let remote_consistent_lsn_projected = timeline diff --git a/pageserver/src/lib.rs b/pageserver/src/lib.rs index 3f74694ef2..0bdf096bfe 100644 --- a/pageserver/src/lib.rs +++ b/pageserver/src/lib.rs @@ -186,13 +186,6 @@ pub struct InitializationOrder { /// Each initial tenant load task carries this until completion. pub initial_tenant_load: Option, - /// Barrier for when we can start initial logical size calculations. - pub initial_logical_size_can_start: utils::completion::Barrier, - - /// Each timeline owns a clone of this to be consumed on the initial logical size calculation - /// attempt. It is important to drop this once the attempt has completed. - pub initial_logical_size_attempt: Option, - /// Barrier for when we can start any background jobs. /// /// This can be broken up later on, but right now there is just one class of a background job. diff --git a/pageserver/src/metrics.rs b/pageserver/src/metrics.rs index 0cfbfcdf2f..6e311041ba 100644 --- a/pageserver/src/metrics.rs +++ b/pageserver/src/metrics.rs @@ -407,16 +407,14 @@ pub(crate) mod initial_logical_size { use metrics::{register_int_counter, register_int_counter_vec, IntCounter, IntCounterVec}; use once_cell::sync::Lazy; - use crate::task_mgr::TaskKind; - pub(crate) struct StartCalculation(IntCounterVec); pub(crate) static START_CALCULATION: Lazy = Lazy::new(|| { StartCalculation( register_int_counter_vec!( "pageserver_initial_logical_size_start_calculation", "Incremented each time we start an initial logical size calculation attempt. \ - The `task_kind` label is for the task kind that caused this attempt.", - &["attempt", "task_kind"] + The `circumstances` label provides some additional details.", + &["attempt", "circumstances"] ) .unwrap(), ) @@ -464,19 +462,24 @@ pub(crate) mod initial_logical_size { inc_drop_calculation: Option, } + #[derive(strum_macros::IntoStaticStr)] + pub(crate) enum StartCircumstances { + EmptyInitial, + SkippedConcurrencyLimiter, + AfterBackgroundTasksRateLimit, + } + impl StartCalculation { - pub(crate) fn first(&self, causing_task_kind: Option) -> OngoingCalculationGuard { - let task_kind_label: &'static str = - causing_task_kind.map(|k| k.into()).unwrap_or_default(); - self.0.with_label_values(&["first", task_kind_label]); + pub(crate) fn first(&self, circumstances: StartCircumstances) -> OngoingCalculationGuard { + let circumstances_label: &'static str = circumstances.into(); + self.0.with_label_values(&["first", circumstances_label]); OngoingCalculationGuard { inc_drop_calculation: Some(DROP_CALCULATION.first.clone()), } } - pub(crate) fn retry(&self, causing_task_kind: Option) -> OngoingCalculationGuard { - let task_kind_label: &'static str = - causing_task_kind.map(|k| k.into()).unwrap_or_default(); - self.0.with_label_values(&["retry", task_kind_label]); + pub(crate) fn retry(&self, circumstances: StartCircumstances) -> OngoingCalculationGuard { + let circumstances_label: &'static str = circumstances.into(); + self.0.with_label_values(&["retry", circumstances_label]); OngoingCalculationGuard { inc_drop_calculation: Some(DROP_CALCULATION.retry.clone()), } diff --git a/pageserver/src/pgdatadir_mapping.rs b/pageserver/src/pgdatadir_mapping.rs index 15d5609ceb..a448142158 100644 --- a/pageserver/src/pgdatadir_mapping.rs +++ b/pageserver/src/pgdatadir_mapping.rs @@ -282,6 +282,10 @@ impl Timeline { } /// Get a list of all existing relations in given tablespace and database. + /// + /// # Cancel-Safety + /// + /// This method is cancellation-safe. pub async fn list_rels( &self, spcnode: Oid, @@ -630,6 +634,10 @@ impl Timeline { /// /// Only relation blocks are counted currently. That excludes metadata, /// SLRUs, twophase files etc. + /// + /// # Cancel-Safety + /// + /// This method is cancellation-safe. pub async fn get_current_logical_size_non_incremental( &self, lsn: Lsn, diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index f67a4174af..0b2e48e1ff 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -472,7 +472,6 @@ impl Tenant { index_part: Option, metadata: TimelineMetadata, ancestor: Option>, - init_order: Option<&InitializationOrder>, _ctx: &RequestContext, ) -> anyhow::Result<()> { let tenant_id = self.tenant_shard_id; @@ -482,7 +481,6 @@ impl Tenant { &metadata, ancestor.clone(), resources, - init_order, CreateTimelineCause::Load, )?; let disk_consistent_lsn = timeline.get_disk_consistent_lsn(); @@ -683,10 +681,6 @@ impl Tenant { // as we are no longer loading, signal completion by dropping // the completion while we resume deletion drop(_completion); - // do not hold to initial_logical_size_attempt as it will prevent loading from proceeding without timeout - let _ = init_order - .as_mut() - .and_then(|x| x.initial_logical_size_attempt.take()); let background_jobs_can_start = init_order.as_ref().map(|x| &x.background_jobs_can_start); if let Some(background) = background_jobs_can_start { @@ -700,7 +694,6 @@ impl Tenant { &tenant_clone, preload, tenants, - init_order, &ctx, ) .await @@ -713,7 +706,7 @@ impl Tenant { } } - match tenant_clone.attach(init_order, preload, &ctx).await { + match tenant_clone.attach(preload, &ctx).await { Ok(()) => { info!("attach finished, activating"); tenant_clone.activate(broker_client, None, &ctx); @@ -776,7 +769,6 @@ impl Tenant { /// async fn attach( self: &Arc, - init_order: Option, preload: Option, ctx: &RequestContext, ) -> anyhow::Result<()> { @@ -789,7 +781,7 @@ impl Tenant { None => { // Deprecated dev mode: load from local disk state instead of remote storage // https://github.com/neondatabase/neon/issues/5624 - return self.load_local(init_order, ctx).await; + return self.load_local(ctx).await; } }; @@ -884,7 +876,6 @@ impl Tenant { &index_part.metadata, Some(remote_timeline_client), self.deletion_queue_client.clone(), - None, ) .await .context("resume_deletion") @@ -1009,10 +1000,6 @@ impl Tenant { None }; - // we can load remote timelines during init, but they are assumed to be so rare that - // initialization order is not passed to here. - let init_order = None; - // timeline loading after attach expects to find metadata file for each metadata save_metadata( self.conf, @@ -1030,7 +1017,6 @@ impl Tenant { Some(index_part), remote_metadata, ancestor, - init_order, ctx, ) .await @@ -1272,11 +1258,7 @@ impl Tenant { /// files on disk. Used at pageserver startup. /// /// No background tasks are started as part of this routine. - async fn load_local( - self: &Arc, - init_order: Option, - ctx: &RequestContext, - ) -> anyhow::Result<()> { + async fn load_local(self: &Arc, ctx: &RequestContext) -> anyhow::Result<()> { span::debug_assert_current_span_has_tenant_id(); debug!("loading tenant task"); @@ -1302,7 +1284,7 @@ impl Tenant { // Process loadable timelines first for (timeline_id, local_metadata) in scan.sorted_timelines_to_load { if let Err(e) = self - .load_local_timeline(timeline_id, local_metadata, init_order.as_ref(), ctx, false) + .load_local_timeline(timeline_id, local_metadata, ctx, false) .await { match e { @@ -1336,13 +1318,7 @@ impl Tenant { } Some(local_metadata) => { if let Err(e) = self - .load_local_timeline( - timeline_id, - local_metadata, - init_order.as_ref(), - ctx, - true, - ) + .load_local_timeline(timeline_id, local_metadata, ctx, true) .await { match e { @@ -1370,12 +1346,11 @@ impl Tenant { /// Subroutine of `load_tenant`, to load an individual timeline /// /// NB: The parent is assumed to be already loaded! - #[instrument(skip(self, local_metadata, init_order, ctx))] + #[instrument(skip(self, local_metadata, ctx))] async fn load_local_timeline( self: &Arc, timeline_id: TimelineId, local_metadata: TimelineMetadata, - init_order: Option<&InitializationOrder>, ctx: &RequestContext, found_delete_mark: bool, ) -> Result<(), LoadLocalTimelineError> { @@ -1392,7 +1367,6 @@ impl Tenant { &local_metadata, None, self.deletion_queue_client.clone(), - init_order, ) .await .context("resume deletion") @@ -1409,17 +1383,9 @@ impl Tenant { None }; - self.timeline_init_and_sync( - timeline_id, - resources, - None, - local_metadata, - ancestor, - init_order, - ctx, - ) - .await - .map_err(LoadLocalTimelineError::Load) + self.timeline_init_and_sync(timeline_id, resources, None, local_metadata, ancestor, ctx) + .await + .map_err(LoadLocalTimelineError::Load) } pub(crate) fn tenant_id(&self) -> TenantId { @@ -2314,7 +2280,6 @@ impl Tenant { new_metadata: &TimelineMetadata, ancestor: Option>, resources: TimelineResources, - init_order: Option<&InitializationOrder>, cause: CreateTimelineCause, ) -> anyhow::Result> { let state = match cause { @@ -2329,9 +2294,6 @@ impl Tenant { CreateTimelineCause::Delete => TimelineState::Stopping, }; - let initial_logical_size_can_start = init_order.map(|x| &x.initial_logical_size_can_start); - let initial_logical_size_attempt = init_order.map(|x| &x.initial_logical_size_attempt); - let pg_version = new_metadata.pg_version(); let timeline = Timeline::new( @@ -2345,8 +2307,6 @@ impl Tenant { Arc::clone(&self.walredo_mgr), resources, pg_version, - initial_logical_size_can_start.cloned(), - initial_logical_size_attempt.cloned().flatten(), state, self.cancel.child_token(), ); @@ -3168,7 +3128,6 @@ impl Tenant { new_metadata, ancestor, resources, - None, CreateTimelineCause::Load, ) .context("Failed to create timeline data structure")?; @@ -3843,7 +3802,7 @@ pub(crate) mod harness { match mode { LoadMode::Local => { tenant - .load_local(None, ctx) + .load_local(ctx) .instrument(info_span!("try_load", tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug())) .await?; } @@ -3853,7 +3812,7 @@ pub(crate) mod harness { .instrument(info_span!("try_load_preload", tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug())) .await?; tenant - .attach(None, Some(preload), ctx) + .attach(Some(preload), ctx) .instrument(info_span!("try_load", tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug())) .await?; } diff --git a/pageserver/src/tenant/delete.rs b/pageserver/src/tenant/delete.rs index b7b2ef9c79..548b173c0d 100644 --- a/pageserver/src/tenant/delete.rs +++ b/pageserver/src/tenant/delete.rs @@ -15,7 +15,6 @@ use crate::{ context::RequestContext, task_mgr::{self, TaskKind}, tenant::mgr::{TenantSlot, TenantsMapRemoveResult}, - InitializationOrder, }; use super::{ @@ -390,7 +389,6 @@ impl DeleteTenantFlow { tenant: &Arc, preload: Option, tenants: &'static std::sync::RwLock, - init_order: Option, ctx: &RequestContext, ) -> Result<(), DeleteTenantError> { let (_, progress) = completion::channel(); @@ -400,10 +398,7 @@ impl DeleteTenantFlow { .await .expect("cant be stopping or broken"); - tenant - .attach(init_order, preload, ctx) - .await - .context("attach")?; + tenant.attach(preload, ctx).await.context("attach")?; Self::background( guard, diff --git a/pageserver/src/tenant/storage_layer/layer.rs b/pageserver/src/tenant/storage_layer/layer.rs index 3ed4e05bea..e203d9d334 100644 --- a/pageserver/src/tenant/storage_layer/layer.rs +++ b/pageserver/src/tenant/storage_layer/layer.rs @@ -230,6 +230,10 @@ impl Layer { /// /// It is up to the caller to collect more data from the previous layer and /// perform WAL redo, if necessary. + /// + /// # Cancellation-Safety + /// + /// This method is cancellation-safe. pub(crate) async fn get_value_reconstruct_data( &self, key: Key, diff --git a/pageserver/src/tenant/tasks.rs b/pageserver/src/tenant/tasks.rs index 138578ec8a..bc404c41a0 100644 --- a/pageserver/src/tenant/tasks.rs +++ b/pageserver/src/tenant/tasks.rs @@ -44,6 +44,7 @@ pub(crate) enum BackgroundLoopKind { Eviction, ConsumptionMetricsCollectMetrics, ConsumptionMetricsSyntheticSizeWorker, + InitialLogicalSizeCalculation, } impl BackgroundLoopKind { diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index e252ee584e..f02fd733b4 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -20,23 +20,27 @@ use pageserver_api::{ }, shard::TenantShardId, }; +use rand::Rng; use serde_with::serde_as; use storage_broker::BrokerClientChannel; use tokio::{ runtime::Handle, - sync::{oneshot, watch, TryAcquireError}, + sync::{oneshot, watch}, }; use tokio_util::sync::CancellationToken; use tracing::*; use utils::{id::TenantTimelineId, sync::gate::Gate}; -use std::cmp::{max, min, Ordering}; use std::collections::{BinaryHeap, HashMap, HashSet}; use std::ops::{Deref, Range}; use std::pin::pin; use std::sync::atomic::Ordering as AtomicOrdering; use std::sync::{Arc, Mutex, RwLock, Weak}; use std::time::{Duration, Instant, SystemTime}; +use std::{ + cmp::{max, min, Ordering}, + ops::ControlFlow, +}; use crate::context::{ AccessStatsBehavior, DownloadBehavior, RequestContext, RequestContextBuilder, @@ -298,13 +302,6 @@ pub struct Timeline { eviction_task_timeline_state: tokio::sync::Mutex, - /// Barrier to wait before doing initial logical size calculation. Used only during startup. - initial_logical_size_can_start: Option, - - /// Completion shared between all timelines loaded during startup; used to delay heavier - /// background tasks until some logical sizes have been calculated. - initial_logical_size_attempt: Mutex>, - /// Load or creation time information about the disk_consistent_lsn and when the loading /// happened. Used for consumption metrics. pub(crate) loaded_at: (Lsn, SystemTime), @@ -453,6 +450,11 @@ pub enum LogicalSizeCalculationCause { TenantSizeHandler, } +pub enum GetLogicalSizePriority { + User, + Background, +} + #[derive(enumset::EnumSetType)] pub(crate) enum CompactFlags { ForceRepartition, @@ -489,6 +491,9 @@ impl Timeline { /// an ancestor branch, for example, or waste a lot of cycles chasing the /// non-existing key. /// + /// # Cancel-Safety + /// + /// This method is cancellation-safe. pub async fn get( &self, key: Key, @@ -849,46 +854,6 @@ impl Timeline { } } - /// Retrieve current logical size of the timeline. - /// - /// The size could be lagging behind the actual number, in case - /// the initial size calculation has not been run (gets triggered on the first size access). - /// - /// return size and boolean flag that shows if the size is exact - pub(crate) fn get_current_logical_size( - self: &Arc, - ctx: &RequestContext, - ) -> logical_size::CurrentLogicalSize { - let current_size = self.current_logical_size.current_size(); - debug!("Current size: {current_size:?}"); - - if let (CurrentLogicalSize::Approximate(_), Some(initial_part_end)) = - (current_size, self.current_logical_size.initial_part_end) - { - self.try_spawn_size_init_task(initial_part_end, ctx); - } - - if let CurrentLogicalSize::Approximate(_) = ¤t_size { - if ctx.task_kind() == TaskKind::WalReceiverConnectionHandler { - let first = self - .current_logical_size - .did_return_approximate_to_walreceiver - .compare_exchange( - false, - true, - AtomicOrdering::Relaxed, - AtomicOrdering::Relaxed, - ) - .is_ok(); - if first { - crate::metrics::initial_logical_size::TIMELINES_WHERE_WALRECEIVER_GOT_APPROXIMATE_SIZE.inc(); - } - } - } - - current_size - } - /// Check if more than 'checkpoint_distance' of WAL has been accumulated in /// the in-memory layer, and initiate flushing it if so. /// @@ -938,6 +903,7 @@ impl Timeline { background_jobs_can_start: Option<&completion::Barrier>, ctx: &RequestContext, ) { + self.spawn_initial_logical_size_computation_task(ctx); self.launch_wal_receiver(ctx, broker_client); self.set_state(TimelineState::Active); self.launch_eviction_task(background_jobs_can_start); @@ -1051,17 +1017,6 @@ impl Timeline { error!("Not activating a Stopping timeline"); } (_, new_state) => { - if matches!( - new_state, - TimelineState::Stopping | TimelineState::Broken { .. } - ) { - // drop the completion guard, if any; it might be holding off the completion - // forever needlessly - self.initial_logical_size_attempt - .lock() - .unwrap_or_else(|e| e.into_inner()) - .take(); - } self.state.send_replace(new_state); } } @@ -1383,8 +1338,6 @@ impl Timeline { walredo_mgr: Arc, resources: TimelineResources, pg_version: u32, - initial_logical_size_can_start: Option, - initial_logical_size_attempt: Option, state: TimelineState, cancel: CancellationToken, ) -> Arc { @@ -1484,8 +1437,6 @@ impl Timeline { ), delete_progress: Arc::new(tokio::sync::Mutex::new(DeleteTimelineFlow::default())), - initial_logical_size_can_start, - initial_logical_size_attempt: Mutex::new(initial_logical_size_attempt), cancel, gate: Gate::new(format!("Timeline<{tenant_shard_id}/{timeline_id}>")), @@ -1797,39 +1748,91 @@ impl Timeline { Ok(()) } - fn try_spawn_size_init_task(self: &Arc, lsn: Lsn, ctx: &RequestContext) { - let state = self.current_state(); - if matches!( - state, - TimelineState::Broken { .. } | TimelineState::Stopping - ) { - // Can happen when timeline detail endpoint is used when deletion is ongoing (or its broken). - return; + /// Retrieve current logical size of the timeline. + /// + /// The size could be lagging behind the actual number, in case + /// the initial size calculation has not been run (gets triggered on the first size access). + /// + /// return size and boolean flag that shows if the size is exact + pub(crate) fn get_current_logical_size( + self: &Arc, + priority: GetLogicalSizePriority, + ctx: &RequestContext, + ) -> logical_size::CurrentLogicalSize { + let current_size = self.current_logical_size.current_size(); + debug!("Current size: {current_size:?}"); + + match (current_size.accuracy(), priority) { + (logical_size::Accuracy::Exact, _) => (), // nothing to do + (logical_size::Accuracy::Approximate, GetLogicalSizePriority::Background) => { + // background task will eventually deliver an exact value, we're in no rush + } + (logical_size::Accuracy::Approximate, GetLogicalSizePriority::User) => { + // background task is not ready, but user is asking for it now; + // => make the background task skip the line + // (The alternative would be to calculate the size here, but, + // it can actually take a long time if the user has a lot of rels. + // And we'll inevitable need it again; So, let the background task do the work.) + match self + .current_logical_size + .cancel_wait_for_background_loop_concurrency_limit_semaphore + .get() + { + Some(cancel) => cancel.cancel(), + None => { + let state = self.current_state(); + if matches!( + state, + TimelineState::Broken { .. } | TimelineState::Stopping + ) { + + // Can happen when timeline detail endpoint is used when deletion is ongoing (or its broken). + // Don't make noise. + } else { + warn!("unexpected: cancel_wait_for_background_loop_concurrency_limit_semaphore not set, priority-boosting of logical size calculation will not work"); + } + } + }; + } } - let permit = match Arc::clone(&self.current_logical_size.initial_size_computation) - .try_acquire_owned() - { - Ok(permit) => permit, - Err(TryAcquireError::NoPermits) => { - // computation already ongoing or finished with success - return; + if let CurrentLogicalSize::Approximate(_) = ¤t_size { + if ctx.task_kind() == TaskKind::WalReceiverConnectionHandler { + let first = self + .current_logical_size + .did_return_approximate_to_walreceiver + .compare_exchange( + false, + true, + AtomicOrdering::Relaxed, + AtomicOrdering::Relaxed, + ) + .is_ok(); + if first { + crate::metrics::initial_logical_size::TIMELINES_WHERE_WALRECEIVER_GOT_APPROXIMATE_SIZE.inc(); + } } - Err(TryAcquireError::Closed) => unreachable!("we never call close"), - }; - debug_assert!(self - .current_logical_size - .initial_logical_size - .get() - .is_none()); + } + + current_size + } + + fn spawn_initial_logical_size_computation_task(self: &Arc, ctx: &RequestContext) { + let Some(initial_part_end) = self.current_logical_size.initial_part_end else { + // nothing to do for freshly created timelines; + assert_eq!( + self.current_logical_size.current_size().accuracy(), + logical_size::Accuracy::Exact, + ); + return; + }; + + let cancel_wait_for_background_loop_concurrency_limit_semaphore = CancellationToken::new(); + let token = cancel_wait_for_background_loop_concurrency_limit_semaphore.clone(); + self.current_logical_size + .cancel_wait_for_background_loop_concurrency_limit_semaphore.set(token) + .expect("initial logical size calculation task must be spawned exactly once per Timeline object"); - info!( - "spawning logical size computation from context of task kind {:?}", - ctx.task_kind() - ); - let causing_task_kind = ctx.task_kind(); - // We need to start the computation task. - // It gets a separate context since it will outlive the request that called this function. let self_clone = Arc::clone(self); let background_ctx = ctx.detached_child( TaskKind::InitialLogicalSizeCalculation, @@ -1844,96 +1847,152 @@ impl Timeline { false, // NB: don't log errors here, task_mgr will do that. async move { - let cancel = task_mgr::shutdown_token(); + self_clone + .initial_logical_size_calculation_task( + initial_part_end, + cancel_wait_for_background_loop_concurrency_limit_semaphore, + cancel, + background_ctx, + ) + .await; + Ok(()) + } + .instrument(info_span!(parent: None, "initial_size_calculation", tenant_id=%self.tenant_shard_id.tenant_id, timeline_id=%self.timeline_id)), + ); + } - // in case we were created during pageserver initialization, wait for - // initialization to complete before proceeding. startup time init runs on the same - // runtime. - tokio::select! { - _ = cancel.cancelled() => { return Ok(()); }, - _ = completion::Barrier::maybe_wait(self_clone.initial_logical_size_can_start.clone()) => {} + async fn initial_logical_size_calculation_task( + self: Arc, + initial_part_end: Lsn, + skip_concurrency_limiter: CancellationToken, + cancel: CancellationToken, + background_ctx: RequestContext, + ) { + enum BackgroundCalculationError { + Cancelled, + Other(anyhow::Error), + } + + let try_once = |attempt: usize| { + let background_ctx = &background_ctx; + let self_ref = &self; + let skip_concurrency_limiter = &skip_concurrency_limiter; + async move { + let cancel = task_mgr::shutdown_token(); + let wait_for_permit = super::tasks::concurrent_background_tasks_rate_limit( + BackgroundLoopKind::InitialLogicalSizeCalculation, + background_ctx, + &cancel, + ); + + use crate::metrics::initial_logical_size::StartCircumstances; + let (_maybe_permit, circumstances) = tokio::select! { + res = wait_for_permit => { + match res { + Ok(permit) => (Some(permit), StartCircumstances::AfterBackgroundTasksRateLimit), + Err(RateLimitError::Cancelled) => { + return Err(BackgroundCalculationError::Cancelled); + } + } + } + () = skip_concurrency_limiter.cancelled() => { + // Some action that is part of a end user interaction requested logical size + // => break out of the rate limit + // TODO: ideally we'd not run on BackgroundRuntime but the requester's runtime; + // but then again what happens if they cancel; also, we should just be using + // one runtime across the entire process, so, let's leave this for now. + (None, StartCircumstances::SkippedConcurrencyLimiter) + } }; - - - // hold off background tasks from starting until all timelines get to try at least - // once initial logical size calculation; though retry will rarely be useful. - // holding off is done because heavier tasks execute blockingly on the same - // runtime. - // - // dropping this at every outcome is probably better than trying to cling on to it, - // delay will be terminated by a timeout regardless. - let completion = { self_clone.initial_logical_size_attempt.lock().expect("unexpected initial_logical_size_attempt poisoned").take() }; - - let metrics_guard = match &completion { - Some(_) => crate::metrics::initial_logical_size::START_CALCULATION.first(Some(causing_task_kind)), - None => crate::metrics::initial_logical_size::START_CALCULATION.retry(Some(causing_task_kind)), + let metrics_guard = if attempt == 1 { + crate::metrics::initial_logical_size::START_CALCULATION.first(circumstances) + } else { + crate::metrics::initial_logical_size::START_CALCULATION.retry(circumstances) }; - let calculated_size = match self_clone - .logical_size_calculation_task(lsn, LogicalSizeCalculationCause::Initial, &background_ctx) + match self_ref + .logical_size_calculation_task( + initial_part_end, + LogicalSizeCalculationCause::Initial, + background_ctx, + ) .await { - Ok(s) => s, + Ok(calculated_size) => Ok((calculated_size, metrics_guard)), Err(CalculateLogicalSizeError::Cancelled) => { - // Don't make noise, this is a common task. - // In the unlikely case that there is another call to this function, we'll retry - // because initial_logical_size is still None. - info!("initial size calculation cancelled, likely timeline delete / tenant detach"); - return Ok(()); + Err(BackgroundCalculationError::Cancelled) } Err(CalculateLogicalSizeError::Other(err)) => { - if let Some(e @ PageReconstructError::AncestorStopping(_)) = + if let Some(PageReconstructError::AncestorStopping(_)) = err.root_cause().downcast_ref() { - // This can happen if the timeline parent timeline switches to - // Stopping state while we're still calculating the initial - // timeline size for the child, for example if the tenant is - // being detached or the pageserver is shut down. Like with - // CalculateLogicalSizeError::Cancelled, don't make noise. - info!("initial size calculation failed because the timeline or its ancestor is Stopping, likely because the tenant is being detached: {e:#}"); - return Ok(()); + Err(BackgroundCalculationError::Cancelled) + } else { + Err(BackgroundCalculationError::Other(err)) } - return Err(err.context("Failed to calculate logical size")); - } - }; - - // we cannot query current_logical_size.current_size() to know the current - // *negative* value, only truncated to u64. - let added = self_clone - .current_logical_size - .size_added_after_initial - .load(AtomicOrdering::Relaxed); - - let sum = calculated_size.saturating_add_signed(added); - - // set the gauge value before it can be set in `update_current_logical_size`. - self_clone.metrics.current_logical_size_gauge.set(sum); - - match self_clone - .current_logical_size - .initial_logical_size - .set((calculated_size, metrics_guard.calculation_result_saved())) - { - Ok(()) => (), - Err(_what_we_just_attempted_to_set) => { - let (existing_size, _) = self_clone - .current_logical_size - .initial_logical_size - .get() - .expect("once_cell set was lost, then get failed, impossible."); - // This shouldn't happen because the semaphore is initialized with 1. - // But if it happens, just complain & report success so there are no further retries. - error!("Tried to update initial timeline size value to {calculated_size}, but the size was already set to {existing_size}, not changing") } } - // now that `initial_logical_size.is_some()`, reduce permit count to 0 - // so that we prevent future callers from spawning this task - permit.forget(); - Ok(()) - }.in_current_span(), - ); + } + }; + + let retrying = async { + let mut attempt = 0; + loop { + attempt += 1; + + match try_once(attempt).await { + Ok(res) => return ControlFlow::Continue(res), + Err(BackgroundCalculationError::Cancelled) => return ControlFlow::Break(()), + Err(BackgroundCalculationError::Other(e)) => { + warn!(attempt, "initial size calculation failed: {e:?}"); + // exponential back-off doesn't make sense at these long intervals; + // use fixed retry interval with generous jitter instead + let sleep_duration = Duration::from_secs( + u64::try_from( + // 1hour base + (60_i64 * 60_i64) + // 10min jitter + + rand::thread_rng().gen_range(-10 * 60..10 * 60), + ) + .expect("10min < 1hour"), + ); + tokio::time::sleep(sleep_duration).await; + } + } + } + }; + + let (calculated_size, metrics_guard) = tokio::select! { + res = retrying => { + match res { + ControlFlow::Continue(calculated_size) => calculated_size, + ControlFlow::Break(()) => return, + } + } + _ = cancel.cancelled() => { + return; + } + }; + + // we cannot query current_logical_size.current_size() to know the current + // *negative* value, only truncated to u64. + let added = self + .current_logical_size + .size_added_after_initial + .load(AtomicOrdering::Relaxed); + + let sum = calculated_size.saturating_add_signed(added); + + // set the gauge value before it can be set in `update_current_logical_size`. + self.metrics.current_logical_size_gauge.set(sum); + + self.current_logical_size + .initial_logical_size + .set((calculated_size, metrics_guard.calculation_result_saved())) + .ok() + .expect("only this task sets it"); } pub fn spawn_ondemand_logical_size_calculation( @@ -1971,6 +2030,9 @@ impl Timeline { receiver } + /// # Cancel-Safety + /// + /// This method is cancellation-safe. #[instrument(skip_all)] async fn logical_size_calculation_task( self: &Arc, @@ -2008,6 +2070,10 @@ impl Timeline { /// /// NOTE: counted incrementally, includes ancestors. This can be a slow operation, /// especially if we need to download remote layers. + /// + /// # Cancel-Safety + /// + /// This method is cancellation-safe. pub async fn calculate_logical_size( &self, up_to_lsn: Lsn, @@ -2123,6 +2189,10 @@ impl Timeline { /// /// This function takes the current timeline's locked LayerMap as an argument, /// so callers can avoid potential race conditions. + /// + /// # Cancel-Safety + /// + /// This method is cancellation-safe. async fn get_reconstruct_data( &self, key: Key, @@ -2371,6 +2441,9 @@ impl Timeline { } } + /// # Cancel-safety + /// + /// This method is cancellation-safe. async fn lookup_cached_page( &self, key: &Key, diff --git a/pageserver/src/tenant/timeline/delete.rs b/pageserver/src/tenant/timeline/delete.rs index 497796c80a..2a103a7ff4 100644 --- a/pageserver/src/tenant/timeline/delete.rs +++ b/pageserver/src/tenant/timeline/delete.rs @@ -21,7 +21,6 @@ use crate::{ }, CreateTimelineCause, DeleteTimelineError, Tenant, }, - InitializationOrder, }; use super::{Timeline, TimelineResources}; @@ -407,7 +406,6 @@ impl DeleteTimelineFlow { local_metadata: &TimelineMetadata, remote_client: Option, deletion_queue_client: DeletionQueueClient, - init_order: Option<&InitializationOrder>, ) -> anyhow::Result<()> { // Note: here we even skip populating layer map. Timeline is essentially uninitialized. // RemoteTimelineClient is the only functioning part. @@ -420,7 +418,6 @@ impl DeleteTimelineFlow { remote_client, deletion_queue_client, }, - init_order, // Important. We dont pass ancestor above because it can be missing. // Thus we need to skip the validation here. CreateTimelineCause::Delete, diff --git a/pageserver/src/tenant/timeline/logical_size.rs b/pageserver/src/tenant/timeline/logical_size.rs index a33fb28ebd..f2db8c91fc 100644 --- a/pageserver/src/tenant/timeline/logical_size.rs +++ b/pageserver/src/tenant/timeline/logical_size.rs @@ -1,11 +1,10 @@ use anyhow::Context; -use once_cell::sync::OnceCell; -use tokio::sync::Semaphore; +use once_cell::sync::OnceCell; +use tokio_util::sync::CancellationToken; use utils::lsn::Lsn; use std::sync::atomic::{AtomicBool, AtomicI64, Ordering as AtomicOrdering}; -use std::sync::Arc; /// Internal structure to hold all data needed for logical size calculation. /// @@ -28,8 +27,12 @@ pub(super) struct LogicalSize { crate::metrics::initial_logical_size::FinishedCalculationGuard, )>, - /// Semaphore to track ongoing calculation of `initial_logical_size`. - pub initial_size_computation: Arc, + /// Cancellation for the best-effort logical size calculation. + /// + /// The token is kept in a once-cell so that we can error out if a higher priority + /// request comes in *before* we have started the normal logical size calculation. + pub(crate) cancel_wait_for_background_loop_concurrency_limit_semaphore: + OnceCell, /// Latest Lsn that has its size uncalculated, could be absent for freshly created timelines. pub initial_part_end: Option, @@ -72,7 +75,7 @@ pub(crate) enum CurrentLogicalSize { Exact(Exact), } -#[derive(Debug, Copy, Clone)] +#[derive(Debug, Copy, Clone, PartialEq, Eq)] pub(crate) enum Accuracy { Approximate, Exact, @@ -115,11 +118,10 @@ impl LogicalSize { Self { initial_logical_size: OnceCell::with_value((0, { crate::metrics::initial_logical_size::START_CALCULATION - .first(None) + .first(crate::metrics::initial_logical_size::StartCircumstances::EmptyInitial) .calculation_result_saved() })), - // initial_logical_size already computed, so, don't admit any calculations - initial_size_computation: Arc::new(Semaphore::new(0)), + cancel_wait_for_background_loop_concurrency_limit_semaphore: OnceCell::new(), initial_part_end: None, size_added_after_initial: AtomicI64::new(0), did_return_approximate_to_walreceiver: AtomicBool::new(false), @@ -129,7 +131,7 @@ impl LogicalSize { pub(super) fn deferred_initial(compute_to: Lsn) -> Self { Self { initial_logical_size: OnceCell::new(), - initial_size_computation: Arc::new(Semaphore::new(1)), + cancel_wait_for_background_loop_concurrency_limit_semaphore: OnceCell::new(), initial_part_end: Some(compute_to), size_added_after_initial: AtomicI64::new(0), did_return_approximate_to_walreceiver: AtomicBool::new(false), diff --git a/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs b/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs index 7045658f24..3bcb7ff891 100644 --- a/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs +++ b/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs @@ -397,7 +397,10 @@ pub(super) async fn handle_walreceiver_connection( // Send the replication feedback message. // Regular standby_status_update fields are put into this message. let current_timeline_size = timeline - .get_current_logical_size(&ctx) + .get_current_logical_size( + crate::tenant::timeline::GetLogicalSizePriority::User, + &ctx, + ) // FIXME: https://github.com/neondatabase/neon/issues/5963 .size_dont_care_about_accuracy(); let status_update = PageserverFeedback { diff --git a/test_runner/regress/test_ondemand_download.py b/test_runner/regress/test_ondemand_download.py index a4cd42b6c3..86a749eaf3 100644 --- a/test_runner/regress/test_ondemand_download.py +++ b/test_runner/regress/test_ondemand_download.py @@ -384,7 +384,7 @@ def test_download_remote_layers_api( env.pageserver.allowed_errors.extend( [ ".*download failed: downloading evicted layer file failed.*", - f".*initial size calculation.*{tenant_id}.*{timeline_id}.*Failed to calculate logical size", + f".*initial_size_calculation.*{tenant_id}.*{timeline_id}.*initial size calculation failed: downloading evicted layer file failed", ] ) diff --git a/test_runner/regress/test_pageserver_restart.py b/test_runner/regress/test_pageserver_restart.py index 443b0812fd..3cac32b790 100644 --- a/test_runner/regress/test_pageserver_restart.py +++ b/test_runner/regress/test_pageserver_restart.py @@ -106,7 +106,6 @@ def test_pageserver_restart(neon_env_builder: NeonEnvBuilder, generations: bool) # Initial tenant load should reflect the delay we injected ("initial_tenant_load", lambda t, p: t >= (tenant_load_delay_ms / 1000.0) and t >= p), # Subsequent steps should occur in expected order - ("initial_logical_sizes", lambda t, p: t > 0 and t >= p), ("background_jobs_can_start", lambda t, p: t > 0 and t >= p), ("complete", lambda t, p: t > 0 and t >= p), ]