diff --git a/control_plane/src/pageserver.rs b/control_plane/src/pageserver.rs index 3f66960edd..3673d1f4f2 100644 --- a/control_plane/src/pageserver.rs +++ b/control_plane/src/pageserver.rs @@ -452,6 +452,12 @@ impl PageServerNode { .map(|x| x.parse::()) .transpose() .context("Failed to parse 'image_creation_threshold' as non zero integer")?, + // HADRON + image_layer_force_creation_period: settings + .remove("image_layer_force_creation_period") + .map(humantime::parse_duration) + .transpose() + .context("Failed to parse 'image_layer_force_creation_period' as duration")?, image_layer_creation_check_threshold: settings .remove("image_layer_creation_check_threshold") .map(|x| x.parse::()) diff --git a/libs/pageserver_api/src/config.rs b/libs/pageserver_api/src/config.rs index 22815955c1..9e9c7a4dcb 100644 --- a/libs/pageserver_api/src/config.rs +++ b/libs/pageserver_api/src/config.rs @@ -272,6 +272,8 @@ pub struct ConfigToml { pub timeline_import_config: TimelineImportConfig, #[serde(skip_serializing_if = "Option::is_none")] pub basebackup_cache_config: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub image_layer_generation_large_timeline_threshold: Option, } #[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] @@ -561,6 +563,11 @@ pub struct TenantConfigToml { pub gc_period: Duration, // Delta layer churn threshold to create L1 image layers. pub image_creation_threshold: usize, + // HADRON + // When the timeout is reached, PageServer will (1) force compact any remaining L0 deltas and + // (2) create image layers if there are any L1 deltas. + #[serde(with = "humantime_serde")] + pub image_layer_force_creation_period: Option, // Determines how much history is retained, to allow // branching and read replicas at an older point in time. // The unit is time. @@ -823,6 +830,7 @@ impl Default for ConfigToml { }, basebackup_cache_config: None, posthog_config: None, + image_layer_generation_large_timeline_threshold: Some(2 * 1024 * 1024 * 1024), } } } @@ -916,6 +924,7 @@ impl Default for TenantConfigToml { gc_period: humantime::parse_duration(DEFAULT_GC_PERIOD) .expect("cannot parse default gc period"), image_creation_threshold: DEFAULT_IMAGE_CREATION_THRESHOLD, + image_layer_force_creation_period: None, pitr_interval: humantime::parse_duration(DEFAULT_PITR_INTERVAL) .expect("cannot parse default PITR interval"), walreceiver_connect_timeout: humantime::parse_duration( diff --git a/libs/pageserver_api/src/models.rs b/libs/pageserver_api/src/models.rs index 6735320484..56dd95eab3 100644 --- a/libs/pageserver_api/src/models.rs +++ b/libs/pageserver_api/src/models.rs @@ -597,6 +597,9 @@ pub struct TenantConfigPatch { pub gc_period: FieldPatch, #[serde(skip_serializing_if = "FieldPatch::is_noop")] pub image_creation_threshold: FieldPatch, + // HADRON + #[serde(skip_serializing_if = "FieldPatch::is_noop")] + pub image_layer_force_creation_period: FieldPatch, #[serde(skip_serializing_if = "FieldPatch::is_noop")] pub pitr_interval: FieldPatch, #[serde(skip_serializing_if = "FieldPatch::is_noop")] @@ -700,6 +703,11 @@ pub struct TenantConfig { #[serde(skip_serializing_if = "Option::is_none")] pub image_creation_threshold: Option, + // HADRON + #[serde(skip_serializing_if = "Option::is_none")] + #[serde(with = "humantime_serde")] + pub image_layer_force_creation_period: Option, + #[serde(skip_serializing_if = "Option::is_none")] #[serde(with = "humantime_serde")] pub pitr_interval: Option, @@ -798,6 +806,7 @@ impl TenantConfig { mut gc_horizon, mut gc_period, mut image_creation_threshold, + mut image_layer_force_creation_period, mut pitr_interval, mut walreceiver_connect_timeout, mut lagging_wal_timeout, @@ -861,6 +870,11 @@ impl TenantConfig { patch .image_creation_threshold .apply(&mut image_creation_threshold); + // HADRON + patch + .image_layer_force_creation_period + .map(|v| humantime::parse_duration(&v))? + .apply(&mut image_layer_force_creation_period); patch .pitr_interval .map(|v| humantime::parse_duration(&v))? @@ -942,6 +956,7 @@ impl TenantConfig { gc_horizon, gc_period, image_creation_threshold, + image_layer_force_creation_period, pitr_interval, walreceiver_connect_timeout, lagging_wal_timeout, @@ -1016,6 +1031,9 @@ impl TenantConfig { image_creation_threshold: self .image_creation_threshold .unwrap_or(global_conf.image_creation_threshold), + image_layer_force_creation_period: self + .image_layer_force_creation_period + .or(global_conf.image_layer_force_creation_period), pitr_interval: self.pitr_interval.unwrap_or(global_conf.pitr_interval), walreceiver_connect_timeout: self .walreceiver_connect_timeout diff --git a/pageserver/src/config.rs b/pageserver/src/config.rs index 15ec31b0a6..f64c5838ff 100644 --- a/pageserver/src/config.rs +++ b/pageserver/src/config.rs @@ -252,6 +252,10 @@ pub struct PageServerConf { pub timeline_import_config: pageserver_api::config::TimelineImportConfig, pub basebackup_cache_config: Option, + + /// Defines what is a big tenant for the purpose of image layer generation. + /// See Timeline::should_check_if_image_layers_required + pub image_layer_generation_large_timeline_threshold: Option, } /// Token for authentication to safekeepers @@ -432,6 +436,7 @@ impl PageServerConf { posthog_config, timeline_import_config, basebackup_cache_config, + image_layer_generation_large_timeline_threshold, } = config_toml; let mut conf = PageServerConf { @@ -490,6 +495,7 @@ impl PageServerConf { dev_mode, timeline_import_config, basebackup_cache_config, + image_layer_generation_large_timeline_threshold, // ------------------------------------------------------------ // fields that require additional validation or custom handling diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index 240ba36236..7e2e6d96b8 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -4171,6 +4171,15 @@ impl TenantShard { .unwrap_or(self.conf.default_tenant_conf.image_creation_threshold) } + // HADRON + pub fn get_image_creation_timeout(&self) -> Option { + let tenant_conf = self.tenant_conf.load().tenant_conf.clone(); + tenant_conf.image_layer_force_creation_period.or(self + .conf + .default_tenant_conf + .image_layer_force_creation_period) + } + pub fn get_pitr_interval(&self) -> Duration { let tenant_conf = self.tenant_conf.load().tenant_conf.clone(); tenant_conf diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 0a026d288e..a9bc0a060b 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -351,6 +351,13 @@ pub struct Timeline { last_image_layer_creation_check_at: AtomicLsn, last_image_layer_creation_check_instant: std::sync::Mutex>, + // HADRON + /// If a key range has writes with LSN > force_image_creation_lsn, then we should force image layer creation + /// on this key range. + force_image_creation_lsn: AtomicLsn, + /// The last time instant when force_image_creation_lsn is computed. + force_image_creation_lsn_computed_at: std::sync::Mutex>, + /// Current logical size of the "datadir", at the last LSN. current_logical_size: LogicalSize, @@ -2846,6 +2853,18 @@ impl Timeline { .unwrap_or(self.conf.default_tenant_conf.image_creation_threshold) } + // HADRON + fn get_image_creation_timeout(&self) -> Option { + let tenant_conf = self.tenant_conf.load(); + tenant_conf + .tenant_conf + .image_layer_force_creation_period + .or(self + .conf + .default_tenant_conf + .image_layer_force_creation_period) + } + fn get_compaction_algorithm_settings(&self) -> CompactionAlgorithmSettings { let tenant_conf = &self.tenant_conf.load(); tenant_conf @@ -3115,7 +3134,9 @@ impl Timeline { repartition_threshold: 0, last_image_layer_creation_check_at: AtomicLsn::new(0), last_image_layer_creation_check_instant: Mutex::new(None), - + // HADRON + force_image_creation_lsn: AtomicLsn::new(0), + force_image_creation_lsn_computed_at: std::sync::Mutex::new(None), last_received_wal: Mutex::new(None), rel_size_latest_cache: RwLock::new(HashMap::new()), rel_size_snapshot_cache: Mutex::new(LruCache::new(relsize_snapshot_cache_capacity)), @@ -5036,6 +5057,7 @@ impl Timeline { .create_image_layers( &partitions, self.initdb_lsn, + None, ImageLayerCreationMode::Initial, ctx, LastImageLayerCreationStatus::Initial, @@ -5307,14 +5329,19 @@ impl Timeline { } // Is it time to create a new image layer for the given partition? True if we want to generate. - async fn time_for_new_image_layer(&self, partition: &KeySpace, lsn: Lsn) -> bool { + async fn time_for_new_image_layer( + &self, + partition: &KeySpace, + lsn: Lsn, + force_image_creation_lsn: Option, + ) -> bool { let threshold = self.get_image_creation_threshold(); let guard = self.layers.read(LayerManagerLockHolder::Compaction).await; let Ok(layers) = guard.layer_map() else { return false; }; - + let mut min_image_lsn: Lsn = Lsn::MAX; let mut max_deltas = 0; for part_range in &partition.ranges { let image_coverage = layers.image_coverage(part_range, lsn); @@ -5349,9 +5376,22 @@ impl Timeline { return true; } } + min_image_lsn = min(min_image_lsn, img_lsn); } } + // HADRON + if min_image_lsn < force_image_creation_lsn.unwrap_or(Lsn(0)) && max_deltas > 0 { + info!( + "forcing image creation for partitioned range {}-{}. Min image LSN: {}, force image creation LSN: {}", + partition.ranges[0].start, + partition.ranges[0].end, + min_image_lsn, + force_image_creation_lsn.unwrap() + ); + return true; + } + debug!( max_deltas, "none of the partitioned ranges had >= {threshold} deltas" @@ -5577,7 +5617,7 @@ impl Timeline { /// suffer from the lack of image layers /// 2. For small tenants (that can mostly fit in RAM), we use a much longer interval fn should_check_if_image_layers_required(self: &Arc, lsn: Lsn) -> bool { - const LARGE_TENANT_THRESHOLD: u64 = 2 * 1024 * 1024 * 1024; + let large_timeline_threshold = self.conf.image_layer_generation_large_timeline_threshold; let last_checks_at = self.last_image_layer_creation_check_at.load(); let distance = lsn @@ -5591,12 +5631,12 @@ impl Timeline { let mut time_based_decision = false; let mut last_check_instant = self.last_image_layer_creation_check_instant.lock().unwrap(); if let CurrentLogicalSize::Exact(logical_size) = self.current_logical_size.current_size() { - let check_required_after = if Into::::into(&logical_size) >= LARGE_TENANT_THRESHOLD - { - self.get_checkpoint_timeout() - } else { - Duration::from_secs(3600 * 48) - }; + let check_required_after = + if Some(Into::::into(&logical_size)) >= large_timeline_threshold { + self.get_checkpoint_timeout() + } else { + Duration::from_secs(3600 * 48) + }; time_based_decision = match *last_check_instant { Some(last_check) => { @@ -5624,10 +5664,12 @@ impl Timeline { /// true = we have generate all image layers, false = we preempt the process for L0 compaction. /// /// `partition_mode` is only for logging purpose and is not used anywhere in this function. + #[allow(clippy::too_many_arguments)] async fn create_image_layers( self: &Arc, partitioning: &KeyPartitioning, lsn: Lsn, + force_image_creation_lsn: Option, mode: ImageLayerCreationMode, ctx: &RequestContext, last_status: LastImageLayerCreationStatus, @@ -5731,7 +5773,11 @@ impl Timeline { } else if let ImageLayerCreationMode::Try = mode { // check_for_image_layers = false -> skip // check_for_image_layers = true -> check time_for_new_image_layer -> skip/generate - if !check_for_image_layers || !self.time_for_new_image_layer(partition, lsn).await { + if !check_for_image_layers + || !self + .time_for_new_image_layer(partition, lsn, force_image_creation_lsn) + .await + { start = img_range.end; continue; } diff --git a/pageserver/src/tenant/timeline/compaction.rs b/pageserver/src/tenant/timeline/compaction.rs index 18a0ca852d..171f9d1284 100644 --- a/pageserver/src/tenant/timeline/compaction.rs +++ b/pageserver/src/tenant/timeline/compaction.rs @@ -4,10 +4,11 @@ //! //! The old legacy algorithm is implemented directly in `timeline.rs`. +use std::cmp::min; use std::collections::{BinaryHeap, HashMap, HashSet, VecDeque}; use std::ops::{Deref, Range}; use std::sync::Arc; -use std::time::{Duration, Instant}; +use std::time::{Duration, Instant, SystemTime}; use super::layer_manager::LayerManagerLockHolder; use super::{ @@ -33,6 +34,7 @@ use pageserver_api::models::{CompactInfoResponse, CompactKeyRange}; use pageserver_api::shard::{ShardCount, ShardIdentity, TenantShardId}; use pageserver_compaction::helpers::{fully_contains, overlaps_with}; use pageserver_compaction::interface::*; +use postgres_ffi::to_pg_timestamp; use serde::Serialize; use tokio::sync::{OwnedSemaphorePermit, Semaphore}; use tokio_util::sync::CancellationToken; @@ -45,6 +47,7 @@ use wal_decoder::models::value::Value; use crate::context::{AccessStatsBehavior, RequestContext, RequestContextBuilder}; use crate::page_cache; +use crate::pgdatadir_mapping::LsnForTimestamp; use crate::statvfs::Statvfs; use crate::tenant::checks::check_valid_layermap; use crate::tenant::gc_block::GcBlock; @@ -1267,6 +1270,12 @@ impl Timeline { // Define partitioning schema if needed + // HADRON + let force_image_creation_lsn = self + .get_or_compute_force_image_creation_lsn(cancel, ctx) + .await + .map_err(CompactionError::Other)?; + // 1. L0 Compact let l0_outcome = { let timer = self.metrics.compact_time_histo.start_timer(); @@ -1274,6 +1283,7 @@ impl Timeline { .compact_level0( target_file_size, options.flags.contains(CompactFlags::ForceL0Compaction), + force_image_creation_lsn, ctx, ) .await?; @@ -1376,6 +1386,7 @@ impl Timeline { .create_image_layers( &partitioning, lsn, + force_image_creation_lsn, mode, &image_ctx, self.last_image_layer_creation_status @@ -1472,6 +1483,63 @@ impl Timeline { Ok(CompactionOutcome::Done) } + /* BEGIN_HADRON */ + // Get the force image creation LSN. Compute it if the last computed LSN is too old. + async fn get_or_compute_force_image_creation_lsn( + self: &Arc, + cancel: &CancellationToken, + ctx: &RequestContext, + ) -> anyhow::Result> { + const FORCE_IMAGE_CREATION_LSN_COMPUTE_INTERVAL: Duration = Duration::from_secs(10 * 60); // 10 minutes + let image_layer_force_creation_period = self.get_image_creation_timeout(); + if image_layer_force_creation_period.is_none() { + return Ok(None); + } + + let image_layer_force_creation_period = image_layer_force_creation_period.unwrap(); + let force_image_creation_lsn_computed_at = + *self.force_image_creation_lsn_computed_at.lock().unwrap(); + if force_image_creation_lsn_computed_at.is_none() + || force_image_creation_lsn_computed_at.unwrap().elapsed() + > FORCE_IMAGE_CREATION_LSN_COMPUTE_INTERVAL + { + let now: SystemTime = SystemTime::now(); + let timestamp = now + .checked_sub(image_layer_force_creation_period) + .ok_or_else(|| { + anyhow::anyhow!( + "image creation timeout is too large: {image_layer_force_creation_period:?}" + ) + })?; + let timestamp = to_pg_timestamp(timestamp); + let force_image_creation_lsn = match self + .find_lsn_for_timestamp(timestamp, cancel, ctx) + .await? + { + LsnForTimestamp::Present(lsn) | LsnForTimestamp::Future(lsn) => lsn, + _ => { + let gc_lsn = *self.get_applied_gc_cutoff_lsn(); + tracing::info!( + "no LSN found for timestamp {timestamp:?}, using latest GC cutoff LSN {}", + gc_lsn + ); + gc_lsn + } + }; + self.force_image_creation_lsn + .store(force_image_creation_lsn); + *self.force_image_creation_lsn_computed_at.lock().unwrap() = Some(Instant::now()); + tracing::info!( + "computed force image creation LSN: {}", + force_image_creation_lsn + ); + Ok(Some(force_image_creation_lsn)) + } else { + Ok(Some(self.force_image_creation_lsn.load())) + } + } + /* END_HADRON */ + /// Check for layers that are elegible to be rewritten: /// - Shard splitting: After a shard split, ancestor layers beyond pitr_interval, so that /// we don't indefinitely retain keys in this shard that aren't needed. @@ -1801,6 +1869,7 @@ impl Timeline { self: &Arc, target_file_size: u64, force_compaction_ignore_threshold: bool, + force_compaction_lsn: Option, ctx: &RequestContext, ) -> Result { let CompactLevel0Phase1Result { @@ -1821,6 +1890,7 @@ impl Timeline { stats, target_file_size, force_compaction_ignore_threshold, + force_compaction_lsn, &ctx, ) .instrument(phase1_span) @@ -1843,6 +1913,7 @@ impl Timeline { mut stats: CompactLevel0Phase1StatsBuilder, target_file_size: u64, force_compaction_ignore_threshold: bool, + force_compaction_lsn: Option, ctx: &RequestContext, ) -> Result { let begin = tokio::time::Instant::now(); @@ -1872,11 +1943,28 @@ impl Timeline { return Ok(CompactLevel0Phase1Result::default()); } } else { - debug!( - level0_deltas = level0_deltas.len(), - threshold, "too few deltas to compact" - ); - return Ok(CompactLevel0Phase1Result::default()); + // HADRON + let min_lsn = level0_deltas + .iter() + .map(|a| a.get_lsn_range().start) + .reduce(min); + if force_compaction_lsn.is_some() + && min_lsn.is_some() + && min_lsn.unwrap() < force_compaction_lsn.unwrap() + { + info!( + "forcing L0 compaction of {} L0 deltas. Min lsn: {}, force compaction lsn: {}", + level0_deltas.len(), + min_lsn.unwrap(), + force_compaction_lsn.unwrap() + ); + } else { + debug!( + level0_deltas = level0_deltas.len(), + threshold, "too few deltas to compact" + ); + return Ok(CompactLevel0Phase1Result::default()); + } } } diff --git a/test_runner/regress/test_attach_tenant_config.py b/test_runner/regress/test_attach_tenant_config.py index 7788faceb4..eaaa3014a5 100644 --- a/test_runner/regress/test_attach_tenant_config.py +++ b/test_runner/regress/test_attach_tenant_config.py @@ -165,6 +165,7 @@ def test_fully_custom_config(positive_env: NeonEnv): "gc_horizon": 23 * (1024 * 1024), "gc_period": "2h 13m", "image_creation_threshold": 7, + "image_layer_force_creation_period": "1m", "pitr_interval": "1m", "lagging_wal_timeout": "23m", "lazy_slru_download": True, diff --git a/test_runner/regress/test_compaction.py b/test_runner/regress/test_compaction.py index 1570d40ae9..e67161c6b7 100644 --- a/test_runner/regress/test_compaction.py +++ b/test_runner/regress/test_compaction.py @@ -944,3 +944,78 @@ def test_image_layer_compression(neon_env_builder: NeonEnvBuilder, enabled: bool f"SELECT count(*) FROM foo WHERE id={v} and val=repeat('abcde{v:0>3}', 500)" ) assert res[0][0] == 1 + + +# BEGIN_HADRON +def get_layer_map(env, tenant_shard_id, timeline_id, ps_id): + client = env.pageservers[ps_id].http_client() + layer_map = client.layer_map_info(tenant_shard_id, timeline_id) + image_layer_count = 0 + delta_layer_count = 0 + for layer in layer_map.historic_layers: + if layer.kind == "Image": + image_layer_count += 1 + elif layer.kind == "Delta": + delta_layer_count += 1 + return image_layer_count, delta_layer_count + + +def test_image_creation_timeout(neon_env_builder: NeonEnvBuilder): + """ + Tests that page server can force creating new images if image creation timeout is enabled + """ + # use large knobs to disable L0 compaction/image creation except for the force image creation + tenant_conf = { + "compaction_threshold": "100", + "image_creation_threshold": "100", + "image_layer_creation_check_threshold": "1", + "checkpoint_distance": 10 * 1024, + "checkpoint_timeout": "1s", + "image_layer_force_creation_period": "1s", + # The lsn for forced image layer creations is calculated once every 10 minutes. + # Hence, drive compaction manually such that the test doesn't compute it at the + # wrong time. + "compaction_period": "0s", + } + + # consider every tenant large to run the image layer generation check more eagerly + neon_env_builder.pageserver_config_override = ( + "image_layer_generation_large_timeline_threshold=0" + ) + + neon_env_builder.num_pageservers = 1 + neon_env_builder.num_safekeepers = 1 + env = neon_env_builder.init_start(initial_tenant_conf=tenant_conf) + + tenant_id = env.initial_tenant + timeline_id = env.initial_timeline + + endpoint = env.endpoints.create_start("main") + endpoint.safe_psql("CREATE TABLE foo (id INTEGER PRIMARY KEY, val text)") + # Generate some rows. + for v in range(10): + endpoint.safe_psql(f"INSERT INTO foo (id, val) VALUES ({v}, repeat('abcde{v:0>3}', 500))") + + # Sleep a bit such that the inserts are considered when calculating the forced image layer creation LSN. + time.sleep(2) + + def check_force_image_creation(): + ps_http = env.pageserver.http_client() + ps_http.timeline_compact(tenant_id, timeline_id) + image, delta = get_layer_map(env, tenant_id, timeline_id, 0) + log.info(f"images: {image}, deltas: {delta}") + assert image > 0 + + env.pageserver.assert_log_contains("forcing L0 compaction of") + env.pageserver.assert_log_contains("forcing image creation for partitioned range") + + wait_until(check_force_image_creation) + + endpoint.stop_and_destroy() + + env.pageserver.allowed_errors.append( + ".*created delta file of size.*larger than double of target.*" + ) + + +# END_HADRON