diff --git a/pageserver/src/config.rs b/pageserver/src/config.rs index dc85c83c17..0bfc451a24 100644 --- a/pageserver/src/config.rs +++ b/pageserver/src/config.rs @@ -31,7 +31,8 @@ pub mod defaults { // would be more appropriate. But a low value forces the code to be exercised more, // which is good for now to trigger bugs. pub const DEFAULT_CHECKPOINT_DISTANCE: u64 = 256 * 1024 * 1024; - pub const DEFAULT_CHECKPOINT_PERIOD: &str = "1 s"; + + pub const DEFAULT_COMPACTION_PERIOD: &str = "1 s"; pub const DEFAULT_GC_HORIZON: u64 = 64 * 1024 * 1024; pub const DEFAULT_GC_PERIOD: &str = "100 s"; @@ -57,7 +58,7 @@ pub mod defaults { #listen_http_addr = '{DEFAULT_HTTP_LISTEN_ADDR}' #checkpoint_distance = {DEFAULT_CHECKPOINT_DISTANCE} # in bytes -#checkpoint_period = '{DEFAULT_CHECKPOINT_PERIOD}' +#compaction_period = '{DEFAULT_COMPACTION_PERIOD}' #gc_period = '{DEFAULT_GC_PERIOD}' #gc_horizon = {DEFAULT_GC_HORIZON} @@ -91,7 +92,9 @@ pub struct PageServerConf { // This puts a backstop on how much WAL needs to be re-digested if the // page server crashes. pub checkpoint_distance: u64, - pub checkpoint_period: Duration, + + // How often to check if there's compaction work to be done. + pub compaction_period: Duration, pub gc_horizon: u64, pub gc_period: Duration, @@ -145,7 +148,8 @@ struct PageServerConfigBuilder { listen_http_addr: BuilderValue, checkpoint_distance: BuilderValue, - checkpoint_period: BuilderValue, + + compaction_period: BuilderValue, gc_horizon: BuilderValue, gc_period: BuilderValue, @@ -179,8 +183,8 @@ impl Default for PageServerConfigBuilder { listen_pg_addr: Set(DEFAULT_PG_LISTEN_ADDR.to_string()), listen_http_addr: Set(DEFAULT_HTTP_LISTEN_ADDR.to_string()), checkpoint_distance: Set(DEFAULT_CHECKPOINT_DISTANCE), - checkpoint_period: Set(humantime::parse_duration(DEFAULT_CHECKPOINT_PERIOD) - .expect("cannot parse default checkpoint period")), + compaction_period: Set(humantime::parse_duration(DEFAULT_COMPACTION_PERIOD) + .expect("cannot parse default compaction period")), gc_horizon: Set(DEFAULT_GC_HORIZON), gc_period: Set(humantime::parse_duration(DEFAULT_GC_PERIOD) .expect("cannot parse default gc period")), @@ -216,8 +220,8 @@ impl PageServerConfigBuilder { self.checkpoint_distance = BuilderValue::Set(checkpoint_distance) } - pub fn checkpoint_period(&mut self, checkpoint_period: Duration) { - self.checkpoint_period = BuilderValue::Set(checkpoint_period) + pub fn compaction_period(&mut self, compaction_period: Duration) { + self.compaction_period = BuilderValue::Set(compaction_period) } pub fn gc_horizon(&mut self, gc_horizon: u64) { @@ -286,9 +290,9 @@ impl PageServerConfigBuilder { checkpoint_distance: self .checkpoint_distance .ok_or(anyhow::anyhow!("missing checkpoint_distance"))?, - checkpoint_period: self - .checkpoint_period - .ok_or(anyhow::anyhow!("missing checkpoint_period"))?, + compaction_period: self + .compaction_period + .ok_or(anyhow::anyhow!("missing compaction_period"))?, gc_horizon: self .gc_horizon .ok_or(anyhow::anyhow!("missing gc_horizon"))?, @@ -425,7 +429,7 @@ impl PageServerConf { "listen_pg_addr" => builder.listen_pg_addr(parse_toml_string(key, item)?), "listen_http_addr" => builder.listen_http_addr(parse_toml_string(key, item)?), "checkpoint_distance" => builder.checkpoint_distance(parse_toml_u64(key, item)?), - "checkpoint_period" => builder.checkpoint_period(parse_toml_duration(key, item)?), + "compaction_period" => builder.compaction_period(parse_toml_duration(key, item)?), "gc_horizon" => builder.gc_horizon(parse_toml_u64(key, item)?), "gc_period" => builder.gc_period(parse_toml_duration(key, item)?), "wait_lsn_timeout" => builder.wait_lsn_timeout(parse_toml_duration(key, item)?), @@ -561,7 +565,7 @@ impl PageServerConf { PageServerConf { id: ZNodeId(0), checkpoint_distance: defaults::DEFAULT_CHECKPOINT_DISTANCE, - checkpoint_period: Duration::from_secs(10), + compaction_period: Duration::from_secs(10), gc_horizon: defaults::DEFAULT_GC_HORIZON, gc_period: Duration::from_secs(10), wait_lsn_timeout: Duration::from_secs(60), @@ -631,7 +635,8 @@ listen_pg_addr = '127.0.0.1:64000' listen_http_addr = '127.0.0.1:9898' checkpoint_distance = 111 # in bytes -checkpoint_period = '111 s' + +compaction_period = '111 s' gc_period = '222 s' gc_horizon = 222 @@ -668,7 +673,7 @@ id = 10 listen_pg_addr: defaults::DEFAULT_PG_LISTEN_ADDR.to_string(), listen_http_addr: defaults::DEFAULT_HTTP_LISTEN_ADDR.to_string(), checkpoint_distance: defaults::DEFAULT_CHECKPOINT_DISTANCE, - checkpoint_period: humantime::parse_duration(defaults::DEFAULT_CHECKPOINT_PERIOD)?, + compaction_period: humantime::parse_duration(defaults::DEFAULT_COMPACTION_PERIOD)?, gc_horizon: defaults::DEFAULT_GC_HORIZON, gc_period: humantime::parse_duration(defaults::DEFAULT_GC_PERIOD)?, wait_lsn_timeout: humantime::parse_duration(defaults::DEFAULT_WAIT_LSN_TIMEOUT)?, @@ -712,7 +717,7 @@ id = 10 listen_pg_addr: "127.0.0.1:64000".to_string(), listen_http_addr: "127.0.0.1:9898".to_string(), checkpoint_distance: 111, - checkpoint_period: Duration::from_secs(111), + compaction_period: Duration::from_secs(111), gc_horizon: 222, gc_period: Duration::from_secs(222), wait_lsn_timeout: Duration::from_secs(111), diff --git a/pageserver/src/layered_repository.rs b/pageserver/src/layered_repository.rs index f195288b9a..2c48295ad0 100644 --- a/pageserver/src/layered_repository.rs +++ b/pageserver/src/layered_repository.rs @@ -72,8 +72,6 @@ use layer_map::LayerMap; use layer_map::SearchResult; use storage_layer::{Layer, ValueReconstructResult, ValueReconstructState}; -use crate::keyspace::TARGET_FILE_SIZE_BYTES; - // re-export this function so that page_cache.rs can use it. pub use crate::layered_repository::ephemeral_file::writeback as writeback_ephemeral_file; @@ -104,7 +102,7 @@ pub const TIMELINES_SEGMENT_NAME: &str = "timelines"; /// Repository consists of multiple timelines. Keep them in a hash table. /// pub struct LayeredRepository { - conf: &'static PageServerConf, + pub conf: &'static PageServerConf, tenantid: ZTenantId, timelines: Mutex>, // This mutex prevents creation of new timelines during GC. @@ -246,23 +244,58 @@ impl Repository for LayeredRepository { }) } - fn checkpoint_iteration(&self, cconf: CheckpointConfig) -> Result<()> { + fn compaction_iteration(&self) -> Result<()> { // Scan through the hashmap and collect a list of all the timelines, // while holding the lock. Then drop the lock and actually perform the - // checkpoints. We don't want to block everything else while the - // checkpoint runs. + // compactions. We don't want to block everything else while the + // compaction runs. let timelines = self.timelines.lock().unwrap(); - let timelines_to_checkpoint = timelines + let timelines_to_compact = timelines .iter() .map(|(timelineid, timeline)| (*timelineid, timeline.clone())) .collect::>(); drop(timelines); - for (timelineid, timeline) in &timelines_to_checkpoint { + for (timelineid, timeline) in &timelines_to_compact { + let _entered = + info_span!("compact", timeline = %timelineid, tenant = %self.tenantid).entered(); + match timeline { + LayeredTimelineEntry::Local(timeline) => { + timeline.compact()?; + } + LayeredTimelineEntry::Remote { .. } => { + debug!("Cannot compact remote timeline {}", timelineid) + } + } + } + + Ok(()) + } + + /// + /// Flush all in-memory data to disk. + /// + /// Used at shutdown. + /// + fn checkpoint(&self) -> Result<()> { + // Scan through the hashmap and collect a list of all the timelines, + // while holding the lock. Then drop the lock and actually perform the + // checkpoints. We don't want to block everything else while the + // checkpoint runs. + let timelines = self.timelines.lock().unwrap(); + let timelines_to_compact = timelines + .iter() + .map(|(timelineid, timeline)| (*timelineid, timeline.clone())) + .collect::>(); + drop(timelines); + + for (timelineid, timeline) in &timelines_to_compact { let _entered = info_span!("checkpoint", timeline = %timelineid, tenant = %self.tenantid).entered(); match timeline { - LayeredTimelineEntry::Local(timeline) => timeline.checkpoint(cconf)?, + LayeredTimelineEntry::Local(timeline) => { + timeline.checkpoint(CheckpointConfig::Flush)?; + } LayeredTimelineEntry::Remote { .. } => debug!( "Cannot run the checkpoint for remote timeline {}", timelineid @@ -756,9 +789,9 @@ pub struct LayeredTimeline { // Metrics histograms reconstruct_time_histo: Histogram, - checkpoint_time_histo: Histogram, - flush_checkpoint_time_histo: Histogram, - forced_checkpoint_time_histo: Histogram, + flush_time_histo: Histogram, + compact_time_histo: Histogram, + create_images_time_histo: Histogram, /// If `true`, will backup its files that appear after each checkpointing to the remote storage. upload_relishes: AtomicBool, @@ -860,23 +893,14 @@ impl Timeline for LayeredTimeline { fn checkpoint(&self, cconf: CheckpointConfig) -> Result<()> { match cconf { CheckpointConfig::Flush => { - self.flush_checkpoint_time_histo - .observe_closure_duration(|| { - self.freeze_inmem_layer(false); - self.flush_frozen_layers(true) - }) + self.freeze_inmem_layer(false); + self.flush_frozen_layers(true) } CheckpointConfig::Forced => { - self.forced_checkpoint_time_histo - .observe_closure_duration(|| { - self.freeze_inmem_layer(false); - self.flush_frozen_layers(true)?; - self.checkpoint_internal() - }) + self.freeze_inmem_layer(false); + self.flush_frozen_layers(true)?; + self.compact() } - CheckpointConfig::Distance => self - .checkpoint_time_histo - .observe_closure_duration(|| self.checkpoint_internal()), } } @@ -946,23 +970,23 @@ impl LayeredTimeline { let reconstruct_time_histo = RECONSTRUCT_TIME .get_metric_with_label_values(&[&tenantid.to_string(), &timelineid.to_string()]) .unwrap(); - let checkpoint_time_histo = STORAGE_TIME + let flush_time_histo = STORAGE_TIME .get_metric_with_label_values(&[ - "checkpoint", + "layer flush", &tenantid.to_string(), &timelineid.to_string(), ]) .unwrap(); - let flush_checkpoint_time_histo = STORAGE_TIME + let compact_time_histo = STORAGE_TIME .get_metric_with_label_values(&[ - "flush checkpoint", + "compact", &tenantid.to_string(), &timelineid.to_string(), ]) .unwrap(); - let forced_checkpoint_time_histo = STORAGE_TIME + let create_images_time_histo = STORAGE_TIME .get_metric_with_label_values(&[ - "forced checkpoint", + "create images", &tenantid.to_string(), &timelineid.to_string(), ]) @@ -989,9 +1013,9 @@ impl LayeredTimeline { ancestor_lsn: metadata.ancestor_lsn(), reconstruct_time_histo, - checkpoint_time_histo, - flush_checkpoint_time_histo, - forced_checkpoint_time_histo, + flush_time_histo, + compact_time_histo, + create_images_time_histo, upload_relishes: AtomicBool::new(upload_relishes), @@ -1331,37 +1355,6 @@ impl LayeredTimeline { drop(layers); } - /// - /// Flush to disk all data that was written with the put_* functions - /// - /// NOTE: This has nothing to do with checkpoint in PostgreSQL. - fn checkpoint_internal(&self) -> Result<()> { - info!("checkpoint starting"); - // Prevent concurrent checkpoints - // FIXME: This does compaction now, not the flushing of layers. - // Is this lock still needed? - let _checkpoint_cs = self.checkpoint_cs.lock().unwrap(); - - // Create new image layers to allow GC and to reduce read latency - // TODO: the threshold for how often we create image layers is - // currently hard-coded at 3. It means, write out a new image layer, - // if there are at least three delta layers on top of it. - self.compact(TARGET_FILE_SIZE_BYTES as usize)?; - - // TODO: We should also compact existing delta layers here. - - // Call unload() on all frozen layers, to release memory. - // This shouldn't be much memory, as only metadata is slurped - // into memory. - let layers = self.layers.lock().unwrap(); - for layer in layers.iter_historic_layers() { - layer.unload()?; - } - drop(layers); - - Ok(()) - } - pub fn check_checkpoint_distance(self: &Arc) -> Result<()> { let last_lsn = self.get_last_record_lsn(); @@ -1402,6 +1395,8 @@ impl LayeredTimeline { } }; + let timer = self.flush_time_histo.start_timer(); + loop { let layers = self.layers.lock().unwrap(); if let Some(frozen_layer) = layers.frozen_layers.front() { @@ -1422,6 +1417,8 @@ impl LayeredTimeline { } } + timer.stop_and_record(); + Ok(()) } @@ -1525,13 +1522,13 @@ impl LayeredTimeline { Ok(()) } - fn compact(&self, target_file_size: usize) -> Result<()> { + pub fn compact(&self) -> Result<()> { // // High level strategy for compaction / image creation: // // 1. First, calculate the desired "partitioning" of the // currently in-use key space. The goal is to partition the - // key space into TARGET_FILE_SIZE chunks, but also take into + // key space into roughly fixed-size chunks, but also take into // account any existing image layers, and try to align the // chunk boundaries with the existing image layers to avoid // too much churn. Also try to align chunk boundaries with @@ -1561,10 +1558,13 @@ impl LayeredTimeline { // but they are a bit ad hoc and don't quite work like it's explained // above. Rewrite it. + let target_file_size = self.conf.checkpoint_distance; + // 1. The partitioning was already done by the code in // pgdatadir_mapping.rs. We just use it here. let partitioning_guard = self.partitioning.read().unwrap(); if let Some((partitioning, lsn)) = partitioning_guard.as_ref() { + let timer = self.create_images_time_histo.start_timer(); // Make a copy of the partitioning, so that we can release // the lock. Otherwise we could block the WAL receiver. let lsn = *lsn; @@ -1578,12 +1578,25 @@ impl LayeredTimeline { self.create_image_layer(partition, lsn)?; } } + timer.stop_and_record(); // 3. Compact + let timer = self.compact_time_histo.start_timer(); self.compact_level0(target_file_size)?; + timer.stop_and_record(); } else { info!("Could not compact because no partitioning specified yet"); } + + // Call unload() on all frozen layers, to release memory. + // This shouldn't be much memory, as only metadata is slurped + // into memory. + let layers = self.layers.lock().unwrap(); + for layer in layers.iter_historic_layers() { + layer.unload()?; + } + drop(layers); + Ok(()) } @@ -1643,7 +1656,7 @@ impl LayeredTimeline { Ok(()) } - fn compact_level0(&self, target_file_size: usize) -> Result<()> { + fn compact_level0(&self, target_file_size: u64) -> Result<()> { let layers = self.layers.lock().unwrap(); // We compact or "shuffle" the level-0 delta layers when 10 have @@ -1698,7 +1711,7 @@ impl LayeredTimeline { if let Some(prev_key) = prev_key { if key != prev_key && writer.is_some() { let size = writer.as_mut().unwrap().size(); - if size > target_file_size as u64 { + if size > target_file_size { new_layers.push(writer.take().unwrap().finish(prev_key.next())?); writer = None; } @@ -2032,7 +2045,7 @@ fn rename_to_backup(path: PathBuf) -> anyhow::Result<()> { /// file format and directory layout. The test here are more low level. /// #[cfg(test)] -mod tests { +pub mod tests { use super::*; use crate::keyspace::KeySpaceAccum; use crate::repository::repo_harness::*; @@ -2072,7 +2085,7 @@ mod tests { // file size is much larger, maybe 1 GB. But a small size makes it // much faster to exercise all the logic for creating the files, // garbage collection, compaction etc. - const TEST_FILE_SIZE: usize = 4 * 1024 * 1024; + pub const TEST_FILE_SIZE: u64 = 4 * 1024 * 1024; #[test] fn test_images() -> Result<()> { @@ -2088,7 +2101,7 @@ mod tests { drop(writer); tline.checkpoint(CheckpointConfig::Forced)?; - tline.compact(TEST_FILE_SIZE)?; + tline.compact()?; let writer = tline.writer(); writer.put(TEST_KEY, Lsn(0x20), Value::Image(TEST_IMG("foo at 0x20")))?; @@ -2096,7 +2109,7 @@ mod tests { drop(writer); tline.checkpoint(CheckpointConfig::Forced)?; - tline.compact(TEST_FILE_SIZE)?; + tline.compact()?; let writer = tline.writer(); writer.put(TEST_KEY, Lsn(0x30), Value::Image(TEST_IMG("foo at 0x30")))?; @@ -2104,7 +2117,7 @@ mod tests { drop(writer); tline.checkpoint(CheckpointConfig::Forced)?; - tline.compact(TEST_FILE_SIZE)?; + tline.compact()?; let writer = tline.writer(); writer.put(TEST_KEY, Lsn(0x40), Value::Image(TEST_IMG("foo at 0x40")))?; @@ -2112,7 +2125,7 @@ mod tests { drop(writer); tline.checkpoint(CheckpointConfig::Forced)?; - tline.compact(TEST_FILE_SIZE)?; + tline.compact()?; assert_eq!(tline.get(TEST_KEY, Lsn(0x10))?, TEST_IMG("foo at 0x10")); assert_eq!(tline.get(TEST_KEY, Lsn(0x1f))?, TEST_IMG("foo at 0x10")); @@ -2165,7 +2178,7 @@ mod tests { tline.update_gc_info(Vec::new(), cutoff); tline.checkpoint(CheckpointConfig::Forced)?; - tline.compact(TEST_FILE_SIZE)?; + tline.compact()?; tline.gc()?; } @@ -2239,7 +2252,7 @@ mod tests { let cutoff = tline.get_last_record_lsn(); tline.update_gc_info(Vec::new(), cutoff); tline.checkpoint(CheckpointConfig::Forced)?; - tline.compact(TEST_FILE_SIZE)?; + tline.compact()?; tline.gc()?; } @@ -2325,7 +2338,7 @@ mod tests { let cutoff = tline.get_last_record_lsn(); tline.update_gc_info(Vec::new(), cutoff); tline.checkpoint(CheckpointConfig::Forced)?; - tline.compact(TEST_FILE_SIZE)?; + tline.compact()?; tline.gc()?; } diff --git a/pageserver/src/lib.rs b/pageserver/src/lib.rs index 3c557e4e82..fd38ba9d70 100644 --- a/pageserver/src/lib.rs +++ b/pageserver/src/lib.rs @@ -41,8 +41,6 @@ pub const LOG_FILE_NAME: &str = "pageserver.log"; /// Config for the Repository checkpointer #[derive(Debug, Clone, Copy)] pub enum CheckpointConfig { - // Flush in-memory data that is older than this - Distance, // Flush all in-memory data Flush, // Flush all in-memory data and reconstruct all page images diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index 69ef4b9868..947a2d72ab 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -729,6 +729,13 @@ impl postgres_backend::Handler for PageServerHandler { .context("Failed to fetch local timeline for checkpoint request")?; timeline.tline.checkpoint(CheckpointConfig::Forced)?; + + // Also compact it. + // + // FIXME: This probably shouldn't be part of a "checkpoint" command, but a + // separate operation. Update the tests if you change this. + timeline.tline.compact()?; + pgb.write_message_noflush(&SINGLE_COL_ROWDESC)? .write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?; } else { diff --git a/pageserver/src/pgdatadir_mapping.rs b/pageserver/src/pgdatadir_mapping.rs index 9be8e658ca..c4661ad2d6 100644 --- a/pageserver/src/pgdatadir_mapping.rs +++ b/pageserver/src/pgdatadir_mapping.rs @@ -35,6 +35,8 @@ where pub tline: Arc, pub last_partitioning: AtomicLsn, pub current_logical_size: AtomicIsize, + + pub repartition_threshold: u64, } #[derive(Debug, Serialize, Deserialize)] @@ -71,11 +73,12 @@ pub struct SlruSegmentDirectory { static ZERO_PAGE: Bytes = Bytes::from_static(&[0u8; 8192]); impl DatadirTimeline { - pub fn new(tline: Arc) -> Self { + pub fn new(tline: Arc, repartition_threshold: u64) -> Self { DatadirTimeline { tline, last_partitioning: AtomicLsn::new(0), current_logical_size: AtomicIsize::new(0), + repartition_threshold, } } @@ -1178,7 +1181,7 @@ pub fn create_test_timeline( timeline_id: zenith_utils::zid::ZTimelineId, ) -> Result>> { let tline = repo.create_empty_timeline(timeline_id, Lsn(8))?; - let tline = DatadirTimeline::new(tline); + let tline = DatadirTimeline::new(tline, crate::layered_repository::tests::TEST_FILE_SIZE / 10); let mut writer = tline.begin_record(Lsn(8)); writer.init_empty()?; diff --git a/pageserver/src/repository.rs b/pageserver/src/repository.rs index 03fbff42a8..56bd5208ca 100644 --- a/pageserver/src/repository.rs +++ b/pageserver/src/repository.rs @@ -194,6 +194,11 @@ pub trait Repository: Send + Sync { /// Branch a timeline fn branch_timeline(&self, src: ZTimelineId, dst: ZTimelineId, start_lsn: Lsn) -> Result<()>; + /// Flush all data to disk. + /// + /// this is used at graceful shutdown. + fn checkpoint(&self) -> Result<()>; + /// perform one garbage collection iteration, removing old data files from disk. /// this function is periodically called by gc thread. /// also it can be explicitly requested through page server api 'do_gc' command. @@ -210,9 +215,9 @@ pub trait Repository: Send + Sync { checkpoint_before_gc: bool, ) -> Result; - /// perform one checkpoint iteration, flushing in-memory data on disk. - /// this function is periodically called by checkponter thread. - fn checkpoint_iteration(&self, cconf: CheckpointConfig) -> Result<()>; + /// perform one compaction iteration. + /// this function is periodically called by compactor thread. + fn compaction_iteration(&self) -> Result<()>; } /// A timeline, that belongs to the current repository. diff --git a/pageserver/src/tenant_mgr.rs b/pageserver/src/tenant_mgr.rs index c0350fa288..38ac1a8bc4 100644 --- a/pageserver/src/tenant_mgr.rs +++ b/pageserver/src/tenant_mgr.rs @@ -9,7 +9,6 @@ use crate::thread_mgr; use crate::thread_mgr::ThreadKind; use crate::timelines; use crate::walredo::PostgresRedoManager; -use crate::CheckpointConfig; use crate::{DatadirTimelineImpl, RepositoryImpl}; use anyhow::{Context, Result}; use lazy_static::lazy_static; @@ -152,7 +151,7 @@ pub fn shutdown_all_tenants() { thread_mgr::shutdown_threads(Some(ThreadKind::WalReceiver), None, None); thread_mgr::shutdown_threads(Some(ThreadKind::GarbageCollector), None, None); - thread_mgr::shutdown_threads(Some(ThreadKind::Checkpointer), None, None); + thread_mgr::shutdown_threads(Some(ThreadKind::Compactor), None, None); // Ok, no background threads running anymore. Flush any remaining data in // memory to disk. @@ -166,7 +165,7 @@ pub fn shutdown_all_tenants() { debug!("shutdown tenant {}", tenantid); match get_repository_for_tenant(tenantid) { Ok(repo) => { - if let Err(err) = repo.checkpoint_iteration(CheckpointConfig::Flush) { + if let Err(err) = repo.checkpoint() { error!( "Could not checkpoint tenant {} during shutdown: {:?}", tenantid, err @@ -212,7 +211,7 @@ pub fn get_tenant_state(tenantid: ZTenantId) -> Option { } /// -/// Change the state of a tenant to Active and launch its checkpointer and GC +/// Change the state of a tenant to Active and launch its compactor and GC /// threads. If the tenant was already in Active state or Stopping, does nothing. /// pub fn activate_tenant(conf: &'static PageServerConf, tenantid: ZTenantId) -> Result<()> { @@ -227,18 +226,18 @@ pub fn activate_tenant(conf: &'static PageServerConf, tenantid: ZTenantId) -> Re // If the tenant is already active, nothing to do. TenantState::Active => {} - // If it's Idle, launch the checkpointer and GC threads + // If it's Idle, launch the compactor and GC threads TenantState::Idle => { thread_mgr::spawn( - ThreadKind::Checkpointer, + ThreadKind::Compactor, Some(tenantid), None, - "Checkpointer thread", - move || crate::tenant_threads::checkpoint_loop(tenantid, conf), + "Compactor thread", + move || crate::tenant_threads::compact_loop(tenantid, conf), )?; // FIXME: if we fail to launch the GC thread, but already launched the - // checkpointer, we're in a strange state. + // compactor, we're in a strange state. thread_mgr::spawn( ThreadKind::GarbageCollector, @@ -286,7 +285,9 @@ pub fn get_timeline_for_tenant( .local_timeline() .with_context(|| format!("cannot fetch timeline {}", timelineid))?; - let page_tline = Arc::new(DatadirTimelineImpl::new(tline)); + let repartition_distance = tenant.repo.conf.checkpoint_distance / 10; + + let page_tline = Arc::new(DatadirTimelineImpl::new(tline, repartition_distance)); page_tline.init_logical_size()?; tenant.timelines.insert(timelineid, Arc::clone(&page_tline)); Ok(page_tline) diff --git a/pageserver/src/tenant_threads.rs b/pageserver/src/tenant_threads.rs index c7fe625ecf..6c2ba479db 100644 --- a/pageserver/src/tenant_threads.rs +++ b/pageserver/src/tenant_threads.rs @@ -1,44 +1,42 @@ //! This module contains functions to serve per-tenant background processes, -//! such as checkpointer and GC +//! such as compaction and GC use crate::config::PageServerConf; use crate::repository::Repository; use crate::tenant_mgr; use crate::tenant_mgr::TenantState; -use crate::CheckpointConfig; use anyhow::Result; use std::time::Duration; use tracing::*; use zenith_utils::zid::ZTenantId; /// -/// Checkpointer thread's main loop +/// Compaction thread's main loop /// -pub fn checkpoint_loop(tenantid: ZTenantId, conf: &'static PageServerConf) -> Result<()> { - if let Err(err) = checkpoint_loop_ext(tenantid, conf) { - error!("checkpoint loop terminated with error: {:?}", err); +pub fn compact_loop(tenantid: ZTenantId, conf: &'static PageServerConf) -> Result<()> { + if let Err(err) = compact_loop_ext(tenantid, conf) { + error!("compact loop terminated with error: {:?}", err); Err(err) } else { Ok(()) } } -fn checkpoint_loop_ext(tenantid: ZTenantId, conf: &'static PageServerConf) -> Result<()> { +fn compact_loop_ext(tenantid: ZTenantId, conf: &'static PageServerConf) -> Result<()> { loop { if tenant_mgr::get_tenant_state(tenantid) != Some(TenantState::Active) { break; } - std::thread::sleep(conf.checkpoint_period); - trace!("checkpointer thread for tenant {} waking up", tenantid); + std::thread::sleep(conf.compaction_period); + trace!("compaction thread for tenant {} waking up", tenantid); - // checkpoint timelines that have accumulated more than CHECKPOINT_DISTANCE - // bytes of WAL since last checkpoint. + // Compact timelines let repo = tenant_mgr::get_repository_for_tenant(tenantid)?; - repo.checkpoint_iteration(CheckpointConfig::Distance)?; + repo.compaction_iteration()?; } trace!( - "checkpointer thread stopped for tenant {} state is {:?}", + "compaction thread stopped for tenant {} state is {:?}", tenantid, tenant_mgr::get_tenant_state(tenantid) ); diff --git a/pageserver/src/thread_mgr.rs b/pageserver/src/thread_mgr.rs index ec3606b41e..46aa391241 100644 --- a/pageserver/src/thread_mgr.rs +++ b/pageserver/src/thread_mgr.rs @@ -92,8 +92,8 @@ pub enum ThreadKind { // Thread that connects to a safekeeper to fetch WAL for one timeline. WalReceiver, - // Thread that handles checkpointing of all timelines for a tenant. - Checkpointer, + // Thread that handles compaction of all timelines for a tenant. + Compactor, // Thread that handles GC of a tenant GarbageCollector, diff --git a/pageserver/src/timelines.rs b/pageserver/src/timelines.rs index 7969df47b9..8ecb5f8e69 100644 --- a/pageserver/src/timelines.rs +++ b/pageserver/src/timelines.rs @@ -259,7 +259,7 @@ fn bootstrap_timeline( // Initdb lsn will be equal to last_record_lsn which will be set after import. // Because we know it upfront avoid having an option or dummy zero value by passing it to create_empty_timeline. let timeline = repo.create_empty_timeline(tli, lsn)?; - let mut page_tline: DatadirTimeline = DatadirTimeline::new(timeline); + let mut page_tline: DatadirTimeline = DatadirTimeline::new(timeline, u64::MAX); import_datadir::import_timeline_from_postgres_datadir(&pgdata_path, &mut page_tline, lsn)?; page_tline.tline.checkpoint(CheckpointConfig::Forced)?;