diff --git a/control_plane/src/pageserver.rs b/control_plane/src/pageserver.rs index 967810ee06..52527ffa90 100644 --- a/control_plane/src/pageserver.rs +++ b/control_plane/src/pageserver.rs @@ -357,6 +357,11 @@ impl PageServerNode { .map(|x| x.parse::()) .transpose() .context("Failed to parse 'l0_flush_delay_threshold' as an integer")?, + l0_flush_wait_upload: settings + .remove("l0_flush_wait_upload") + .map(|x| x.parse::()) + .transpose() + .context("Failed to parse 'l0_flush_wait_upload' as a boolean")?, l0_flush_stall_threshold: settings .remove("l0_flush_stall_threshold") .map(|x| x.parse::()) diff --git a/libs/pageserver_api/src/config.rs b/libs/pageserver_api/src/config.rs index cc6e4a3699..40c8837af5 100644 --- a/libs/pageserver_api/src/config.rs +++ b/libs/pageserver_api/src/config.rs @@ -265,6 +265,10 @@ pub struct TenantConfigToml { /// Level0 delta layer threshold at which to stall layer flushes. Must be >compaction_threshold /// to avoid deadlock. 0 to disable. Disabled by default. pub l0_flush_stall_threshold: Option, + /// If true, Level0 delta layer flushes will wait for S3 upload before flushing the next + /// layer. This is a temporary backpressure mechanism which should be removed once + /// l0_flush_{delay,stall}_threshold is fully enabled. + pub l0_flush_wait_upload: bool, // Determines how much history is retained, to allow // branching and read replicas at an older point in time. // The unit is #of bytes of WAL. @@ -522,6 +526,8 @@ pub mod tenant_conf_defaults { pub const DEFAULT_COMPACTION_ALGORITHM: crate::models::CompactionAlgorithm = crate::models::CompactionAlgorithm::Legacy; + pub const DEFAULT_L0_FLUSH_WAIT_UPLOAD: bool = true; + pub const DEFAULT_GC_HORIZON: u64 = 64 * 1024 * 1024; // Large DEFAULT_GC_PERIOD is fine as long as PITR_INTERVAL is larger. @@ -562,6 +568,7 @@ impl Default for TenantConfigToml { }, l0_flush_delay_threshold: None, l0_flush_stall_threshold: None, + l0_flush_wait_upload: DEFAULT_L0_FLUSH_WAIT_UPLOAD, gc_horizon: DEFAULT_GC_HORIZON, gc_period: humantime::parse_duration(DEFAULT_GC_PERIOD) .expect("cannot parse default gc period"), diff --git a/libs/pageserver_api/src/models.rs b/libs/pageserver_api/src/models.rs index 16473415b4..dcc233c7c4 100644 --- a/libs/pageserver_api/src/models.rs +++ b/libs/pageserver_api/src/models.rs @@ -466,6 +466,8 @@ pub struct TenantConfigPatch { #[serde(skip_serializing_if = "FieldPatch::is_noop")] pub l0_flush_stall_threshold: FieldPatch, #[serde(skip_serializing_if = "FieldPatch::is_noop")] + pub l0_flush_wait_upload: FieldPatch, + #[serde(skip_serializing_if = "FieldPatch::is_noop")] pub gc_horizon: FieldPatch, #[serde(skip_serializing_if = "FieldPatch::is_noop")] pub gc_period: FieldPatch, @@ -524,6 +526,7 @@ pub struct TenantConfig { pub compaction_algorithm: Option, pub l0_flush_delay_threshold: Option, pub l0_flush_stall_threshold: Option, + pub l0_flush_wait_upload: Option, pub gc_horizon: Option, pub gc_period: Option, pub image_creation_threshold: Option, @@ -559,6 +562,7 @@ impl TenantConfig { mut compaction_algorithm, mut l0_flush_delay_threshold, mut l0_flush_stall_threshold, + mut l0_flush_wait_upload, mut gc_horizon, mut gc_period, mut image_creation_threshold, @@ -597,6 +601,7 @@ impl TenantConfig { patch .l0_flush_stall_threshold .apply(&mut l0_flush_stall_threshold); + patch.l0_flush_wait_upload.apply(&mut l0_flush_wait_upload); patch.gc_horizon.apply(&mut gc_horizon); patch.gc_period.apply(&mut gc_period); patch @@ -651,6 +656,7 @@ impl TenantConfig { compaction_algorithm, l0_flush_delay_threshold, l0_flush_stall_threshold, + l0_flush_wait_upload, gc_horizon, gc_period, image_creation_threshold, diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index d359270be4..4385fe9a9b 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -5467,6 +5467,7 @@ pub(crate) mod harness { compaction_algorithm: Some(tenant_conf.compaction_algorithm), l0_flush_delay_threshold: tenant_conf.l0_flush_delay_threshold, l0_flush_stall_threshold: tenant_conf.l0_flush_stall_threshold, + l0_flush_wait_upload: Some(tenant_conf.l0_flush_wait_upload), gc_horizon: Some(tenant_conf.gc_horizon), gc_period: Some(tenant_conf.gc_period), image_creation_threshold: Some(tenant_conf.image_creation_threshold), diff --git a/pageserver/src/tenant/config.rs b/pageserver/src/tenant/config.rs index c870ca97b8..50da998c30 100644 --- a/pageserver/src/tenant/config.rs +++ b/pageserver/src/tenant/config.rs @@ -289,6 +289,10 @@ pub struct TenantConfOpt { #[serde(default)] pub l0_flush_stall_threshold: Option, + #[serde(skip_serializing_if = "Option::is_none")] + #[serde(default)] + pub l0_flush_wait_upload: Option, + #[serde(skip_serializing_if = "Option::is_none")] #[serde(default)] pub gc_horizon: Option, @@ -408,6 +412,9 @@ impl TenantConfOpt { l0_flush_stall_threshold: self .l0_flush_stall_threshold .or(global_conf.l0_flush_stall_threshold), + l0_flush_wait_upload: self + .l0_flush_wait_upload + .unwrap_or(global_conf.l0_flush_wait_upload), gc_horizon: self.gc_horizon.unwrap_or(global_conf.gc_horizon), gc_period: self.gc_period.unwrap_or(global_conf.gc_period), image_creation_threshold: self @@ -474,6 +481,7 @@ impl TenantConfOpt { mut compaction_algorithm, mut l0_flush_delay_threshold, mut l0_flush_stall_threshold, + mut l0_flush_wait_upload, mut gc_horizon, mut gc_period, mut image_creation_threshold, @@ -518,6 +526,7 @@ impl TenantConfOpt { patch .l0_flush_stall_threshold .apply(&mut l0_flush_stall_threshold); + patch.l0_flush_wait_upload.apply(&mut l0_flush_wait_upload); patch.gc_horizon.apply(&mut gc_horizon); patch .gc_period @@ -590,6 +599,7 @@ impl TenantConfOpt { compaction_algorithm, l0_flush_delay_threshold, l0_flush_stall_threshold, + l0_flush_wait_upload, gc_horizon, gc_period, image_creation_threshold, @@ -649,6 +659,7 @@ impl From for models::TenantConfig { compaction_threshold: value.compaction_threshold, l0_flush_delay_threshold: value.l0_flush_delay_threshold, l0_flush_stall_threshold: value.l0_flush_stall_threshold, + l0_flush_wait_upload: value.l0_flush_wait_upload, gc_horizon: value.gc_horizon, gc_period: value.gc_period.map(humantime), image_creation_threshold: value.image_creation_threshold, diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 280a3baa21..de990a9fe4 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -2250,6 +2250,14 @@ impl Timeline { Some(max(l0_flush_stall_threshold, compaction_threshold)) } + fn get_l0_flush_wait_upload(&self) -> bool { + let tenant_conf = self.tenant_conf.load(); + tenant_conf + .tenant_conf + .l0_flush_wait_upload + .unwrap_or(self.conf.default_tenant_conf.l0_flush_wait_upload) + } + fn get_image_creation_threshold(&self) -> usize { let tenant_conf = self.tenant_conf.load(); tenant_conf @@ -4034,21 +4042,24 @@ impl Timeline { // Backpressure mechanism: wait with continuation of the flush loop until we have uploaded all layer files. // This makes us refuse ingest until the new layers have been persisted to the remote - let start = Instant::now(); - self.remote_client - .wait_completion() - .await - .map_err(|e| match e { - WaitCompletionError::UploadQueueShutDownOrStopped - | WaitCompletionError::NotInitialized( - NotInitialized::ShuttingDown | NotInitialized::Stopped, - ) => FlushLayerError::Cancelled, - WaitCompletionError::NotInitialized(NotInitialized::Uninitialized) => { - FlushLayerError::Other(anyhow!(e).into()) - } - })?; - let duration = start.elapsed().as_secs_f64(); - self.metrics.flush_wait_upload_time_gauge_add(duration); + // TODO: remove this, and rely on l0_flush_{delay,stall}_threshold instead. + if self.get_l0_flush_wait_upload() { + let start = Instant::now(); + self.remote_client + .wait_completion() + .await + .map_err(|e| match e { + WaitCompletionError::UploadQueueShutDownOrStopped + | WaitCompletionError::NotInitialized( + NotInitialized::ShuttingDown | NotInitialized::Stopped, + ) => FlushLayerError::Cancelled, + WaitCompletionError::NotInitialized(NotInitialized::Uninitialized) => { + FlushLayerError::Other(anyhow!(e).into()) + } + })?; + let duration = start.elapsed().as_secs_f64(); + self.metrics.flush_wait_upload_time_gauge_add(duration); + } // FIXME: between create_delta_layer and the scheduling of the upload in `update_metadata_file`, // a compaction can delete the file and then it won't be available for uploads any more. diff --git a/test_runner/regress/test_attach_tenant_config.py b/test_runner/regress/test_attach_tenant_config.py index 1fdba223ad..8b92e4c442 100644 --- a/test_runner/regress/test_attach_tenant_config.py +++ b/test_runner/regress/test_attach_tenant_config.py @@ -141,6 +141,7 @@ def test_fully_custom_config(positive_env: NeonEnv): "compaction_threshold": 13, "l0_flush_delay_threshold": 25, "l0_flush_stall_threshold": 42, + "l0_flush_wait_upload": True, "compaction_target_size": 1048576, "checkpoint_distance": 10000, "checkpoint_timeout": "13m",