mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-06 04:52:55 +00:00
feat(pageserver): yield gc-compaction to L0 compaction (#11120)
## Problem Part of https://github.com/neondatabase/neon/issues/9114 ## Summary of changes gc-compaction could take a long time in some cases, for example, if the job split heuristics is wrong and we selected a too large region for compaction that can't be finished within a reasonable amount of time. We will give up such tasks and yield to L0 compaction. Each gc-compaction sub-compaction job is atomic and cannot be split further so we have to give up (instead of storing a state and continue later as in image creation). --------- Signed-off-by: Alex Chi Z <chi@neon.tech>
This commit is contained in:
@@ -20,6 +20,7 @@ use anyhow::{Context, anyhow};
|
||||
use bytes::Bytes;
|
||||
use enumset::EnumSet;
|
||||
use fail::fail_point;
|
||||
use futures::FutureExt;
|
||||
use itertools::Itertools;
|
||||
use once_cell::sync::Lazy;
|
||||
use pageserver_api::config::tenant_conf_defaults::DEFAULT_CHECKPOINT_DISTANCE;
|
||||
@@ -443,6 +444,7 @@ impl GcCompactionQueue {
|
||||
));
|
||||
};
|
||||
let has_pending_tasks;
|
||||
let mut yield_for_l0 = false;
|
||||
let Some((id, item)) = ({
|
||||
let mut guard = self.inner.lock().unwrap();
|
||||
if let Some((id, item)) = guard.queued.pop_front() {
|
||||
@@ -492,13 +494,23 @@ impl GcCompactionQueue {
|
||||
let mut guard = self.inner.lock().unwrap();
|
||||
guard.guards.entry(id).or_default().gc_guard = Some(gc_guard);
|
||||
}
|
||||
let _ = timeline.compact_with_options(cancel, options, ctx).await?;
|
||||
let compaction_result =
|
||||
timeline.compact_with_options(cancel, options, ctx).await?;
|
||||
self.notify_and_unblock(id);
|
||||
if compaction_result == CompactionOutcome::YieldForL0 {
|
||||
yield_for_l0 = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
GcCompactionQueueItem::SubCompactionJob(options) => {
|
||||
// TODO: error handling, clear the queue if any task fails?
|
||||
let _ = timeline.compact_with_options(cancel, options, ctx).await?;
|
||||
let compaction_result = timeline.compact_with_options(cancel, options, ctx).await?;
|
||||
if compaction_result == CompactionOutcome::YieldForL0 {
|
||||
// We will permenantly give up a task if we yield for L0 compaction: the preempted subcompaction job won't be running
|
||||
// again. This ensures that we don't keep doing duplicated work within gc-compaction. Not directly returning here because
|
||||
// we need to clean things up before returning from the function.
|
||||
yield_for_l0 = true;
|
||||
}
|
||||
}
|
||||
GcCompactionQueueItem::Notify(id, l2_lsn) => {
|
||||
self.notify_and_unblock(id);
|
||||
@@ -527,7 +539,10 @@ impl GcCompactionQueue {
|
||||
let mut guard = self.inner.lock().unwrap();
|
||||
guard.running = None;
|
||||
}
|
||||
Ok(if has_pending_tasks {
|
||||
Ok(if yield_for_l0 {
|
||||
tracing::info!("give up gc-compaction: yield for L0 compaction");
|
||||
CompactionOutcome::YieldForL0
|
||||
} else if has_pending_tasks {
|
||||
CompactionOutcome::Pending
|
||||
} else {
|
||||
CompactionOutcome::Done
|
||||
@@ -2598,7 +2613,7 @@ impl Timeline {
|
||||
cancel: &CancellationToken,
|
||||
options: CompactOptions,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<(), CompactionError> {
|
||||
) -> Result<CompactionOutcome, CompactionError> {
|
||||
let sub_compaction = options.sub_compaction;
|
||||
let job = GcCompactJob::from_compact_options(options.clone());
|
||||
if sub_compaction {
|
||||
@@ -2620,7 +2635,7 @@ impl Timeline {
|
||||
if jobs_len == 0 {
|
||||
info!("no jobs to run, skipping gc bottom-most compaction");
|
||||
}
|
||||
return Ok(());
|
||||
return Ok(CompactionOutcome::Done);
|
||||
}
|
||||
self.compact_with_gc_inner(cancel, job, ctx).await
|
||||
}
|
||||
@@ -2630,7 +2645,7 @@ impl Timeline {
|
||||
cancel: &CancellationToken,
|
||||
job: GcCompactJob,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<(), CompactionError> {
|
||||
) -> Result<CompactionOutcome, CompactionError> {
|
||||
// Block other compaction/GC tasks from running for now. GC-compaction could run along
|
||||
// with legacy compaction tasks in the future. Always ensure the lock order is compaction -> gc.
|
||||
// Note that we already acquired the compaction lock when the outer `compact` function gets called.
|
||||
@@ -2699,7 +2714,7 @@ impl Timeline {
|
||||
tracing::warn!(
|
||||
"no layers to compact with gc: gc_cutoff not generated yet, skipping gc bottom-most compaction"
|
||||
);
|
||||
return Ok(());
|
||||
return Ok(CompactionOutcome::Skipped);
|
||||
}
|
||||
real_gc_cutoff
|
||||
} else {
|
||||
@@ -2737,7 +2752,7 @@ impl Timeline {
|
||||
"no layers to compact with gc: no historic layers below gc_cutoff, gc_cutoff={}",
|
||||
gc_cutoff
|
||||
);
|
||||
return Ok(());
|
||||
return Ok(CompactionOutcome::Done);
|
||||
};
|
||||
// Next, if the user specifies compact_lsn_range.start, we need to filter some layers out. All the layers (strictly) below
|
||||
// the min_layer_lsn computed as below will be filtered out and the data will be accessed using the normal read path, as if
|
||||
@@ -2758,7 +2773,7 @@ impl Timeline {
|
||||
"no layers to compact with gc: no historic layers above compact_above_lsn, compact_above_lsn={}",
|
||||
compact_lsn_range.end
|
||||
);
|
||||
return Ok(());
|
||||
return Ok(CompactionOutcome::Done);
|
||||
};
|
||||
// Then, pick all the layers that are below the max_layer_lsn. This is to ensure we can pick all single-key
|
||||
// layers to compact.
|
||||
@@ -2784,7 +2799,7 @@ impl Timeline {
|
||||
"no layers to compact with gc: no layers within the key range, gc_cutoff={}, key_range={}..{}",
|
||||
gc_cutoff, compact_key_range.start, compact_key_range.end
|
||||
);
|
||||
return Ok(());
|
||||
return Ok(CompactionOutcome::Done);
|
||||
}
|
||||
retain_lsns_below_horizon.sort();
|
||||
GcCompactionJobDescription {
|
||||
@@ -2899,6 +2914,15 @@ impl Timeline {
|
||||
if cancel.is_cancelled() {
|
||||
return Err(CompactionError::ShuttingDown);
|
||||
}
|
||||
let should_yield = self
|
||||
.l0_compaction_trigger
|
||||
.notified()
|
||||
.now_or_never()
|
||||
.is_some();
|
||||
if should_yield {
|
||||
tracing::info!("preempt gc-compaction when downloading layers: too many L0 layers");
|
||||
return Ok(CompactionOutcome::YieldForL0);
|
||||
}
|
||||
let resident_layer = layer
|
||||
.download_and_keep_resident(ctx)
|
||||
.await
|
||||
@@ -3019,6 +3043,8 @@ impl Timeline {
|
||||
// the key and LSN range are determined. However, to keep things simple here, we still
|
||||
// create this writer, and discard the writer in the end.
|
||||
|
||||
let mut keys_processed = 0;
|
||||
|
||||
while let Some(((key, lsn, val), desc)) = merge_iter
|
||||
.next_with_trace()
|
||||
.await
|
||||
@@ -3028,6 +3054,18 @@ impl Timeline {
|
||||
if cancel.is_cancelled() {
|
||||
return Err(CompactionError::ShuttingDown);
|
||||
}
|
||||
keys_processed += 1;
|
||||
if keys_processed % 1000 == 0 {
|
||||
let should_yield = self
|
||||
.l0_compaction_trigger
|
||||
.notified()
|
||||
.now_or_never()
|
||||
.is_some();
|
||||
if should_yield {
|
||||
tracing::info!("preempt gc-compaction in the main loop: too many L0 layers");
|
||||
return Ok(CompactionOutcome::YieldForL0);
|
||||
}
|
||||
}
|
||||
if self.shard_identity.is_key_disposable(&key) {
|
||||
// If this shard does not need to store this key, simply skip it.
|
||||
//
|
||||
@@ -3360,7 +3398,7 @@ impl Timeline {
|
||||
);
|
||||
|
||||
if dry_run {
|
||||
return Ok(());
|
||||
return Ok(CompactionOutcome::Done);
|
||||
}
|
||||
|
||||
info!(
|
||||
@@ -3479,7 +3517,7 @@ impl Timeline {
|
||||
|
||||
drop(gc_lock);
|
||||
|
||||
Ok(())
|
||||
Ok(CompactionOutcome::Done)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user