diff --git a/pageserver/src/task_mgr.rs b/pageserver/src/task_mgr.rs index b80a498c82..cb1b2b8011 100644 --- a/pageserver/src/task_mgr.rs +++ b/pageserver/src/task_mgr.rs @@ -561,9 +561,14 @@ pub async fn shutdown_watcher() { /// cancelled. It can however be moved to other tasks, such as `tokio::task::spawn_blocking` or /// `tokio::task::JoinSet::spawn`. pub fn shutdown_token() -> CancellationToken { - SHUTDOWN_TOKEN - .try_with(|t| t.clone()) - .expect("shutdown_token() called in an unexpected task or thread") + let res = SHUTDOWN_TOKEN.try_with(|t| t.clone()); + + if cfg!(test) { + // in tests this method is called from non-taskmgr spawned tasks, and that is all ok. + res.unwrap_or_default() + } else { + res.expect("shutdown_token() called in an unexpected task or thread") + } } /// Has the current task been requested to shut down? diff --git a/pageserver/src/tenant/tasks.rs b/pageserver/src/tenant/tasks.rs index 4b118442f4..7ff1873eda 100644 --- a/pageserver/src/tenant/tasks.rs +++ b/pageserver/src/tenant/tasks.rs @@ -54,29 +54,18 @@ impl BackgroundLoopKind { } } -pub(crate) enum RateLimitError { - Cancelled, -} - -pub(crate) async fn concurrent_background_tasks_rate_limit( +/// Cancellation safe. +pub(crate) async fn concurrent_background_tasks_rate_limit_permit( loop_kind: BackgroundLoopKind, _ctx: &RequestContext, - cancel: &CancellationToken, -) -> Result { +) -> impl Drop { let _guard = crate::metrics::BACKGROUND_LOOP_SEMAPHORE_WAIT_GAUGE .with_label_values(&[loop_kind.as_static_str()]) .guard(); - tokio::select! { - permit = CONCURRENT_BACKGROUND_TASKS.acquire() => { - match permit { - Ok(permit) => Ok(permit), - Err(_closed) => unreachable!("we never close the semaphore"), - } - }, - _ = cancel.cancelled() => { - Err(RateLimitError::Cancelled) - } + match CONCURRENT_BACKGROUND_TASKS.acquire().await { + Ok(permit) => permit, + Err(_closed) => unreachable!("we never close the semaphore"), } } diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index ac1922ccad..7438215a68 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -51,7 +51,7 @@ use crate::tenant::storage_layer::{ LayerAccessStatsReset, LayerFileName, ResidentLayer, ValueReconstructResult, ValueReconstructState, }; -use crate::tenant::tasks::{BackgroundLoopKind, RateLimitError}; +use crate::tenant::tasks::BackgroundLoopKind; use crate::tenant::timeline::logical_size::CurrentLogicalSize; use crate::tenant::{ layer_map::{LayerMap, SearchResult}, @@ -715,19 +715,27 @@ impl Timeline { flags: EnumSet, ctx: &RequestContext, ) -> Result<(), CompactionError> { - let _g = self.compaction_lock.lock().await; + // most likely the cancellation token is from background task, but in tests it could be the + // request task as well. + + let prepare = async move { + let guard = self.compaction_lock.lock().await; + + let permit = super::tasks::concurrent_background_tasks_rate_limit_permit( + BackgroundLoopKind::Compaction, + ctx, + ) + .await; + + (guard, permit) + }; // this wait probably never needs any "long time spent" logging, because we already nag if // compaction task goes over it's period (20s) which is quite often in production. - let _permit = match super::tasks::concurrent_background_tasks_rate_limit( - BackgroundLoopKind::Compaction, - ctx, - cancel, - ) - .await - { - Ok(permit) => permit, - Err(RateLimitError::Cancelled) => return Ok(()), + let (_guard, _permit) = tokio::select! { + tuple = prepare => { tuple }, + _ = self.cancel.cancelled() => return Ok(()), + _ = cancel.cancelled() => return Ok(()), }; let last_record_lsn = self.get_last_record_lsn(); @@ -1782,22 +1790,22 @@ impl Timeline { let skip_concurrency_limiter = &skip_concurrency_limiter; async move { let cancel = task_mgr::shutdown_token(); - let wait_for_permit = super::tasks::concurrent_background_tasks_rate_limit( + let wait_for_permit = super::tasks::concurrent_background_tasks_rate_limit_permit( BackgroundLoopKind::InitialLogicalSizeCalculation, background_ctx, - &cancel, ); use crate::metrics::initial_logical_size::StartCircumstances; let (_maybe_permit, circumstances) = tokio::select! { - res = wait_for_permit => { - match res { - Ok(permit) => (Some(permit), StartCircumstances::AfterBackgroundTasksRateLimit), - Err(RateLimitError::Cancelled) => { - return Err(BackgroundCalculationError::Cancelled); - } - } + permit = wait_for_permit => { + (Some(permit), StartCircumstances::AfterBackgroundTasksRateLimit) } + _ = self_ref.cancel.cancelled() => { + return Err(BackgroundCalculationError::Cancelled); + } + _ = cancel.cancelled() => { + return Err(BackgroundCalculationError::Cancelled); + }, () = skip_concurrency_limiter.cancelled() => { // Some action that is part of a end user interaction requested logical size // => break out of the rate limit @@ -3852,7 +3860,14 @@ impl Timeline { /// within a layer file. We can only remove the whole file if it's fully /// obsolete. pub(super) async fn gc(&self) -> anyhow::Result { - let _g = self.gc_lock.lock().await; + // this is most likely the background tasks, but it might be the spawned task from + // immediate_gc + let cancel = crate::task_mgr::shutdown_token(); + let _g = tokio::select! { + guard = self.gc_lock.lock() => guard, + _ = self.cancel.cancelled() => return Ok(GcResult::default()), + _ = cancel.cancelled() => return Ok(GcResult::default()), + }; let timer = self.metrics.garbage_collect_histo.start_timer(); fail_point!("before-timeline-gc"); diff --git a/pageserver/src/tenant/timeline/eviction_task.rs b/pageserver/src/tenant/timeline/eviction_task.rs index 782e8f9e39..ea5f5f5fa7 100644 --- a/pageserver/src/tenant/timeline/eviction_task.rs +++ b/pageserver/src/tenant/timeline/eviction_task.rs @@ -30,7 +30,7 @@ use crate::{ task_mgr::{self, TaskKind, BACKGROUND_RUNTIME}, tenant::{ config::{EvictionPolicy, EvictionPolicyLayerAccessThreshold}, - tasks::{BackgroundLoopKind, RateLimitError}, + tasks::BackgroundLoopKind, timeline::EvictionError, LogicalSizeCalculationCause, Tenant, }, @@ -158,15 +158,15 @@ impl Timeline { ) -> ControlFlow<()> { let now = SystemTime::now(); - let _permit = match crate::tenant::tasks::concurrent_background_tasks_rate_limit( + let acquire_permit = crate::tenant::tasks::concurrent_background_tasks_rate_limit_permit( BackgroundLoopKind::Eviction, ctx, - cancel, - ) - .await - { - Ok(permit) => permit, - Err(RateLimitError::Cancelled) => return ControlFlow::Break(()), + ); + + let _permit = tokio::select! { + permit = acquire_permit => permit, + _ = cancel.cancelled() => return ControlFlow::Break(()), + _ = self.cancel.cancelled() => return ControlFlow::Break(()), }; // If we evict layers but keep cached values derived from those layers, then