refactor(pageserver): clearify compaction return value (#10643)

## Problem

## Summary of changes

Make the return value of the set of compaction functions less confusing.

Signed-off-by: Alex Chi Z <chi@neon.tech>
This commit is contained in:
Alex Chi Z.
2025-02-03 16:56:55 -05:00
committed by GitHub
parent c1be84197e
commit e219d48bfe
4 changed files with 67 additions and 41 deletions

View File

@@ -46,6 +46,7 @@ use std::sync::atomic::AtomicBool;
use std::sync::Weak;
use std::time::SystemTime;
use storage_broker::BrokerClientChannel;
use timeline::compaction::CompactionOutcome;
use timeline::compaction::GcCompactionQueue;
use timeline::import_pgdata;
use timeline::offload::offload_timeline;
@@ -2907,10 +2908,10 @@ impl Tenant {
self: &Arc<Self>,
cancel: &CancellationToken,
ctx: &RequestContext,
) -> Result<bool, timeline::CompactionError> {
) -> Result<CompactionOutcome, timeline::CompactionError> {
// Don't start doing work during shutdown, or when broken, we do not need those in the logs
if !self.is_active() {
return Ok(false);
return Ok(CompactionOutcome::Done);
}
{
@@ -2924,7 +2925,7 @@ impl Tenant {
// to AttachedSingle state.
if !conf.location.may_upload_layers_hint() {
info!("Skipping compaction in location state {:?}", conf.location);
return Ok(false);
return Ok(CompactionOutcome::Done);
}
}
@@ -2967,7 +2968,7 @@ impl Tenant {
// Before doing any I/O work, check our circuit breaker
if self.compaction_circuit_breaker.lock().unwrap().is_broken() {
info!("Skipping compaction due to previous failures");
return Ok(false);
return Ok(CompactionOutcome::Done);
}
let mut has_pending_task = false;
@@ -2975,10 +2976,10 @@ impl Tenant {
for (timeline_id, timeline, (can_compact, can_offload)) in &timelines_to_compact_or_offload
{
// pending_task_left == None: cannot compact, maybe still pending tasks
// pending_task_left == Some(true): compaction task left
// pending_task_left == Some(false): no compaction task left
// pending_task_left == Some(Pending): compaction task left
// pending_task_left == Some(Done): no compaction task left
let pending_task_left = if *can_compact {
let has_pending_l0_compaction_task = timeline
let compaction_outcome = timeline
.compact(cancel, EnumSet::empty(), ctx)
.instrument(info_span!("compact_timeline", %timeline_id))
.await
@@ -2996,27 +2997,27 @@ impl Tenant {
.fail(&CIRCUIT_BREAKERS_BROKEN, e);
}
})?;
if has_pending_l0_compaction_task {
Some(true)
if let CompactionOutcome::Pending = compaction_outcome {
Some(CompactionOutcome::Pending)
} else {
let queue = {
let guard = self.scheduled_compaction_tasks.lock().unwrap();
guard.get(timeline_id).cloned()
};
if let Some(queue) = queue {
let has_pending_tasks = queue
let outcome = queue
.iteration(cancel, ctx, &self.gc_block, timeline)
.await?;
Some(has_pending_tasks)
Some(outcome)
} else {
Some(false)
Some(CompactionOutcome::Done)
}
}
} else {
None
};
has_pending_task |= pending_task_left.unwrap_or(false);
if pending_task_left == Some(false) && *can_offload {
has_pending_task |= pending_task_left == Some(CompactionOutcome::Pending);
if pending_task_left == Some(CompactionOutcome::Done) && *can_offload {
pausable_failpoint!("before-timeline-auto-offload");
match offload_timeline(self, timeline)
.instrument(info_span!("offload_timeline", %timeline_id))
@@ -3036,7 +3037,11 @@ impl Tenant {
.unwrap()
.success(&CIRCUIT_BREAKERS_UNBROKEN);
Ok(has_pending_task)
Ok(if has_pending_task {
CompactionOutcome::Pending
} else {
CompactionOutcome::Done
})
}
/// Cancel scheduled compaction tasks

View File

@@ -11,6 +11,7 @@ use crate::metrics::TENANT_TASK_EVENTS;
use crate::task_mgr;
use crate::task_mgr::{TaskKind, BACKGROUND_RUNTIME};
use crate::tenant::throttle::Stats;
use crate::tenant::timeline::compaction::CompactionOutcome;
use crate::tenant::timeline::CompactionError;
use crate::tenant::{Tenant, TenantState};
use rand::Rng;
@@ -206,10 +207,10 @@ async fn compaction_loop(tenant: Arc<Tenant>, cancel: CancellationToken) {
.run(tenant.compaction_iteration(&cancel, &ctx))
.await;
match output {
Ok(has_pending_task) => {
Ok(outcome) => {
error_run_count = 0;
// schedule the next compaction immediately in case there is a pending compaction task
sleep_duration = if has_pending_task {
sleep_duration = if let CompactionOutcome::Pending = outcome {
Duration::ZERO
} else {
period

View File

@@ -18,6 +18,7 @@ use arc_swap::{ArcSwap, ArcSwapOption};
use bytes::Bytes;
use camino::Utf8Path;
use chrono::{DateTime, Utc};
use compaction::CompactionOutcome;
use enumset::EnumSet;
use fail::fail_point;
use futures::{stream::FuturesUnordered, StreamExt};
@@ -1679,7 +1680,7 @@ impl Timeline {
cancel: &CancellationToken,
flags: EnumSet<CompactFlags>,
ctx: &RequestContext,
) -> Result<bool, CompactionError> {
) -> Result<CompactionOutcome, CompactionError> {
self.compact_with_options(
cancel,
CompactOptions {
@@ -1701,7 +1702,7 @@ impl Timeline {
cancel: &CancellationToken,
options: CompactOptions,
ctx: &RequestContext,
) -> Result<bool, CompactionError> {
) -> Result<CompactionOutcome, CompactionError> {
// most likely the cancellation token is from background task, but in tests it could be the
// request task as well.
@@ -1721,8 +1722,8 @@ impl Timeline {
// compaction task goes over it's period (20s) which is quite often in production.
let (_guard, _permit) = tokio::select! {
tuple = prepare => { tuple },
_ = self.cancel.cancelled() => return Ok(false),
_ = cancel.cancelled() => return Ok(false),
_ = self.cancel.cancelled() => return Ok(CompactionOutcome::Done),
_ = cancel.cancelled() => return Ok(CompactionOutcome::Done),
};
let last_record_lsn = self.get_last_record_lsn();
@@ -1730,13 +1731,13 @@ impl Timeline {
// Last record Lsn could be zero in case the timeline was just created
if !last_record_lsn.is_valid() {
warn!("Skipping compaction for potentially just initialized timeline, it has invalid last record lsn: {last_record_lsn}");
return Ok(false);
return Ok(CompactionOutcome::Done);
}
let result = match self.get_compaction_algorithm_settings().kind {
CompactionAlgorithm::Tiered => {
self.compact_tiered(cancel, ctx).await?;
Ok(false)
Ok(CompactionOutcome::Done)
}
CompactionAlgorithm::Legacy => self.compact_legacy(cancel, options, ctx).await,
};

View File

@@ -262,13 +262,13 @@ impl GcCompactionQueue {
ctx: &RequestContext,
gc_block: &GcBlock,
timeline: &Arc<Timeline>,
) -> Result<bool, CompactionError> {
) -> Result<CompactionOutcome, CompactionError> {
let _one_op_at_a_time_guard = self.consumer_lock.lock().await;
let has_pending_tasks;
let (id, item) = {
let mut guard = self.inner.lock().unwrap();
let Some((id, item)) = guard.queued.pop_front() else {
return Ok(false);
return Ok(CompactionOutcome::Done);
};
guard.running = Some((id, item.clone()));
has_pending_tasks = !guard.queued.is_empty();
@@ -323,7 +323,11 @@ impl GcCompactionQueue {
let mut guard = self.inner.lock().unwrap();
guard.running = None;
}
Ok(has_pending_tasks)
Ok(if has_pending_tasks {
CompactionOutcome::Pending
} else {
CompactionOutcome::Done
})
}
#[allow(clippy::type_complexity)]
@@ -589,6 +593,17 @@ impl CompactionStatistics {
}
}
#[derive(Default, Debug, Clone, Copy, PartialEq, Eq)]
pub enum CompactionOutcome {
#[default]
/// No layers need to be compacted after this round. Compaction doesn't need
/// to be immediately scheduled.
Done,
/// Still has pending layers to be compacted after this round. Ideally, the scheduler
/// should immediately schedule another compaction.
Pending,
}
impl Timeline {
/// TODO: cancellation
///
@@ -598,7 +613,7 @@ impl Timeline {
cancel: &CancellationToken,
options: CompactOptions,
ctx: &RequestContext,
) -> Result<bool, CompactionError> {
) -> Result<CompactionOutcome, CompactionError> {
if options
.flags
.contains(CompactFlags::EnhancedGcBottomMostCompaction)
@@ -606,7 +621,7 @@ impl Timeline {
self.compact_with_gc(cancel, options, ctx)
.await
.map_err(CompactionError::Other)?;
return Ok(false);
return Ok(CompactionOutcome::Done);
}
if options.flags.contains(CompactFlags::DryRun) {
@@ -666,9 +681,9 @@ impl Timeline {
// Define partitioning schema if needed
// 1. L0 Compact
let fully_compacted = {
let l0_compaction_outcome = {
let timer = self.metrics.compact_time_histo.start_timer();
let fully_compacted = self
let l0_compaction_outcome = self
.compact_level0(
target_file_size,
options.flags.contains(CompactFlags::ForceL0Compaction),
@@ -676,15 +691,15 @@ impl Timeline {
)
.await?;
timer.stop_and_record();
fully_compacted
l0_compaction_outcome
};
if !fully_compacted {
if let CompactionOutcome::Pending = l0_compaction_outcome {
// Yield and do not do any other kind of compaction. True means
// that we have pending L0 compaction tasks and the compaction scheduler
// will prioritize compacting this tenant/timeline again.
info!("skipping image layer generation and shard ancestor compaction due to L0 compaction did not include all layers.");
return Ok(true);
return Ok(CompactionOutcome::Pending);
}
// 2. Repartition and create image layers if necessary
@@ -736,7 +751,7 @@ impl Timeline {
if let LastImageLayerCreationStatus::Incomplete = outcome {
// Yield and do not do any other kind of compaction.
info!("skipping shard ancestor compaction due to pending image layer generation tasks (preempted by L0 compaction).");
return Ok(true);
return Ok(CompactionOutcome::Pending);
}
partitioning.parts.len()
}
@@ -765,7 +780,7 @@ impl Timeline {
self.compact_shard_ancestors(rewrite_max, ctx).await?;
}
Ok(false)
Ok(CompactionOutcome::Done)
}
/// Check for layers that are elegible to be rewritten:
@@ -1022,11 +1037,11 @@ impl Timeline {
target_file_size: u64,
force_compaction_ignore_threshold: bool,
ctx: &RequestContext,
) -> Result<bool, CompactionError> {
) -> Result<CompactionOutcome, CompactionError> {
let CompactLevel0Phase1Result {
new_layers,
deltas_to_compact,
fully_compacted,
outcome,
} = {
let phase1_span = info_span!("compact_level0_phase1");
let ctx = ctx.attached_child();
@@ -1055,12 +1070,12 @@ impl Timeline {
if new_layers.is_empty() && deltas_to_compact.is_empty() {
// nothing to do
return Ok(true);
return Ok(CompactionOutcome::Done);
}
self.finish_compact_batch(&new_layers, &Vec::new(), &deltas_to_compact)
.await?;
Ok(fully_compacted)
Ok(outcome)
}
/// Level0 files first phase of compaction, explained in the [`Self::compact_legacy`] comment.
@@ -1602,7 +1617,11 @@ impl Timeline {
.into_iter()
.map(|x| x.drop_eviction_guard())
.collect::<Vec<_>>(),
fully_compacted,
outcome: if fully_compacted {
CompactionOutcome::Done
} else {
CompactionOutcome::Pending
},
})
}
}
@@ -1613,7 +1632,7 @@ struct CompactLevel0Phase1Result {
deltas_to_compact: Vec<Layer>,
// Whether we have included all L0 layers, or selected only part of them due to the
// L0 compaction size limit.
fully_compacted: bool,
outcome: CompactionOutcome,
}
#[derive(Default)]