diff --git a/docs/glossary.md b/docs/glossary.md index 74dbf94c65..d8439f764c 100644 --- a/docs/glossary.md +++ b/docs/glossary.md @@ -34,6 +34,9 @@ Configuration parameter `checkpoint_distance` defines the distance from current LSN to perform checkpoint of in-memory layers. Default is `DEFAULT_CHECKPOINT_DISTANCE`. Set this parameter to `0` to force checkpoint of every layer. + +Configuration parameter `checkpoint_period` defines the interval between checkpoint iterations. +Default is `DEFAULT_CHECKPOINT_PERIOD`. ### Compute node Stateless Postgres node that stores data in pageserver. diff --git a/pageserver/src/bin/pageserver.rs b/pageserver/src/bin/pageserver.rs index 041831f167..ea360a5741 100644 --- a/pageserver/src/bin/pageserver.rs +++ b/pageserver/src/bin/pageserver.rs @@ -28,6 +28,7 @@ struct CfgFileParams { listen_pg_addr: Option, listen_http_addr: Option, checkpoint_distance: Option, + checkpoint_period: Option, gc_horizon: Option, gc_period: Option, pg_distrib_dir: Option, @@ -46,6 +47,7 @@ impl CfgFileParams { listen_pg_addr: get_arg("listen-pg"), listen_http_addr: get_arg("listen-http"), checkpoint_distance: get_arg("checkpoint_distance"), + checkpoint_period: get_arg("checkpoint_period"), gc_horizon: get_arg("gc_horizon"), gc_period: get_arg("gc_period"), pg_distrib_dir: get_arg("postgres-distrib"), @@ -61,6 +63,7 @@ impl CfgFileParams { listen_pg_addr: self.listen_pg_addr.or(other.listen_pg_addr), listen_http_addr: self.listen_http_addr.or(other.listen_http_addr), checkpoint_distance: self.checkpoint_distance.or(other.checkpoint_distance), + checkpoint_period: self.checkpoint_period.or(other.checkpoint_period), gc_horizon: self.gc_horizon.or(other.gc_horizon), gc_period: self.gc_period.or(other.gc_period), pg_distrib_dir: self.pg_distrib_dir.or(other.pg_distrib_dir), @@ -89,6 +92,10 @@ impl CfgFileParams { Some(checkpoint_distance_str) => checkpoint_distance_str.parse()?, None => DEFAULT_CHECKPOINT_DISTANCE, }; + let checkpoint_period = match self.checkpoint_period.as_ref() { + Some(checkpoint_period_str) => humantime::parse_duration(checkpoint_period_str)?, + None => DEFAULT_CHECKPOINT_PERIOD, + }; let gc_horizon: u64 = match self.gc_horizon.as_ref() { Some(horizon_str) => horizon_str.parse()?, @@ -138,6 +145,7 @@ impl CfgFileParams { listen_pg_addr, listen_http_addr, checkpoint_distance, + checkpoint_period, gc_horizon, gc_period, @@ -190,6 +198,12 @@ fn main() -> Result<()> { .takes_value(true) .help("Distance from current LSN to perform checkpoint of in-memory layers"), ) + .arg( + Arg::with_name("checkpoint_period") + .long("checkpoint_period") + .takes_value(true) + .help("Interval between checkpoint iterations"), + ) .arg( Arg::with_name("gc_horizon") .long("gc_horizon") diff --git a/pageserver/src/layered_repository.rs b/pageserver/src/layered_repository.rs index 4e65c2cdd5..c8a078c1c4 100644 --- a/pageserver/src/layered_repository.rs +++ b/pageserver/src/layered_repository.rs @@ -296,8 +296,7 @@ impl LayeredRepository { /// fn checkpoint_loop(&self, conf: &'static PageServerConf) -> Result<()> { loop { - std::thread::sleep(conf.gc_period); - + std::thread::sleep(conf.checkpoint_period); info!("checkpointer thread for tenant {} waking up", self.tenantid); // checkpoint timelines that have accumulated more than CHECKPOINT_DISTANCE @@ -313,6 +312,29 @@ impl LayeredRepository { } // release lock on 'timelines' } + } + } + + /// + /// Launch the GC thread in given repository. + /// + pub fn launch_gc_thread(conf: &'static PageServerConf, rc: Arc) { + let _thread = std::thread::Builder::new() + .name("GC thread".into()) + .spawn(move || { + // FIXME: relaunch it? Panic is not good. + rc.gc_loop(conf).expect("GC thread died"); + }) + .unwrap(); + } + + /// + /// GC thread's main loop + /// + fn gc_loop(&self, conf: &'static PageServerConf) -> Result<()> { + loop { + std::thread::sleep(conf.gc_period); + info!("gc thread for tenant {} waking up", self.tenantid); // Garbage collect old files that are not needed for PITR anymore if conf.gc_horizon > 0 { diff --git a/pageserver/src/lib.rs b/pageserver/src/lib.rs index 4750c49ce4..a1950c567f 100644 --- a/pageserver/src/lib.rs +++ b/pageserver/src/lib.rs @@ -32,6 +32,7 @@ pub mod defaults { // would be more appropriate. But a low value forces the code to be exercised more, // which is good for now to trigger bugs. pub const DEFAULT_CHECKPOINT_DISTANCE: i128 = 64 * 1024 * 1024; + pub const DEFAULT_CHECKPOINT_PERIOD: Duration = Duration::from_secs(100); pub const DEFAULT_GC_HORIZON: u64 = 64 * 1024 * 1024; pub const DEFAULT_GC_PERIOD: Duration = Duration::from_secs(100); @@ -59,6 +60,8 @@ pub struct PageServerConf { // This puts a backstop on how much WAL needs to be re-digested if the // page server crashes. pub checkpoint_distance: i128, + pub checkpoint_period: Duration, + pub gc_horizon: u64, pub gc_period: Duration, pub superuser: String, @@ -145,6 +148,7 @@ impl PageServerConf { PageServerConf { daemonize: false, checkpoint_distance: defaults::DEFAULT_CHECKPOINT_DISTANCE, + checkpoint_period: Duration::from_secs(10), gc_horizon: defaults::DEFAULT_GC_HORIZON, gc_period: Duration::from_secs(10), listen_pg_addr: "127.0.0.1:5430".to_string(), diff --git a/pageserver/src/tenant_mgr.rs b/pageserver/src/tenant_mgr.rs index ff56764ceb..920bcaa251 100644 --- a/pageserver/src/tenant_mgr.rs +++ b/pageserver/src/tenant_mgr.rs @@ -37,6 +37,7 @@ pub fn init(conf: &'static PageServerConf) { tenantid, )); LayeredRepository::launch_checkpointer_thread(conf, repo.clone()); + LayeredRepository::launch_gc_thread(conf, repo.clone()); info!("initialized storage for tenant: {}", &tenantid); m.insert(tenantid, repo);