From e825974a2d2791611c5278e36e67560f61100ebd Mon Sep 17 00:00:00 2001 From: "Alex Chi Z." <4198311+skyzh@users.noreply.github.com> Date: Thu, 6 Mar 2025 15:30:11 -0500 Subject: [PATCH] feat(pageserver): yield gc-compaction to L0 compaction (#11120) ## Problem Part of https://github.com/neondatabase/neon/issues/9114 ## Summary of changes gc-compaction could take a long time in some cases, for example, if the job split heuristics is wrong and we selected a too large region for compaction that can't be finished within a reasonable amount of time. We will give up such tasks and yield to L0 compaction. Each gc-compaction sub-compaction job is atomic and cannot be split further so we have to give up (instead of storing a state and continue later as in image creation). --------- Signed-off-by: Alex Chi Z --- pageserver/src/tenant/timeline/compaction.rs | 62 ++++++++++++++++---- 1 file changed, 50 insertions(+), 12 deletions(-) diff --git a/pageserver/src/tenant/timeline/compaction.rs b/pageserver/src/tenant/timeline/compaction.rs index 8fa79ddb22..42b36f7252 100644 --- a/pageserver/src/tenant/timeline/compaction.rs +++ b/pageserver/src/tenant/timeline/compaction.rs @@ -20,6 +20,7 @@ use anyhow::{Context, anyhow}; use bytes::Bytes; use enumset::EnumSet; use fail::fail_point; +use futures::FutureExt; use itertools::Itertools; use once_cell::sync::Lazy; use pageserver_api::config::tenant_conf_defaults::DEFAULT_CHECKPOINT_DISTANCE; @@ -443,6 +444,7 @@ impl GcCompactionQueue { )); }; let has_pending_tasks; + let mut yield_for_l0 = false; let Some((id, item)) = ({ let mut guard = self.inner.lock().unwrap(); if let Some((id, item)) = guard.queued.pop_front() { @@ -492,13 +494,23 @@ impl GcCompactionQueue { let mut guard = self.inner.lock().unwrap(); guard.guards.entry(id).or_default().gc_guard = Some(gc_guard); } - let _ = timeline.compact_with_options(cancel, options, ctx).await?; + let compaction_result = + timeline.compact_with_options(cancel, options, ctx).await?; self.notify_and_unblock(id); + if compaction_result == CompactionOutcome::YieldForL0 { + yield_for_l0 = true; + } } } GcCompactionQueueItem::SubCompactionJob(options) => { // TODO: error handling, clear the queue if any task fails? - let _ = timeline.compact_with_options(cancel, options, ctx).await?; + let compaction_result = timeline.compact_with_options(cancel, options, ctx).await?; + if compaction_result == CompactionOutcome::YieldForL0 { + // We will permenantly give up a task if we yield for L0 compaction: the preempted subcompaction job won't be running + // again. This ensures that we don't keep doing duplicated work within gc-compaction. Not directly returning here because + // we need to clean things up before returning from the function. + yield_for_l0 = true; + } } GcCompactionQueueItem::Notify(id, l2_lsn) => { self.notify_and_unblock(id); @@ -527,7 +539,10 @@ impl GcCompactionQueue { let mut guard = self.inner.lock().unwrap(); guard.running = None; } - Ok(if has_pending_tasks { + Ok(if yield_for_l0 { + tracing::info!("give up gc-compaction: yield for L0 compaction"); + CompactionOutcome::YieldForL0 + } else if has_pending_tasks { CompactionOutcome::Pending } else { CompactionOutcome::Done @@ -2598,7 +2613,7 @@ impl Timeline { cancel: &CancellationToken, options: CompactOptions, ctx: &RequestContext, - ) -> Result<(), CompactionError> { + ) -> Result { let sub_compaction = options.sub_compaction; let job = GcCompactJob::from_compact_options(options.clone()); if sub_compaction { @@ -2620,7 +2635,7 @@ impl Timeline { if jobs_len == 0 { info!("no jobs to run, skipping gc bottom-most compaction"); } - return Ok(()); + return Ok(CompactionOutcome::Done); } self.compact_with_gc_inner(cancel, job, ctx).await } @@ -2630,7 +2645,7 @@ impl Timeline { cancel: &CancellationToken, job: GcCompactJob, ctx: &RequestContext, - ) -> Result<(), CompactionError> { + ) -> Result { // Block other compaction/GC tasks from running for now. GC-compaction could run along // with legacy compaction tasks in the future. Always ensure the lock order is compaction -> gc. // Note that we already acquired the compaction lock when the outer `compact` function gets called. @@ -2699,7 +2714,7 @@ impl Timeline { tracing::warn!( "no layers to compact with gc: gc_cutoff not generated yet, skipping gc bottom-most compaction" ); - return Ok(()); + return Ok(CompactionOutcome::Skipped); } real_gc_cutoff } else { @@ -2737,7 +2752,7 @@ impl Timeline { "no layers to compact with gc: no historic layers below gc_cutoff, gc_cutoff={}", gc_cutoff ); - return Ok(()); + return Ok(CompactionOutcome::Done); }; // Next, if the user specifies compact_lsn_range.start, we need to filter some layers out. All the layers (strictly) below // the min_layer_lsn computed as below will be filtered out and the data will be accessed using the normal read path, as if @@ -2758,7 +2773,7 @@ impl Timeline { "no layers to compact with gc: no historic layers above compact_above_lsn, compact_above_lsn={}", compact_lsn_range.end ); - return Ok(()); + return Ok(CompactionOutcome::Done); }; // Then, pick all the layers that are below the max_layer_lsn. This is to ensure we can pick all single-key // layers to compact. @@ -2784,7 +2799,7 @@ impl Timeline { "no layers to compact with gc: no layers within the key range, gc_cutoff={}, key_range={}..{}", gc_cutoff, compact_key_range.start, compact_key_range.end ); - return Ok(()); + return Ok(CompactionOutcome::Done); } retain_lsns_below_horizon.sort(); GcCompactionJobDescription { @@ -2899,6 +2914,15 @@ impl Timeline { if cancel.is_cancelled() { return Err(CompactionError::ShuttingDown); } + let should_yield = self + .l0_compaction_trigger + .notified() + .now_or_never() + .is_some(); + if should_yield { + tracing::info!("preempt gc-compaction when downloading layers: too many L0 layers"); + return Ok(CompactionOutcome::YieldForL0); + } let resident_layer = layer .download_and_keep_resident(ctx) .await @@ -3019,6 +3043,8 @@ impl Timeline { // the key and LSN range are determined. However, to keep things simple here, we still // create this writer, and discard the writer in the end. + let mut keys_processed = 0; + while let Some(((key, lsn, val), desc)) = merge_iter .next_with_trace() .await @@ -3028,6 +3054,18 @@ impl Timeline { if cancel.is_cancelled() { return Err(CompactionError::ShuttingDown); } + keys_processed += 1; + if keys_processed % 1000 == 0 { + let should_yield = self + .l0_compaction_trigger + .notified() + .now_or_never() + .is_some(); + if should_yield { + tracing::info!("preempt gc-compaction in the main loop: too many L0 layers"); + return Ok(CompactionOutcome::YieldForL0); + } + } if self.shard_identity.is_key_disposable(&key) { // If this shard does not need to store this key, simply skip it. // @@ -3360,7 +3398,7 @@ impl Timeline { ); if dry_run { - return Ok(()); + return Ok(CompactionOutcome::Done); } info!( @@ -3479,7 +3517,7 @@ impl Timeline { drop(gc_lock); - Ok(()) + Ok(CompactionOutcome::Done) } }