mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-20 06:30:43 +00:00
Limit concurrent compactions (#4777)
Compactions can create a lot of concurrent work right now with #4265. Limit compactions to use at most 6/8 background runtime threads.
This commit is contained in:
1
Cargo.lock
generated
1
Cargo.lock
generated
@@ -2545,6 +2545,7 @@ dependencies = [
|
||||
"metrics",
|
||||
"nix",
|
||||
"num-traits",
|
||||
"num_cpus",
|
||||
"once_cell",
|
||||
"pageserver_api",
|
||||
"pin-project-lite",
|
||||
|
||||
@@ -35,6 +35,8 @@ humantime-serde.workspace = true
|
||||
hyper.workspace = true
|
||||
itertools.workspace = true
|
||||
nix.workspace = true
|
||||
# hack to get the number of worker threads tokio uses
|
||||
num_cpus = { version = "1.15" }
|
||||
num-traits.workspace = true
|
||||
once_cell.workspace = true
|
||||
pin-project-lite.workspace = true
|
||||
|
||||
@@ -994,31 +994,29 @@ async fn timeline_gc_handler(
|
||||
// Run compaction immediately on given timeline.
|
||||
async fn timeline_compact_handler(
|
||||
request: Request<Body>,
|
||||
_cancel: CancellationToken,
|
||||
cancel: CancellationToken,
|
||||
) -> Result<Response<Body>, ApiError> {
|
||||
let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
|
||||
let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
|
||||
check_permission(&request, Some(tenant_id))?;
|
||||
|
||||
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
|
||||
let result_receiver = mgr::immediate_compact(tenant_id, timeline_id, &ctx)
|
||||
.await
|
||||
.context("spawn compaction task")
|
||||
.map_err(ApiError::InternalServerError)?;
|
||||
|
||||
let result: anyhow::Result<()> = result_receiver
|
||||
.await
|
||||
.context("receive compaction result")
|
||||
.map_err(ApiError::InternalServerError)?;
|
||||
result.map_err(ApiError::InternalServerError)?;
|
||||
|
||||
json_response(StatusCode::OK, ())
|
||||
async {
|
||||
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
|
||||
let timeline = active_timeline_of_active_tenant(tenant_id, timeline_id).await?;
|
||||
timeline
|
||||
.compact(&cancel, &ctx)
|
||||
.await
|
||||
.map_err(ApiError::InternalServerError)?;
|
||||
json_response(StatusCode::OK, ())
|
||||
}
|
||||
.instrument(info_span!("manual_compaction", %tenant_id, %timeline_id))
|
||||
.await
|
||||
}
|
||||
|
||||
// Run checkpoint immediately on given timeline.
|
||||
async fn timeline_checkpoint_handler(
|
||||
request: Request<Body>,
|
||||
_cancel: CancellationToken,
|
||||
cancel: CancellationToken,
|
||||
) -> Result<Response<Body>, ApiError> {
|
||||
let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
|
||||
let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
|
||||
@@ -1031,13 +1029,13 @@ async fn timeline_checkpoint_handler(
|
||||
.await
|
||||
.map_err(ApiError::InternalServerError)?;
|
||||
timeline
|
||||
.compact(&ctx)
|
||||
.compact(&cancel, &ctx)
|
||||
.await
|
||||
.map_err(ApiError::InternalServerError)?;
|
||||
|
||||
json_response(StatusCode::OK, ())
|
||||
}
|
||||
.instrument(info_span!("manual_checkpoint", tenant_id = %tenant_id, timeline_id = %timeline_id))
|
||||
.instrument(info_span!("manual_checkpoint", %tenant_id, %timeline_id))
|
||||
.await
|
||||
}
|
||||
|
||||
|
||||
@@ -130,11 +130,25 @@ pub static WALRECEIVER_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
|
||||
pub static BACKGROUND_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
|
||||
tokio::runtime::Builder::new_multi_thread()
|
||||
.thread_name("background op worker")
|
||||
// if you change the number of worker threads please change the constant below
|
||||
.enable_all()
|
||||
.build()
|
||||
.expect("Failed to create background op runtime")
|
||||
});
|
||||
|
||||
pub(crate) static BACKGROUND_RUNTIME_WORKER_THREADS: Lazy<usize> = Lazy::new(|| {
|
||||
// force init and thus panics
|
||||
let _ = BACKGROUND_RUNTIME.handle();
|
||||
// replicates tokio-1.28.1::loom::sys::num_cpus which is not available publicly
|
||||
// tokio would had already panicked for parsing errors or NotUnicode
|
||||
//
|
||||
// this will be wrong if any of the runtimes gets their worker threads configured to something
|
||||
// else, but that has not been needed in a long time.
|
||||
std::env::var("TOKIO_WORKER_THREADS")
|
||||
.map(|s| s.parse::<usize>().unwrap())
|
||||
.unwrap_or_else(|_e| usize::max(1, num_cpus::get()))
|
||||
});
|
||||
|
||||
#[derive(Debug, Clone, Copy)]
|
||||
pub struct PageserverTaskId(u64);
|
||||
|
||||
@@ -545,7 +559,7 @@ pub fn current_task_id() -> Option<PageserverTaskId> {
|
||||
pub async fn shutdown_watcher() {
|
||||
let token = SHUTDOWN_TOKEN
|
||||
.try_with(|t| t.clone())
|
||||
.expect("shutdown_requested() called in an unexpected task or thread");
|
||||
.expect("shutdown_watcher() called in an unexpected task or thread");
|
||||
|
||||
token.cancelled().await;
|
||||
}
|
||||
|
||||
@@ -20,6 +20,7 @@ use storage_broker::BrokerClientChannel;
|
||||
use tokio::sync::watch;
|
||||
use tokio::sync::OwnedMutexGuard;
|
||||
use tokio::task::JoinSet;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::*;
|
||||
use utils::completion;
|
||||
use utils::crashsafe::path_with_suffix_extension;
|
||||
@@ -1335,7 +1336,11 @@ impl Tenant {
|
||||
/// This function is periodically called by compactor task.
|
||||
/// Also it can be explicitly requested per timeline through page server
|
||||
/// api's 'compact' command.
|
||||
pub async fn compaction_iteration(&self, ctx: &RequestContext) -> anyhow::Result<()> {
|
||||
pub async fn compaction_iteration(
|
||||
&self,
|
||||
cancel: &CancellationToken,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
anyhow::ensure!(
|
||||
self.is_active(),
|
||||
"Cannot run compaction iteration on inactive tenant"
|
||||
@@ -1363,7 +1368,7 @@ impl Tenant {
|
||||
|
||||
for (timeline_id, timeline) in &timelines_to_compact {
|
||||
timeline
|
||||
.compact(ctx)
|
||||
.compact(cancel, ctx)
|
||||
.instrument(info_span!("compact_timeline", %timeline_id))
|
||||
.await?;
|
||||
}
|
||||
@@ -3449,6 +3454,7 @@ mod tests {
|
||||
use hex_literal::hex;
|
||||
use once_cell::sync::Lazy;
|
||||
use rand::{thread_rng, Rng};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
|
||||
static TEST_KEY: Lazy<Key> =
|
||||
Lazy::new(|| Key::from_slice(&hex!("112222222233333333444444445500000001")));
|
||||
@@ -3970,7 +3976,7 @@ mod tests {
|
||||
drop(writer);
|
||||
|
||||
tline.freeze_and_flush().await?;
|
||||
tline.compact(&ctx).await?;
|
||||
tline.compact(&CancellationToken::new(), &ctx).await?;
|
||||
|
||||
let writer = tline.writer().await;
|
||||
writer
|
||||
@@ -3980,7 +3986,7 @@ mod tests {
|
||||
drop(writer);
|
||||
|
||||
tline.freeze_and_flush().await?;
|
||||
tline.compact(&ctx).await?;
|
||||
tline.compact(&CancellationToken::new(), &ctx).await?;
|
||||
|
||||
let writer = tline.writer().await;
|
||||
writer
|
||||
@@ -3990,7 +3996,7 @@ mod tests {
|
||||
drop(writer);
|
||||
|
||||
tline.freeze_and_flush().await?;
|
||||
tline.compact(&ctx).await?;
|
||||
tline.compact(&CancellationToken::new(), &ctx).await?;
|
||||
|
||||
let writer = tline.writer().await;
|
||||
writer
|
||||
@@ -4000,7 +4006,7 @@ mod tests {
|
||||
drop(writer);
|
||||
|
||||
tline.freeze_and_flush().await?;
|
||||
tline.compact(&ctx).await?;
|
||||
tline.compact(&CancellationToken::new(), &ctx).await?;
|
||||
|
||||
assert_eq!(
|
||||
tline.get(*TEST_KEY, Lsn(0x10), &ctx).await?,
|
||||
@@ -4069,7 +4075,7 @@ mod tests {
|
||||
.update_gc_info(Vec::new(), cutoff, Duration::ZERO, &ctx)
|
||||
.await?;
|
||||
tline.freeze_and_flush().await?;
|
||||
tline.compact(&ctx).await?;
|
||||
tline.compact(&CancellationToken::new(), &ctx).await?;
|
||||
tline.gc().await?;
|
||||
}
|
||||
|
||||
@@ -4146,7 +4152,7 @@ mod tests {
|
||||
.update_gc_info(Vec::new(), cutoff, Duration::ZERO, &ctx)
|
||||
.await?;
|
||||
tline.freeze_and_flush().await?;
|
||||
tline.compact(&ctx).await?;
|
||||
tline.compact(&CancellationToken::new(), &ctx).await?;
|
||||
tline.gc().await?;
|
||||
}
|
||||
|
||||
@@ -4234,7 +4240,7 @@ mod tests {
|
||||
.update_gc_info(Vec::new(), cutoff, Duration::ZERO, &ctx)
|
||||
.await?;
|
||||
tline.freeze_and_flush().await?;
|
||||
tline.compact(&ctx).await?;
|
||||
tline.compact(&CancellationToken::new(), &ctx).await?;
|
||||
tline.gc().await?;
|
||||
}
|
||||
|
||||
|
||||
@@ -768,55 +768,6 @@ pub async fn immediate_gc(
|
||||
Ok(wait_task_done)
|
||||
}
|
||||
|
||||
pub async fn immediate_compact(
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<tokio::sync::oneshot::Receiver<anyhow::Result<()>>, ApiError> {
|
||||
let guard = TENANTS.read().await;
|
||||
|
||||
let tenant = guard
|
||||
.get(&tenant_id)
|
||||
.map(Arc::clone)
|
||||
.with_context(|| format!("tenant {tenant_id}"))
|
||||
.map_err(|e| ApiError::NotFound(e.into()))?;
|
||||
|
||||
let timeline = tenant
|
||||
.get_timeline(timeline_id, true)
|
||||
.map_err(|e| ApiError::NotFound(e.into()))?;
|
||||
|
||||
// Run in task_mgr to avoid race with tenant_detach operation
|
||||
let ctx = ctx.detached_child(TaskKind::Compaction, DownloadBehavior::Download);
|
||||
let (task_done, wait_task_done) = tokio::sync::oneshot::channel();
|
||||
task_mgr::spawn(
|
||||
&tokio::runtime::Handle::current(),
|
||||
TaskKind::Compaction,
|
||||
Some(tenant_id),
|
||||
Some(timeline_id),
|
||||
&format!(
|
||||
"timeline_compact_handler compaction run for tenant {tenant_id} timeline {timeline_id}"
|
||||
),
|
||||
false,
|
||||
async move {
|
||||
let result = timeline
|
||||
.compact(&ctx)
|
||||
.instrument(info_span!("manual_compact", %tenant_id, %timeline_id))
|
||||
.await;
|
||||
|
||||
match task_done.send(result) {
|
||||
Ok(_) => (),
|
||||
Err(result) => error!("failed to send compaction result: {result:?}"),
|
||||
}
|
||||
Ok(())
|
||||
},
|
||||
);
|
||||
|
||||
// drop the guard until after we've spawned the task so that timeline shutdown will wait for the task
|
||||
drop(guard);
|
||||
|
||||
Ok(wait_task_done)
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::collections::HashMap;
|
||||
|
||||
@@ -111,7 +111,7 @@ async fn compaction_loop(tenant: Arc<Tenant>, cancel: CancellationToken) {
|
||||
Duration::from_secs(10)
|
||||
} else {
|
||||
// Run compaction
|
||||
if let Err(e) = tenant.compaction_iteration(&ctx).await {
|
||||
if let Err(e) = tenant.compaction_iteration(&cancel, &ctx).await {
|
||||
error!("Compaction failed, retrying in {:?}: {e:?}", wait_duration);
|
||||
wait_duration
|
||||
} else {
|
||||
|
||||
@@ -611,9 +611,46 @@ impl Timeline {
|
||||
}
|
||||
|
||||
/// Outermost timeline compaction operation; downloads needed layers.
|
||||
pub async fn compact(self: &Arc<Self>, ctx: &RequestContext) -> anyhow::Result<()> {
|
||||
pub async fn compact(
|
||||
self: &Arc<Self>,
|
||||
cancel: &CancellationToken,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
const ROUNDS: usize = 2;
|
||||
|
||||
static CONCURRENT_COMPACTIONS: once_cell::sync::Lazy<tokio::sync::Semaphore> =
|
||||
once_cell::sync::Lazy::new(|| {
|
||||
let total_threads = *task_mgr::BACKGROUND_RUNTIME_WORKER_THREADS;
|
||||
let permits = usize::max(
|
||||
1,
|
||||
// while a lot of the work is done on spawn_blocking, we still do
|
||||
// repartitioning in the async context. this should give leave us some workers
|
||||
// unblocked to be blocked on other work, hopefully easing any outside visible
|
||||
// effects of restarts.
|
||||
//
|
||||
// 6/8 is a guess; previously we ran with unlimited 8 and more from
|
||||
// spawn_blocking.
|
||||
(total_threads * 3).checked_div(4).unwrap_or(0),
|
||||
);
|
||||
assert_ne!(permits, 0, "we will not be adding in permits later");
|
||||
assert!(
|
||||
permits < total_threads,
|
||||
"need threads avail for shorter work"
|
||||
);
|
||||
tokio::sync::Semaphore::new(permits)
|
||||
});
|
||||
|
||||
// this wait probably never needs any "long time spent" logging, because we already nag if
|
||||
// compaction task goes over it's period (20s) which is quite often in production.
|
||||
let _permit = tokio::select! {
|
||||
permit = CONCURRENT_COMPACTIONS.acquire() => {
|
||||
permit
|
||||
},
|
||||
_ = cancel.cancelled() => {
|
||||
return Ok(());
|
||||
}
|
||||
};
|
||||
|
||||
let last_record_lsn = self.get_last_record_lsn();
|
||||
|
||||
// Last record Lsn could be zero in case the timeline was just created
|
||||
@@ -671,11 +708,9 @@ impl Timeline {
|
||||
|
||||
let mut failed = 0;
|
||||
|
||||
let mut cancelled = pin!(task_mgr::shutdown_watcher());
|
||||
|
||||
loop {
|
||||
tokio::select! {
|
||||
_ = &mut cancelled => anyhow::bail!("Cancelled while downloading remote layers"),
|
||||
_ = cancel.cancelled() => anyhow::bail!("Cancelled while downloading remote layers"),
|
||||
res = downloads.next() => {
|
||||
match res {
|
||||
Some(Ok(())) => {},
|
||||
|
||||
Reference in New Issue
Block a user