diff --git a/control_plane/src/pageserver.rs b/control_plane/src/pageserver.rs index dd37bfc407..e15b30236e 100644 --- a/control_plane/src/pageserver.rs +++ b/control_plane/src/pageserver.rs @@ -357,6 +357,11 @@ impl PageServerNode { .map(serde_json::from_str) .transpose() .context("Failed to parse 'compaction_algorithm' json")?, + compaction_l0_first: settings + .remove("compaction_l0_first") + .map(|x| x.parse::()) + .transpose() + .context("Failed to parse 'compaction_l0_first' as a bool")?, l0_flush_delay_threshold: settings .remove("l0_flush_delay_threshold") .map(|x| x.parse::()) diff --git a/libs/pageserver_api/src/config.rs b/libs/pageserver_api/src/config.rs index a00d7838fd..9bc1b6d359 100644 --- a/libs/pageserver_api/src/config.rs +++ b/libs/pageserver_api/src/config.rs @@ -264,6 +264,8 @@ pub struct TenantConfigToml { /// size exceeds `compaction_upper_limit * checkpoint_distance`. pub compaction_upper_limit: usize, pub compaction_algorithm: crate::models::CompactionAlgorithmSettings, + /// If true, compact down L0 across all tenant timelines before doing regular compaction. + pub compaction_l0_first: bool, /// Level0 delta layer threshold at which to delay layer flushes for compaction backpressure, /// such that they take 2x as long, and start waiting for layer flushes during ephemeral layer /// rolls. This helps compaction keep up with WAL ingestion, and avoids read amplification @@ -545,6 +547,7 @@ pub mod tenant_conf_defaults { // most of our pageservers. Compaction ~50 layers requires about 2GB memory (could be reduced later by optimizing L0 hole // calculation to avoid loading all keys into the memory). So with this config, we can get a maximum peak compaction usage of 18GB. pub const DEFAULT_COMPACTION_UPPER_LIMIT: usize = 50; + pub const DEFAULT_COMPACTION_L0_FIRST: bool = false; pub const DEFAULT_COMPACTION_ALGORITHM: crate::models::CompactionAlgorithm = crate::models::CompactionAlgorithm::Legacy; @@ -594,6 +597,7 @@ impl Default for TenantConfigToml { compaction_algorithm: crate::models::CompactionAlgorithmSettings { kind: DEFAULT_COMPACTION_ALGORITHM, }, + compaction_l0_first: DEFAULT_COMPACTION_L0_FIRST, l0_flush_delay_threshold: None, l0_flush_stall_threshold: None, l0_flush_wait_upload: DEFAULT_L0_FLUSH_WAIT_UPLOAD, diff --git a/libs/pageserver_api/src/models.rs b/libs/pageserver_api/src/models.rs index 19beb37ab3..335ac4cec5 100644 --- a/libs/pageserver_api/src/models.rs +++ b/libs/pageserver_api/src/models.rs @@ -464,6 +464,8 @@ pub struct TenantConfigPatch { #[serde(skip_serializing_if = "FieldPatch::is_noop")] pub compaction_algorithm: FieldPatch, #[serde(skip_serializing_if = "FieldPatch::is_noop")] + pub compaction_l0_first: FieldPatch, + #[serde(skip_serializing_if = "FieldPatch::is_noop")] pub l0_flush_delay_threshold: FieldPatch, #[serde(skip_serializing_if = "FieldPatch::is_noop")] pub l0_flush_stall_threshold: FieldPatch, @@ -529,6 +531,7 @@ pub struct TenantConfig { pub compaction_upper_limit: Option, // defer parsing compaction_algorithm, like eviction_policy pub compaction_algorithm: Option, + pub compaction_l0_first: Option, pub l0_flush_delay_threshold: Option, pub l0_flush_stall_threshold: Option, pub l0_flush_wait_upload: Option, @@ -567,6 +570,7 @@ impl TenantConfig { mut compaction_threshold, mut compaction_upper_limit, mut compaction_algorithm, + mut compaction_l0_first, mut l0_flush_delay_threshold, mut l0_flush_stall_threshold, mut l0_flush_wait_upload, @@ -606,6 +610,7 @@ impl TenantConfig { .compaction_upper_limit .apply(&mut compaction_upper_limit); patch.compaction_algorithm.apply(&mut compaction_algorithm); + patch.compaction_l0_first.apply(&mut compaction_l0_first); patch .l0_flush_delay_threshold .apply(&mut l0_flush_delay_threshold); @@ -669,6 +674,7 @@ impl TenantConfig { compaction_threshold, compaction_upper_limit, compaction_algorithm, + compaction_l0_first, l0_flush_delay_threshold, l0_flush_stall_threshold, l0_flush_wait_upload, diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index 91df47b250..5f17e8cb60 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -52,7 +52,9 @@ use timeline::compaction::GcCompactionQueue; use timeline::import_pgdata; use timeline::offload::offload_timeline; use timeline::offload::OffloadError; +use timeline::CompactFlags; use timeline::CompactOptions; +use timeline::CompactionError; use timeline::ShutdownMode; use tokio::io::BufReader; use tokio::sync::watch; @@ -2898,150 +2900,181 @@ impl Tenant { .await } - /// Perform one compaction iteration. - /// This function is periodically called by compactor task. - /// Also it can be explicitly requested per timeline through page server - /// api's 'compact' command. + /// Performs one compaction iteration. Called periodically from the compaction loop. Returns + /// whether another compaction iteration is needed (if we yield), or /// - /// Returns whether we have pending compaction task. + /// Compaction can also be explicitly requested for a timeline via the HTTP API. async fn compaction_iteration( self: &Arc, cancel: &CancellationToken, ctx: &RequestContext, - ) -> Result { - // Don't start doing work during shutdown, or when broken, we do not need those in the logs + ) -> Result { + // Don't compact inactive tenants. if !self.is_active() { return Ok(CompactionOutcome::Done); } - { - let conf = self.tenant_conf.load(); - - // Note that compaction usually requires deletions, but we don't respect - // may_delete_layers_hint here: that is because tenants in AttachedMulti - // should proceed with compaction even if they can't do deletion, to avoid - // accumulating dangerously deep stacks of L0 layers. Deletions will be - // enqueued inside RemoteTimelineClient, and executed layer if/when we transition - // to AttachedSingle state. - if !conf.location.may_upload_layers_hint() { - info!("Skipping compaction in location state {:?}", conf.location); - return Ok(CompactionOutcome::Done); - } - } - - // Scan through the hashmap and collect a list of all the timelines, - // while holding the lock. Then drop the lock and actually perform the - // compactions. We don't want to block everything else while the - // compaction runs. - let timelines_to_compact_or_offload; - { - let timelines = self.timelines.lock().unwrap(); - timelines_to_compact_or_offload = timelines - .iter() - .filter_map(|(timeline_id, timeline)| { - let (is_active, (can_offload, _)) = - (timeline.is_active(), timeline.can_offload()); - let has_no_unoffloaded_children = { - !timelines - .iter() - .any(|(_id, tl)| tl.get_ancestor_timeline_id() == Some(*timeline_id)) - }; - let config_allows_offload = self.conf.timeline_offloading - || self - .tenant_conf - .load() - .tenant_conf - .timeline_offloading - .unwrap_or_default(); - let can_offload = - can_offload && has_no_unoffloaded_children && config_allows_offload; - if (is_active, can_offload) == (false, false) { - None - } else { - Some((*timeline_id, timeline.clone(), (is_active, can_offload))) - } - }) - .collect::>(); - drop(timelines); - } - - // Before doing any I/O work, check our circuit breaker - if self.compaction_circuit_breaker.lock().unwrap().is_broken() { - info!("Skipping compaction due to previous failures"); + // Don't compact tenants that can't upload layers. We don't check `may_delete_layers_hint`, + // since we need to compact L0 even in AttachedMulti to bound read amplification. + let location = self.tenant_conf.load().location; + if !location.may_upload_layers_hint() { + info!("skipping compaction in location state {location:?}"); return Ok(CompactionOutcome::Done); } - let mut has_pending_task = false; + // Don't compact if the circuit breaker is tripped. + if self.compaction_circuit_breaker.lock().unwrap().is_broken() { + info!("skipping compaction due to previous failures"); + return Ok(CompactionOutcome::Done); + } + + // Collect all timelines to compact, along with offload instructions and L0 counts. + let mut compact: Vec> = Vec::new(); + let mut offload: HashSet = HashSet::new(); + let mut l0_counts: HashMap = HashMap::new(); - for (timeline_id, timeline, (can_compact, can_offload)) in &timelines_to_compact_or_offload { - // pending_task_left == None: cannot compact, maybe still pending tasks - // pending_task_left == Some(Pending): compaction task left - // pending_task_left == Some(Done): no compaction task left - let pending_task_left = if *can_compact { - let compaction_outcome = timeline - .compact(cancel, EnumSet::empty(), ctx) - .instrument(info_span!("compact_timeline", %timeline_id)) - .await - .inspect_err(|e| match e { - timeline::CompactionError::ShuttingDown => (), - timeline::CompactionError::Offload(_) => { - // Failures to offload timelines do not trip the circuit breaker, because - // they do not do lots of writes the way compaction itself does: it is cheap - // to retry, and it would be bad to stop all compaction because of an issue with offloading. - } - timeline::CompactionError::Other(e) => { - self.compaction_circuit_breaker - .lock() - .unwrap() - .fail(&CIRCUIT_BREAKERS_BROKEN, e); - } - })?; - if let CompactionOutcome::Pending = compaction_outcome { - Some(CompactionOutcome::Pending) - } else { - let queue = { - let guard = self.scheduled_compaction_tasks.lock().unwrap(); - guard.get(timeline_id).cloned() - }; - if let Some(queue) = queue { - let outcome = queue - .iteration(cancel, ctx, &self.gc_block, timeline) - .await?; - Some(outcome) - } else { - Some(CompactionOutcome::Done) - } + let offload_enabled = self.get_timeline_offloading_enabled(); + let timelines = self.timelines.lock().unwrap(); + for (&timeline_id, timeline) in timelines.iter() { + // Skip inactive timelines. + if !timeline.is_active() { + continue; } - } else { - None - }; - has_pending_task |= pending_task_left == Some(CompactionOutcome::Pending); - if pending_task_left == Some(CompactionOutcome::Done) && *can_offload { - pausable_failpoint!("before-timeline-auto-offload"); - match offload_timeline(self, timeline) - .instrument(info_span!("offload_timeline", %timeline_id)) - .await - { - Err(OffloadError::NotArchived) => { - // Ignore this, we likely raced with unarchival - Ok(()) - } - other => other, - }?; + + // Schedule the timeline for compaction. + compact.push(timeline.clone()); + + // Schedule the timeline for offloading if eligible. + let can_offload = offload_enabled + && timeline.can_offload().0 + && !timelines + .iter() + .any(|(_, tli)| tli.get_ancestor_timeline_id() == Some(timeline_id)); + if can_offload { + offload.insert(timeline_id); + } + } + } // release timelines lock + + for timeline in &compact { + // Collect L0 counts. Can't await while holding lock above. + if let Ok(lm) = timeline.layers.read().await.layer_map() { + l0_counts.insert(timeline.timeline_id, lm.level0_deltas().len()); } } + // Pass 1: L0 compaction across all timelines, in order of L0 count. We prioritize this to + // bound read amplification. + // + // TODO: this may spin on one or more ingest-heavy timelines, starving out image/GC + // compaction and offloading. We leave that as a potential problem to solve later. Consider + // splitting L0 and image/GC compaction to separate background jobs. + if self.get_compaction_l0_first() { + let compaction_threshold = self.get_compaction_threshold(); + let compact_l0 = compact + .iter() + .map(|tli| (tli, l0_counts.get(&tli.timeline_id).copied().unwrap_or(0))) + .filter(|&(_, l0)| l0 >= compaction_threshold) + .sorted_by_key(|&(_, l0)| l0) + .rev() + .map(|(tli, _)| tli.clone()) + .collect_vec(); + + let mut has_pending_l0 = false; + for timeline in compact_l0 { + let outcome = timeline + .compact(cancel, CompactFlags::OnlyL0Compaction.into(), ctx) + .instrument(info_span!("compact_timeline", timeline_id = %timeline.timeline_id)) + .await + .inspect_err(|err| self.maybe_trip_compaction_breaker(err))?; + has_pending_l0 |= outcome == CompactionOutcome::Pending; + } + if has_pending_l0 { + return Ok(CompactionOutcome::Pending); // do another pass + } + } + + // Pass 2: image compaction and timeline offloading. If any timelines have accumulated + // more L0 layers, they may also be compacted here. + // + // NB: image compaction may yield if there is pending L0 compaction. + // + // TODO: it will only yield if there is pending L0 compaction on the same timeline. If a + // different timeline needs compaction, it won't. It should check `l0_compaction_trigger`. + // We leave this for a later PR. + // + // TODO: consider ordering timelines by some priority, e.g. time since last full compaction, + // amount of L1 delta debt or garbage, offload-eligible timelines first, etc. + let mut has_pending = false; + for timeline in compact { + if !timeline.is_active() { + continue; + } + + let mut outcome = timeline + .compact(cancel, EnumSet::default(), ctx) + .instrument(info_span!("compact_timeline", timeline_id = %timeline.timeline_id)) + .await + .inspect_err(|err| self.maybe_trip_compaction_breaker(err))?; + + // If we're done compacting, check the scheduled GC compaction queue for more work. + if outcome == CompactionOutcome::Done { + let queue = self + .scheduled_compaction_tasks + .lock() + .unwrap() + .get(&timeline.timeline_id) + .cloned(); + if let Some(queue) = queue { + outcome = queue + .iteration(cancel, ctx, &self.gc_block, &timeline) + .await?; + } + } + + // If we're done compacting, offload the timeline if requested. + if outcome == CompactionOutcome::Done && offload.contains(&timeline.timeline_id) { + pausable_failpoint!("before-timeline-auto-offload"); + offload_timeline(self, &timeline) + .instrument(info_span!("offload_timeline", timeline_id = %timeline.timeline_id)) + .await + .or_else(|err| match err { + // Ignore this, we likely raced with unarchival. + OffloadError::NotArchived => Ok(()), + err => Err(err), + })?; + } + + has_pending |= outcome == CompactionOutcome::Pending; + } + + // Success! Untrip the breaker if necessary. self.compaction_circuit_breaker .lock() .unwrap() .success(&CIRCUIT_BREAKERS_UNBROKEN); - Ok(if has_pending_task { - CompactionOutcome::Pending - } else { - CompactionOutcome::Done - }) + match has_pending { + true => Ok(CompactionOutcome::Pending), + false => Ok(CompactionOutcome::Done), + } + } + + /// Trips the compaction circuit breaker if appropriate. + pub(crate) fn maybe_trip_compaction_breaker(&self, err: &CompactionError) { + match err { + CompactionError::ShuttingDown => (), + // Offload failures don't trip the circuit breaker, since they're cheap to retry and + // shouldn't block compaction. + CompactionError::Offload(_) => {} + CompactionError::Other(err) => { + self.compaction_circuit_breaker + .lock() + .unwrap() + .fail(&CIRCUIT_BREAKERS_BROKEN, err); + } + } } /// Cancel scheduled compaction tasks @@ -3819,6 +3852,13 @@ impl Tenant { .unwrap_or(self.conf.default_tenant_conf.compaction_upper_limit) } + pub fn get_compaction_l0_first(&self) -> bool { + let tenant_conf = self.tenant_conf.load().tenant_conf.clone(); + tenant_conf + .compaction_l0_first + .unwrap_or(self.conf.default_tenant_conf.compaction_l0_first) + } + pub fn get_gc_horizon(&self) -> u64 { let tenant_conf = self.tenant_conf.load().tenant_conf.clone(); tenant_conf @@ -3873,6 +3913,16 @@ impl Tenant { .unwrap_or(self.conf.default_tenant_conf.lsn_lease_length) } + pub fn get_timeline_offloading_enabled(&self) -> bool { + if self.conf.timeline_offloading { + return true; + } + let tenant_conf = self.tenant_conf.load().tenant_conf.clone(); + tenant_conf + .timeline_offloading + .unwrap_or(self.conf.default_tenant_conf.timeline_offloading) + } + /// Generate an up-to-date TenantManifest based on the state of this Tenant. fn build_tenant_manifest(&self) -> TenantManifest { let timelines_offloaded = self.timelines_offloaded.lock().unwrap(); @@ -5478,6 +5528,7 @@ pub(crate) mod harness { compaction_threshold: Some(tenant_conf.compaction_threshold), compaction_upper_limit: Some(tenant_conf.compaction_upper_limit), compaction_algorithm: Some(tenant_conf.compaction_algorithm), + compaction_l0_first: Some(tenant_conf.compaction_l0_first), l0_flush_delay_threshold: tenant_conf.l0_flush_delay_threshold, l0_flush_stall_threshold: tenant_conf.l0_flush_stall_threshold, l0_flush_wait_upload: Some(tenant_conf.l0_flush_wait_upload), diff --git a/pageserver/src/tenant/config.rs b/pageserver/src/tenant/config.rs index ad13e9e8e4..cff33afffd 100644 --- a/pageserver/src/tenant/config.rs +++ b/pageserver/src/tenant/config.rs @@ -285,6 +285,10 @@ pub struct TenantConfOpt { #[serde(default)] pub compaction_algorithm: Option, + #[serde(skip_serializing_if = "Option::is_none")] + #[serde(default)] + pub compaction_l0_first: Option, + #[serde(skip_serializing_if = "Option::is_none")] #[serde(default)] pub l0_flush_delay_threshold: Option, @@ -416,6 +420,9 @@ impl TenantConfOpt { .as_ref() .unwrap_or(&global_conf.compaction_algorithm) .clone(), + compaction_l0_first: self + .compaction_l0_first + .unwrap_or(global_conf.compaction_l0_first), l0_flush_delay_threshold: self .l0_flush_delay_threshold .or(global_conf.l0_flush_delay_threshold), @@ -493,6 +500,7 @@ impl TenantConfOpt { mut compaction_threshold, mut compaction_upper_limit, mut compaction_algorithm, + mut compaction_l0_first, mut l0_flush_delay_threshold, mut l0_flush_stall_threshold, mut l0_flush_wait_upload, @@ -538,6 +546,7 @@ impl TenantConfOpt { .compaction_upper_limit .apply(&mut compaction_upper_limit); patch.compaction_algorithm.apply(&mut compaction_algorithm); + patch.compaction_l0_first.apply(&mut compaction_l0_first); patch .l0_flush_delay_threshold .apply(&mut l0_flush_delay_threshold); @@ -619,6 +628,7 @@ impl TenantConfOpt { compaction_threshold, compaction_upper_limit, compaction_algorithm, + compaction_l0_first, l0_flush_delay_threshold, l0_flush_stall_threshold, l0_flush_wait_upload, @@ -681,6 +691,7 @@ impl From for models::TenantConfig { compaction_period: value.compaction_period.map(humantime), compaction_threshold: value.compaction_threshold, compaction_upper_limit: value.compaction_upper_limit, + compaction_l0_first: value.compaction_l0_first, l0_flush_delay_threshold: value.l0_flush_delay_threshold, l0_flush_stall_threshold: value.l0_flush_stall_threshold, l0_flush_wait_upload: value.l0_flush_wait_upload, diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 1fbcd6bceb..43811b77f8 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -876,6 +876,7 @@ pub(crate) enum CompactFlags { ForceRepartition, ForceImageLayerCreation, ForceL0Compaction, + OnlyL0Compaction, EnhancedGcBottomMostCompaction, DryRun, } diff --git a/pageserver/src/tenant/timeline/compaction.rs b/pageserver/src/tenant/timeline/compaction.rs index 4cbc344669..18b5afd04b 100644 --- a/pageserver/src/tenant/timeline/compaction.rs +++ b/pageserver/src/tenant/timeline/compaction.rs @@ -609,6 +609,8 @@ pub enum CompactionOutcome { /// Still has pending layers to be compacted after this round. Ideally, the scheduler /// should immediately schedule another compaction. Pending, + // TODO: add a skipped variant for cases where we didn't attempt compaction. These currently + // return Done, which can lead the caller to believe there is no compaction debt. } impl Timeline { @@ -715,10 +717,12 @@ impl Timeline { l0_compaction_outcome }; - if let CompactionOutcome::Pending = l0_compaction_outcome { - // Yield and do not do any other kind of compaction. True means - // that we have pending L0 compaction tasks and the compaction scheduler - // will prioritize compacting this tenant/timeline again. + if options.flags.contains(CompactFlags::OnlyL0Compaction) { + return Ok(l0_compaction_outcome); + } + + if l0_compaction_outcome == CompactionOutcome::Pending { + // Yield if we have pending L0 compaction. The scheduler will do another pass. info!("skipping image layer generation and shard ancestor compaction due to L0 compaction did not include all layers."); return Ok(CompactionOutcome::Pending); } diff --git a/test_runner/regress/test_attach_tenant_config.py b/test_runner/regress/test_attach_tenant_config.py index 7acc64377e..34d56c5cb1 100644 --- a/test_runner/regress/test_attach_tenant_config.py +++ b/test_runner/regress/test_attach_tenant_config.py @@ -140,9 +140,10 @@ def test_fully_custom_config(positive_env: NeonEnv): "compaction_period": "1h", "compaction_threshold": 13, "compaction_upper_limit": 100, + "compaction_l0_first": False, "l0_flush_delay_threshold": 25, "l0_flush_stall_threshold": 42, - "l0_flush_wait_upload": True, + "l0_flush_wait_upload": False, "compaction_target_size": 1048576, "checkpoint_distance": 10000, "checkpoint_timeout": "13m",