diff --git a/docs/glossary.md b/docs/glossary.md index 4a64f214c0..74dbf94c65 100644 --- a/docs/glossary.md +++ b/docs/glossary.md @@ -30,6 +30,10 @@ writes out the changes from in-memory layers into new layer files[]. This proces is called "checkpointing". The page server only creates layer files for relations that have been modified since the last checkpoint. +Configuration parameter `checkpoint_distance` defines the distance +from current LSN to perform checkpoint of in-memory layers. +Default is `DEFAULT_CHECKPOINT_DISTANCE`. +Set this parameter to `0` to force checkpoint of every layer. ### Compute node Stateless Postgres node that stores data in pageserver. diff --git a/pageserver/src/bin/pageserver.rs b/pageserver/src/bin/pageserver.rs index 45a697ac5f..041831f167 100644 --- a/pageserver/src/bin/pageserver.rs +++ b/pageserver/src/bin/pageserver.rs @@ -27,6 +27,7 @@ use zenith_utils::http::endpoint; struct CfgFileParams { listen_pg_addr: Option, listen_http_addr: Option, + checkpoint_distance: Option, gc_horizon: Option, gc_period: Option, pg_distrib_dir: Option, @@ -44,6 +45,7 @@ impl CfgFileParams { Self { listen_pg_addr: get_arg("listen-pg"), listen_http_addr: get_arg("listen-http"), + checkpoint_distance: get_arg("checkpoint_distance"), gc_horizon: get_arg("gc_horizon"), gc_period: get_arg("gc_period"), pg_distrib_dir: get_arg("postgres-distrib"), @@ -58,6 +60,7 @@ impl CfgFileParams { Self { listen_pg_addr: self.listen_pg_addr.or(other.listen_pg_addr), listen_http_addr: self.listen_http_addr.or(other.listen_http_addr), + checkpoint_distance: self.checkpoint_distance.or(other.checkpoint_distance), gc_horizon: self.gc_horizon.or(other.gc_horizon), gc_period: self.gc_period.or(other.gc_period), pg_distrib_dir: self.pg_distrib_dir.or(other.pg_distrib_dir), @@ -82,6 +85,11 @@ impl CfgFileParams { None => DEFAULT_HTTP_LISTEN_ADDR.to_owned(), }; + let checkpoint_distance: i128 = match self.checkpoint_distance.as_ref() { + Some(checkpoint_distance_str) => checkpoint_distance_str.parse()?, + None => DEFAULT_CHECKPOINT_DISTANCE, + }; + let gc_horizon: u64 = match self.gc_horizon.as_ref() { Some(horizon_str) => horizon_str.parse()?, None => DEFAULT_GC_HORIZON, @@ -129,6 +137,7 @@ impl CfgFileParams { listen_pg_addr, listen_http_addr, + checkpoint_distance, gc_horizon, gc_period, @@ -175,6 +184,12 @@ fn main() -> Result<()> { .takes_value(false) .help("Initialize pageserver repo"), ) + .arg( + Arg::with_name("checkpoint_distance") + .long("checkpoint_distance") + .takes_value(true) + .help("Distance from current LSN to perform checkpoint of in-memory layers"), + ) .arg( Arg::with_name("gc_horizon") .long("gc_horizon") diff --git a/pageserver/src/layered_repository.rs b/pageserver/src/layered_repository.rs index 8010ceab76..4e65c2cdd5 100644 --- a/pageserver/src/layered_repository.rs +++ b/pageserver/src/layered_repository.rs @@ -70,15 +70,6 @@ static ZERO_PAGE: Bytes = Bytes::from_static(&[0u8; 8192]); // Timeout when waiting for WAL receiver to catch up to an LSN given in a GetPage@LSN call. static TIMEOUT: Duration = Duration::from_secs(60); -// Flush out an inmemory layer, if it's holding WAL older than this. -// This puts a backstop on how much WAL needs to be re-digested if the -// page server crashes. -// -// FIXME: This current value is very low. I would imagine something like 1 GB or 10 GB -// would be more appropriate. But a low value forces the code to be exercised more, -// which is good for now to trigger bugs. -static OLDEST_INMEM_DISTANCE: i128 = 16 * 1024 * 1024; - // Metrics collected on operations on the storage repository. lazy_static! { static ref STORAGE_TIME: HistogramVec = register_histogram_vec!( @@ -309,14 +300,16 @@ impl LayeredRepository { info!("checkpointer thread for tenant {} waking up", self.tenantid); - // checkpoint timelines that have accumulated more than CHECKPOINT_INTERVAL + // checkpoint timelines that have accumulated more than CHECKPOINT_DISTANCE // bytes of WAL since last checkpoint. { let timelines = self.timelines.lock().unwrap(); for (_timelineid, timeline) in timelines.iter() { STORAGE_TIME .with_label_values(&["checkpoint_timed"]) - .observe_closure_duration(|| timeline.checkpoint_internal(false))? + .observe_closure_duration(|| { + timeline.checkpoint_internal(conf.checkpoint_distance) + })? } // release lock on 'timelines' } @@ -861,7 +854,8 @@ impl Timeline for LayeredTimeline { fn checkpoint(&self) -> Result<()> { STORAGE_TIME .with_label_values(&["checkpoint_force"]) - .observe_closure_duration(|| self.checkpoint_internal(true)) + //pass checkpoint_distance=0 to force checkpoint + .observe_closure_duration(|| self.checkpoint_internal(0)) } /// @@ -1215,9 +1209,8 @@ impl LayeredTimeline { /// /// Flush to disk all data that was written with the put_* functions /// - /// NOTE: This has nothing to do with checkpoint in PostgreSQL. We don't - /// know anything about them here in the repository. - fn checkpoint_internal(&self, force: bool) -> Result<()> { + /// NOTE: This has nothing to do with checkpoint in PostgreSQL. + fn checkpoint_internal(&self, checkpoint_distance: i128) -> Result<()> { // Grab lock on the layer map. // // TODO: We hold it locked throughout the checkpoint operation. That's bad, @@ -1257,15 +1250,15 @@ impl LayeredTimeline { // Does this layer need freezing? // - // Write out all in-memory layers that contain WAL older than OLDEST_INMEM_DISTANCE. - // Or if 'force' is true, write out all of them. If we reach a layer with the same + // Write out all in-memory layers that contain WAL older than CHECKPOINT_DISTANCE. + // If we reach a layer with the same // generation number, we know that we have cycled through all layers that were open // when we started. We don't want to process layers inserted after we started, to // avoid getting into an infinite loop trying to process again entries that we // inserted ourselves. let distance = last_record_lsn.widening_sub(oldest_pending_lsn); if distance < 0 - || (!force && distance < OLDEST_INMEM_DISTANCE) + || distance < checkpoint_distance || oldest_generation == current_generation { info!( diff --git a/pageserver/src/lib.rs b/pageserver/src/lib.rs index 3388f32df1..4750c49ce4 100644 --- a/pageserver/src/lib.rs +++ b/pageserver/src/lib.rs @@ -28,6 +28,11 @@ pub mod defaults { pub const DEFAULT_HTTP_LISTEN_PORT: u16 = 9898; pub const DEFAULT_HTTP_LISTEN_ADDR: &str = "127.0.0.1:9898"; + // FIXME: This current value is very low. I would imagine something like 1 GB or 10 GB + // would be more appropriate. But a low value forces the code to be exercised more, + // which is good for now to trigger bugs. + pub const DEFAULT_CHECKPOINT_DISTANCE: i128 = 64 * 1024 * 1024; + pub const DEFAULT_GC_HORIZON: u64 = 64 * 1024 * 1024; pub const DEFAULT_GC_PERIOD: Duration = Duration::from_secs(100); @@ -50,6 +55,10 @@ pub struct PageServerConf { pub daemonize: bool, pub listen_pg_addr: String, pub listen_http_addr: String, + // Flush out an inmemory layer, if it's holding WAL older than this + // This puts a backstop on how much WAL needs to be re-digested if the + // page server crashes. + pub checkpoint_distance: i128, pub gc_horizon: u64, pub gc_period: Duration, pub superuser: String, @@ -135,7 +144,8 @@ impl PageServerConf { fn dummy_conf(repo_dir: PathBuf) -> Self { PageServerConf { daemonize: false, - gc_horizon: 64 * 1024 * 1024, + checkpoint_distance: defaults::DEFAULT_CHECKPOINT_DISTANCE, + gc_horizon: defaults::DEFAULT_GC_HORIZON, gc_period: Duration::from_secs(10), listen_pg_addr: "127.0.0.1:5430".to_string(), listen_http_addr: "127.0.0.1:9898".to_string(), diff --git a/pageserver/src/repository.rs b/pageserver/src/repository.rs index 13036042b5..40e141360b 100644 --- a/pageserver/src/repository.rs +++ b/pageserver/src/repository.rs @@ -216,8 +216,6 @@ mod tests { use postgres_ffi::xlog_utils::SIZEOF_CHECKPOINT; use std::fs; use std::str::FromStr; - use std::time::Duration; - use zenith_utils::postgres_backend::AuthType; use zenith_utils::zid::ZTenantId; /// Arbitrary relation tag, for testing. @@ -261,18 +259,8 @@ mod tests { fs::create_dir_all(&repo_dir)?; fs::create_dir_all(&repo_dir.join("timelines"))?; - let conf = PageServerConf { - daemonize: false, - gc_horizon: 64 * 1024 * 1024, - gc_period: Duration::from_secs(10), - listen_pg_addr: "127.0.0.1:5430".to_string(), - listen_http_addr: "127.0.0.1:9898".to_string(), - superuser: "zenith_admin".to_string(), - workdir: repo_dir, - pg_distrib_dir: "".into(), - auth_type: AuthType::Trust, - auth_validation_public_key_path: None, - }; + let conf = PageServerConf::dummy_conf(repo_dir); + // Make a static copy of the config. This can never be free'd, but that's // OK in a test. let conf: &'static PageServerConf = Box::leak(Box::new(conf));