diff --git a/pageserver/src/disk_usage_eviction_task.rs b/pageserver/src/disk_usage_eviction_task.rs index ca44fbe6ae..738a783813 100644 --- a/pageserver/src/disk_usage_eviction_task.rs +++ b/pageserver/src/disk_usage_eviction_task.rs @@ -61,6 +61,7 @@ use crate::{ remote_timeline_client::LayerFileMetadata, secondary::SecondaryTenant, storage_layer::{AsLayerDesc, EvictionError, Layer, LayerName, LayerVisibilityHint}, + tasks::sleep_random, }, CancellableTask, DiskUsageEvictionTask, }; @@ -210,14 +211,8 @@ async fn disk_usage_eviction_task( info!("disk usage based eviction task finishing"); }; - use crate::tenant::tasks::random_init_delay; - { - if random_init_delay(task_config.period, &cancel) - .await - .is_err() - { - return; - } + if sleep_random(task_config.period, &cancel).await.is_err() { + return; } let mut iteration_no = 0; diff --git a/pageserver/src/tenant/tasks.rs b/pageserver/src/tenant/tasks.rs index d562f7b783..a45eb002bd 100644 --- a/pageserver/src/tenant/tasks.rs +++ b/pageserver/src/tenant/tasks.rs @@ -1,15 +1,17 @@ -//! This module contains functions to serve per-tenant background processes, -//! such as compaction and GC +//! This module contains per-tenant background processes, e.g. compaction and GC. use std::cmp::max; -use std::ops::ControlFlow; +use std::future::Future; +use std::ops::{ControlFlow, RangeInclusive}; +use std::pin::pin; use std::str::FromStr; use std::sync::{Arc, Mutex}; use std::time::{Duration, Instant}; use once_cell::sync::Lazy; use rand::Rng; -use tokio::sync::Semaphore; +use scopeguard::defer; +use tokio::sync::{Semaphore, SemaphorePermit}; use tokio_util::sync::CancellationToken; use tracing::*; @@ -20,8 +22,10 @@ use crate::tenant::throttle::Stats; use crate::tenant::timeline::compaction::CompactionOutcome; use crate::tenant::timeline::CompactionError; use crate::tenant::{Tenant, TenantState}; +use pageserver_api::config::tenant_conf_defaults::DEFAULT_COMPACTION_PERIOD; +use utils::completion::Barrier; use utils::rate_limit::RateLimit; -use utils::{backoff, completion, pausable_failpoint}; +use utils::{backoff, pausable_failpoint}; /// Semaphore limiting concurrent background tasks (across all tenants). /// @@ -52,6 +56,10 @@ static CONCURRENT_COMPACTION_TASKS: Lazy = Lazy::new(|| { Semaphore::new(permits) }); +/// Background jobs. +/// +/// NB: not all of these acquire a CONCURRENT_BACKGROUND_TASKS semaphore permit, only the ones that +/// do any significant IO. #[derive( Debug, PartialEq, @@ -76,15 +84,15 @@ pub(crate) enum BackgroundLoopKind { } pub struct BackgroundLoopSemaphorePermit<'a> { - _permit: tokio::sync::SemaphorePermit<'static>, + _permit: SemaphorePermit<'static>, _recorder: BackgroundLoopSemaphoreMetricsRecorder<'a>, } -/// Cancellation safe. -pub(crate) async fn concurrent_background_tasks_rate_limit_permit( - _ctx: &RequestContext, +/// Acquires a semaphore permit, to limit concurrent background jobs. +pub(crate) async fn acquire_concurrency_permit( loop_kind: BackgroundLoopKind, use_compaction_semaphore: bool, + _ctx: &RequestContext, ) -> BackgroundLoopSemaphorePermit<'static> { // TODO: use a lower threshold and remove the pacer once we resolve some blockage. const WARN_THRESHOLD: Duration = Duration::from_secs(600); @@ -121,12 +129,10 @@ pub(crate) async fn concurrent_background_tasks_rate_limit_permit( } } -/// Start per tenant background loops: compaction and gc. -pub fn start_background_loops( - tenant: &Arc, - background_jobs_can_start: Option<&completion::Barrier>, -) { +/// Start per tenant background loops: compaction, GC, and ingest housekeeping. +pub fn start_background_loops(tenant: &Arc, can_start: Option<&Barrier>) { let tenant_shard_id = tenant.tenant_shard_id; + task_mgr::spawn( BACKGROUND_RUNTIME.handle(), TaskKind::Compaction, @@ -135,13 +141,15 @@ pub fn start_background_loops( &format!("compactor for tenant {tenant_shard_id}"), { let tenant = Arc::clone(tenant); - let background_jobs_can_start = background_jobs_can_start.cloned(); + let can_start = can_start.cloned(); async move { - let cancel = task_mgr::shutdown_token(); + let cancel = task_mgr::shutdown_token(); // NB: must be in async context tokio::select! { - _ = cancel.cancelled() => { return Ok(()) }, - _ = completion::Barrier::maybe_wait(background_jobs_can_start) => {} + _ = cancel.cancelled() => return Ok(()), + _ = Barrier::maybe_wait(can_start) => {} }; + TENANT_TASK_EVENTS.with_label_values(&["start"]).inc(); + defer!(TENANT_TASK_EVENTS.with_label_values(&["stop"]).inc()); compaction_loop(tenant, cancel) // If you rename this span, change the RUST_LOG env variable in test_runner/performance/test_branch_creation.py .instrument(info_span!("compaction_loop", tenant_id = %tenant_shard_id.tenant_id, shard_id = %tenant_shard_id.shard_slug())) @@ -150,6 +158,7 @@ pub fn start_background_loops( } }, ); + task_mgr::spawn( BACKGROUND_RUNTIME.handle(), TaskKind::GarbageCollector, @@ -158,13 +167,15 @@ pub fn start_background_loops( &format!("garbage collector for tenant {tenant_shard_id}"), { let tenant = Arc::clone(tenant); - let background_jobs_can_start = background_jobs_can_start.cloned(); + let can_start = can_start.cloned(); async move { - let cancel = task_mgr::shutdown_token(); + let cancel = task_mgr::shutdown_token(); // NB: must be in async context tokio::select! { - _ = cancel.cancelled() => { return Ok(()) }, - _ = completion::Barrier::maybe_wait(background_jobs_can_start) => {} + _ = cancel.cancelled() => return Ok(()), + _ = Barrier::maybe_wait(can_start) => {} }; + TENANT_TASK_EVENTS.with_label_values(&["start"]).inc(); + defer!(TENANT_TASK_EVENTS.with_label_values(&["stop"]).inc()); gc_loop(tenant, cancel) .instrument(info_span!("gc_loop", tenant_id = %tenant_shard_id.tenant_id, shard_id = %tenant_shard_id.shard_slug())) .await; @@ -181,13 +192,15 @@ pub fn start_background_loops( &format!("ingest housekeeping for tenant {tenant_shard_id}"), { let tenant = Arc::clone(tenant); - let background_jobs_can_start = background_jobs_can_start.cloned(); + let can_start = can_start.cloned(); async move { - let cancel = task_mgr::shutdown_token(); + let cancel = task_mgr::shutdown_token(); // NB: must be in async context tokio::select! { - _ = cancel.cancelled() => { return Ok(()) }, - _ = completion::Barrier::maybe_wait(background_jobs_can_start) => {} + _ = cancel.cancelled() => return Ok(()), + _ = Barrier::maybe_wait(can_start) => {} }; + TENANT_TASK_EVENTS.with_label_values(&["start"]).inc(); + defer!(TENANT_TASK_EVENTS.with_label_values(&["stop"]).inc()); ingest_housekeeping_loop(tenant, cancel) .instrument(info_span!("ingest_housekeeping_loop", tenant_id = %tenant_shard_id.tenant_id, shard_id = %tenant_shard_id.shard_slug())) .await; @@ -197,372 +210,309 @@ pub fn start_background_loops( ); } -/// -/// Compaction task's main loop -/// +/// Compaction task's main loop. async fn compaction_loop(tenant: Arc, cancel: CancellationToken) { const MAX_BACKOFF_SECS: f64 = 300.0; - // How many errors we have seen consequtively - let mut error_run_count = 0; - TENANT_TASK_EVENTS.with_label_values(&["start"]).inc(); - async { - let ctx = RequestContext::todo_child(TaskKind::Compaction, DownloadBehavior::Download); - let mut first = true; - loop { - tokio::select! { - _ = cancel.cancelled() => { - return; - }, - tenant_wait_result = wait_for_active_tenant(&tenant) => match tenant_wait_result { - ControlFlow::Break(()) => return, - ControlFlow::Continue(()) => (), - }, - } + let ctx = RequestContext::todo_child(TaskKind::Compaction, DownloadBehavior::Download); + let mut first = true; + let mut error_run = 0; // consecutive errors - let period = tenant.get_compaction_period(); + loop { + if wait_for_active_tenant(&tenant, &cancel).await.is_break() { + return; + } - // TODO: we shouldn't need to await to find tenant and this could be moved outside of - // loop, #3501. There are also additional "allowed_errors" in tests. - if first { - first = false; - if random_init_delay(period, &cancel).await.is_err() { - break; - } - } + let period = tenant.get_compaction_period(); - let sleep_duration; - if period == Duration::ZERO { - #[cfg(not(feature = "testing"))] - info!("automatic compaction is disabled"); - // check again in 10 seconds, in case it's been enabled again. - sleep_duration = Duration::from_secs(10) - } else { - let iteration = Iteration { - started_at: Instant::now(), - period, - kind: BackgroundLoopKind::Compaction, - }; - - // Run compaction - let IterationResult { output, elapsed } = iteration - .run(tenant.compaction_iteration(&cancel, &ctx)) - .await; - match output { - Ok(outcome) => { - error_run_count = 0; - // schedule the next compaction immediately in case there is a pending compaction task - sleep_duration = if let CompactionOutcome::Pending = outcome { - Duration::from_secs(1) - } else { - period - }; - } - Err(e) => { - let wait_duration = backoff::exponential_backoff_duration_seconds( - error_run_count + 1, - 1.0, - MAX_BACKOFF_SECS, - ); - error_run_count += 1; - let wait_duration = Duration::from_secs_f64(wait_duration); - log_compaction_error( - &e, - error_run_count, - &wait_duration, - cancel.is_cancelled(), - ); - sleep_duration = wait_duration; - } - } - - // the duration is recorded by performance tests by enabling debug in this function - tracing::debug!( - elapsed_ms = elapsed.as_millis(), - "compaction iteration complete" - ); - }; - - // Perhaps we did no work and the walredo process has been idle for some time: - // give it a chance to shut down to avoid leaving walredo process running indefinitely. - // TODO: move this to a separate task (housekeeping loop) that isn't affected by the back-off, - // so we get some upper bound guarantee on when walredo quiesce / this throttling reporting here happens. - if let Some(walredo_mgr) = &tenant.walredo_mgr { - walredo_mgr.maybe_quiesce(period * 10); - } - - // Sleep - if tokio::time::timeout(sleep_duration, cancel.cancelled()) - .await - .is_ok() - { + // TODO: we shouldn't need to await to find tenant and this could be moved outside of + // loop, #3501. There are also additional "allowed_errors" in tests. + if first { + first = false; + if sleep_random(period, &cancel).await.is_err() { break; } } + + let sleep_duration; + if period == Duration::ZERO { + #[cfg(not(feature = "testing"))] + info!("automatic compaction is disabled"); + // check again in 10 seconds, in case it's been enabled again. + sleep_duration = Duration::from_secs(10) + } else { + let iteration = Iteration { + started_at: Instant::now(), + period, + kind: BackgroundLoopKind::Compaction, + }; + + // Run compaction + let IterationResult { output, elapsed } = iteration + .run(tenant.compaction_iteration(&cancel, &ctx)) + .await; + match output { + Ok(outcome) => { + error_run = 0; + // schedule the next compaction immediately in case there is a pending compaction task + sleep_duration = if let CompactionOutcome::Pending = outcome { + Duration::from_secs(1) + } else { + period + }; + } + Err(err) => { + let wait_duration = backoff::exponential_backoff_duration_seconds( + error_run + 1, + 1.0, + MAX_BACKOFF_SECS, + ); + error_run += 1; + let wait_duration = Duration::from_secs_f64(wait_duration); + log_compaction_error(&err, error_run, &wait_duration, cancel.is_cancelled()); + sleep_duration = wait_duration; + } + } + + // the duration is recorded by performance tests by enabling debug in this function + debug!( + elapsed_ms = elapsed.as_millis(), + "compaction iteration complete" + ); + }; + + // Perhaps we did no work and the walredo process has been idle for some time: + // give it a chance to shut down to avoid leaving walredo process running indefinitely. + // TODO: move this to a separate task (housekeeping loop) that isn't affected by the back-off, + // so we get some upper bound guarantee on when walredo quiesce / this throttling reporting here happens. + if let Some(walredo_mgr) = &tenant.walredo_mgr { + walredo_mgr.maybe_quiesce(period * 10); + } + + // Sleep + if tokio::time::timeout(sleep_duration, cancel.cancelled()) + .await + .is_ok() + { + break; + } } - .await; - TENANT_TASK_EVENTS.with_label_values(&["stop"]).inc(); } fn log_compaction_error( - e: &CompactionError, - error_run_count: u32, - sleep_duration: &std::time::Duration, + err: &CompactionError, + error_count: u32, + sleep_duration: &Duration, task_cancelled: bool, ) { use crate::tenant::upload_queue::NotInitialized; use crate::tenant::PageReconstructError; use CompactionError::*; - enum LooksLike { - Info, - Error, - } + let level = match err { + ShuttingDown => return, + Offload(_) => Level::ERROR, + _ if task_cancelled => Level::INFO, + Other(err) => { + let root_cause = err.root_cause(); - let decision = match e { - ShuttingDown => None, - Offload(_) => Some(LooksLike::Error), - _ if task_cancelled => Some(LooksLike::Info), - Other(e) => { - let root_cause = e.root_cause(); - - let is_stopping = { - let upload_queue = root_cause - .downcast_ref::() - .is_some_and(|e| e.is_stopping()); - - let timeline = root_cause - .downcast_ref::() - .is_some_and(|e| e.is_stopping()); - - upload_queue || timeline - }; + let upload_queue = root_cause + .downcast_ref::() + .is_some_and(|e| e.is_stopping()); + let timeline = root_cause + .downcast_ref::() + .is_some_and(|e| e.is_stopping()); + let is_stopping = upload_queue || timeline; if is_stopping { - Some(LooksLike::Info) + Level::INFO } else { - Some(LooksLike::Error) + Level::ERROR } } }; - match decision { - Some(LooksLike::Info) => info!( - "Compaction failed {error_run_count} times, retrying in {sleep_duration:?}: {e:#}", - ), - Some(LooksLike::Error) => error!( - "Compaction failed {error_run_count} times, retrying in {sleep_duration:?}: {e:?}", - ), - None => {} + match level { + Level::ERROR => { + error!("Compaction failed {error_count} times, retrying in {sleep_duration:?}: {err:#}") + } + Level::INFO => { + info!("Compaction failed {error_count} times, retrying in {sleep_duration:?}: {err:#}") + } + level => unimplemented!("unexpected level {level:?}"), } } -/// -/// GC task's main loop -/// +/// GC task's main loop. async fn gc_loop(tenant: Arc, cancel: CancellationToken) { const MAX_BACKOFF_SECS: f64 = 300.0; - // How many errors we have seen consequtively - let mut error_run_count = 0; + let mut error_run = 0; // consecutive errors - TENANT_TASK_EVENTS.with_label_values(&["start"]).inc(); - async { - // GC might require downloading, to find the cutoff LSN that corresponds to the - // cutoff specified as time. - let ctx = - RequestContext::todo_child(TaskKind::GarbageCollector, DownloadBehavior::Download); + // GC might require downloading, to find the cutoff LSN that corresponds to the + // cutoff specified as time. + let ctx = RequestContext::todo_child(TaskKind::GarbageCollector, DownloadBehavior::Download); + let mut first = true; - let mut first = true; - loop { - tokio::select! { - _ = cancel.cancelled() => { - return; - }, - tenant_wait_result = wait_for_active_tenant(&tenant) => match tenant_wait_result { - ControlFlow::Break(()) => return, - ControlFlow::Continue(()) => (), - }, - } + loop { + if wait_for_active_tenant(&tenant, &cancel).await.is_break() { + return; + } - let period = tenant.get_gc_period(); + let period = tenant.get_gc_period(); - if first { - first = false; - - let delays = async { - random_init_delay(period, &cancel).await?; - Ok::<_, Cancelled>(()) - }; - - if delays.await.is_err() { - break; - } - } - - let gc_horizon = tenant.get_gc_horizon(); - let sleep_duration; - if period == Duration::ZERO || gc_horizon == 0 { - #[cfg(not(feature = "testing"))] - info!("automatic GC is disabled"); - // check again in 10 seconds, in case it's been enabled again. - sleep_duration = Duration::from_secs(10); - } else { - let iteration = Iteration { - started_at: Instant::now(), - period, - kind: BackgroundLoopKind::Gc, - }; - // Run gc - let IterationResult { output, elapsed: _ } = - iteration.run(tenant.gc_iteration(None, gc_horizon, tenant.get_pitr_interval(), &cancel, &ctx)) - .await; - match output { - Ok(_) => { - error_run_count = 0; - sleep_duration = period; - } - Err(crate::tenant::GcError::TenantCancelled) => { - return; - } - Err(e) => { - let wait_duration = backoff::exponential_backoff_duration_seconds( - error_run_count + 1, - 1.0, - MAX_BACKOFF_SECS, - ); - error_run_count += 1; - let wait_duration = Duration::from_secs_f64(wait_duration); - - if matches!(e, crate::tenant::GcError::TimelineCancelled) { - // Timeline was cancelled during gc. We might either be in an event - // that affects the entire tenant (tenant deletion, pageserver shutdown), - // or in one that affects the timeline only (timeline deletion). - // Therefore, don't exit the loop. - info!("Gc failed {error_run_count} times, retrying in {wait_duration:?}: {e:?}"); - } else { - error!("Gc failed {error_run_count} times, retrying in {wait_duration:?}: {e:?}"); - } - - sleep_duration = wait_duration; - } - } - }; - - if tokio::time::timeout(sleep_duration, cancel.cancelled()) - .await - .is_ok() - { + if first { + first = false; + if sleep_random(period, &cancel).await.is_err() { break; } } - } - .await; - TENANT_TASK_EVENTS.with_label_values(&["stop"]).inc(); -} - -async fn ingest_housekeeping_loop(tenant: Arc, cancel: CancellationToken) { - TENANT_TASK_EVENTS.with_label_values(&["start"]).inc(); - async { - let mut last_throttle_flag_reset_at = Instant::now(); - loop { - tokio::select! { - _ = cancel.cancelled() => { - return; - }, - tenant_wait_result = wait_for_active_tenant(&tenant) => match tenant_wait_result { - ControlFlow::Break(()) => return, - ControlFlow::Continue(()) => (), - }, - } - - // We run ingest housekeeping with the same frequency as compaction: it is not worth - // having a distinct setting. But we don't run it in the same task, because compaction - // blocks on acquiring the background job semaphore. - let period = tenant.get_compaction_period(); - - // If compaction period is set to zero (to disable it), then we will use a reasonable default - let period = if period == Duration::ZERO { - humantime::Duration::from_str( - pageserver_api::config::tenant_conf_defaults::DEFAULT_COMPACTION_PERIOD, - ) - .unwrap() - .into() - } else { - period - }; - - // Jitter the period by +/- 5% - let period = - rand::thread_rng().gen_range((period * (95)) / 100..(period * (105)) / 100); - - // Always sleep first: we do not need to do ingest housekeeping early in the lifetime of - // a tenant, since it won't have started writing any ephemeral files yet. - if tokio::time::timeout(period, cancel.cancelled()) - .await - .is_ok() - { - break; - } + let gc_horizon = tenant.get_gc_horizon(); + let sleep_duration; + if period == Duration::ZERO || gc_horizon == 0 { + #[cfg(not(feature = "testing"))] + info!("automatic GC is disabled"); + // check again in 10 seconds, in case it's been enabled again. + sleep_duration = Duration::from_secs(10); + } else { let iteration = Iteration { started_at: Instant::now(), period, - kind: BackgroundLoopKind::IngestHouseKeeping, + kind: BackgroundLoopKind::Gc, }; - iteration.run(tenant.ingest_housekeeping()).await; - - // TODO: rename the background loop kind to something more generic, like, tenant housekeeping. - // Or just spawn another background loop for this throttle, it's not like it's super costly. - info_span!(parent: None, "pagestream_throttle", tenant_id=%tenant.tenant_shard_id, shard_id=%tenant.tenant_shard_id.shard_slug()).in_scope(|| { - let now = Instant::now(); - let prev = std::mem::replace(&mut last_throttle_flag_reset_at, now); - let Stats { count_accounted_start, count_accounted_finish, count_throttled, sum_throttled_usecs} = tenant.pagestream_throttle.reset_stats(); - if count_throttled == 0 { + // Run gc + let IterationResult { output, elapsed: _ } = iteration + .run(tenant.gc_iteration( + None, + gc_horizon, + tenant.get_pitr_interval(), + &cancel, + &ctx, + )) + .await; + match output { + Ok(_) => { + error_run = 0; + sleep_duration = period; + } + Err(crate::tenant::GcError::TenantCancelled) => { return; } - let allowed_rps = tenant.pagestream_throttle.steady_rps(); - let delta = now - prev; - info!( - n_seconds=%format_args!("{:.3}", delta.as_secs_f64()), - count_accounted = count_accounted_finish, // don't break existing log scraping - count_throttled, - sum_throttled_usecs, - count_accounted_start, // log after pre-existing fields to not break existing log scraping - allowed_rps=%format_args!("{allowed_rps:.0}"), - "shard was throttled in the last n_seconds" - ); - }); - } - } - .await; - TENANT_TASK_EVENTS.with_label_values(&["stop"]).inc(); -} + Err(e) => { + let wait_duration = backoff::exponential_backoff_duration_seconds( + error_run + 1, + 1.0, + MAX_BACKOFF_SECS, + ); + error_run += 1; + let wait_duration = Duration::from_secs_f64(wait_duration); -async fn wait_for_active_tenant(tenant: &Arc) -> ControlFlow<()> { - // if the tenant has a proper status already, no need to wait for anything - if tenant.current_state() == TenantState::Active { - ControlFlow::Continue(()) - } else { - let mut tenant_state_updates = tenant.subscribe_for_state_updates(); - loop { - match tenant_state_updates.changed().await { - Ok(()) => { - let new_state = &*tenant_state_updates.borrow(); - match new_state { - TenantState::Active => { - debug!("Tenant state changed to active, continuing the task loop"); - return ControlFlow::Continue(()); - } - state => { - debug!("Not running the task loop, tenant is not active: {state:?}"); - continue; - } + if matches!(e, crate::tenant::GcError::TimelineCancelled) { + // Timeline was cancelled during gc. We might either be in an event + // that affects the entire tenant (tenant deletion, pageserver shutdown), + // or in one that affects the timeline only (timeline deletion). + // Therefore, don't exit the loop. + info!("Gc failed {error_run} times, retrying in {wait_duration:?}: {e:?}"); + } else { + error!("Gc failed {error_run} times, retrying in {wait_duration:?}: {e:?}"); } - } - Err(_sender_dropped_error) => { - return ControlFlow::Break(()); + + sleep_duration = wait_duration; } } + }; + + if tokio::time::timeout(sleep_duration, cancel.cancelled()) + .await + .is_ok() + { + break; + } + } +} + +/// Ingest housekeeping's main loop. +async fn ingest_housekeeping_loop(tenant: Arc, cancel: CancellationToken) { + let mut last_throttle_flag_reset_at = Instant::now(); + loop { + if wait_for_active_tenant(&tenant, &cancel).await.is_break() { + return; + } + + // We run ingest housekeeping with the same frequency as compaction: it is not worth + // having a distinct setting. But we don't run it in the same task, because compaction + // blocks on acquiring the background job semaphore. + let mut period = tenant.get_compaction_period(); + + // If compaction period is set to zero (to disable it), then we will use a reasonable default + if period == Duration::ZERO { + period = humantime::Duration::from_str(DEFAULT_COMPACTION_PERIOD) + .unwrap() + .into() + } + + // Always sleep first: we do not need to do ingest housekeeping early in the lifetime of + // a tenant, since it won't have started writing any ephemeral files yet. Jitter the + // period by ±5%. + let Ok(period) = sleep_jitter(period, period * 5 / 100, &cancel).await else { + break; + }; + + let iteration = Iteration { + started_at: Instant::now(), + period, + kind: BackgroundLoopKind::IngestHouseKeeping, + }; + iteration.run(tenant.ingest_housekeeping()).await; + + // TODO: rename the background loop kind to something more generic, like, tenant housekeeping. + // Or just spawn another background loop for this throttle, it's not like it's super costly. + info_span!(parent: None, "pagestream_throttle", tenant_id=%tenant.tenant_shard_id, shard_id=%tenant.tenant_shard_id.shard_slug()).in_scope(|| { + let now = Instant::now(); + let prev = std::mem::replace(&mut last_throttle_flag_reset_at, now); + let Stats { count_accounted_start, count_accounted_finish, count_throttled, sum_throttled_usecs} = tenant.pagestream_throttle.reset_stats(); + if count_throttled == 0 { + return; + } + let allowed_rps = tenant.pagestream_throttle.steady_rps(); + let delta = now - prev; + info!( + n_seconds=%format_args!("{:.3}", delta.as_secs_f64()), + count_accounted = count_accounted_finish, // don't break existing log scraping + count_throttled, + sum_throttled_usecs, + count_accounted_start, // log after pre-existing fields to not break existing log scraping + allowed_rps=%format_args!("{allowed_rps:.0}"), + "shard was throttled in the last n_seconds" + ); + }); + } +} + +/// Waits until the tenant becomes active, or returns `ControlFlow::Break()` to shut down. +async fn wait_for_active_tenant( + tenant: &Arc, + cancel: &CancellationToken, +) -> ControlFlow<()> { + if tenant.current_state() == TenantState::Active { + return ControlFlow::Continue(()); + } + + let mut update_rx = tenant.subscribe_for_state_updates(); + loop { + tokio::select! { + _ = cancel.cancelled() => return ControlFlow::Break(()), + result = update_rx.changed() => if result.is_err() { + return ControlFlow::Break(()); + } + } + + match &*update_rx.borrow() { + TenantState::Active => { + debug!("Tenant state changed to active, continuing the task loop"); + return ControlFlow::Continue(()); + } + state => debug!("Not running the task loop, tenant is not active: {state:?}"), } } } @@ -571,26 +521,41 @@ async fn wait_for_active_tenant(tenant: &Arc) -> ControlFlow<()> { #[error("cancelled")] pub(crate) struct Cancelled; -/// Provide a random delay for background task initialization. +/// Sleeps for a random interval up to the given max value. /// /// This delay prevents a thundering herd of background tasks and will likely keep them running on /// different periods for more stable load. -pub(crate) async fn random_init_delay( - period: Duration, +pub(crate) async fn sleep_random( + max: Duration, cancel: &CancellationToken, -) -> Result<(), Cancelled> { - if period == Duration::ZERO { - return Ok(()); - } +) -> Result { + sleep_random_range(Duration::ZERO..=max, cancel).await +} - let d = { - let mut rng = rand::thread_rng(); - rng.gen_range(Duration::ZERO..=period) - }; - match tokio::time::timeout(d, cancel.cancelled()).await { - Ok(_) => Err(Cancelled), - Err(_) => Ok(()), +/// Sleeps for a random interval in the given range. Returns the duration. +pub(crate) async fn sleep_random_range( + interval: RangeInclusive, + cancel: &CancellationToken, +) -> Result { + let delay = rand::thread_rng().gen_range(interval); + if delay == Duration::ZERO { + return Ok(delay); } + tokio::select! { + _ = cancel.cancelled() => Err(Cancelled), + _ = tokio::time::sleep(delay) => Ok(delay), + } +} + +/// Sleeps for an interval with a random jitter. +pub(crate) async fn sleep_jitter( + duration: Duration, + jitter: Duration, + cancel: &CancellationToken, +) -> Result { + let from = duration.saturating_sub(jitter); + let to = duration.saturating_add(jitter); + sleep_random_range(from..=to, cancel).await } struct Iteration { @@ -606,42 +571,25 @@ struct IterationResult { impl Iteration { #[instrument(skip_all)] - pub(crate) async fn run(self, fut: Fut) -> IterationResult - where - Fut: std::future::Future, - { - let Self { - started_at, - period, - kind, - } = self; - - let mut fut = std::pin::pin!(fut); + pub(crate) async fn run, O>(self, fut: F) -> IterationResult { + let mut fut = pin!(fut); // Wrap `fut` into a future that logs a message every `period` so that we get a // very obvious breadcrumb in the logs _while_ a slow iteration is happening. - let liveness_logger = async move { - loop { - match tokio::time::timeout(period, &mut fut).await { - Ok(x) => return x, - Err(_) => { - // info level as per the same rationale why warn_when_period_overrun is info - // => https://github.com/neondatabase/neon/pull/5724 - info!("still running"); - } - } + let output = loop { + match tokio::time::timeout(self.period, &mut fut).await { + Ok(r) => break r, + Err(_) => info!("still running"), } }; - - let output = liveness_logger.await; - - let elapsed = started_at.elapsed(); - warn_when_period_overrun(elapsed, period, kind); + let elapsed = self.started_at.elapsed(); + warn_when_period_overrun(elapsed, self.period, self.kind); IterationResult { output, elapsed } } } -/// Attention: the `task` and `period` beocme labels of a pageserver-wide prometheus metric. + +// NB: the `task` and `period` are used for metrics labels. pub(crate) fn warn_when_period_overrun( elapsed: Duration, period: Duration, diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 2be6fc1e59..f1843b4e96 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -1718,10 +1718,10 @@ impl Timeline { let prepare = async move { let guard = self.compaction_lock.lock().await; - let permit = super::tasks::concurrent_background_tasks_rate_limit_permit( - ctx, + let permit = super::tasks::acquire_concurrency_permit( BackgroundLoopKind::Compaction, self.conf.use_compaction_semaphore, + ctx, ) .await; @@ -3057,10 +3057,10 @@ impl Timeline { let self_ref = &self; let skip_concurrency_limiter = &skip_concurrency_limiter; async move { - let wait_for_permit = super::tasks::concurrent_background_tasks_rate_limit_permit( - background_ctx, + let wait_for_permit = super::tasks::acquire_concurrency_permit( BackgroundLoopKind::InitialLogicalSizeCalculation, false, + background_ctx, ); use crate::metrics::initial_logical_size::StartCircumstances; diff --git a/pageserver/src/tenant/timeline/eviction_task.rs b/pageserver/src/tenant/timeline/eviction_task.rs index 985329136e..42e5f1496d 100644 --- a/pageserver/src/tenant/timeline/eviction_task.rs +++ b/pageserver/src/tenant/timeline/eviction_task.rs @@ -32,7 +32,7 @@ use crate::{ tenant::{ size::CalculateSyntheticSizeError, storage_layer::LayerVisibilityHint, - tasks::{BackgroundLoopKind, BackgroundLoopSemaphorePermit}, + tasks::{sleep_random, BackgroundLoopKind, BackgroundLoopSemaphorePermit}, timeline::EvictionError, LogicalSizeCalculationCause, Tenant, }, @@ -83,8 +83,6 @@ impl Timeline { #[instrument(skip_all, fields(tenant_id = %self.tenant_shard_id.tenant_id, shard_id = %self.tenant_shard_id.shard_slug(), timeline_id = %self.timeline_id))] async fn eviction_task(self: Arc, tenant: Arc) { - use crate::tenant::tasks::random_init_delay; - // acquire the gate guard only once within a useful span let Ok(guard) = self.gate.enter() else { return; @@ -97,7 +95,7 @@ impl Timeline { EvictionPolicy::OnlyImitiate(lat) => lat.period, EvictionPolicy::NoEviction => Duration::from_secs(10), }; - if random_init_delay(period, &self.cancel).await.is_err() { + if sleep_random(period, &self.cancel).await.is_err() { return; } } @@ -334,10 +332,10 @@ impl Timeline { cancel: &CancellationToken, ctx: &RequestContext, ) -> ControlFlow<(), BackgroundLoopSemaphorePermit<'static>> { - let acquire_permit = crate::tenant::tasks::concurrent_background_tasks_rate_limit_permit( - ctx, + let acquire_permit = crate::tenant::tasks::acquire_concurrency_permit( BackgroundLoopKind::Eviction, false, + ctx, ); tokio::select! {