diff --git a/Cargo.lock b/Cargo.lock
index 05a70bfe55..506e5f6c7c 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -2545,6 +2545,7 @@ dependencies = [
"metrics",
"nix",
"num-traits",
+ "num_cpus",
"once_cell",
"pageserver_api",
"pin-project-lite",
diff --git a/pageserver/Cargo.toml b/pageserver/Cargo.toml
index 27e90ea97d..d709fac5cb 100644
--- a/pageserver/Cargo.toml
+++ b/pageserver/Cargo.toml
@@ -35,6 +35,8 @@ humantime-serde.workspace = true
hyper.workspace = true
itertools.workspace = true
nix.workspace = true
+# hack to get the number of worker threads tokio uses
+num_cpus = { version = "1.15" }
num-traits.workspace = true
once_cell.workspace = true
pin-project-lite.workspace = true
diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs
index 08fb917fb6..c10223beed 100644
--- a/pageserver/src/http/routes.rs
+++ b/pageserver/src/http/routes.rs
@@ -994,31 +994,29 @@ async fn timeline_gc_handler(
// Run compaction immediately on given timeline.
async fn timeline_compact_handler(
request: Request
,
- _cancel: CancellationToken,
+ cancel: CancellationToken,
) -> Result, ApiError> {
let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
check_permission(&request, Some(tenant_id))?;
- let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
- let result_receiver = mgr::immediate_compact(tenant_id, timeline_id, &ctx)
- .await
- .context("spawn compaction task")
- .map_err(ApiError::InternalServerError)?;
-
- let result: anyhow::Result<()> = result_receiver
- .await
- .context("receive compaction result")
- .map_err(ApiError::InternalServerError)?;
- result.map_err(ApiError::InternalServerError)?;
-
- json_response(StatusCode::OK, ())
+ async {
+ let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
+ let timeline = active_timeline_of_active_tenant(tenant_id, timeline_id).await?;
+ timeline
+ .compact(&cancel, &ctx)
+ .await
+ .map_err(ApiError::InternalServerError)?;
+ json_response(StatusCode::OK, ())
+ }
+ .instrument(info_span!("manual_compaction", %tenant_id, %timeline_id))
+ .await
}
// Run checkpoint immediately on given timeline.
async fn timeline_checkpoint_handler(
request: Request,
- _cancel: CancellationToken,
+ cancel: CancellationToken,
) -> Result, ApiError> {
let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
@@ -1031,13 +1029,13 @@ async fn timeline_checkpoint_handler(
.await
.map_err(ApiError::InternalServerError)?;
timeline
- .compact(&ctx)
+ .compact(&cancel, &ctx)
.await
.map_err(ApiError::InternalServerError)?;
json_response(StatusCode::OK, ())
}
- .instrument(info_span!("manual_checkpoint", tenant_id = %tenant_id, timeline_id = %timeline_id))
+ .instrument(info_span!("manual_checkpoint", %tenant_id, %timeline_id))
.await
}
diff --git a/pageserver/src/task_mgr.rs b/pageserver/src/task_mgr.rs
index f3a4ce6db7..3c7a1115df 100644
--- a/pageserver/src/task_mgr.rs
+++ b/pageserver/src/task_mgr.rs
@@ -130,11 +130,25 @@ pub static WALRECEIVER_RUNTIME: Lazy = Lazy::new(|| {
pub static BACKGROUND_RUNTIME: Lazy = Lazy::new(|| {
tokio::runtime::Builder::new_multi_thread()
.thread_name("background op worker")
+ // if you change the number of worker threads please change the constant below
.enable_all()
.build()
.expect("Failed to create background op runtime")
});
+pub(crate) static BACKGROUND_RUNTIME_WORKER_THREADS: Lazy = Lazy::new(|| {
+ // force init and thus panics
+ let _ = BACKGROUND_RUNTIME.handle();
+ // replicates tokio-1.28.1::loom::sys::num_cpus which is not available publicly
+ // tokio would had already panicked for parsing errors or NotUnicode
+ //
+ // this will be wrong if any of the runtimes gets their worker threads configured to something
+ // else, but that has not been needed in a long time.
+ std::env::var("TOKIO_WORKER_THREADS")
+ .map(|s| s.parse::().unwrap())
+ .unwrap_or_else(|_e| usize::max(1, num_cpus::get()))
+});
+
#[derive(Debug, Clone, Copy)]
pub struct PageserverTaskId(u64);
@@ -545,7 +559,7 @@ pub fn current_task_id() -> Option {
pub async fn shutdown_watcher() {
let token = SHUTDOWN_TOKEN
.try_with(|t| t.clone())
- .expect("shutdown_requested() called in an unexpected task or thread");
+ .expect("shutdown_watcher() called in an unexpected task or thread");
token.cancelled().await;
}
diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs
index 81dce84b04..3bf88f2729 100644
--- a/pageserver/src/tenant.rs
+++ b/pageserver/src/tenant.rs
@@ -20,6 +20,7 @@ use storage_broker::BrokerClientChannel;
use tokio::sync::watch;
use tokio::sync::OwnedMutexGuard;
use tokio::task::JoinSet;
+use tokio_util::sync::CancellationToken;
use tracing::*;
use utils::completion;
use utils::crashsafe::path_with_suffix_extension;
@@ -1335,7 +1336,11 @@ impl Tenant {
/// This function is periodically called by compactor task.
/// Also it can be explicitly requested per timeline through page server
/// api's 'compact' command.
- pub async fn compaction_iteration(&self, ctx: &RequestContext) -> anyhow::Result<()> {
+ pub async fn compaction_iteration(
+ &self,
+ cancel: &CancellationToken,
+ ctx: &RequestContext,
+ ) -> anyhow::Result<()> {
anyhow::ensure!(
self.is_active(),
"Cannot run compaction iteration on inactive tenant"
@@ -1363,7 +1368,7 @@ impl Tenant {
for (timeline_id, timeline) in &timelines_to_compact {
timeline
- .compact(ctx)
+ .compact(cancel, ctx)
.instrument(info_span!("compact_timeline", %timeline_id))
.await?;
}
@@ -3449,6 +3454,7 @@ mod tests {
use hex_literal::hex;
use once_cell::sync::Lazy;
use rand::{thread_rng, Rng};
+ use tokio_util::sync::CancellationToken;
static TEST_KEY: Lazy =
Lazy::new(|| Key::from_slice(&hex!("112222222233333333444444445500000001")));
@@ -3970,7 +3976,7 @@ mod tests {
drop(writer);
tline.freeze_and_flush().await?;
- tline.compact(&ctx).await?;
+ tline.compact(&CancellationToken::new(), &ctx).await?;
let writer = tline.writer().await;
writer
@@ -3980,7 +3986,7 @@ mod tests {
drop(writer);
tline.freeze_and_flush().await?;
- tline.compact(&ctx).await?;
+ tline.compact(&CancellationToken::new(), &ctx).await?;
let writer = tline.writer().await;
writer
@@ -3990,7 +3996,7 @@ mod tests {
drop(writer);
tline.freeze_and_flush().await?;
- tline.compact(&ctx).await?;
+ tline.compact(&CancellationToken::new(), &ctx).await?;
let writer = tline.writer().await;
writer
@@ -4000,7 +4006,7 @@ mod tests {
drop(writer);
tline.freeze_and_flush().await?;
- tline.compact(&ctx).await?;
+ tline.compact(&CancellationToken::new(), &ctx).await?;
assert_eq!(
tline.get(*TEST_KEY, Lsn(0x10), &ctx).await?,
@@ -4069,7 +4075,7 @@ mod tests {
.update_gc_info(Vec::new(), cutoff, Duration::ZERO, &ctx)
.await?;
tline.freeze_and_flush().await?;
- tline.compact(&ctx).await?;
+ tline.compact(&CancellationToken::new(), &ctx).await?;
tline.gc().await?;
}
@@ -4146,7 +4152,7 @@ mod tests {
.update_gc_info(Vec::new(), cutoff, Duration::ZERO, &ctx)
.await?;
tline.freeze_and_flush().await?;
- tline.compact(&ctx).await?;
+ tline.compact(&CancellationToken::new(), &ctx).await?;
tline.gc().await?;
}
@@ -4234,7 +4240,7 @@ mod tests {
.update_gc_info(Vec::new(), cutoff, Duration::ZERO, &ctx)
.await?;
tline.freeze_and_flush().await?;
- tline.compact(&ctx).await?;
+ tline.compact(&CancellationToken::new(), &ctx).await?;
tline.gc().await?;
}
diff --git a/pageserver/src/tenant/mgr.rs b/pageserver/src/tenant/mgr.rs
index 4b97871f35..eeb84caf13 100644
--- a/pageserver/src/tenant/mgr.rs
+++ b/pageserver/src/tenant/mgr.rs
@@ -768,55 +768,6 @@ pub async fn immediate_gc(
Ok(wait_task_done)
}
-pub async fn immediate_compact(
- tenant_id: TenantId,
- timeline_id: TimelineId,
- ctx: &RequestContext,
-) -> Result>, ApiError> {
- let guard = TENANTS.read().await;
-
- let tenant = guard
- .get(&tenant_id)
- .map(Arc::clone)
- .with_context(|| format!("tenant {tenant_id}"))
- .map_err(|e| ApiError::NotFound(e.into()))?;
-
- let timeline = tenant
- .get_timeline(timeline_id, true)
- .map_err(|e| ApiError::NotFound(e.into()))?;
-
- // Run in task_mgr to avoid race with tenant_detach operation
- let ctx = ctx.detached_child(TaskKind::Compaction, DownloadBehavior::Download);
- let (task_done, wait_task_done) = tokio::sync::oneshot::channel();
- task_mgr::spawn(
- &tokio::runtime::Handle::current(),
- TaskKind::Compaction,
- Some(tenant_id),
- Some(timeline_id),
- &format!(
- "timeline_compact_handler compaction run for tenant {tenant_id} timeline {timeline_id}"
- ),
- false,
- async move {
- let result = timeline
- .compact(&ctx)
- .instrument(info_span!("manual_compact", %tenant_id, %timeline_id))
- .await;
-
- match task_done.send(result) {
- Ok(_) => (),
- Err(result) => error!("failed to send compaction result: {result:?}"),
- }
- Ok(())
- },
- );
-
- // drop the guard until after we've spawned the task so that timeline shutdown will wait for the task
- drop(guard);
-
- Ok(wait_task_done)
-}
-
#[cfg(test)]
mod tests {
use std::collections::HashMap;
diff --git a/pageserver/src/tenant/tasks.rs b/pageserver/src/tenant/tasks.rs
index 622ae371a4..a758d4da23 100644
--- a/pageserver/src/tenant/tasks.rs
+++ b/pageserver/src/tenant/tasks.rs
@@ -111,7 +111,7 @@ async fn compaction_loop(tenant: Arc, cancel: CancellationToken) {
Duration::from_secs(10)
} else {
// Run compaction
- if let Err(e) = tenant.compaction_iteration(&ctx).await {
+ if let Err(e) = tenant.compaction_iteration(&cancel, &ctx).await {
error!("Compaction failed, retrying in {:?}: {e:?}", wait_duration);
wait_duration
} else {
diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs
index 3413282c5d..a7daf09021 100644
--- a/pageserver/src/tenant/timeline.rs
+++ b/pageserver/src/tenant/timeline.rs
@@ -611,9 +611,46 @@ impl Timeline {
}
/// Outermost timeline compaction operation; downloads needed layers.
- pub async fn compact(self: &Arc, ctx: &RequestContext) -> anyhow::Result<()> {
+ pub async fn compact(
+ self: &Arc,
+ cancel: &CancellationToken,
+ ctx: &RequestContext,
+ ) -> anyhow::Result<()> {
const ROUNDS: usize = 2;
+ static CONCURRENT_COMPACTIONS: once_cell::sync::Lazy =
+ once_cell::sync::Lazy::new(|| {
+ let total_threads = *task_mgr::BACKGROUND_RUNTIME_WORKER_THREADS;
+ let permits = usize::max(
+ 1,
+ // while a lot of the work is done on spawn_blocking, we still do
+ // repartitioning in the async context. this should give leave us some workers
+ // unblocked to be blocked on other work, hopefully easing any outside visible
+ // effects of restarts.
+ //
+ // 6/8 is a guess; previously we ran with unlimited 8 and more from
+ // spawn_blocking.
+ (total_threads * 3).checked_div(4).unwrap_or(0),
+ );
+ assert_ne!(permits, 0, "we will not be adding in permits later");
+ assert!(
+ permits < total_threads,
+ "need threads avail for shorter work"
+ );
+ tokio::sync::Semaphore::new(permits)
+ });
+
+ // this wait probably never needs any "long time spent" logging, because we already nag if
+ // compaction task goes over it's period (20s) which is quite often in production.
+ let _permit = tokio::select! {
+ permit = CONCURRENT_COMPACTIONS.acquire() => {
+ permit
+ },
+ _ = cancel.cancelled() => {
+ return Ok(());
+ }
+ };
+
let last_record_lsn = self.get_last_record_lsn();
// Last record Lsn could be zero in case the timeline was just created
@@ -671,11 +708,9 @@ impl Timeline {
let mut failed = 0;
- let mut cancelled = pin!(task_mgr::shutdown_watcher());
-
loop {
tokio::select! {
- _ = &mut cancelled => anyhow::bail!("Cancelled while downloading remote layers"),
+ _ = cancel.cancelled() => anyhow::bail!("Cancelled while downloading remote layers"),
res = downloads.next() => {
match res {
Some(Ok(())) => {},