diff --git a/control_plane/src/storage.rs b/control_plane/src/storage.rs index c49d5743a9..694f73c4e0 100644 --- a/control_plane/src/storage.rs +++ b/control_plane/src/storage.rs @@ -345,7 +345,7 @@ impl PageServerNode { ) -> anyhow::Result> { let tenant_id_string = self .http_request(Method::POST, format!("{}/tenant", self.http_base_url)) - .json(&TenantCreateRequest { new_tenant_id }) + .json(&TenantCreateRequest::new(tenantid)) .send()? .error_from_body()? .json::>()?; diff --git a/pageserver/src/config.rs b/pageserver/src/config.rs index 0d5cac8b4f..6742e0391e 100644 --- a/pageserver/src/config.rs +++ b/pageserver/src/config.rs @@ -41,6 +41,7 @@ pub mod defaults { pub const DEFAULT_GC_HORIZON: u64 = 64 * 1024 * 1024; pub const DEFAULT_GC_PERIOD: &str = "100 s"; + pub const DEFAULT_PITR_INTERVAL: &str = "30 days"; pub const DEFAULT_WAIT_LSN_TIMEOUT: &str = "60 s"; pub const DEFAULT_WAL_REDO_TIMEOUT: &str = "60 s"; @@ -68,6 +69,7 @@ pub mod defaults { #gc_period = '{DEFAULT_GC_PERIOD}' #gc_horizon = {DEFAULT_GC_HORIZON} +#pitr_interval = '{DEFAULT_PITR_INTERVAL}' #wait_lsn_timeout = '{DEFAULT_WAIT_LSN_TIMEOUT}' #wal_redo_timeout = '{DEFAULT_WAL_REDO_TIMEOUT}' @@ -109,6 +111,7 @@ pub struct PageServerConf { pub gc_horizon: u64, pub gc_period: Duration, + pub pitr_interval: Duration, // Timeout when waiting for WAL receiver to catch up to an LSN given in a GetPage@LSN call. pub wait_lsn_timeout: Duration, @@ -136,6 +139,27 @@ pub struct PageServerConf { pub remote_storage_config: Option, } +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub struct TenantConf { + pub checkpoint_distance: u64, + pub compaction_period: Duration, + pub gc_horizon: u64, + pub gc_period: Duration, + pub pitr_interval: Duration, +} + +impl TenantConf { + pub fn from(conf: &PageServerConf) -> TenantConf { + TenantConf { + gc_period: conf.gc_period, + gc_horizon: conf.gc_horizon, + pitr_interval: conf.pitr_interval, + checkpoint_distance: conf.checkpoint_distance, + compaction_period: conf.compaction_period, + } + } +} + // use dedicated enum for builder to better indicate the intention // and avoid possible confusion with nested options pub enum BuilderValue { @@ -165,6 +189,7 @@ struct PageServerConfigBuilder { gc_horizon: BuilderValue, gc_period: BuilderValue, + pitr_interval: BuilderValue, wait_lsn_timeout: BuilderValue, wal_redo_timeout: BuilderValue, @@ -201,6 +226,8 @@ impl Default for PageServerConfigBuilder { gc_horizon: Set(DEFAULT_GC_HORIZON), gc_period: Set(humantime::parse_duration(DEFAULT_GC_PERIOD) .expect("cannot parse default gc period")), + pitr_interval: Set(humantime::parse_duration(DEFAULT_PITR_INTERVAL) + .expect("cannot parse default PITR interval")), wait_lsn_timeout: Set(humantime::parse_duration(DEFAULT_WAIT_LSN_TIMEOUT) .expect("cannot parse default wait lsn timeout")), wal_redo_timeout: Set(humantime::parse_duration(DEFAULT_WAL_REDO_TIMEOUT) @@ -249,6 +276,10 @@ impl PageServerConfigBuilder { self.gc_period = BuilderValue::Set(gc_period) } + pub fn pitr_interval(&mut self, gc_period: Duration) { + self.pitr_interval = BuilderValue::Set(pitr_interval) + } + pub fn wait_lsn_timeout(&mut self, wait_lsn_timeout: Duration) { self.wait_lsn_timeout = BuilderValue::Set(wait_lsn_timeout) } @@ -317,6 +348,7 @@ impl PageServerConfigBuilder { .gc_horizon .ok_or(anyhow::anyhow!("missing gc_horizon"))?, gc_period: self.gc_period.ok_or(anyhow::anyhow!("missing gc_period"))?, + pitr_interval: self.pitr_interval.ok_or(anyhow::anyhow!("missing pitr_interval"))?, wait_lsn_timeout: self .wait_lsn_timeout .ok_or(anyhow::anyhow!("missing wait_lsn_timeout"))?, @@ -455,6 +487,7 @@ impl PageServerConf { "compaction_period" => builder.compaction_period(parse_toml_duration(key, item)?), "gc_horizon" => builder.gc_horizon(parse_toml_u64(key, item)?), "gc_period" => builder.gc_period(parse_toml_duration(key, item)?), + "pitr_interval" => builder.pitr_interval(parse_toml_duration(key, item)?), "wait_lsn_timeout" => builder.wait_lsn_timeout(parse_toml_duration(key, item)?), "wal_redo_timeout" => builder.wal_redo_timeout(parse_toml_duration(key, item)?), "initial_superuser_name" => builder.superuser(parse_toml_string(key, item)?), @@ -592,6 +625,7 @@ impl PageServerConf { compaction_period: Duration::from_secs(10), gc_horizon: defaults::DEFAULT_GC_HORIZON, gc_period: Duration::from_secs(10), + pitr_interval: Duration::from_secs(60 * 60), wait_lsn_timeout: Duration::from_secs(60), wal_redo_timeout: Duration::from_secs(60), page_cache_size: defaults::DEFAULT_PAGE_CACHE_SIZE, @@ -666,6 +700,8 @@ compaction_period = '111 s' gc_period = '222 s' gc_horizon = 222 +pitr_interval = '30 days' + wait_lsn_timeout = '111 s' wal_redo_timeout = '111 s' @@ -702,6 +738,7 @@ id = 10 compaction_period: humantime::parse_duration(defaults::DEFAULT_COMPACTION_PERIOD)?, gc_horizon: defaults::DEFAULT_GC_HORIZON, gc_period: humantime::parse_duration(defaults::DEFAULT_GC_PERIOD)?, + pitr_interval: humantime::parse_duration(defaults::DEFAULT_PITR_INTERVAL)?, wait_lsn_timeout: humantime::parse_duration(defaults::DEFAULT_WAIT_LSN_TIMEOUT)?, wal_redo_timeout: humantime::parse_duration(defaults::DEFAULT_WAL_REDO_TIMEOUT)?, superuser: defaults::DEFAULT_SUPERUSER.to_string(), @@ -749,6 +786,7 @@ id = 10 gc_period: Duration::from_secs(222), wait_lsn_timeout: Duration::from_secs(111), wal_redo_timeout: Duration::from_secs(111), + pitr_interval: Duration::from_secs(30 * 24 * 60 * 60), superuser: "zzzz".to_string(), page_cache_size: 444, max_file_descriptors: 333, diff --git a/pageserver/src/http/models.rs b/pageserver/src/http/models.rs index d1dfb911ba..12ee30be4d 100644 --- a/pageserver/src/http/models.rs +++ b/pageserver/src/http/models.rs @@ -25,6 +25,11 @@ pub struct TenantCreateRequest { #[serde(default)] #[serde_as(as = "Option")] pub new_tenant_id: Option, + pub checkpoint_distance: Option, + pub compaction_period: Option, + pub gc_horizon: Option, + pub gc_period: Option, + pub pitr_interval: Option, } #[serde_as] @@ -36,3 +41,16 @@ pub struct TenantCreateResponse(#[serde_as(as = "DisplayFromStr")] pub ZTenantId pub struct StatusResponse { pub id: ZNodeId, } + +impl TenantCreateRequest { + pub fn new(new_tenant_id: ZTenantId) -> TenantCreateRequest { + TenantCreateRequest { + new_tenant_id, + checkpoint_distance: None, + compaction_period: None, + gc_horizon: None, + gc_period: None, + pitr_interval: None, + } + } +} diff --git a/pageserver/src/http/openapi_spec.yml b/pageserver/src/http/openapi_spec.yml index b2760efe85..2d59d33717 100644 --- a/pageserver/src/http/openapi_spec.yml +++ b/pageserver/src/http/openapi_spec.yml @@ -333,6 +333,16 @@ paths: new_tenant_id: type: string format: hex + gc_period: + type: string + gc_horizon: + type: integer + pitr_interval: + type: string + checkpoint_distance: + type: integer + compaction_period: + type: string responses: "201": description: New tenant created successfully diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index 207d2420bd..f915e942de 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -24,7 +24,7 @@ use super::models::{ use crate::remote_storage::{schedule_timeline_download, RemoteIndex}; use crate::repository::Repository; use crate::timelines::{LocalTimelineInfo, RemoteTimelineInfo, TimelineInfo}; -use crate::{config::PageServerConf, tenant_mgr, timelines, ZTenantId}; +use crate::{config::PageServerConf, config::TenantConf, tenant_mgr, timelines, ZTenantId}; struct State { conf: &'static PageServerConf, @@ -290,6 +290,26 @@ async fn tenant_create_handler(mut request: Request) -> Result) -> Result>, // This mutex prevents creation of new timelines during GC. @@ -140,6 +143,10 @@ pub struct LayeredRepository { impl Repository for LayeredRepository { type Timeline = LayeredTimeline; + fn get_tenant_conf(&self) -> TenantConf { + self.tenant_conf + } + fn get_timeline(&self, timelineid: ZTimelineId) -> Option> { let timelines = self.timelines.lock().unwrap(); self.get_timeline_internal(timelineid, &timelines) @@ -269,6 +276,7 @@ impl Repository for LayeredRepository { &self, target_timelineid: Option, horizon: u64, + pitr: Duration, checkpoint_before_gc: bool, ) -> Result { let timeline_str = target_timelineid @@ -278,7 +286,7 @@ impl Repository for LayeredRepository { STORAGE_TIME .with_label_values(&["gc", &self.tenantid.to_string(), &timeline_str]) .observe_closure_duration(|| { - self.gc_iteration_internal(target_timelineid, horizon, checkpoint_before_gc) + self.gc_iteration_internal(target_timelineid, horizon, pitr, checkpoint_before_gc) }) } @@ -540,6 +548,7 @@ impl LayeredRepository { pub fn new( conf: &'static PageServerConf, + tenant_conf: TenantConf, walredo_mgr: Arc, tenantid: ZTenantId, remote_index: RemoteIndex, @@ -548,6 +557,7 @@ impl LayeredRepository { LayeredRepository { tenantid, conf, + tenant_conf, timelines: Mutex::new(HashMap::new()), gc_cs: Mutex::new(()), walredo_mgr, @@ -631,6 +641,7 @@ impl LayeredRepository { &self, target_timelineid: Option, horizon: u64, + pitr: Duration, checkpoint_before_gc: bool, ) -> Result { let _span_guard = @@ -706,7 +717,7 @@ impl LayeredRepository { timeline.checkpoint(CheckpointConfig::Forced)?; info!("timeline {} checkpoint_before_gc done", timelineid); } - timeline.update_gc_info(branchpoints, cutoff); + timeline.update_gc_info(branchpoints, cutoff, pitr); let result = timeline.gc()?; totals += result; @@ -1824,7 +1835,7 @@ impl LayeredTimeline { /// obsolete. /// fn gc(&self) -> Result { - let now = Instant::now(); + let now = SystemTime::now(); let mut result: GcResult = Default::default(); let disk_consistent_lsn = self.get_disk_consistent_lsn(); @@ -1833,6 +1844,7 @@ impl LayeredTimeline { let gc_info = self.gc_info.read().unwrap(); let retain_lsns = &gc_info.retain_lsns; let cutoff = gc_info.cutoff; + let pitr = gc_info.pitr; let _enter = info_span!("garbage collection", timeline = %self.timelineid, tenant = %self.tenantid, cutoff = %cutoff).entered(); @@ -1850,8 +1862,9 @@ impl LayeredTimeline { // // Garbage collect the layer if all conditions are satisfied: // 1. it is older than cutoff LSN; - // 2. it doesn't need to be retained for 'retain_lsns'; - // 3. newer on-disk image layers cover the layer's whole key range + // 2. it is older than PITR interval; + // 3. it doesn't need to be retained for 'retain_lsns'; + // 4. newer on-disk image layers cover the layer's whole key range // let mut layers = self.layers.lock().unwrap(); 'outer: for l in layers.iter_historic_layers() { @@ -1877,8 +1890,33 @@ impl LayeredTimeline { result.layers_needed_by_cutoff += 1; continue 'outer; } - - // 2. Is it needed by a child branch? + // 2. It is newer than PiTR interval? + // We use modification time of layer file to estimate update time. + // This estimation is not quite precise but maintaining LSN->timestamp map seems to be overkill. + // It is not expected that users will need high precision here. And this estimation + // is conservative: modification time of file is always newer than actual time of version + // creation. So it is safe for users. + // + if let Ok(metadata) = fs::metadata(&l.filename()) { + let last_modified = metadata.modified()?; + if now.duration_since(last_modified)? < pitr { + info!( + "keeping {} {}-{} because it's modification time {:?} is newer than PiTR {:?}", + seg, + l.get_start_lsn(), + l.get_end_lsn(), + last_modified, + pitr + ); + if seg.rel.is_relation() { + result.ondisk_relfiles_needed_by_pitr += 1; + } else { + result.ondisk_nonrelfiles_needed_by_pitr += 1; + } + continue 'outer; + } + } + // 3. Is it needed by a child branch? // NOTE With that wee would keep data that // might be referenced by child branches forever. // We can track this in child timeline GC and delete parent layers when @@ -1897,7 +1935,7 @@ impl LayeredTimeline { } } - // 3. Is there a later on-disk layer for this relation? + // 4. Is there a later on-disk layer for this relation? // // The end-LSN is exclusive, while disk_consistent_lsn is // inclusive. For example, if disk_consistent_lsn is 100, it is @@ -1938,7 +1976,7 @@ impl LayeredTimeline { result.layers_removed += 1; } - result.elapsed = now.elapsed(); + result.elapsed = now.elapsed()?; Ok(result) } diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index e7a4117b3e..fbc047359b 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -19,6 +19,7 @@ use std::net::TcpListener; use std::str; use std::str::FromStr; use std::sync::{Arc, RwLockReadGuard}; +use std::time::Duration; use tracing::*; use zenith_metrics::{register_histogram_vec, HistogramVec}; use zenith_utils::auth::{self, JwtAuth}; @@ -695,10 +696,11 @@ impl postgres_backend::Handler for PageServerHandler { .unwrap_or(Ok(self.conf.gc_horizon))?; let repo = tenant_mgr::get_repository_for_tenant(tenantid)?; - let result = repo.gc_iteration(Some(timelineid), gc_horizon, true)?; + let result = repo.gc_iteration(Some(timelineid), gc_horizon, Duration::ZERO, true)?; pgb.write_message_noflush(&BeMessage::RowDescription(&[ RowDescriptor::int8_col(b"layers_total"), RowDescriptor::int8_col(b"layers_needed_by_cutoff"), + RowDescriptor::int8_col(b"layers_needed_by_pitr"), RowDescriptor::int8_col(b"layers_needed_by_branches"), RowDescriptor::int8_col(b"layers_not_updated"), RowDescriptor::int8_col(b"layers_removed"), @@ -707,6 +709,7 @@ impl postgres_backend::Handler for PageServerHandler { .write_message_noflush(&BeMessage::DataRow(&[ Some(result.layers_total.to_string().as_bytes()), Some(result.layers_needed_by_cutoff.to_string().as_bytes()), + Some(result.layers_needed_by_pitr.to_string().as_bytes()), Some(result.layers_needed_by_branches.to_string().as_bytes()), Some(result.layers_not_updated.to_string().as_bytes()), Some(result.layers_removed.to_string().as_bytes()), diff --git a/pageserver/src/repository.rs b/pageserver/src/repository.rs index 02334d3229..adf47ca79e 100644 --- a/pageserver/src/repository.rs +++ b/pageserver/src/repository.rs @@ -1,6 +1,7 @@ use crate::layered_repository::metadata::TimelineMetadata; use crate::remote_storage::RemoteIndex; use crate::walrecord::ZenithWalRecord; +use crate::config::TenantConf; use crate::CheckpointConfig; use anyhow::{bail, Result}; use byteorder::{ByteOrder, BE}; @@ -249,6 +250,7 @@ pub trait Repository: Send + Sync { &self, timelineid: Option, horizon: u64, + pitr: Duration, checkpoint_before_gc: bool, ) -> Result; @@ -261,6 +263,8 @@ pub trait Repository: Send + Sync { // Allows to retrieve remote timeline index from the repo. Used in walreceiver to grab remote consistent lsn. fn get_remote_index(&self) -> &RemoteIndex; + + fn get_tenant_conf(&self) -> TenantConf; } /// A timeline, that belongs to the current repository. @@ -303,6 +307,7 @@ impl<'a, T> From<&'a RepositoryTimeline> for LocalTimelineState { pub struct GcResult { pub layers_total: u64, pub layers_needed_by_cutoff: u64, + pub layers_needed_by_pitr: u64, pub layers_needed_by_branches: u64, pub layers_not_updated: u64, pub layers_removed: u64, // # of layer files removed because they have been made obsolete by newer ondisk files. @@ -313,6 +318,7 @@ pub struct GcResult { impl AddAssign for GcResult { fn add_assign(&mut self, other: Self) { self.layers_total += other.layers_total; + self.layers_needed_by_pitr += other.layers_needed_by_pitr; self.layers_needed_by_cutoff += other.layers_needed_by_cutoff; self.layers_needed_by_branches += other.layers_needed_by_branches; self.layers_not_updated += other.layers_not_updated; @@ -505,6 +511,7 @@ pub mod repo_harness { let repo = LayeredRepository::new( self.conf, + TenantConf::from(self.conf), walredo_mgr, self.tenant_id, RemoteIndex::empty(), @@ -720,7 +727,7 @@ mod tests { // FIXME: this doesn't actually remove any layer currently, given how the checkpointing // and compaction works. But it does set the 'cutoff' point so that the cross check // below should fail. - repo.gc_iteration(Some(TIMELINE_ID), 0x10, false)?; + repo.gc_iteration(Some(TIMELINE_ID), 0x10, Duration::ZERO, false)?; // try to branch at lsn 25, should fail because we already garbage collected the data match repo.branch_timeline(TIMELINE_ID, NEW_TIMELINE_ID, Lsn(0x25)) { @@ -771,7 +778,7 @@ mod tests { let tline = repo.create_empty_timeline(TIMELINE_ID, Lsn(0))?; make_some_layers(tline.as_ref(), Lsn(0x20))?; - repo.gc_iteration(Some(TIMELINE_ID), 0x10, false)?; + repo.gc_iteration(Some(TIMELINE_ID), 0x10, Duration::ZERO, false)?; let latest_gc_cutoff_lsn = tline.get_latest_gc_cutoff_lsn(); assert!(*latest_gc_cutoff_lsn > Lsn(0x25)); match tline.get(*TEST_KEY, Lsn(0x25)) { @@ -794,7 +801,7 @@ mod tests { .get_timeline_load(NEW_TIMELINE_ID) .expect("Should have a local timeline"); // this removes layers before lsn 40 (50 minus 10), so there are two remaining layers, image and delta for 31-50 - repo.gc_iteration(Some(TIMELINE_ID), 0x10, false)?; + repo.gc_iteration(Some(TIMELINE_ID), 0x10, Duration::ZERO, false)?; assert!(newtline.get(*TEST_KEY, Lsn(0x25)).is_ok()); Ok(()) @@ -813,7 +820,7 @@ mod tests { make_some_layers(newtline.as_ref(), Lsn(0x60))?; // run gc on parent - repo.gc_iteration(Some(TIMELINE_ID), 0x10, false)?; + repo.gc_iteration(Some(TIMELINE_ID), 0x10, Duration::ZERO, false)?; // Check that the data is still accessible on the branch. assert_eq!( diff --git a/pageserver/src/tenant_mgr.rs b/pageserver/src/tenant_mgr.rs index aeff718803..c885dc83f8 100644 --- a/pageserver/src/tenant_mgr.rs +++ b/pageserver/src/tenant_mgr.rs @@ -1,7 +1,8 @@ //! This module acts as a switchboard to access different repositories managed by this //! page server. -use crate::config::PageServerConf; +use crate::branches; +use crate::config::{PageServerConf, TenantConf}; use crate::layered_repository::LayeredRepository; use crate::remote_storage::RemoteIndex; use crate::repository::{Repository, TimelineSyncStatusUpdate}; @@ -174,9 +175,13 @@ pub fn shutdown_all_tenants() { pub fn create_tenant_repository( conf: &'static PageServerConf, + tenant_conf: TenantConf, tenantid: ZTenantId, remote_index: RemoteIndex, ) -> Result> { + let wal_redo_manager = Arc::new(PostgresRedoManager::new(conf, tenantid)); + let repo = branches::create_repo(conf, tenant_conf, tenantid, wal_redo_manager)?; + match access_tenants().entry(tenantid) { Entry::Occupied(_) => { debug!("tenant {} already exists", tenantid); @@ -184,8 +189,10 @@ pub fn create_tenant_repository( } Entry::Vacant(v) => { let wal_redo_manager = Arc::new(PostgresRedoManager::new(conf, tenantid)); + let tenant_conf = match TenantConf::load(conf, tenant_id)?; let repo = timelines::create_repo( conf, + tenant_conf, tenantid, CreateRepo::Real { wal_redo_manager, @@ -210,7 +217,7 @@ pub fn get_tenant_state(tenantid: ZTenantId) -> Option { /// Change the state of a tenant to Active and launch its compactor and GC /// threads. If the tenant was already in Active state or Stopping, does nothing. /// -pub fn activate_tenant(conf: &'static PageServerConf, tenant_id: ZTenantId) -> Result<()> { +pub fn activate_tenant(tenantid: ZTenantId) -> Result<()> { let mut m = access_tenants(); let tenant = m .get_mut(&tenant_id) @@ -230,7 +237,7 @@ pub fn activate_tenant(conf: &'static PageServerConf, tenant_id: ZTenantId) -> R None, "Compactor thread", true, - move || crate::tenant_threads::compact_loop(tenant_id, conf), + move || crate::tenant_threads::compact_loop(tenant_id), )?; let gc_spawn_result = thread_mgr::spawn( @@ -239,7 +246,7 @@ pub fn activate_tenant(conf: &'static PageServerConf, tenant_id: ZTenantId) -> R None, "GC thread", true, - move || crate::tenant_threads::gc_loop(tenant_id, conf), + move || crate::tenant_threads::gc_loop(tenant_id), ) .with_context(|| format!("Failed to launch GC thread for tenant {}", tenant_id)); @@ -251,7 +258,6 @@ pub fn activate_tenant(conf: &'static PageServerConf, tenant_id: ZTenantId) -> R thread_mgr::shutdown_threads(Some(ThreadKind::Compactor), Some(tenant_id), None); return gc_spawn_result; } - tenant.state = TenantState::Active; } diff --git a/pageserver/src/tenant_threads.rs b/pageserver/src/tenant_threads.rs index 0d9a94cc5b..0b21b70e7e 100644 --- a/pageserver/src/tenant_threads.rs +++ b/pageserver/src/tenant_threads.rs @@ -1,6 +1,5 @@ //! This module contains functions to serve per-tenant background processes, //! such as compaction and GC -use crate::config::PageServerConf; use crate::repository::Repository; use crate::tenant_mgr; use crate::tenant_mgr::TenantState; @@ -12,8 +11,8 @@ use zenith_utils::zid::ZTenantId; /// /// Compaction thread's main loop /// -pub fn compact_loop(tenantid: ZTenantId, conf: &'static PageServerConf) -> Result<()> { - if let Err(err) = compact_loop_ext(tenantid, conf) { +pub fn compact_loop(tenantid: ZTenantId) -> Result<()> { + if let Err(err) = compact_loop_ext(tenantid) { error!("compact loop terminated with error: {:?}", err); Err(err) } else { @@ -21,13 +20,15 @@ pub fn compact_loop(tenantid: ZTenantId, conf: &'static PageServerConf) -> Resul } } -fn compact_loop_ext(tenantid: ZTenantId, conf: &'static PageServerConf) -> Result<()> { +fn compact_loop_ext(tenantid: ZTenantId) -> Result<()> { loop { if tenant_mgr::get_tenant_state(tenantid) != Some(TenantState::Active) { break; } + let repo = tenant_mgr::get_repository_for_tenant(tenantid)?; + let tenant_conf = repo.get_tenant_conf(); - std::thread::sleep(conf.compaction_period); + std::thread::sleep(tenant_conf.checkpoint_period); trace!("compaction thread for tenant {} waking up", tenantid); // Compact timelines @@ -46,23 +47,29 @@ fn compact_loop_ext(tenantid: ZTenantId, conf: &'static PageServerConf) -> Resul /// /// GC thread's main loop /// -pub fn gc_loop(tenantid: ZTenantId, conf: &'static PageServerConf) -> Result<()> { +pub fn gc_loop(tenantid: ZTenantId) -> Result<()> { loop { if tenant_mgr::get_tenant_state(tenantid) != Some(TenantState::Active) { break; } trace!("gc thread for tenant {} waking up", tenantid); + let repo = tenant_mgr::get_repository_for_tenant(tenantid)?; + let tenant_conf = repo.get_tenant_conf(); // Garbage collect old files that are not needed for PITR anymore - if conf.gc_horizon > 0 { - let repo = tenant_mgr::get_repository_for_tenant(tenantid)?; - repo.gc_iteration(None, conf.gc_horizon, false)?; + if tenant_conf.gc_horizon > 0 { + repo.gc_iteration( + None, + tenant_conf.gc_horizon, + tenant_conf.pitr_interval, + false, + )?; } // TODO Write it in more adequate way using // condvar.wait_timeout() or something - let mut sleep_time = conf.gc_period.as_secs(); + let mut sleep_time = tenant_conf.gc_period.as_secs(); while sleep_time > 0 && tenant_mgr::get_tenant_state(tenantid) == Some(TenantState::Active) { sleep_time -= 1; diff --git a/pageserver/src/walreceiver.rs b/pageserver/src/walreceiver.rs index 6de0b87478..e44bd83e30 100644 --- a/pageserver/src/walreceiver.rs +++ b/pageserver/src/walreceiver.rs @@ -93,7 +93,7 @@ pub fn launch_wal_receiver( receivers.insert((tenantid, timelineid), receiver); // Update tenant state and start tenant threads, if they are not running yet. - tenant_mgr::activate_tenant(conf, tenantid)?; + tenant_mgr::activate_tenant(tenantid)?; } }; Ok(())