diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index 0e9a6ce4ea..69c89a80b4 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -11,7 +11,7 @@ //! parent timeline, and the last LSN that has been written to disk. //! -use anyhow::{bail, ensure, Context, Result}; +use anyhow::{bail, ensure, Context}; use tokio::sync::watch; use tracing::*; use utils::crashsafe::path_with_suffix_extension; @@ -25,7 +25,6 @@ use std::fs::File; use std::fs::OpenOptions; use std::io; use std::io::Write; -use std::num::NonZeroU64; use std::ops::Bound::Included; use std::path::Path; use std::path::PathBuf; @@ -292,7 +291,7 @@ impl TimelineUninitMark { Ok(()) } - fn delete_mark_file_if_present(&mut self) -> Result<(), anyhow::Error> { + fn delete_mark_file_if_present(&mut self) -> anyhow::Result<()> { let uninit_mark_file = &self.uninit_mark_path; let uninit_mark_parent = uninit_mark_file .parent() @@ -470,7 +469,7 @@ impl Tenant { horizon: u64, pitr: Duration, checkpoint_before_gc: bool, - ) -> Result { + ) -> anyhow::Result { let timeline_str = target_timeline_id .map(|x| x.to_string()) .unwrap_or_else(|| "-".to_string()); @@ -486,7 +485,7 @@ impl Tenant { /// This function is periodically called by compactor task. /// Also it can be explicitly requested per timeline through page server /// api's 'compact' command. - pub fn compaction_iteration(&self) -> Result<()> { + pub fn compaction_iteration(&self) -> anyhow::Result<()> { // Scan through the hashmap and collect a list of all the timelines, // while holding the lock. Then drop the lock and actually perform the // compactions. We don't want to block everything else while the @@ -510,7 +509,7 @@ impl Tenant { /// /// Used at graceful shutdown. /// - pub fn checkpoint(&self) -> Result<()> { + pub fn checkpoint(&self) -> anyhow::Result<()> { // Scan through the hashmap and collect a list of all the timelines, // while holding the lock. Then drop the lock and actually perform the // checkpoints. We don't want to block everything else while the @@ -681,7 +680,7 @@ impl Tenant { /// before the children. fn tree_sort_timelines( timelines: HashMap, -) -> Result> { +) -> anyhow::Result> { let mut result = Vec::with_capacity(timelines.len()); let mut now = Vec::with_capacity(timelines.len()); @@ -784,27 +783,6 @@ impl Tenant { .unwrap_or(self.conf.default_tenant_conf.pitr_interval) } - pub fn get_wal_receiver_connect_timeout(&self) -> Duration { - let tenant_conf = self.tenant_conf.read().unwrap(); - tenant_conf - .walreceiver_connect_timeout - .unwrap_or(self.conf.default_tenant_conf.walreceiver_connect_timeout) - } - - pub fn get_lagging_wal_timeout(&self) -> Duration { - let tenant_conf = self.tenant_conf.read().unwrap(); - tenant_conf - .lagging_wal_timeout - .unwrap_or(self.conf.default_tenant_conf.lagging_wal_timeout) - } - - pub fn get_max_lsn_wal_lag(&self) -> NonZeroU64 { - let tenant_conf = self.tenant_conf.read().unwrap(); - tenant_conf - .max_lsn_wal_lag - .unwrap_or(self.conf.default_tenant_conf.max_lsn_wal_lag) - } - pub fn update_tenant_config(&self, new_tenant_conf: TenantConfOpt) { self.tenant_conf.write().unwrap().update(&new_tenant_conf); } @@ -836,7 +814,7 @@ impl Tenant { )) } - pub fn new( + pub(super) fn new( conf: &'static PageServerConf, tenant_conf: TenantConfOpt, walredo_mgr: Arc, @@ -859,7 +837,7 @@ impl Tenant { } /// Locate and load config - pub fn load_tenant_config( + pub(super) fn load_tenant_config( conf: &'static PageServerConf, tenant_id: TenantId, ) -> anyhow::Result { @@ -901,7 +879,7 @@ impl Tenant { Ok(tenant_conf) } - pub fn persist_tenant_config( + pub(super) fn persist_tenant_config( target_config_path: &Path, tenant_conf: TenantConfOpt, first_save: bool, @@ -994,7 +972,7 @@ impl Tenant { horizon: u64, pitr: Duration, checkpoint_before_gc: bool, - ) -> Result { + ) -> anyhow::Result { let mut totals: GcResult = Default::default(); let now = Instant::now(); @@ -1411,7 +1389,7 @@ fn run_initdb( conf: &'static PageServerConf, initdb_target_dir: &Path, pg_version: u32, -) -> Result<()> { +) -> anyhow::Result<()> { let initdb_bin_path = conf.pg_bin_dir(pg_version)?.join("initdb"); let initdb_lib_dir = conf.pg_lib_dir(pg_version)?; info!( @@ -1457,7 +1435,7 @@ impl Drop for Tenant { } } /// Dump contents of a layer file to stdout. -pub fn dump_layerfile_from_path(path: &Path, verbose: bool) -> Result<()> { +pub fn dump_layerfile_from_path(path: &Path, verbose: bool) -> anyhow::Result<()> { use std::os::unix::fs::FileExt; // All layer files start with a two-byte "magic" value, to identify the kind of @@ -1562,13 +1540,13 @@ pub mod harness { } impl<'a> TenantHarness<'a> { - pub fn create(test_name: &'static str) -> Result { + pub fn create(test_name: &'static str) -> anyhow::Result { Self::create_internal(test_name, false) } - pub fn create_exclusive(test_name: &'static str) -> Result { + pub fn create_exclusive(test_name: &'static str) -> anyhow::Result { Self::create_internal(test_name, true) } - fn create_internal(test_name: &'static str, exclusive: bool) -> Result { + fn create_internal(test_name: &'static str, exclusive: bool) -> anyhow::Result { let lock_guard = if exclusive { (None, Some(LOCK.write().unwrap())) } else { @@ -1602,7 +1580,7 @@ pub mod harness { self.try_load().expect("failed to load test tenant") } - pub fn try_load(&self) -> Result { + pub fn try_load(&self) -> anyhow::Result { let walredo_mgr = Arc::new(TestRedoManager); let tenant = Tenant::new( @@ -1682,7 +1660,7 @@ pub mod harness { }, records.len() ); - println!("{}", s); + println!("{s}"); Ok(TEST_IMG(&s)) } @@ -1706,7 +1684,7 @@ mod tests { Lazy::new(|| Key::from_slice(&hex!("112222222233333333444444445500000001"))); #[test] - fn test_basic() -> Result<()> { + fn test_basic() -> anyhow::Result<()> { let tenant = TenantHarness::create("test_basic")?.load(); let tline = tenant .create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION)? @@ -1730,7 +1708,7 @@ mod tests { } #[test] - fn no_duplicate_timelines() -> Result<()> { + fn no_duplicate_timelines() -> anyhow::Result<()> { let tenant = TenantHarness::create("no_duplicate_timelines")?.load(); let _ = tenant .create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION)? @@ -1761,7 +1739,7 @@ mod tests { /// Test branch creation /// #[test] - fn test_branch() -> Result<()> { + fn test_branch() -> anyhow::Result<()> { let tenant = TenantHarness::create("test_branch")?.load(); let tline = tenant .create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION)? @@ -1814,7 +1792,7 @@ mod tests { Ok(()) } - fn make_some_layers(tline: &Timeline, start_lsn: Lsn) -> Result<()> { + fn make_some_layers(tline: &Timeline, start_lsn: Lsn) -> anyhow::Result<()> { let mut lsn = start_lsn; #[allow(non_snake_case)] { @@ -1856,7 +1834,7 @@ mod tests { } #[test] - fn test_prohibit_branch_creation_on_garbage_collected_data() -> Result<()> { + fn test_prohibit_branch_creation_on_garbage_collected_data() -> anyhow::Result<()> { let tenant = TenantHarness::create("test_prohibit_branch_creation_on_garbage_collected_data")? .load(); @@ -1888,7 +1866,7 @@ mod tests { } #[test] - fn test_prohibit_branch_creation_on_pre_initdb_lsn() -> Result<()> { + fn test_prohibit_branch_creation_on_pre_initdb_lsn() -> anyhow::Result<()> { let tenant = TenantHarness::create("test_prohibit_branch_creation_on_pre_initdb_lsn")?.load(); @@ -1915,7 +1893,7 @@ mod tests { // FIXME: This currently fails to error out. Calling GC doesn't currently // remove the old value, we'd need to work a little harder #[test] - fn test_prohibit_get_for_garbage_collected_data() -> Result<()> { + fn test_prohibit_get_for_garbage_collected_data() -> anyhow::Result<()> { let repo = RepoHarness::create("test_prohibit_get_for_garbage_collected_data")? .load(); @@ -1935,7 +1913,7 @@ mod tests { */ #[test] - fn test_retain_data_in_parent_which_is_needed_for_child() -> Result<()> { + fn test_retain_data_in_parent_which_is_needed_for_child() -> anyhow::Result<()> { let tenant = TenantHarness::create("test_retain_data_in_parent_which_is_needed_for_child")?.load(); let tline = tenant @@ -1954,7 +1932,7 @@ mod tests { Ok(()) } #[test] - fn test_parent_keeps_data_forever_after_branching() -> Result<()> { + fn test_parent_keeps_data_forever_after_branching() -> anyhow::Result<()> { let tenant = TenantHarness::create("test_parent_keeps_data_forever_after_branching")?.load(); let tline = tenant @@ -1982,7 +1960,7 @@ mod tests { } #[test] - fn timeline_load() -> Result<()> { + fn timeline_load() -> anyhow::Result<()> { const TEST_NAME: &str = "timeline_load"; let harness = TenantHarness::create(TEST_NAME)?; { @@ -2003,7 +1981,7 @@ mod tests { } #[test] - fn timeline_load_with_ancestor() -> Result<()> { + fn timeline_load_with_ancestor() -> anyhow::Result<()> { const TEST_NAME: &str = "timeline_load_with_ancestor"; let harness = TenantHarness::create(TEST_NAME)?; // create two timelines @@ -2042,7 +2020,7 @@ mod tests { } #[test] - fn corrupt_metadata() -> Result<()> { + fn corrupt_metadata() -> anyhow::Result<()> { const TEST_NAME: &str = "corrupt_metadata"; let harness = TenantHarness::create(TEST_NAME)?; let tenant = harness.load(); @@ -2084,7 +2062,7 @@ mod tests { } #[test] - fn test_images() -> Result<()> { + fn test_images() -> anyhow::Result<()> { let tenant = TenantHarness::create("test_images")?.load(); let tline = tenant .create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION)? @@ -2136,7 +2114,7 @@ mod tests { // repeat 50 times. // #[test] - fn test_bulk_insert() -> Result<()> { + fn test_bulk_insert() -> anyhow::Result<()> { let tenant = TenantHarness::create("test_bulk_insert")?.load(); let tline = tenant .create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION)? @@ -2178,7 +2156,7 @@ mod tests { } #[test] - fn test_random_updates() -> Result<()> { + fn test_random_updates() -> anyhow::Result<()> { let tenant = TenantHarness::create("test_random_updates")?.load(); let tline = tenant .create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION)? @@ -2250,7 +2228,7 @@ mod tests { } #[test] - fn test_traverse_branches() -> Result<()> { + fn test_traverse_branches() -> anyhow::Result<()> { let tenant = TenantHarness::create("test_traverse_branches")?.load(); let mut tline = tenant .create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION)? @@ -2331,7 +2309,7 @@ mod tests { } #[test] - fn test_traverse_ancestors() -> Result<()> { + fn test_traverse_ancestors() -> anyhow::Result<()> { let tenant = TenantHarness::create("test_traverse_ancestors")?.load(); let mut tline = tenant .create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION)? diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index a0ac0adea2..ccd094b65a 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -1,6 +1,6 @@ //! -use anyhow::{anyhow, bail, ensure, Context, Result}; +use anyhow::{anyhow, bail, ensure, Context}; use bytes::Bytes; use fail::fail_point; use itertools::Itertools; @@ -307,10 +307,6 @@ pub struct GcInfo { /// Public interface functions impl Timeline { - //------------------------------------------------------------------------------ - // Public GET functions - //------------------------------------------------------------------------------ - /// Get the LSN where this branch was created pub fn get_ancestor_lsn(&self) -> Lsn { self.ancestor_lsn @@ -445,7 +441,7 @@ impl Timeline { &self, lsn: Lsn, latest_gc_cutoff_lsn: &RcuReadGuard, - ) -> Result<()> { + ) -> anyhow::Result<()> { ensure!( lsn >= **latest_gc_cutoff_lsn, "LSN {} is earlier than latest GC horizon {} (we might've already garbage collected needed data)", @@ -455,12 +451,6 @@ impl Timeline { Ok(()) } - //------------------------------------------------------------------------------ - // Public PUT functions, to update the repository with new page versions. - // - // These are called by the WAL receiver to digest WAL records. - //------------------------------------------------------------------------------ - /// Flush to disk all data that was written with the put_* functions /// /// NOTE: This has nothing to do with checkpoint in PostgreSQL. We don't @@ -479,6 +469,91 @@ impl Timeline { } } + pub fn compact(&self) -> anyhow::Result<()> { + let last_record_lsn = self.get_last_record_lsn(); + + // Last record Lsn could be zero in case the timelie was just created + if !last_record_lsn.is_valid() { + warn!("Skipping compaction for potentially just initialized timeline, it has invalid last record lsn: {last_record_lsn}"); + return Ok(()); + } + + // + // High level strategy for compaction / image creation: + // + // 1. First, calculate the desired "partitioning" of the + // currently in-use key space. The goal is to partition the + // key space into roughly fixed-size chunks, but also take into + // account any existing image layers, and try to align the + // chunk boundaries with the existing image layers to avoid + // too much churn. Also try to align chunk boundaries with + // relation boundaries. In principle, we don't know about + // relation boundaries here, we just deal with key-value + // pairs, and the code in pgdatadir_mapping.rs knows how to + // map relations into key-value pairs. But in practice we know + // that 'field6' is the block number, and the fields 1-5 + // identify a relation. This is just an optimization, + // though. + // + // 2. Once we know the partitioning, for each partition, + // decide if it's time to create a new image layer. The + // criteria is: there has been too much "churn" since the last + // image layer? The "churn" is fuzzy concept, it's a + // combination of too many delta files, or too much WAL in + // total in the delta file. Or perhaps: if creating an image + // file would allow to delete some older files. + // + // 3. After that, we compact all level0 delta files if there + // are too many of them. While compacting, we also garbage + // collect any page versions that are no longer needed because + // of the new image layers we created in step 2. + // + // TODO: This high level strategy hasn't been implemented yet. + // Below are functions compact_level0() and create_image_layers() + // but they are a bit ad hoc and don't quite work like it's explained + // above. Rewrite it. + let _layer_removal_cs = self.layer_removal_cs.lock().unwrap(); + + let target_file_size = self.get_checkpoint_distance(); + + // Define partitioning schema if needed + + match self.repartition( + self.get_last_record_lsn(), + self.get_compaction_target_size(), + ) { + Ok((partitioning, lsn)) => { + // 2. Create new image layers for partitions that have been modified + // "enough". + let layer_paths_to_upload = self.create_image_layers(&partitioning, lsn, false)?; + if !layer_paths_to_upload.is_empty() + && self.upload_layers.load(atomic::Ordering::Relaxed) + { + storage_sync::schedule_layer_upload( + self.tenant_id, + self.timeline_id, + layer_paths_to_upload, + None, + ); + } + + // 3. Compact + let timer = self.metrics.compact_time_histo.start_timer(); + self.compact_level0(target_file_size)?; + timer.stop_and_record(); + } + Err(err) => { + // no partitioning? This is normal, if the timeline was just created + // as an empty timeline. Also in unit tests, when we use the timeline + // as a simple key-value store, ignoring the datadir layout. Log the + // error but continue. + error!("could not compact, repartitioning keyspace failed: {err:?}"); + } + }; + + Ok(()) + } + /// Mutate the timeline with a [`TimelineWriter`]. pub fn writer(&self) -> TimelineWriter<'_> { TimelineWriter { @@ -486,6 +561,80 @@ impl Timeline { _write_guard: self.write_lock.lock().unwrap(), } } + + /// Retrieve current logical size of the timeline. + /// + /// The size could be lagging behind the actual number, in case + /// the initial size calculation has not been run (gets triggered on the first size access). + pub fn get_current_logical_size(self: &Arc) -> anyhow::Result { + let current_size = self.current_logical_size.current_size()?; + debug!("Current size: {current_size:?}"); + + let size = current_size.size(); + if let (CurrentLogicalSize::Approximate(_), Some(init_lsn)) = + (current_size, self.current_logical_size.initial_part_end) + { + self.try_spawn_size_init_task(init_lsn); + } + + Ok(size) + } + + /// Check if more than 'checkpoint_distance' of WAL has been accumulated in + /// the in-memory layer, and initiate flushing it if so. + /// + /// Also flush after a period of time without new data -- it helps + /// safekeepers to regard pageserver as caught up and suspend activity. + pub fn check_checkpoint_distance(self: &Arc) -> anyhow::Result<()> { + let last_lsn = self.get_last_record_lsn(); + let layers = self.layers.read().unwrap(); + if let Some(open_layer) = &layers.open_layer { + let open_layer_size = open_layer.size()?; + drop(layers); + let last_freeze_at = self.last_freeze_at.load(); + let last_freeze_ts = *(self.last_freeze_ts.read().unwrap()); + let distance = last_lsn.widening_sub(last_freeze_at); + // Checkpointing the open layer can be triggered by layer size or LSN range. + // S3 has a 5 GB limit on the size of one upload (without multi-part upload), and + // we want to stay below that with a big margin. The LSN distance determines how + // much WAL the safekeepers need to store. + if distance >= self.get_checkpoint_distance().into() + || open_layer_size > self.get_checkpoint_distance() + || (distance > 0 && last_freeze_ts.elapsed() >= self.get_checkpoint_timeout()) + { + info!( + "check_checkpoint_distance {}, layer size {}, elapsed since last flush {:?}", + distance, + open_layer_size, + last_freeze_ts.elapsed() + ); + + self.freeze_inmem_layer(true); + self.last_freeze_at.store(last_lsn); + *(self.last_freeze_ts.write().unwrap()) = Instant::now(); + + // Launch a task to flush the frozen layer to disk, unless + // a task was already running. (If the task was running + // at the time that we froze the layer, it must've seen the + // the layer we just froze before it exited; see comments + // in flush_frozen_layers()) + if let Ok(guard) = self.layer_flush_lock.try_lock() { + drop(guard); + let self_clone = Arc::clone(self); + task_mgr::spawn( + task_mgr::BACKGROUND_RUNTIME.handle(), + task_mgr::TaskKind::LayerFlushTask, + Some(self.tenant_id), + Some(self.timeline_id), + "layer flush task", + false, + async move { self_clone.flush_frozen_layers(false) }, + ); + } + } + } + Ok(()) + } } // Private functions @@ -529,7 +678,7 @@ impl Timeline { /// /// Loads the metadata for the timeline into memory, but not the layer map. #[allow(clippy::too_many_arguments)] - pub fn new( + pub(super) fn new( conf: &'static PageServerConf, tenant_conf: Arc>, metadata: TimelineMetadata, @@ -602,7 +751,7 @@ impl Timeline { result } - pub fn launch_wal_receiver(self: &Arc) { + pub(super) fn launch_wal_receiver(self: &Arc) { if !is_etcd_client_initialized() { if cfg!(test) { info!("not launching WAL receiver because etcd client hasn't been initialized"); @@ -641,7 +790,7 @@ impl Timeline { /// Scan the timeline directory to populate the layer map. /// Returns all timeline-related files that were found and loaded. /// - pub fn load_layer_map(&self, disk_consistent_lsn: Lsn) -> anyhow::Result<()> { + pub(super) fn load_layer_map(&self, disk_consistent_lsn: Lsn) -> anyhow::Result<()> { let mut layers = self.layers.write().unwrap(); let mut num_layers = 0; @@ -727,30 +876,12 @@ impl Timeline { Ok(()) } - pub fn layer_removal_guard(&self) -> anyhow::Result> { + pub(super) fn layer_removal_guard(&self) -> anyhow::Result> { self.layer_removal_cs .try_lock() .map_err(|e| anyhow!("cannot lock compaction critical section {e}")) } - /// Retrieve current logical size of the timeline. - /// - /// The size could be lagging behind the actual number, in case - /// the initial size calculation has not been run (gets triggered on the first size access). - pub fn get_current_logical_size(self: &Arc) -> anyhow::Result { - let current_size = self.current_logical_size.current_size()?; - debug!("Current size: {current_size:?}"); - - let size = current_size.size(); - if let (CurrentLogicalSize::Approximate(_), Some(init_lsn)) = - (current_size, self.current_logical_size.initial_part_end) - { - self.try_spawn_size_init_task(init_lsn); - } - - Ok(size) - } - fn try_spawn_size_init_task(self: &Arc, init_lsn: Lsn) { let timeline_id = self.timeline_id; @@ -971,7 +1102,7 @@ impl Timeline { Some((lsn, img)) } - fn get_ancestor_timeline(&self) -> Result> { + fn get_ancestor_timeline(&self) -> anyhow::Result> { let ancestor = self.ancestor_timeline.as_ref().with_context(|| { format!( "Ancestor is missing. Timeline id: {} Ancestor id {:?}", @@ -1030,14 +1161,14 @@ impl Timeline { Ok(layer) } - fn put_value(&self, key: Key, lsn: Lsn, val: &Value) -> Result<()> { + fn put_value(&self, key: Key, lsn: Lsn, val: &Value) -> anyhow::Result<()> { //info!("PUT: key {} at {}", key, lsn); let layer = self.get_layer_for_write(lsn)?; layer.put_value(key, lsn, val)?; Ok(()) } - fn put_tombstone(&self, key_range: Range, lsn: Lsn) -> Result<()> { + fn put_tombstone(&self, key_range: Range, lsn: Lsn) -> anyhow::Result<()> { let layer = self.get_layer_for_write(lsn)?; layer.put_tombstone(key_range, lsn)?; @@ -1076,64 +1207,6 @@ impl Timeline { drop(layers); } - /// - /// Check if more than 'checkpoint_distance' of WAL has been accumulated in - /// the in-memory layer, and initiate flushing it if so. - /// - /// Also flush after a period of time without new data -- it helps - /// safekeepers to regard pageserver as caught up and suspend activity. - /// - pub fn check_checkpoint_distance(self: &Arc) -> Result<()> { - let last_lsn = self.get_last_record_lsn(); - let layers = self.layers.read().unwrap(); - if let Some(open_layer) = &layers.open_layer { - let open_layer_size = open_layer.size()?; - drop(layers); - let last_freeze_at = self.last_freeze_at.load(); - let last_freeze_ts = *(self.last_freeze_ts.read().unwrap()); - let distance = last_lsn.widening_sub(last_freeze_at); - // Checkpointing the open layer can be triggered by layer size or LSN range. - // S3 has a 5 GB limit on the size of one upload (without multi-part upload), and - // we want to stay below that with a big margin. The LSN distance determines how - // much WAL the safekeepers need to store. - if distance >= self.get_checkpoint_distance().into() - || open_layer_size > self.get_checkpoint_distance() - || (distance > 0 && last_freeze_ts.elapsed() >= self.get_checkpoint_timeout()) - { - info!( - "check_checkpoint_distance {}, layer size {}, elapsed since last flush {:?}", - distance, - open_layer_size, - last_freeze_ts.elapsed() - ); - - self.freeze_inmem_layer(true); - self.last_freeze_at.store(last_lsn); - *(self.last_freeze_ts.write().unwrap()) = Instant::now(); - - // Launch a task to flush the frozen layer to disk, unless - // a task was already running. (If the task was running - // at the time that we froze the layer, it must've seen the - // the layer we just froze before it exited; see comments - // in flush_frozen_layers()) - if let Ok(guard) = self.layer_flush_lock.try_lock() { - drop(guard); - let self_clone = Arc::clone(self); - task_mgr::spawn( - task_mgr::BACKGROUND_RUNTIME.handle(), - task_mgr::TaskKind::LayerFlushTask, - Some(self.tenant_id), - Some(self.timeline_id), - "layer flush task", - false, - async move { self_clone.flush_frozen_layers(false) }, - ); - } - } - } - Ok(()) - } - /// Flush all frozen layers to disk. /// /// Only one task at a time can be doing layer-flushing for a @@ -1141,7 +1214,7 @@ impl Timeline { /// currently doing the flushing, this function will wait for it /// to finish. If 'wait' is false, this function will return /// immediately instead. - fn flush_frozen_layers(&self, wait: bool) -> Result<()> { + fn flush_frozen_layers(&self, wait: bool) -> anyhow::Result<()> { let flush_lock_guard = if wait { self.layer_flush_lock.lock().unwrap() } else { @@ -1180,7 +1253,7 @@ impl Timeline { } /// Flush one frozen in-memory layer to disk, as a new delta layer. - fn flush_frozen_layer(&self, frozen_layer: Arc) -> Result<()> { + fn flush_frozen_layer(&self, frozen_layer: Arc) -> anyhow::Result<()> { // As a special case, when we have just imported an image into the repository, // instead of writing out a L0 delta layer, we directly write out image layer // files instead. This is possible as long as *all* the data imported into the @@ -1238,7 +1311,7 @@ impl Timeline { &self, disk_consistent_lsn: Lsn, layer_paths_to_upload: HashMap, - ) -> Result<()> { + ) -> anyhow::Result<()> { // We can only save a valid 'prev_record_lsn' value on disk if we // flushed *all* in-memory changes to disk. We only track // 'prev_record_lsn' in memory for the latest processed record, so we @@ -1299,7 +1372,7 @@ impl Timeline { fn create_delta_layer( &self, frozen_layer: &InMemoryLayer, - ) -> Result<(PathBuf, LayerFileMetadata)> { + ) -> anyhow::Result<(PathBuf, LayerFileMetadata)> { // Write it out let new_delta = frozen_layer.write_to_disk()?; let new_delta_path = new_delta.path(); @@ -1334,92 +1407,7 @@ impl Timeline { Ok((new_delta_path, LayerFileMetadata::new(sz))) } - pub fn compact(&self) -> anyhow::Result<()> { - let last_record_lsn = self.get_last_record_lsn(); - - // Last record Lsn could be zero in case the timelie was just created - if !last_record_lsn.is_valid() { - warn!("Skipping compaction for potentially just initialized timeline, it has invalid last record lsn: {last_record_lsn}"); - return Ok(()); - } - - // - // High level strategy for compaction / image creation: - // - // 1. First, calculate the desired "partitioning" of the - // currently in-use key space. The goal is to partition the - // key space into roughly fixed-size chunks, but also take into - // account any existing image layers, and try to align the - // chunk boundaries with the existing image layers to avoid - // too much churn. Also try to align chunk boundaries with - // relation boundaries. In principle, we don't know about - // relation boundaries here, we just deal with key-value - // pairs, and the code in pgdatadir_mapping.rs knows how to - // map relations into key-value pairs. But in practice we know - // that 'field6' is the block number, and the fields 1-5 - // identify a relation. This is just an optimization, - // though. - // - // 2. Once we know the partitioning, for each partition, - // decide if it's time to create a new image layer. The - // criteria is: there has been too much "churn" since the last - // image layer? The "churn" is fuzzy concept, it's a - // combination of too many delta files, or too much WAL in - // total in the delta file. Or perhaps: if creating an image - // file would allow to delete some older files. - // - // 3. After that, we compact all level0 delta files if there - // are too many of them. While compacting, we also garbage - // collect any page versions that are no longer needed because - // of the new image layers we created in step 2. - // - // TODO: This high level strategy hasn't been implemented yet. - // Below are functions compact_level0() and create_image_layers() - // but they are a bit ad hoc and don't quite work like it's explained - // above. Rewrite it. - let _layer_removal_cs = self.layer_removal_cs.lock().unwrap(); - - let target_file_size = self.get_checkpoint_distance(); - - // Define partitioning schema if needed - - match self.repartition( - self.get_last_record_lsn(), - self.get_compaction_target_size(), - ) { - Ok((partitioning, lsn)) => { - // 2. Create new image layers for partitions that have been modified - // "enough". - let layer_paths_to_upload = self.create_image_layers(&partitioning, lsn, false)?; - if !layer_paths_to_upload.is_empty() - && self.upload_layers.load(atomic::Ordering::Relaxed) - { - storage_sync::schedule_layer_upload( - self.tenant_id, - self.timeline_id, - layer_paths_to_upload, - None, - ); - } - - // 3. Compact - let timer = self.metrics.compact_time_histo.start_timer(); - self.compact_level0(target_file_size)?; - timer.stop_and_record(); - } - Err(err) => { - // no partitioning? This is normal, if the timeline was just created - // as an empty timeline. Also in unit tests, when we use the timeline - // as a simple key-value store, ignoring the datadir layout. Log the - // error but continue. - error!("could not compact, repartitioning keyspace failed: {err:?}"); - } - }; - - Ok(()) - } - - fn repartition(&self, lsn: Lsn, partition_size: u64) -> Result<(KeyPartitioning, Lsn)> { + fn repartition(&self, lsn: Lsn, partition_size: u64) -> anyhow::Result<(KeyPartitioning, Lsn)> { let mut partitioning_guard = self.partitioning.lock().unwrap(); if partitioning_guard.1 == Lsn(0) || lsn.0 - partitioning_guard.1 .0 > self.repartition_threshold @@ -1433,7 +1421,7 @@ impl Timeline { } // Is it time to create a new image layer for the given partition? - fn time_for_new_image_layer(&self, partition: &KeySpace, lsn: Lsn) -> Result { + fn time_for_new_image_layer(&self, partition: &KeySpace, lsn: Lsn) -> anyhow::Result { let layers = self.layers.read().unwrap(); for part_range in &partition.ranges { @@ -1478,7 +1466,7 @@ impl Timeline { partitioning: &KeyPartitioning, lsn: Lsn, force: bool, - ) -> Result> { + ) -> anyhow::Result> { let timer = self.metrics.create_images_time_histo.start_timer(); let mut image_layers: Vec = Vec::new(); for partition in partitioning.parts.iter() { @@ -1571,7 +1559,7 @@ impl Timeline { /// Collect a bunch of Level 0 layer files, and compact and reshuffle them as /// as Level 1 files. /// - fn compact_level0(&self, target_file_size: u64) -> Result<()> { + fn compact_level0(&self, target_file_size: u64) -> anyhow::Result<()> { let layers = self.layers.read().unwrap(); let mut level0_deltas = layers.get_level0_deltas()?; drop(layers); @@ -1881,12 +1869,12 @@ impl Timeline { /// /// The 'pitr' duration is used to calculate a 'pitr_cutoff', which can be used to determine /// whether a record is needed for PITR. - pub fn update_gc_info( + pub(super) fn update_gc_info( &self, retain_lsns: Vec, cutoff_horizon: Lsn, pitr: Duration, - ) -> Result<()> { + ) -> anyhow::Result<()> { let mut gc_info = self.gc_info.write().unwrap(); gc_info.horizon_cutoff = cutoff_horizon; @@ -1941,7 +1929,7 @@ impl Timeline { /// within a layer file. We can only remove the whole file if it's fully /// obsolete. /// - pub fn gc(&self) -> Result { + pub(super) fn gc(&self) -> anyhow::Result { let mut result: GcResult = Default::default(); let now = SystemTime::now(); @@ -2261,11 +2249,11 @@ impl<'a> TimelineWriter<'a> { /// /// This will implicitly extend the relation, if the page is beyond the /// current end-of-file. - pub fn put(&self, key: Key, lsn: Lsn, value: &Value) -> Result<()> { + pub fn put(&self, key: Key, lsn: Lsn, value: &Value) -> anyhow::Result<()> { self.tl.put_value(key, lsn, value) } - pub fn delete(&self, key_range: Range, lsn: Lsn) -> Result<()> { + pub fn delete(&self, key_range: Range, lsn: Lsn) -> anyhow::Result<()> { self.tl.put_tombstone(key_range, lsn) }