From ddb9ae1214a1ab19300514e5487569471b8a551a Mon Sep 17 00:00:00 2001 From: Erik Grinaker Date: Fri, 24 Jan 2025 10:47:28 +0100 Subject: [PATCH] pageserver: add compaction backpressure for layer flushes (#10405) ## Problem There is no direct backpressure for compaction and L0 read amplification. This allows a large buildup of compaction debt and read amplification. Resolves #5415. Requires #10402. ## Summary of changes Delay layer flushes based on the number of level 0 delta layers: * `l0_flush_delay_threshold`: delay flushes such that they take 2x as long (default `2 * compaction_threshold`). * `l0_flush_stall_threshold`: stall flushes until level 0 delta layers drop below threshold (default `4 * compaction_threshold`). If either threshold is reached, ephemeral layer rolls also synchronously wait for layer flushes to propagate this backpressure up into WAL ingestion. This will bound the number of frozen layers to 1 once backpressure kicks in, since all other frozen layers must flush before the rolled layer. ## Analysis This will significantly change the compute backpressure characteristics. Recall the three compute backpressure knobs: * `max_replication_write_lag`: 500 MB (based on Pageserver `last_received_lsn`). * `max_replication_flush_lag`: 10 GB (based on Pageserver `disk_consistent_lsn`). * `max_replication_apply_lag`: disabled (based on Pageserver `remote_consistent_lsn`). Previously, the Pageserver would keep ingesting WAL and build up ephemeral layers and L0 layers until the compute hit `max_replication_flush_lag` at 10 GB and began backpressuring. Now, once we delay/stall WAL ingestion, the compute will begin backpressuring after `max_replication_write_lag`, i.e. 500 MB. This is probably a good thing (we're not building up a ton of compaction debt), but we should consider tuning these settings. `max_replication_flush_lag` probably doesn't serve a purpose anymore, and we should consider removing it. Furthermore, the removal of the upload barrier in #10402 will mean that we no longer backpressure flushes based on S3 uploads, since `max_replication_apply_lag` is disabled. We should consider enabling this as well. ### When and what do we compact? Default compaction settings: * `compaction_threshold`: 10 L0 delta layers. * `compaction_period`: 20 seconds (between each compaction loop check). * `checkpoint_distance`: 256 MB (size of L0 delta layers). * `l0_flush_delay_threshold`: 20 L0 delta layers. * `l0_flush_stall_threshold`: 40 L0 delta layers. Compaction characteristics: * Minimum compaction volume: 10 layers * 256 MB = 2.5 GB. * Additional compaction volume (assuming 128 MB/s WAL): 128 MB/s * 20 seconds = 2.5 GB (10 L0 layers). * Required compaction bandwidth: 5.0 GB / 20 seconds = 256 MB/s. ### When do we hit `max_replication_write_lag`? Depending on how fast compaction and flushes happens, the compute will backpressure somewhere between `l0_flush_delay_threshold` or `l0_flush_stall_threshold` + `max_replication_write_lag`. * Minimum compute backpressure lag: 20 layers * 256 MB + 500 MB = 5.6 GB * Maximum compute backpressure lag: 40 layers * 256 MB + 500 MB = 10.0 GB This seems like a reasonable range to me. --- control_plane/src/pageserver.rs | 10 + libs/pageserver_api/src/config.rs | 13 +- libs/pageserver_api/src/models.rs | 16 ++ pageserver/src/metrics.rs | 12 +- pageserver/src/tenant.rs | 2 + pageserver/src/tenant/config.rs | 26 ++ pageserver/src/tenant/timeline.rs | 257 +++++++++++++++--- .../fixtures/pageserver/allowed_errors.py | 7 +- test_runner/performance/test_layer_map.py | 2 + .../regress/test_attach_tenant_config.py | 2 + test_runner/regress/test_branch_and_gc.py | 2 + test_runner/regress/test_compatibility.py | 2 +- test_runner/regress/test_recovery.py | 5 +- test_runner/regress/test_remote_storage.py | 2 + test_runner/regress/test_timeline_size.py | 2 +- test_runner/regress/test_vm_bits.py | 3 + 16 files changed, 311 insertions(+), 52 deletions(-) diff --git a/control_plane/src/pageserver.rs b/control_plane/src/pageserver.rs index b33b2877b3..967810ee06 100644 --- a/control_plane/src/pageserver.rs +++ b/control_plane/src/pageserver.rs @@ -352,6 +352,16 @@ impl PageServerNode { .map(serde_json::from_str) .transpose() .context("Failed to parse 'compaction_algorithm' json")?, + l0_flush_delay_threshold: settings + .remove("l0_flush_delay_threshold") + .map(|x| x.parse::()) + .transpose() + .context("Failed to parse 'l0_flush_delay_threshold' as an integer")?, + l0_flush_stall_threshold: settings + .remove("l0_flush_stall_threshold") + .map(|x| x.parse::()) + .transpose() + .context("Failed to parse 'l0_flush_stall_threshold' as an integer")?, gc_horizon: settings .remove("gc_horizon") .map(|x| x.parse::()) diff --git a/libs/pageserver_api/src/config.rs b/libs/pageserver_api/src/config.rs index 4982c6233d..5866145690 100644 --- a/libs/pageserver_api/src/config.rs +++ b/libs/pageserver_api/src/config.rs @@ -254,9 +254,18 @@ pub struct TenantConfigToml { // Duration::ZERO means automatic compaction is disabled. #[serde(with = "humantime_serde")] pub compaction_period: Duration, - // Level0 delta layer threshold for compaction. + /// Level0 delta layer threshold for compaction. pub compaction_threshold: usize, pub compaction_algorithm: crate::models::CompactionAlgorithmSettings, + /// Level0 delta layer threshold at which to delay layer flushes for compaction backpressure, + /// such that they take 2x as long, and start waiting for layer flushes during ephemeral layer + /// rolls. This helps compaction keep up with WAL ingestion, and avoids read amplification + /// blowing up. Should be >compaction_threshold. If None, defaults to 2 * compaction_threshold. + /// 0 to disable. + pub l0_flush_delay_threshold: Option, + /// Level0 delta layer threshold at which to stall layer flushes. 0 to disable. If None, + /// defaults to 4 * compaction_threshold. Must be >compaction_threshold to avoid deadlock. + pub l0_flush_stall_threshold: Option, // Determines how much history is retained, to allow // branching and read replicas at an older point in time. // The unit is #of bytes of WAL. @@ -552,6 +561,8 @@ impl Default for TenantConfigToml { compaction_algorithm: crate::models::CompactionAlgorithmSettings { kind: DEFAULT_COMPACTION_ALGORITHM, }, + l0_flush_delay_threshold: None, + l0_flush_stall_threshold: None, gc_horizon: DEFAULT_GC_HORIZON, gc_period: humantime::parse_duration(DEFAULT_GC_PERIOD) .expect("cannot parse default gc period"), diff --git a/libs/pageserver_api/src/models.rs b/libs/pageserver_api/src/models.rs index fd4879087f..16473415b4 100644 --- a/libs/pageserver_api/src/models.rs +++ b/libs/pageserver_api/src/models.rs @@ -462,6 +462,10 @@ pub struct TenantConfigPatch { #[serde(skip_serializing_if = "FieldPatch::is_noop")] pub compaction_algorithm: FieldPatch, #[serde(skip_serializing_if = "FieldPatch::is_noop")] + pub l0_flush_delay_threshold: FieldPatch, + #[serde(skip_serializing_if = "FieldPatch::is_noop")] + pub l0_flush_stall_threshold: FieldPatch, + #[serde(skip_serializing_if = "FieldPatch::is_noop")] pub gc_horizon: FieldPatch, #[serde(skip_serializing_if = "FieldPatch::is_noop")] pub gc_period: FieldPatch, @@ -518,6 +522,8 @@ pub struct TenantConfig { pub compaction_threshold: Option, // defer parsing compaction_algorithm, like eviction_policy pub compaction_algorithm: Option, + pub l0_flush_delay_threshold: Option, + pub l0_flush_stall_threshold: Option, pub gc_horizon: Option, pub gc_period: Option, pub image_creation_threshold: Option, @@ -551,6 +557,8 @@ impl TenantConfig { mut compaction_period, mut compaction_threshold, mut compaction_algorithm, + mut l0_flush_delay_threshold, + mut l0_flush_stall_threshold, mut gc_horizon, mut gc_period, mut image_creation_threshold, @@ -583,6 +591,12 @@ impl TenantConfig { patch.compaction_period.apply(&mut compaction_period); patch.compaction_threshold.apply(&mut compaction_threshold); patch.compaction_algorithm.apply(&mut compaction_algorithm); + patch + .l0_flush_delay_threshold + .apply(&mut l0_flush_delay_threshold); + patch + .l0_flush_stall_threshold + .apply(&mut l0_flush_stall_threshold); patch.gc_horizon.apply(&mut gc_horizon); patch.gc_period.apply(&mut gc_period); patch @@ -635,6 +649,8 @@ impl TenantConfig { compaction_period, compaction_threshold, compaction_algorithm, + l0_flush_delay_threshold, + l0_flush_stall_threshold, gc_horizon, gc_period, image_creation_threshold, diff --git a/pageserver/src/metrics.rs b/pageserver/src/metrics.rs index 985614b6cf..5247a4a2ac 100644 --- a/pageserver/src/metrics.rs +++ b/pageserver/src/metrics.rs @@ -38,6 +38,9 @@ pub(crate) enum StorageTimeOperation { #[strum(serialize = "layer flush")] LayerFlush, + #[strum(serialize = "layer flush delay")] + LayerFlushDelay, + #[strum(serialize = "compact")] Compact, @@ -2508,7 +2511,6 @@ impl Drop for AlwaysRecordingStorageTimeMetricsTimer { impl AlwaysRecordingStorageTimeMetricsTimer { /// Returns the elapsed duration of the timer. - #[allow(unused)] pub fn elapsed(&self) -> Duration { self.0.as_ref().expect("not dropped yet").elapsed() } @@ -2566,6 +2568,7 @@ pub(crate) struct TimelineMetrics { shard_id: String, timeline_id: String, pub flush_time_histo: StorageTimeMetrics, + pub flush_delay_histo: StorageTimeMetrics, pub compact_time_histo: StorageTimeMetrics, pub create_images_time_histo: StorageTimeMetrics, pub logical_size_histo: StorageTimeMetrics, @@ -2611,6 +2614,12 @@ impl TimelineMetrics { &shard_id, &timeline_id, ); + let flush_delay_histo = StorageTimeMetrics::new( + StorageTimeOperation::LayerFlushDelay, + &tenant_id, + &shard_id, + &timeline_id, + ); let compact_time_histo = StorageTimeMetrics::new( StorageTimeOperation::Compact, &tenant_id, @@ -2756,6 +2765,7 @@ impl TimelineMetrics { shard_id, timeline_id, flush_time_histo, + flush_delay_histo, compact_time_histo, create_images_time_histo, logical_size_histo, diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index a273ef5d01..efe89cb982 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -5453,6 +5453,8 @@ pub(crate) mod harness { compaction_period: Some(tenant_conf.compaction_period), compaction_threshold: Some(tenant_conf.compaction_threshold), compaction_algorithm: Some(tenant_conf.compaction_algorithm), + l0_flush_delay_threshold: tenant_conf.l0_flush_delay_threshold, + l0_flush_stall_threshold: tenant_conf.l0_flush_stall_threshold, gc_horizon: Some(tenant_conf.gc_horizon), gc_period: Some(tenant_conf.gc_period), image_creation_threshold: Some(tenant_conf.image_creation_threshold), diff --git a/pageserver/src/tenant/config.rs b/pageserver/src/tenant/config.rs index 3db1445f6e..c870ca97b8 100644 --- a/pageserver/src/tenant/config.rs +++ b/pageserver/src/tenant/config.rs @@ -281,6 +281,14 @@ pub struct TenantConfOpt { #[serde(default)] pub compaction_algorithm: Option, + #[serde(skip_serializing_if = "Option::is_none")] + #[serde(default)] + pub l0_flush_delay_threshold: Option, + + #[serde(skip_serializing_if = "Option::is_none")] + #[serde(default)] + pub l0_flush_stall_threshold: Option, + #[serde(skip_serializing_if = "Option::is_none")] #[serde(default)] pub gc_horizon: Option, @@ -394,6 +402,12 @@ impl TenantConfOpt { .as_ref() .unwrap_or(&global_conf.compaction_algorithm) .clone(), + l0_flush_delay_threshold: self + .l0_flush_delay_threshold + .or(global_conf.l0_flush_delay_threshold), + l0_flush_stall_threshold: self + .l0_flush_stall_threshold + .or(global_conf.l0_flush_stall_threshold), gc_horizon: self.gc_horizon.unwrap_or(global_conf.gc_horizon), gc_period: self.gc_period.unwrap_or(global_conf.gc_period), image_creation_threshold: self @@ -458,6 +472,8 @@ impl TenantConfOpt { mut compaction_period, mut compaction_threshold, mut compaction_algorithm, + mut l0_flush_delay_threshold, + mut l0_flush_stall_threshold, mut gc_horizon, mut gc_period, mut image_creation_threshold, @@ -496,6 +512,12 @@ impl TenantConfOpt { .apply(&mut compaction_period); patch.compaction_threshold.apply(&mut compaction_threshold); patch.compaction_algorithm.apply(&mut compaction_algorithm); + patch + .l0_flush_delay_threshold + .apply(&mut l0_flush_delay_threshold); + patch + .l0_flush_stall_threshold + .apply(&mut l0_flush_stall_threshold); patch.gc_horizon.apply(&mut gc_horizon); patch .gc_period @@ -566,6 +588,8 @@ impl TenantConfOpt { compaction_period, compaction_threshold, compaction_algorithm, + l0_flush_delay_threshold, + l0_flush_stall_threshold, gc_horizon, gc_period, image_creation_threshold, @@ -623,6 +647,8 @@ impl From for models::TenantConfig { compaction_target_size: value.compaction_target_size, compaction_period: value.compaction_period.map(humantime), compaction_threshold: value.compaction_threshold, + l0_flush_delay_threshold: value.l0_flush_delay_threshold, + l0_flush_stall_threshold: value.l0_flush_stall_threshold, gc_horizon: value.gc_horizon, gc_period: value.gc_period.map(humantime), image_creation_threshold: value.image_creation_threshold, diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 56f61abc45..fffa2c8e2b 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -22,11 +22,11 @@ use enumset::EnumSet; use fail::fail_point; use futures::{stream::FuturesUnordered, StreamExt}; use handle::ShardTimelineId; +use layer_manager::Shutdown; use offload::OffloadError; use once_cell::sync::Lazy; use pageserver_api::models::PageTraceEvent; use pageserver_api::{ - config::tenant_conf_defaults::DEFAULT_COMPACTION_THRESHOLD, key::{ KEY_SIZE, METADATA_KEY_BEGIN_PREFIX, METADATA_KEY_END_PREFIX, NON_INHERITED_RANGE, SPARSE_RANGE, @@ -60,20 +60,14 @@ use utils::{ }; use wal_decoder::serialized_batch::{SerializedValueBatch, ValueMeta}; -use std::sync::atomic::Ordering as AtomicOrdering; -use std::sync::OnceLock; -use std::sync::{Arc, Mutex, RwLock, Weak}; +use std::array; +use std::cmp::{max, min}; +use std::collections::btree_map::Entry; +use std::collections::{BTreeMap, HashMap, HashSet}; +use std::ops::{ControlFlow, Deref, Range}; +use std::sync::atomic::{AtomicBool, AtomicU64, Ordering as AtomicOrdering}; +use std::sync::{Arc, Mutex, OnceLock, RwLock, Weak}; use std::time::{Duration, Instant, SystemTime}; -use std::{ - array, - collections::{BTreeMap, HashMap, HashSet}, - sync::atomic::AtomicU64, -}; -use std::{cmp::min, ops::ControlFlow}; -use std::{ - collections::btree_map::Entry, - ops::{Deref, Range}, -}; use crate::l0_flush::{self, L0FlushGlobalState}; use crate::{ @@ -404,6 +398,9 @@ pub struct Timeline { /// Timeline deletion will acquire both compaction and gc locks in whatever order. compaction_lock: tokio::sync::Mutex<()>, + /// If true, the last compaction failed. + compaction_failed: AtomicBool, + /// Make sure we only have one running gc at a time. /// /// Must only be taken in two places: @@ -1698,13 +1695,25 @@ impl Timeline { return Ok(false); } - match self.get_compaction_algorithm_settings().kind { + let result = match self.get_compaction_algorithm_settings().kind { CompactionAlgorithm::Tiered => { self.compact_tiered(cancel, ctx).await?; Ok(false) } CompactionAlgorithm::Legacy => self.compact_legacy(cancel, options, ctx).await, - } + }; + + // Signal compaction failure to avoid L0 flush stalls when it's broken. + let compaction_failed = match result { + Ok(_) => false, + Err(CompactionError::Offload(_)) => false, // doesn't halt compaction + Err(CompactionError::ShuttingDown) => false, // not a failure + Err(CompactionError::Other(_)) => true, + }; + self.compaction_failed + .store(compaction_failed, AtomicOrdering::Relaxed); + + result } /// Mutate the timeline with a [`TimelineWriter`]. @@ -2133,6 +2142,13 @@ impl Timeline { .unwrap_or(self.conf.default_tenant_conf.checkpoint_timeout) } + fn get_compaction_period(&self) -> Duration { + let tenant_conf = self.tenant_conf.load().tenant_conf.clone(); + tenant_conf + .compaction_period + .unwrap_or(self.conf.default_tenant_conf.compaction_period) + } + fn get_compaction_target_size(&self) -> u64 { let tenant_conf = self.tenant_conf.load(); tenant_conf @@ -2149,6 +2165,84 @@ impl Timeline { .unwrap_or(self.conf.default_tenant_conf.compaction_threshold) } + fn get_l0_flush_delay_threshold(&self) -> Option { + // Default to delay L0 flushes at 2x compaction threshold. + const DEFAULT_L0_FLUSH_DELAY_FACTOR: usize = 2; + + // If compaction is disabled, don't delay. + if self.get_compaction_period() == Duration::ZERO { + return None; + } + + let compaction_threshold = self.get_compaction_threshold(); + let tenant_conf = self.tenant_conf.load(); + let l0_flush_delay_threshold = tenant_conf + .tenant_conf + .l0_flush_delay_threshold + .or(self.conf.default_tenant_conf.l0_flush_delay_threshold) + .unwrap_or(DEFAULT_L0_FLUSH_DELAY_FACTOR * compaction_threshold); + + // 0 disables backpressure. + if l0_flush_delay_threshold == 0 { + return None; + } + + // Clamp the flush delay threshold to the compaction threshold; it doesn't make sense to + // backpressure flushes below this. + // TODO: the tenant config should have validation to prevent this instead. + debug_assert!(l0_flush_delay_threshold >= compaction_threshold); + Some(max(l0_flush_delay_threshold, compaction_threshold)) + } + + fn get_l0_flush_stall_threshold(&self) -> Option { + // Default to stall L0 flushes at 4x compaction threshold. + const DEFAULT_L0_FLUSH_STALL_FACTOR: usize = 4; + + // If compaction is disabled, don't stall. + if self.get_compaction_period() == Duration::ZERO { + return None; + } + + // If compaction is failing, don't stall and try to keep the tenant alive. This may not be a + // good idea: read amp can grow unbounded, leading to terrible performance, and we may take + // on unbounded compaction debt that can take a long time to fix once compaction comes back + // online. At least we'll delay flushes, slowing down the growth and buying some time. + if self.compaction_failed.load(AtomicOrdering::Relaxed) { + return None; + } + + let compaction_threshold = self.get_compaction_threshold(); + let tenant_conf = self.tenant_conf.load(); + let l0_flush_stall_threshold = tenant_conf + .tenant_conf + .l0_flush_stall_threshold + .or(self.conf.default_tenant_conf.l0_flush_stall_threshold); + + // Tests sometimes set compaction_threshold=1 to generate lots of layer files, and don't + // handle the 20-second compaction delay. Some (e.g. `test_backward_compatibility`) can't + // easily adjust the L0 backpressure settings, so just disable stalls in this case. + if cfg!(feature = "testing") + && compaction_threshold == 1 + && l0_flush_stall_threshold.is_none() + { + return None; + } + + let l0_flush_stall_threshold = l0_flush_stall_threshold + .unwrap_or(DEFAULT_L0_FLUSH_STALL_FACTOR * compaction_threshold); + + // 0 disables backpressure. + if l0_flush_stall_threshold == 0 { + return None; + } + + // Clamp the flush stall threshold to the compaction threshold; it doesn't make sense to + // backpressure flushes below this. + // TODO: the tenant config should have validation to prevent this instead. + debug_assert!(l0_flush_stall_threshold >= compaction_threshold); + Some(max(l0_flush_stall_threshold, compaction_threshold)) + } + fn get_image_creation_threshold(&self) -> usize { let tenant_conf = self.tenant_conf.load(); tenant_conf @@ -2385,6 +2479,7 @@ impl Timeline { gate: Gate::default(), compaction_lock: tokio::sync::Mutex::default(), + compaction_failed: AtomicBool::default(), gc_lock: tokio::sync::Mutex::default(), standby_horizon: AtomicLsn::new(0), @@ -3600,6 +3695,12 @@ impl Timeline { mut layer_flush_start_rx: tokio::sync::watch::Receiver<(u64, Lsn)>, ctx: &RequestContext, ) { + // Subscribe to L0 delta layer updates, for compaction backpressure. + let mut watch_l0 = match self.layers.read().await.layer_map() { + Ok(lm) => lm.watch_level0_deltas(), + Err(Shutdown) => return, + }; + info!("started flush loop"); loop { tokio::select! { @@ -3630,43 +3731,62 @@ impl Timeline { break Ok(()); } - let timer = self.metrics.flush_time_histo.start_timer(); - - let num_frozen_layers; - let frozen_layer_total_size; - let layer_to_flush = { - let guard = self.layers.read().await; - let Ok(lm) = guard.layer_map() else { + // Fetch the next layer to flush, if any. + let (layer, l0_count, frozen_count, frozen_size) = { + let layers = self.layers.read().await; + let Ok(lm) = layers.layer_map() else { info!("dropping out of flush loop for timeline shutdown"); return; }; - num_frozen_layers = lm.frozen_layers.len(); - frozen_layer_total_size = lm + let l0_count = lm.level0_deltas().len(); + let frozen_count = lm.frozen_layers.len(); + let frozen_size: u64 = lm .frozen_layers .iter() .map(|l| l.estimated_in_mem_size()) - .sum::(); - lm.frozen_layers.front().cloned() - // drop 'layers' lock to allow concurrent reads and writes + .sum(); + let layer = lm.frozen_layers.front().cloned(); + (layer, l0_count, frozen_count, frozen_size) + // drop 'layers' lock }; - let Some(layer_to_flush) = layer_to_flush else { + let Some(layer) = layer else { break Ok(()); }; - if num_frozen_layers - > std::cmp::max( - self.get_compaction_threshold(), - DEFAULT_COMPACTION_THRESHOLD, - ) - && frozen_layer_total_size >= /* 128 MB */ 128000000 - { - tracing::warn!( - "too many frozen layers: {num_frozen_layers} layers with estimated in-mem size of {frozen_layer_total_size} bytes", - ); - } - match self.flush_frozen_layer(layer_to_flush, ctx).await { - Ok(this_layer_to_lsn) => { - flushed_to_lsn = std::cmp::max(flushed_to_lsn, this_layer_to_lsn); + + // Stall flushes to backpressure if compaction can't keep up. This is propagated up + // to WAL ingestion by having ephemeral layer rolls wait for flushes. + // + // NB: the compaction loop only checks `compaction_threshold` every 20 seconds, so + // we can end up stalling before compaction even starts. Consider making it more + // responsive (e.g. via `watch_level0_deltas`). + if let Some(stall_threshold) = self.get_l0_flush_stall_threshold() { + if l0_count >= stall_threshold { + warn!( + "stalling layer flushes for compaction backpressure at {l0_count} \ + L0 layers ({frozen_count} frozen layers with {frozen_size} bytes)" + ); + let stall_timer = self + .metrics + .flush_delay_histo + .start_timer() + .record_on_drop(); + tokio::select! { + result = watch_l0.wait_for(|l0| *l0 < stall_threshold) => { + if let Ok(l0) = result.as_deref() { + let delay = stall_timer.elapsed().as_secs_f64(); + info!("resuming layer flushes at {l0} L0 layers after {delay:.3}s"); + } + }, + _ = self.cancel.cancelled() => {}, + } + continue; // check again } + } + + // Flush the layer. + let flush_timer = self.metrics.flush_time_histo.start_timer(); + match self.flush_frozen_layer(layer, ctx).await { + Ok(layer_lsn) => flushed_to_lsn = max(flushed_to_lsn, layer_lsn), Err(FlushLayerError::Cancelled) => { info!("dropping out of flush loop for timeline shutdown"); return; @@ -3680,7 +3800,30 @@ impl Timeline { break err.map(|_| ()); } } - timer.stop_and_record(); + let flush_duration = flush_timer.stop_and_record(); + + // Delay the next flush to backpressure if compaction can't keep up. We delay by the + // flush duration such that the flush takes 2x as long. This is propagated up to WAL + // ingestion by having ephemeral layer rolls wait for flushes. + if let Some(delay_threshold) = self.get_l0_flush_delay_threshold() { + if l0_count >= delay_threshold { + let delay = flush_duration.as_secs_f64(); + info!( + "delaying layer flush by {delay:.3}s for compaction backpressure at \ + {l0_count} L0 layers ({frozen_count} frozen layers with {frozen_size} bytes)" + ); + let _delay_timer = self + .metrics + .flush_delay_histo + .start_timer() + .record_on_drop(); + tokio::select! { + _ = tokio::time::sleep(flush_duration) => {}, + _ = watch_l0.wait_for(|l0| *l0 < delay_threshold) => {}, + _ = self.cancel.cancelled() => {}, + } + } + } }; // Unsharded tenants should never advance their LSN beyond the end of the @@ -5910,13 +6053,37 @@ impl TimelineWriter<'_> { async fn roll_layer(&mut self, freeze_at: Lsn) -> Result<(), FlushLayerError> { let current_size = self.write_guard.as_ref().unwrap().current_size; + // If layer flushes are backpressured due to compaction not keeping up, wait for the flush + // to propagate the backpressure up into WAL ingestion. + let l0_count = self + .tl + .layers + .read() + .await + .layer_map()? + .level0_deltas() + .len(); + let wait_thresholds = [ + self.get_l0_flush_delay_threshold(), + self.get_l0_flush_stall_threshold(), + ]; + let wait_threshold = wait_thresholds.into_iter().flatten().min(); + // self.write_guard will be taken by the freezing - self.tl + let flush_id = self + .tl .freeze_inmem_layer_at(freeze_at, &mut self.write_guard) .await?; assert!(self.write_guard.is_none()); + if let Some(wait_threshold) = wait_threshold { + if l0_count >= wait_threshold { + info!("layer roll waiting for flush due to compaction backpressure at {l0_count} L0 layers"); + self.tl.wait_flush_completion(flush_id).await?; + } + } + if current_size >= self.get_checkpoint_distance() * 2 { warn!("Flushed oversized open layer with size {}", current_size) } diff --git a/test_runner/fixtures/pageserver/allowed_errors.py b/test_runner/fixtures/pageserver/allowed_errors.py index 5059039678..748ac0d569 100755 --- a/test_runner/fixtures/pageserver/allowed_errors.py +++ b/test_runner/fixtures/pageserver/allowed_errors.py @@ -99,8 +99,11 @@ DEFAULT_PAGESERVER_ALLOWED_ERRORS = ( ".*WARN.*path=/v1/utilization .*request was dropped before completing", # Can happen during shutdown ".*scheduling deletion on drop failed: queue is in state Stopped.*", - # Too many frozen layers error is normal during intensive benchmarks - ".*too many frozen layers.*", + # L0 flush backpressure delays are expected under heavy ingest load. We want to exercise + # this backpressure in tests. + ".*delaying layer flush by \\S+ for compaction backpressure.*", + ".*stalling layer flushes for compaction backpressure.*", + ".*layer roll waiting for flush due to compaction backpressure.*", ) diff --git a/test_runner/performance/test_layer_map.py b/test_runner/performance/test_layer_map.py index 8a4ad2d399..9b159c5fcf 100644 --- a/test_runner/performance/test_layer_map.py +++ b/test_runner/performance/test_layer_map.py @@ -23,6 +23,8 @@ def test_layer_map(neon_env_builder: NeonEnvBuilder, zenbenchmark): "checkpoint_distance": "16384", "compaction_period": "1 s", "compaction_threshold": "1", + "l0_flush_delay_threshold": "0", + "l0_flush_stall_threshold": "0", "compaction_target_size": "16384", } ) diff --git a/test_runner/regress/test_attach_tenant_config.py b/test_runner/regress/test_attach_tenant_config.py index b8d47346a3..1fdba223ad 100644 --- a/test_runner/regress/test_attach_tenant_config.py +++ b/test_runner/regress/test_attach_tenant_config.py @@ -139,6 +139,8 @@ def test_fully_custom_config(positive_env: NeonEnv): fully_custom_config = { "compaction_period": "1h", "compaction_threshold": 13, + "l0_flush_delay_threshold": 25, + "l0_flush_stall_threshold": 42, "compaction_target_size": 1048576, "checkpoint_distance": 10000, "checkpoint_timeout": "13m", diff --git a/test_runner/regress/test_branch_and_gc.py b/test_runner/regress/test_branch_and_gc.py index fccfbc7f09..0e28231a86 100644 --- a/test_runner/regress/test_branch_and_gc.py +++ b/test_runner/regress/test_branch_and_gc.py @@ -64,6 +64,8 @@ def test_branch_and_gc(neon_simple_env: NeonEnv): # tweak the default settings to allow quickly create image layers and L1 layers "compaction_period": "1 s", "compaction_threshold": "2", + "l0_flush_delay_threshold": "20", + "l0_flush_stall_threshold": "40", "image_creation_threshold": "1", # Disable PITR, this test will set an explicit space-based GC limit "pitr_interval": "0 s", diff --git a/test_runner/regress/test_compatibility.py b/test_runner/regress/test_compatibility.py index ac44630d30..cdc6c0053d 100644 --- a/test_runner/regress/test_compatibility.py +++ b/test_runner/regress/test_compatibility.py @@ -143,7 +143,7 @@ def test_create_snapshot( env = neon_env_builder.init_start( initial_tenant_conf={ - # Miniature layers to enable generating non-trivial layer map without writing lots of data + # Miniature layers to enable generating non-trivial layer map without writing lots of data. "checkpoint_distance": f"{128 * 1024}", "compaction_threshold": "1", "compaction_target_size": f"{128 * 1024}", diff --git a/test_runner/regress/test_recovery.py b/test_runner/regress/test_recovery.py index b43a443149..dab01fcd1a 100644 --- a/test_runner/regress/test_recovery.py +++ b/test_runner/regress/test_recovery.py @@ -11,10 +11,13 @@ from fixtures.neon_fixtures import NeonEnvBuilder # Test pageserver recovery after crash # def test_pageserver_recovery(neon_env_builder: NeonEnvBuilder): - # Override default checkpointer settings to run it more often + # Override default checkpointer settings to run it more often. + # This also creates a bunch more L0 layers, so disable backpressure. env = neon_env_builder.init_start( initial_tenant_conf={ "checkpoint_distance": "1048576", + "l0_flush_delay_threshold": "0", + "l0_flush_stall_threshold": "0", } ) env.pageserver.is_testing_enabled_or_skip() diff --git a/test_runner/regress/test_remote_storage.py b/test_runner/regress/test_remote_storage.py index 52b6b254aa..f6bc6f6f41 100644 --- a/test_runner/regress/test_remote_storage.py +++ b/test_runner/regress/test_remote_storage.py @@ -539,6 +539,8 @@ def test_timeline_deletion_with_files_stuck_in_upload_queue( # small checkpointing and compaction targets to ensure we generate many operations "checkpoint_distance": f"{64 * 1024}", "compaction_threshold": "1", + "l0_flush_delay_threshold": "0", + "l0_flush_stall_threshold": "0", "compaction_target_size": f"{64 * 1024}", # large horizon to avoid automatic GC (our assert on gc_result below relies on that) "gc_horizon": f"{1024 ** 4}", diff --git a/test_runner/regress/test_timeline_size.py b/test_runner/regress/test_timeline_size.py index 95bf9106cd..e2fdacdbfc 100644 --- a/test_runner/regress/test_timeline_size.py +++ b/test_runner/regress/test_timeline_size.py @@ -440,7 +440,7 @@ def test_timeline_physical_size_post_compaction(neon_env_builder: NeonEnvBuilder env = neon_env_builder.init_start( initial_tenant_conf={ "checkpoint_distance": "100000", - "compaction_period": "10m", + "compaction_period": "0s", } ) pageserver_http = env.pageserver.http_client() diff --git a/test_runner/regress/test_vm_bits.py b/test_runner/regress/test_vm_bits.py index d9e59c71f4..4865178ca8 100644 --- a/test_runner/regress/test_vm_bits.py +++ b/test_runner/regress/test_vm_bits.py @@ -203,6 +203,9 @@ def test_vm_bit_clear_on_heap_lock_blackbox(neon_env_builder: NeonEnvBuilder): "checkpoint_distance": f"{128 * 1024}", "compaction_target_size": f"{128 * 1024}", "compaction_threshold": "1", + # disable L0 backpressure + "l0_flush_delay_threshold": "0", + "l0_flush_stall_threshold": "0", # create image layers eagerly, so that GC can remove some layers "image_creation_threshold": "1", # set PITR interval to be small, so we can do GC