diff --git a/libs/pageserver_api/src/config.rs b/libs/pageserver_api/src/config.rs index a0b5feea94..b806bd391c 100644 --- a/libs/pageserver_api/src/config.rs +++ b/libs/pageserver_api/src/config.rs @@ -94,6 +94,7 @@ pub struct ConfigToml { pub ondemand_download_behavior_treat_error_as_warn: bool, #[serde(with = "humantime_serde")] pub background_task_maximum_delay: Duration, + pub use_compaction_semaphore: bool, pub control_plane_api: Option, pub control_plane_api_token: Option, pub control_plane_emergency_mode: bool, @@ -470,6 +471,7 @@ impl Default for ConfigToml { DEFAULT_BACKGROUND_TASK_MAXIMUM_DELAY, ) .unwrap()), + use_compaction_semaphore: false, control_plane_api: (None), control_plane_api_token: (None), diff --git a/pageserver/src/config.rs b/pageserver/src/config.rs index ce480c70a0..3c86b73933 100644 --- a/pageserver/src/config.rs +++ b/pageserver/src/config.rs @@ -140,6 +140,10 @@ pub struct PageServerConf { /// not terrible. pub background_task_maximum_delay: Duration, + /// If true, use a separate semaphore for compaction tasks instead of the common background task + /// semaphore. Defaults to false. + pub use_compaction_semaphore: bool, + pub control_plane_api: Option, /// JWT token for use with the control plane API. @@ -332,6 +336,7 @@ impl PageServerConf { test_remote_failures, ondemand_download_behavior_treat_error_as_warn, background_task_maximum_delay, + use_compaction_semaphore, control_plane_api, control_plane_api_token, control_plane_emergency_mode, @@ -385,6 +390,7 @@ impl PageServerConf { test_remote_failures, ondemand_download_behavior_treat_error_as_warn, background_task_maximum_delay, + use_compaction_semaphore, control_plane_api, control_plane_emergency_mode, heatmap_upload_concurrency, diff --git a/pageserver/src/tenant/tasks.rs b/pageserver/src/tenant/tasks.rs index 0f10dd7e10..d562f7b783 100644 --- a/pageserver/src/tenant/tasks.rs +++ b/pageserver/src/tenant/tasks.rs @@ -1,47 +1,56 @@ //! This module contains functions to serve per-tenant background processes, //! such as compaction and GC +use std::cmp::max; use std::ops::ControlFlow; use std::str::FromStr; use std::sync::{Arc, Mutex}; use std::time::{Duration, Instant}; +use once_cell::sync::Lazy; +use rand::Rng; +use tokio::sync::Semaphore; +use tokio_util::sync::CancellationToken; +use tracing::*; + use crate::context::{DownloadBehavior, RequestContext}; use crate::metrics::{BackgroundLoopSemaphoreMetricsRecorder, TENANT_TASK_EVENTS}; -use crate::task_mgr; -use crate::task_mgr::{TaskKind, BACKGROUND_RUNTIME}; +use crate::task_mgr::{self, TaskKind, BACKGROUND_RUNTIME, TOKIO_WORKER_THREADS}; use crate::tenant::throttle::Stats; use crate::tenant::timeline::compaction::CompactionOutcome; use crate::tenant::timeline::CompactionError; use crate::tenant::{Tenant, TenantState}; -use once_cell::sync::Lazy; -use rand::Rng; -use tokio_util::sync::CancellationToken; -use tracing::*; use utils::rate_limit::RateLimit; use utils::{backoff, completion, pausable_failpoint}; -static CONCURRENT_BACKGROUND_TASKS: once_cell::sync::Lazy = - once_cell::sync::Lazy::new(|| { - let total_threads = task_mgr::TOKIO_WORKER_THREADS.get(); - let permits = usize::max( - 1, - // while a lot of the work is done on spawn_blocking, we still do - // repartitioning in the async context. this should give leave us some workers - // unblocked to be blocked on other work, hopefully easing any outside visible - // effects of restarts. - // - // 6/8 is a guess; previously we ran with unlimited 8 and more from - // spawn_blocking. - (total_threads * 3).checked_div(4).unwrap_or(0), - ); - assert_ne!(permits, 0, "we will not be adding in permits later"); - assert!( - permits < total_threads, - "need threads avail for shorter work" - ); - tokio::sync::Semaphore::new(permits) - }); +/// Semaphore limiting concurrent background tasks (across all tenants). +/// +/// We use 3/4 Tokio threads, to avoid blocking all threads in case we do any CPU-heavy work. +static CONCURRENT_BACKGROUND_TASKS: Lazy = Lazy::new(|| { + let total_threads = TOKIO_WORKER_THREADS.get(); + let permits = max(1, (total_threads * 3).checked_div(4).unwrap_or(0)); + assert_ne!(permits, 0, "we will not be adding in permits later"); + assert!(permits < total_threads, "need threads for other work"); + Semaphore::new(permits) +}); + +/// Semaphore limiting concurrent compaction tasks (across all tenants). This is disabled by +/// default, see `use_compaction_semaphore`. +/// +/// We use 3/4 Tokio threads, to avoid blocking all threads in case we do any CPU-heavy work. +/// +/// This is a separate semaphore from background tasks, because L0 compaction needs to be responsive +/// to avoid high read amp during heavy write workloads. +/// +/// TODO: split image compaction and L0 compaction, and move image compaction to background tasks. +/// Only L0 compaction needs to be responsive, and it shouldn't block on image compaction. +static CONCURRENT_COMPACTION_TASKS: Lazy = Lazy::new(|| { + let total_threads = TOKIO_WORKER_THREADS.get(); + let permits = max(1, (total_threads * 3).checked_div(4).unwrap_or(0)); + assert_ne!(permits, 0, "we will not be adding in permits later"); + assert!(permits < total_threads, "need threads for other work"); + Semaphore::new(permits) +}); #[derive( Debug, @@ -73,8 +82,9 @@ pub struct BackgroundLoopSemaphorePermit<'a> { /// Cancellation safe. pub(crate) async fn concurrent_background_tasks_rate_limit_permit( - loop_kind: BackgroundLoopKind, _ctx: &RequestContext, + loop_kind: BackgroundLoopKind, + use_compaction_semaphore: bool, ) -> BackgroundLoopSemaphorePermit<'static> { // TODO: use a lower threshold and remove the pacer once we resolve some blockage. const WARN_THRESHOLD: Duration = Duration::from_secs(600); @@ -88,10 +98,13 @@ pub(crate) async fn concurrent_background_tasks_rate_limit_permit( } // TODO: assert that we run on BACKGROUND_RUNTIME; requires tokio_unstable Handle::id(); - let permit = CONCURRENT_BACKGROUND_TASKS - .acquire() - .await - .expect("should never close"); + let permit = if loop_kind == BackgroundLoopKind::Compaction && use_compaction_semaphore { + CONCURRENT_COMPACTION_TASKS.acquire().await + } else { + assert!(!use_compaction_semaphore); + CONCURRENT_BACKGROUND_TASKS.acquire().await + } + .expect("should never close"); let waited = recorder.acquired(); if waited >= WARN_THRESHOLD { diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 908356c459..770ea418d1 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -1719,8 +1719,9 @@ impl Timeline { let guard = self.compaction_lock.lock().await; let permit = super::tasks::concurrent_background_tasks_rate_limit_permit( - BackgroundLoopKind::Compaction, ctx, + BackgroundLoopKind::Compaction, + self.conf.use_compaction_semaphore, ) .await; @@ -3057,8 +3058,9 @@ impl Timeline { let skip_concurrency_limiter = &skip_concurrency_limiter; async move { let wait_for_permit = super::tasks::concurrent_background_tasks_rate_limit_permit( - BackgroundLoopKind::InitialLogicalSizeCalculation, background_ctx, + BackgroundLoopKind::InitialLogicalSizeCalculation, + false, ); use crate::metrics::initial_logical_size::StartCircumstances; diff --git a/pageserver/src/tenant/timeline/eviction_task.rs b/pageserver/src/tenant/timeline/eviction_task.rs index 9836aafecb..985329136e 100644 --- a/pageserver/src/tenant/timeline/eviction_task.rs +++ b/pageserver/src/tenant/timeline/eviction_task.rs @@ -335,8 +335,9 @@ impl Timeline { ctx: &RequestContext, ) -> ControlFlow<(), BackgroundLoopSemaphorePermit<'static>> { let acquire_permit = crate::tenant::tasks::concurrent_background_tasks_rate_limit_permit( - BackgroundLoopKind::Eviction, ctx, + BackgroundLoopKind::Eviction, + false, ); tokio::select! {