From e593cbaabafafb6f54c9ddcd5d1d9f04d1bd4490 Mon Sep 17 00:00:00 2001 From: Arseny Sher Date: Mon, 8 Aug 2022 12:47:50 +0300 Subject: [PATCH] Add pageserver checkpoint_timeout option. To flush inmemory layer eventually when no new data arrives, which helps safekeepers to suspend activity (stop pushing to the broker). Default 10m should be ok. --- control_plane/src/storage.rs | 2 ++ docs/settings.md | 12 +++++-- pageserver/src/config.rs | 8 +++++ pageserver/src/http/models.rs | 3 ++ pageserver/src/http/openapi_spec.yml | 4 +++ pageserver/src/http/routes.rs | 9 ++++++ pageserver/src/layered_repository.rs | 7 +++++ pageserver/src/layered_repository/timeline.rs | 31 +++++++++++++++---- pageserver/src/page_service.rs | 7 +++++ pageserver/src/repository.rs | 1 + pageserver/src/tenant_config.rs | 14 +++++++++ .../src/walreceiver/walreceiver_connection.rs | 16 ++++------ safekeeper/src/safekeeper.rs | 6 +++- safekeeper/src/timeline.rs | 2 +- test_runner/batch_others/test_wal_acceptor.py | 5 ++- 15 files changed, 106 insertions(+), 21 deletions(-) diff --git a/control_plane/src/storage.rs b/control_plane/src/storage.rs index c2ed3fc824..d2742e84bb 100644 --- a/control_plane/src/storage.rs +++ b/control_plane/src/storage.rs @@ -401,6 +401,7 @@ impl PageServerNode { .get("checkpoint_distance") .map(|x| x.parse::()) .transpose()?, + checkpoint_timeout: settings.get("checkpoint_timeout").map(|x| x.to_string()), compaction_target_size: settings .get("compaction_target_size") .map(|x| x.parse::()) @@ -455,6 +456,7 @@ impl PageServerNode { .map(|x| x.parse::()) .transpose() .context("Failed to parse 'checkpoint_distance' as an integer")?, + checkpoint_timeout: settings.get("checkpoint_timeout").map(|x| x.to_string()), compaction_target_size: settings .get("compaction_target_size") .map(|x| x.parse::()) diff --git a/docs/settings.md b/docs/settings.md index f2aaab75a8..5a0e976b47 100644 --- a/docs/settings.md +++ b/docs/settings.md @@ -15,7 +15,7 @@ listen_pg_addr = '127.0.0.1:64000' listen_http_addr = '127.0.0.1:9898' checkpoint_distance = '268435456' # in bytes -checkpoint_period = '1 s' +checkpoint_timeout = '10m' gc_period = '100 s' gc_horizon = '67108864' @@ -46,7 +46,7 @@ Note the `[remote_storage]` section: it's a [table](https://toml.io/en/v1.0.0#ta All values can be passed as an argument to the pageserver binary, using the `-c` parameter and specified as a valid TOML string. All tables should be passed in the inline form. -Example: `${PAGESERVER_BIN} -c "checkpoint_period = '100 s'" -c "remote_storage={local_path='/some/local/path/'}"` +Example: `${PAGESERVER_BIN} -c "checkpoint_timeout = '10 m'" -c "remote_storage={local_path='/some/local/path/'}"` Note that TOML distinguishes between strings and integers, the former require single or double quotes around them. @@ -82,6 +82,14 @@ S3. The unit is # of bytes. +#### checkpoint_timeout + +Apart from `checkpoint_distance`, open layer flushing is also triggered +`checkpoint_timeout` after the last flush. This makes WAL eventually uploaded to +s3 when activity is stopped. + +The default is 10m. + #### compaction_period Every `compaction_period` seconds, the page server checks if diff --git a/pageserver/src/config.rs b/pageserver/src/config.rs index 01b626e046..c1c4169e14 100644 --- a/pageserver/src/config.rs +++ b/pageserver/src/config.rs @@ -59,6 +59,7 @@ pub mod defaults { # [tenant_config] #checkpoint_distance = {DEFAULT_CHECKPOINT_DISTANCE} # in bytes +#checkpoint_timeout = {DEFAULT_CHECKPOINT_TIMEOUT} #compaction_target_size = {DEFAULT_COMPACTION_TARGET_SIZE} # in bytes #compaction_period = '{DEFAULT_COMPACTION_PERIOD}' #compaction_threshold = '{DEFAULT_COMPACTION_THRESHOLD}' @@ -452,6 +453,13 @@ impl PageServerConf { Some(parse_toml_u64("checkpoint_distance", checkpoint_distance)?); } + if let Some(checkpoint_timeout) = item.get("checkpoint_timeout") { + t_conf.checkpoint_timeout = Some(parse_toml_duration( + "checkpoint_timeout", + checkpoint_timeout, + )?); + } + if let Some(compaction_target_size) = item.get("compaction_target_size") { t_conf.compaction_target_size = Some(parse_toml_u64( "compaction_target_size", diff --git a/pageserver/src/http/models.rs b/pageserver/src/http/models.rs index aee31f14a7..a4f270580f 100644 --- a/pageserver/src/http/models.rs +++ b/pageserver/src/http/models.rs @@ -32,6 +32,7 @@ pub struct TenantCreateRequest { #[serde_as(as = "Option")] pub new_tenant_id: Option, pub checkpoint_distance: Option, + pub checkpoint_timeout: Option, pub compaction_target_size: Option, pub compaction_period: Option, pub compaction_threshold: Option, @@ -70,6 +71,7 @@ pub struct TenantConfigRequest { #[serde(default)] #[serde_as(as = "Option")] pub checkpoint_distance: Option, + pub checkpoint_timeout: Option, pub compaction_target_size: Option, pub compaction_period: Option, pub compaction_threshold: Option, @@ -87,6 +89,7 @@ impl TenantConfigRequest { TenantConfigRequest { tenant_id, checkpoint_distance: None, + checkpoint_timeout: None, compaction_target_size: None, compaction_period: None, compaction_threshold: None, diff --git a/pageserver/src/http/openapi_spec.yml b/pageserver/src/http/openapi_spec.yml index 106c14fbc8..fc3e80ba19 100644 --- a/pageserver/src/http/openapi_spec.yml +++ b/pageserver/src/http/openapi_spec.yml @@ -560,6 +560,8 @@ components: type: string checkpoint_distance: type: integer + checkpoint_timeout: + type: string compaction_period: type: string compaction_threshold: @@ -578,6 +580,8 @@ components: type: string checkpoint_distance: type: integer + checkpoint_timeout: + type: string compaction_period: type: string compaction_threshold: diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index fa598de402..1b1b4f99cb 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -623,6 +623,11 @@ async fn tenant_create_handler(mut request: Request) -> Result) -> Result Duration { + let tenant_conf = self.tenant_conf.read().unwrap(); + tenant_conf + .checkpoint_timeout + .unwrap_or(self.conf.default_tenant_conf.checkpoint_timeout) + } + pub fn get_compaction_target_size(&self) -> u64 { let tenant_conf = self.tenant_conf.read().unwrap(); tenant_conf diff --git a/pageserver/src/layered_repository/timeline.rs b/pageserver/src/layered_repository/timeline.rs index 73877a6656..2d396024a0 100644 --- a/pageserver/src/layered_repository/timeline.rs +++ b/pageserver/src/layered_repository/timeline.rs @@ -16,7 +16,7 @@ use std::ops::{Deref, Range}; use std::path::PathBuf; use std::sync::atomic::{self, AtomicBool, AtomicIsize, Ordering as AtomicOrdering}; use std::sync::{Arc, Mutex, MutexGuard, RwLock, RwLockReadGuard, TryLockError}; -use std::time::{Duration, SystemTime}; +use std::time::{Duration, Instant, SystemTime}; use metrics::{ register_histogram_vec, register_int_counter, register_int_counter_vec, register_int_gauge_vec, @@ -233,6 +233,8 @@ pub struct LayeredTimeline { pub layers: RwLock, last_freeze_at: AtomicLsn, + // Atomic would be more appropriate here. + last_freeze_ts: RwLock, // WAL redo manager walredo_mgr: Arc, @@ -560,6 +562,13 @@ impl LayeredTimeline { .unwrap_or(self.conf.default_tenant_conf.checkpoint_distance) } + fn get_checkpoint_timeout(&self) -> Duration { + let tenant_conf = self.tenant_conf.read().unwrap(); + tenant_conf + .checkpoint_timeout + .unwrap_or(self.conf.default_tenant_conf.checkpoint_timeout) + } + fn get_compaction_target_size(&self) -> u64 { let tenant_conf = self.tenant_conf.read().unwrap(); tenant_conf @@ -649,6 +658,7 @@ impl LayeredTimeline { disk_consistent_lsn: AtomicLsn::new(metadata.disk_consistent_lsn().0), last_freeze_at: AtomicLsn::new(metadata.disk_consistent_lsn().0), + last_freeze_ts: RwLock::new(Instant::now()), ancestor_timeline: ancestor, ancestor_lsn: metadata.ancestor_lsn(), @@ -1094,8 +1104,11 @@ impl LayeredTimeline { } /// - /// Check if more than 'checkpoint_distance' of WAL has been accumulated - /// in the in-memory layer, and initiate flushing it if so. + /// Check if more than 'checkpoint_distance' of WAL has been accumulated in + /// the in-memory layer, and initiate flushing it if so. + /// + /// Also flush after a period of time without new data -- it helps + /// safekeepers to regard pageserver as caught up and suspend activity. /// pub fn check_checkpoint_distance(self: &Arc) -> Result<()> { let last_lsn = self.get_last_record_lsn(); @@ -1103,21 +1116,27 @@ impl LayeredTimeline { if let Some(open_layer) = &layers.open_layer { let open_layer_size = open_layer.size()?; drop(layers); - let distance = last_lsn.widening_sub(self.last_freeze_at.load()); + let last_freeze_at = self.last_freeze_at.load(); + let last_freeze_ts = *(self.last_freeze_ts.read().unwrap()); + let distance = last_lsn.widening_sub(last_freeze_at); // Checkpointing the open layer can be triggered by layer size or LSN range. // S3 has a 5 GB limit on the size of one upload (without multi-part upload), and // we want to stay below that with a big margin. The LSN distance determines how // much WAL the safekeepers need to store. if distance >= self.get_checkpoint_distance().into() || open_layer_size > self.get_checkpoint_distance() + || (distance > 0 && last_freeze_ts.elapsed() >= self.get_checkpoint_timeout()) { info!( - "check_checkpoint_distance {}, layer size {}", - distance, open_layer_size + "check_checkpoint_distance {}, layer size {}, elapsed since last flush {:?}", + distance, + open_layer_size, + last_freeze_ts.elapsed() ); self.freeze_inmem_layer(true); self.last_freeze_at.store(last_lsn); + *(self.last_freeze_ts.write().unwrap()) = Instant::now(); // Launch a thread to flush the frozen layer to disk, unless // a thread was already running. (If the thread was running diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index 75df744014..3c5ea5267e 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -1044,6 +1044,7 @@ impl postgres_backend::Handler for PageServerHandler { let repo = tenant_mgr::get_repository_for_tenant(tenantid)?; pgb.write_message_noflush(&BeMessage::RowDescription(&[ RowDescriptor::int8_col(b"checkpoint_distance"), + RowDescriptor::int8_col(b"checkpoint_timeout"), RowDescriptor::int8_col(b"compaction_target_size"), RowDescriptor::int8_col(b"compaction_period"), RowDescriptor::int8_col(b"compaction_threshold"), @@ -1054,6 +1055,12 @@ impl postgres_backend::Handler for PageServerHandler { ]))? .write_message_noflush(&BeMessage::DataRow(&[ Some(repo.get_checkpoint_distance().to_string().as_bytes()), + Some( + repo.get_checkpoint_timeout() + .as_secs() + .to_string() + .as_bytes(), + ), Some(repo.get_compaction_target_size().to_string().as_bytes()), Some( repo.get_compaction_period() diff --git a/pageserver/src/repository.rs b/pageserver/src/repository.rs index 3fae0184f9..a1a08b11d5 100644 --- a/pageserver/src/repository.rs +++ b/pageserver/src/repository.rs @@ -445,6 +445,7 @@ pub mod repo_harness { fn from(tenant_conf: TenantConf) -> Self { Self { checkpoint_distance: Some(tenant_conf.checkpoint_distance), + checkpoint_timeout: Some(tenant_conf.checkpoint_timeout), compaction_target_size: Some(tenant_conf.compaction_target_size), compaction_period: Some(tenant_conf.compaction_period), compaction_threshold: Some(tenant_conf.compaction_threshold), diff --git a/pageserver/src/tenant_config.rs b/pageserver/src/tenant_config.rs index 8811009743..eff5272837 100644 --- a/pageserver/src/tenant_config.rs +++ b/pageserver/src/tenant_config.rs @@ -23,6 +23,7 @@ pub mod defaults { // which is good for now to trigger bugs. // This parameter actually determines L0 layer file size. pub const DEFAULT_CHECKPOINT_DISTANCE: u64 = 256 * 1024 * 1024; + pub const DEFAULT_CHECKPOINT_TIMEOUT: &str = "10 m"; // Target file size, when creating image and delta layers. // This parameter determines L1 layer file size. @@ -48,6 +49,9 @@ pub struct TenantConf { // page server crashes. // This parameter actually determines L0 layer file size. pub checkpoint_distance: u64, + // Inmemory layer is also flushed at least once in checkpoint_timeout to + // eventually upload WAL after activity is stopped. + pub checkpoint_timeout: Duration, // Target file size, when creating image and delta layers. // This parameter determines L1 layer file size. pub compaction_target_size: u64, @@ -90,6 +94,7 @@ pub struct TenantConf { #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)] pub struct TenantConfOpt { pub checkpoint_distance: Option, + pub checkpoint_timeout: Option, pub compaction_target_size: Option, #[serde(with = "humantime_serde")] pub compaction_period: Option, @@ -113,6 +118,9 @@ impl TenantConfOpt { checkpoint_distance: self .checkpoint_distance .unwrap_or(global_conf.checkpoint_distance), + checkpoint_timeout: self + .checkpoint_timeout + .unwrap_or(global_conf.checkpoint_timeout), compaction_target_size: self .compaction_target_size .unwrap_or(global_conf.compaction_target_size), @@ -142,6 +150,9 @@ impl TenantConfOpt { if let Some(checkpoint_distance) = other.checkpoint_distance { self.checkpoint_distance = Some(checkpoint_distance); } + if let Some(checkpoint_timeout) = other.checkpoint_timeout { + self.checkpoint_timeout = Some(checkpoint_timeout); + } if let Some(compaction_target_size) = other.compaction_target_size { self.compaction_target_size = Some(compaction_target_size); } @@ -181,6 +192,8 @@ impl TenantConf { TenantConf { checkpoint_distance: DEFAULT_CHECKPOINT_DISTANCE, + checkpoint_timeout: humantime::parse_duration(DEFAULT_CHECKPOINT_TIMEOUT) + .expect("cannot parse default checkpoint timeout"), compaction_target_size: DEFAULT_COMPACTION_TARGET_SIZE, compaction_period: humantime::parse_duration(DEFAULT_COMPACTION_PERIOD) .expect("cannot parse default compaction period"), @@ -212,6 +225,7 @@ impl TenantConf { pub fn dummy_conf() -> Self { TenantConf { checkpoint_distance: defaults::DEFAULT_CHECKPOINT_DISTANCE, + checkpoint_timeout: Duration::from_secs(600), compaction_target_size: 4 * 1024 * 1024, compaction_period: Duration::from_secs(10), compaction_threshold: defaults::DEFAULT_COMPACTION_THRESHOLD, diff --git a/pageserver/src/walreceiver/walreceiver_connection.rs b/pageserver/src/walreceiver/walreceiver_connection.rs index c4e66bdb95..538ebfe30e 100644 --- a/pageserver/src/walreceiver/walreceiver_connection.rs +++ b/pageserver/src/walreceiver/walreceiver_connection.rs @@ -178,16 +178,6 @@ pub async fn handle_walreceiver_connection( caught_up = true; } - let timeline_to_check = Arc::clone(&timeline); - tokio::task::spawn_blocking(move || timeline_to_check.check_checkpoint_distance()) - .await - .with_context(|| { - format!("Spawned checkpoint check task panicked for timeline {id}") - })? - .with_context(|| { - format!("Failed to check checkpoint distance for timeline {id}") - })?; - Some(endlsn) } @@ -208,6 +198,12 @@ pub async fn handle_walreceiver_connection( _ => None, }; + let timeline_to_check = Arc::clone(&timeline); + tokio::task::spawn_blocking(move || timeline_to_check.check_checkpoint_distance()) + .await + .with_context(|| format!("Spawned checkpoint check task panicked for timeline {id}"))? + .with_context(|| format!("Failed to check checkpoint distance for timeline {id}"))?; + if let Some(last_lsn) = status_update { let remote_index = repo.get_remote_index(); let timeline_remote_consistent_lsn = remote_index diff --git a/safekeeper/src/safekeeper.rs b/safekeeper/src/safekeeper.rs index fd4761505d..a9373cb584 100644 --- a/safekeeper/src/safekeeper.rs +++ b/safekeeper/src/safekeeper.rs @@ -727,7 +727,7 @@ where info!("setting local_start_lsn to {:?}", state.local_start_lsn); } // Initializing commit_lsn before acking first flushed record is - // important to let find_end_of_wal skip the whole in the beginning + // important to let find_end_of_wal skip the hole in the beginning // of the first segment. // // NB: on new clusters, this happens at the same time as @@ -738,6 +738,10 @@ where // Initializing backup_lsn is useful to avoid making backup think it should upload 0 segment. self.inmem.backup_lsn = max(self.inmem.backup_lsn, state.timeline_start_lsn); + // Initializing remote_consistent_lsn sets that we have nothing to + // stream to pageserver(s) immediately after creation. + self.inmem.remote_consistent_lsn = + max(self.inmem.remote_consistent_lsn, state.timeline_start_lsn); state.acceptor_state.term_history = msg.term_history.clone(); self.persist_control_file(state)?; diff --git a/safekeeper/src/timeline.rs b/safekeeper/src/timeline.rs index ee642408f2..161fca3595 100644 --- a/safekeeper/src/timeline.rs +++ b/safekeeper/src/timeline.rs @@ -137,7 +137,7 @@ impl SharedState { self.is_wal_backup_required() // FIXME: add tracking of relevant pageservers and check them here individually, // otherwise migration won't work (we suspend too early). - || self.sk.inmem.remote_consistent_lsn <= self.sk.inmem.commit_lsn + || self.sk.inmem.remote_consistent_lsn < self.sk.inmem.commit_lsn } /// Mark timeline active/inactive and return whether s3 offloading requires diff --git a/test_runner/batch_others/test_wal_acceptor.py b/test_runner/batch_others/test_wal_acceptor.py index f7aeb0abeb..b55ba84756 100644 --- a/test_runner/batch_others/test_wal_acceptor.py +++ b/test_runner/batch_others/test_wal_acceptor.py @@ -284,9 +284,12 @@ def test_wal_removal(neon_env_builder: NeonEnvBuilder, auth_enabled: bool): env.neon_cli.create_branch('test_safekeepers_wal_removal') pg = env.postgres.create_start('test_safekeepers_wal_removal') + # Note: it is important to insert at least two segments, as currently + # control file is synced roughly once in segment range and WAL is not + # removed until all horizons are persisted. pg.safe_psql_many([ 'CREATE TABLE t(key int primary key, value text)', - "INSERT INTO t SELECT generate_series(1,100000), 'payload'", + "INSERT INTO t SELECT generate_series(1,200000), 'payload'", ]) tenant_id = pg.safe_psql("show neon.tenant_id")[0][0]