From e219d48bfe9991f40eb0d6c4f8713c9f8dc9eb05 Mon Sep 17 00:00:00 2001 From: "Alex Chi Z." <4198311+skyzh@users.noreply.github.com> Date: Mon, 3 Feb 2025 16:56:55 -0500 Subject: [PATCH] refactor(pageserver): clearify compaction return value (#10643) ## Problem ## Summary of changes Make the return value of the set of compaction functions less confusing. Signed-off-by: Alex Chi Z --- pageserver/src/tenant.rs | 35 +++++++------ pageserver/src/tenant/tasks.rs | 5 +- pageserver/src/tenant/timeline.rs | 13 ++--- pageserver/src/tenant/timeline/compaction.rs | 55 +++++++++++++------- 4 files changed, 67 insertions(+), 41 deletions(-) diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index 80a61eba92..c1b408ed72 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -46,6 +46,7 @@ use std::sync::atomic::AtomicBool; use std::sync::Weak; use std::time::SystemTime; use storage_broker::BrokerClientChannel; +use timeline::compaction::CompactionOutcome; use timeline::compaction::GcCompactionQueue; use timeline::import_pgdata; use timeline::offload::offload_timeline; @@ -2907,10 +2908,10 @@ impl Tenant { self: &Arc, cancel: &CancellationToken, ctx: &RequestContext, - ) -> Result { + ) -> Result { // Don't start doing work during shutdown, or when broken, we do not need those in the logs if !self.is_active() { - return Ok(false); + return Ok(CompactionOutcome::Done); } { @@ -2924,7 +2925,7 @@ impl Tenant { // to AttachedSingle state. if !conf.location.may_upload_layers_hint() { info!("Skipping compaction in location state {:?}", conf.location); - return Ok(false); + return Ok(CompactionOutcome::Done); } } @@ -2967,7 +2968,7 @@ impl Tenant { // Before doing any I/O work, check our circuit breaker if self.compaction_circuit_breaker.lock().unwrap().is_broken() { info!("Skipping compaction due to previous failures"); - return Ok(false); + return Ok(CompactionOutcome::Done); } let mut has_pending_task = false; @@ -2975,10 +2976,10 @@ impl Tenant { for (timeline_id, timeline, (can_compact, can_offload)) in &timelines_to_compact_or_offload { // pending_task_left == None: cannot compact, maybe still pending tasks - // pending_task_left == Some(true): compaction task left - // pending_task_left == Some(false): no compaction task left + // pending_task_left == Some(Pending): compaction task left + // pending_task_left == Some(Done): no compaction task left let pending_task_left = if *can_compact { - let has_pending_l0_compaction_task = timeline + let compaction_outcome = timeline .compact(cancel, EnumSet::empty(), ctx) .instrument(info_span!("compact_timeline", %timeline_id)) .await @@ -2996,27 +2997,27 @@ impl Tenant { .fail(&CIRCUIT_BREAKERS_BROKEN, e); } })?; - if has_pending_l0_compaction_task { - Some(true) + if let CompactionOutcome::Pending = compaction_outcome { + Some(CompactionOutcome::Pending) } else { let queue = { let guard = self.scheduled_compaction_tasks.lock().unwrap(); guard.get(timeline_id).cloned() }; if let Some(queue) = queue { - let has_pending_tasks = queue + let outcome = queue .iteration(cancel, ctx, &self.gc_block, timeline) .await?; - Some(has_pending_tasks) + Some(outcome) } else { - Some(false) + Some(CompactionOutcome::Done) } } } else { None }; - has_pending_task |= pending_task_left.unwrap_or(false); - if pending_task_left == Some(false) && *can_offload { + has_pending_task |= pending_task_left == Some(CompactionOutcome::Pending); + if pending_task_left == Some(CompactionOutcome::Done) && *can_offload { pausable_failpoint!("before-timeline-auto-offload"); match offload_timeline(self, timeline) .instrument(info_span!("offload_timeline", %timeline_id)) @@ -3036,7 +3037,11 @@ impl Tenant { .unwrap() .success(&CIRCUIT_BREAKERS_UNBROKEN); - Ok(has_pending_task) + Ok(if has_pending_task { + CompactionOutcome::Pending + } else { + CompactionOutcome::Done + }) } /// Cancel scheduled compaction tasks diff --git a/pageserver/src/tenant/tasks.rs b/pageserver/src/tenant/tasks.rs index 3725e2f7fc..b6b64d02dd 100644 --- a/pageserver/src/tenant/tasks.rs +++ b/pageserver/src/tenant/tasks.rs @@ -11,6 +11,7 @@ use crate::metrics::TENANT_TASK_EVENTS; use crate::task_mgr; use crate::task_mgr::{TaskKind, BACKGROUND_RUNTIME}; use crate::tenant::throttle::Stats; +use crate::tenant::timeline::compaction::CompactionOutcome; use crate::tenant::timeline::CompactionError; use crate::tenant::{Tenant, TenantState}; use rand::Rng; @@ -206,10 +207,10 @@ async fn compaction_loop(tenant: Arc, cancel: CancellationToken) { .run(tenant.compaction_iteration(&cancel, &ctx)) .await; match output { - Ok(has_pending_task) => { + Ok(outcome) => { error_run_count = 0; // schedule the next compaction immediately in case there is a pending compaction task - sleep_duration = if has_pending_task { + sleep_duration = if let CompactionOutcome::Pending = outcome { Duration::ZERO } else { period diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index d65b382e50..11c0bbdfe5 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -18,6 +18,7 @@ use arc_swap::{ArcSwap, ArcSwapOption}; use bytes::Bytes; use camino::Utf8Path; use chrono::{DateTime, Utc}; +use compaction::CompactionOutcome; use enumset::EnumSet; use fail::fail_point; use futures::{stream::FuturesUnordered, StreamExt}; @@ -1679,7 +1680,7 @@ impl Timeline { cancel: &CancellationToken, flags: EnumSet, ctx: &RequestContext, - ) -> Result { + ) -> Result { self.compact_with_options( cancel, CompactOptions { @@ -1701,7 +1702,7 @@ impl Timeline { cancel: &CancellationToken, options: CompactOptions, ctx: &RequestContext, - ) -> Result { + ) -> Result { // most likely the cancellation token is from background task, but in tests it could be the // request task as well. @@ -1721,8 +1722,8 @@ impl Timeline { // compaction task goes over it's period (20s) which is quite often in production. let (_guard, _permit) = tokio::select! { tuple = prepare => { tuple }, - _ = self.cancel.cancelled() => return Ok(false), - _ = cancel.cancelled() => return Ok(false), + _ = self.cancel.cancelled() => return Ok(CompactionOutcome::Done), + _ = cancel.cancelled() => return Ok(CompactionOutcome::Done), }; let last_record_lsn = self.get_last_record_lsn(); @@ -1730,13 +1731,13 @@ impl Timeline { // Last record Lsn could be zero in case the timeline was just created if !last_record_lsn.is_valid() { warn!("Skipping compaction for potentially just initialized timeline, it has invalid last record lsn: {last_record_lsn}"); - return Ok(false); + return Ok(CompactionOutcome::Done); } let result = match self.get_compaction_algorithm_settings().kind { CompactionAlgorithm::Tiered => { self.compact_tiered(cancel, ctx).await?; - Ok(false) + Ok(CompactionOutcome::Done) } CompactionAlgorithm::Legacy => self.compact_legacy(cancel, options, ctx).await, }; diff --git a/pageserver/src/tenant/timeline/compaction.rs b/pageserver/src/tenant/timeline/compaction.rs index 9bd61bbac5..7dd37d7232 100644 --- a/pageserver/src/tenant/timeline/compaction.rs +++ b/pageserver/src/tenant/timeline/compaction.rs @@ -262,13 +262,13 @@ impl GcCompactionQueue { ctx: &RequestContext, gc_block: &GcBlock, timeline: &Arc, - ) -> Result { + ) -> Result { let _one_op_at_a_time_guard = self.consumer_lock.lock().await; let has_pending_tasks; let (id, item) = { let mut guard = self.inner.lock().unwrap(); let Some((id, item)) = guard.queued.pop_front() else { - return Ok(false); + return Ok(CompactionOutcome::Done); }; guard.running = Some((id, item.clone())); has_pending_tasks = !guard.queued.is_empty(); @@ -323,7 +323,11 @@ impl GcCompactionQueue { let mut guard = self.inner.lock().unwrap(); guard.running = None; } - Ok(has_pending_tasks) + Ok(if has_pending_tasks { + CompactionOutcome::Pending + } else { + CompactionOutcome::Done + }) } #[allow(clippy::type_complexity)] @@ -589,6 +593,17 @@ impl CompactionStatistics { } } +#[derive(Default, Debug, Clone, Copy, PartialEq, Eq)] +pub enum CompactionOutcome { + #[default] + /// No layers need to be compacted after this round. Compaction doesn't need + /// to be immediately scheduled. + Done, + /// Still has pending layers to be compacted after this round. Ideally, the scheduler + /// should immediately schedule another compaction. + Pending, +} + impl Timeline { /// TODO: cancellation /// @@ -598,7 +613,7 @@ impl Timeline { cancel: &CancellationToken, options: CompactOptions, ctx: &RequestContext, - ) -> Result { + ) -> Result { if options .flags .contains(CompactFlags::EnhancedGcBottomMostCompaction) @@ -606,7 +621,7 @@ impl Timeline { self.compact_with_gc(cancel, options, ctx) .await .map_err(CompactionError::Other)?; - return Ok(false); + return Ok(CompactionOutcome::Done); } if options.flags.contains(CompactFlags::DryRun) { @@ -666,9 +681,9 @@ impl Timeline { // Define partitioning schema if needed // 1. L0 Compact - let fully_compacted = { + let l0_compaction_outcome = { let timer = self.metrics.compact_time_histo.start_timer(); - let fully_compacted = self + let l0_compaction_outcome = self .compact_level0( target_file_size, options.flags.contains(CompactFlags::ForceL0Compaction), @@ -676,15 +691,15 @@ impl Timeline { ) .await?; timer.stop_and_record(); - fully_compacted + l0_compaction_outcome }; - if !fully_compacted { + if let CompactionOutcome::Pending = l0_compaction_outcome { // Yield and do not do any other kind of compaction. True means // that we have pending L0 compaction tasks and the compaction scheduler // will prioritize compacting this tenant/timeline again. info!("skipping image layer generation and shard ancestor compaction due to L0 compaction did not include all layers."); - return Ok(true); + return Ok(CompactionOutcome::Pending); } // 2. Repartition and create image layers if necessary @@ -736,7 +751,7 @@ impl Timeline { if let LastImageLayerCreationStatus::Incomplete = outcome { // Yield and do not do any other kind of compaction. info!("skipping shard ancestor compaction due to pending image layer generation tasks (preempted by L0 compaction)."); - return Ok(true); + return Ok(CompactionOutcome::Pending); } partitioning.parts.len() } @@ -765,7 +780,7 @@ impl Timeline { self.compact_shard_ancestors(rewrite_max, ctx).await?; } - Ok(false) + Ok(CompactionOutcome::Done) } /// Check for layers that are elegible to be rewritten: @@ -1022,11 +1037,11 @@ impl Timeline { target_file_size: u64, force_compaction_ignore_threshold: bool, ctx: &RequestContext, - ) -> Result { + ) -> Result { let CompactLevel0Phase1Result { new_layers, deltas_to_compact, - fully_compacted, + outcome, } = { let phase1_span = info_span!("compact_level0_phase1"); let ctx = ctx.attached_child(); @@ -1055,12 +1070,12 @@ impl Timeline { if new_layers.is_empty() && deltas_to_compact.is_empty() { // nothing to do - return Ok(true); + return Ok(CompactionOutcome::Done); } self.finish_compact_batch(&new_layers, &Vec::new(), &deltas_to_compact) .await?; - Ok(fully_compacted) + Ok(outcome) } /// Level0 files first phase of compaction, explained in the [`Self::compact_legacy`] comment. @@ -1602,7 +1617,11 @@ impl Timeline { .into_iter() .map(|x| x.drop_eviction_guard()) .collect::>(), - fully_compacted, + outcome: if fully_compacted { + CompactionOutcome::Done + } else { + CompactionOutcome::Pending + }, }) } } @@ -1613,7 +1632,7 @@ struct CompactLevel0Phase1Result { deltas_to_compact: Vec, // Whether we have included all L0 layers, or selected only part of them due to the // L0 compaction size limit. - fully_compacted: bool, + outcome: CompactionOutcome, } #[derive(Default)]