From 2d8587f67d22ff66e14024eeb6912d1c633df0a2 Mon Sep 17 00:00:00 2001 From: Heikki Linnakangas Date: Mon, 14 Mar 2022 11:37:22 +0200 Subject: [PATCH] Separate flushing in-memory layer to disk from checkpoints. When 'checkpoint_distance' is reached, freeze the current in-memory layer directly in the WAL receiver thread. And to flush the frozen layer to disk, launch a separate "layer flushing thread". This leaves only the compaction duty to the checkpoint thread. --- pageserver/src/layered_repository.rs | 243 +++++++++++------- .../src/layered_repository/inmemory_layer.rs | 22 -- .../src/layered_repository/layer_map.rs | 3 +- pageserver/src/lib.rs | 2 +- pageserver/src/pgdatadir_mapping.rs | 2 +- pageserver/src/repository.rs | 31 ++- pageserver/src/tenant_threads.rs | 2 +- pageserver/src/thread_mgr.rs | 3 + pageserver/src/walreceiver.rs | 2 + 9 files changed, 175 insertions(+), 135 deletions(-) diff --git a/pageserver/src/layered_repository.rs b/pageserver/src/layered_repository.rs index cc8ecd9275..f195288b9a 100644 --- a/pageserver/src/layered_repository.rs +++ b/pageserver/src/layered_repository.rs @@ -29,7 +29,7 @@ use std::io::Write; use std::ops::{Bound::Included, Deref, Range}; use std::path::{Path, PathBuf}; use std::sync::atomic::{self, AtomicBool}; -use std::sync::{Arc, Mutex, MutexGuard, RwLock, RwLockReadGuard}; +use std::sync::{Arc, Mutex, MutexGuard, RwLock, RwLockReadGuard, TryLockError}; use std::time::Instant; use self::metadata::{metadata_path, TimelineMetadata, METADATA_FILE_NAME}; @@ -720,6 +720,8 @@ pub struct LayeredTimeline { layers: Mutex, + last_freeze_at: AtomicLsn, + // WAL redo manager walredo_mgr: Arc, @@ -768,6 +770,9 @@ pub struct LayeredTimeline { /// to avoid deadlock. write_lock: Mutex<()>, + /// Used to ensure that there is only one thread + layer_flush_lock: Mutex<()>, + // Prevent concurrent checkpoints. // Checkpoints are normally performed by one thread. But checkpoint can also be manually requested by admin // (that's used in tests), and shutdown also forces a checkpoint. These forced checkpoints run in a different thread @@ -854,15 +859,24 @@ impl Timeline for LayeredTimeline { /// metrics collection. fn checkpoint(&self, cconf: CheckpointConfig) -> Result<()> { match cconf { - CheckpointConfig::Flush => self - .flush_checkpoint_time_histo - .observe_closure_duration(|| self.checkpoint_internal(0, false)), - CheckpointConfig::Forced => self - .forced_checkpoint_time_histo - .observe_closure_duration(|| self.checkpoint_internal(0, true)), - CheckpointConfig::Distance(distance) => self + CheckpointConfig::Flush => { + self.flush_checkpoint_time_histo + .observe_closure_duration(|| { + self.freeze_inmem_layer(false); + self.flush_frozen_layers(true) + }) + } + CheckpointConfig::Forced => { + self.forced_checkpoint_time_histo + .observe_closure_duration(|| { + self.freeze_inmem_layer(false); + self.flush_frozen_layers(true)?; + self.checkpoint_internal() + }) + } + CheckpointConfig::Distance => self .checkpoint_time_histo - .observe_closure_duration(|| self.checkpoint_internal(distance, true)), + .observe_closure_duration(|| self.checkpoint_internal()), } } @@ -969,6 +983,8 @@ impl LayeredTimeline { }), disk_consistent_lsn: AtomicLsn::new(metadata.disk_consistent_lsn().0), + last_freeze_at: AtomicLsn::new(0), + ancestor_timeline: ancestor, ancestor_lsn: metadata.ancestor_lsn(), @@ -980,6 +996,7 @@ impl LayeredTimeline { upload_relishes: AtomicBool::new(upload_relishes), write_lock: Mutex::new(()), + layer_flush_lock: Mutex::new(()), checkpoint_cs: Mutex::new(()), gc_info: RwLock::new(GcInfo { @@ -1100,7 +1117,7 @@ impl LayeredTimeline { let mut result = ValueReconstructResult::Continue; let mut cont_lsn = Lsn(request_lsn.0 + 1); - loop { + 'outer: loop { // The function should have updated 'state' //info!("CALLED for {} at {}: {:?} with {} records", reconstruct_state.key, reconstruct_state.lsn, result, reconstruct_state.records.len()); match result { @@ -1169,7 +1186,7 @@ impl LayeredTimeline { continue; } } - if let Some(frozen_layer) = &layers.frozen_layer { + for frozen_layer in layers.frozen_layers.iter() { let start_lsn = frozen_layer.get_lsn_range().start; if cont_lsn > start_lsn { //info!("CHECKING for {} at {} on frozen layer {}", key, cont_lsn, frozen_layer.filename().display()); @@ -1180,7 +1197,7 @@ impl LayeredTimeline { )?; cont_lsn = start_lsn; path.push((result, cont_lsn, frozen_layer.clone())); - continue; + continue 'outer; } } @@ -1258,7 +1275,7 @@ impl LayeredTimeline { lsn ); let new_layer = - InMemoryLayer::create(self.conf, self.timelineid, self.tenantid, start_lsn, lsn)?; + InMemoryLayer::create(self.conf, self.timelineid, self.tenantid, start_lsn)?; let layer_rc = Arc::new(new_layer); layers.open_layer = Some(Arc::clone(&layer_rc)); @@ -1273,7 +1290,6 @@ impl LayeredTimeline { //info!("PUT: key {} at {}", key, lsn); let layer = self.get_layer_for_write(lsn)?; layer.put_value(key, lsn, val)?; - Ok(()) } @@ -1284,80 +1300,53 @@ impl LayeredTimeline { Ok(()) } + fn finish_write(&self, new_lsn: Lsn) { + assert!(new_lsn.is_aligned()); + + self.last_record_lsn.advance(new_lsn); + } + + fn freeze_inmem_layer(&self, write_lock_held: bool) { + // Freeze the current open in-memory layer. It will be written to disk on next + // iteration. + let _write_guard = if write_lock_held { + None + } else { + Some(self.write_lock.lock().unwrap()) + }; + let mut layers = self.layers.lock().unwrap(); + if let Some(open_layer) = &layers.open_layer { + let open_layer_rc = Arc::clone(open_layer); + // Does this layer need freezing? + let end_lsn = Lsn(self.get_last_record_lsn().0 + 1); + open_layer.freeze(end_lsn); + + // The layer is no longer open, update the layer map to reflect this. + // We will replace it with on-disk historics below. + layers.frozen_layers.push_back(open_layer_rc); + layers.open_layer = None; + layers.next_open_layer_at = Some(end_lsn); + self.last_freeze_at.store(end_lsn); + } + drop(layers); + } + /// /// Flush to disk all data that was written with the put_* functions /// /// NOTE: This has nothing to do with checkpoint in PostgreSQL. - fn checkpoint_internal(&self, checkpoint_distance: u64, reconstruct_pages: bool) -> Result<()> { + fn checkpoint_internal(&self) -> Result<()> { info!("checkpoint starting"); // Prevent concurrent checkpoints + // FIXME: This does compaction now, not the flushing of layers. + // Is this lock still needed? let _checkpoint_cs = self.checkpoint_cs.lock().unwrap(); - // If the in-memory layer is larger than 'checkpoint_distance', write it - // to a delta file. That's necessary to limit the amount of WAL that - // needs to be kept in the safekeepers, and that needs to be reprocessed - // on page server crash. - // - // TODO: It's not a great policy for keeping memory usage in check, - // though. We should also aim at flushing layers that consume a lot of - // memory and/or aren't receiving much updates anymore. - loop { - // Do we have a frozen in-memory layer that we need to write out? - // If we do, write it out now. Otherwise, check if the current - // in-memory layer is old enough that we should freeze and write it out. - let write_guard = self.write_lock.lock().unwrap(); - let mut layers = self.layers.lock().unwrap(); - if let Some(frozen_layer) = &layers.frozen_layer { - // Write out the frozen in-memory layer to disk, as a delta file - let frozen_layer = Arc::clone(frozen_layer); - drop(write_guard); - drop(layers); - self.flush_frozen_layer(frozen_layer)?; - } else { - // Freeze the current open in-memory layer, if it's larger than - // 'checkpoint_distance'. It will be written to disk on next - // iteration. - if let Some(open_layer) = &layers.open_layer { - // Does this layer need freezing? - let RecordLsn { - last: last_record_lsn, - prev: _prev_record_lsn, - } = self.last_record_lsn.load(); - let oldest_lsn = open_layer.get_oldest_lsn(); - let distance = last_record_lsn.widening_sub(oldest_lsn); - if distance < 0 || distance < checkpoint_distance.into() { - info!( - "the oldest layer is now {} which is {} bytes behind last_record_lsn", - open_layer.filename().display(), - distance - ); - break; - } - let end_lsn = Lsn(self.get_last_record_lsn().0 + 1); - open_layer.freeze(end_lsn); - - // The layer is no longer open, update the layer map to reflect this. - // We will replace it with on-disk historics below. - layers.frozen_layer = Some(Arc::clone(open_layer)); - layers.open_layer = None; - layers.next_open_layer_at = Some(end_lsn); - } else { - break; - } - // We will write the now-frozen layer to disk on next iteration. - // That could take a while, so release the lock while do it - drop(layers); - drop(write_guard); - } - } - // Create new image layers to allow GC and to reduce read latency - if reconstruct_pages { - // TODO: the threshold for how often we create image layers is - // currently hard-coded at 3. It means, write out a new image layer, - // if there are at least three delta layers on top of it. - self.compact(TARGET_FILE_SIZE_BYTES as usize)?; - } + // TODO: the threshold for how often we create image layers is + // currently hard-coded at 3. It means, write out a new image layer, + // if there are at least three delta layers on top of it. + self.compact(TARGET_FILE_SIZE_BYTES as usize)?; // TODO: We should also compact existing delta layers here. @@ -1373,20 +1362,86 @@ impl LayeredTimeline { Ok(()) } + pub fn check_checkpoint_distance(self: &Arc) -> Result<()> { + let last_lsn = self.get_last_record_lsn(); + + let distance = last_lsn.widening_sub(self.last_freeze_at.load()); + if distance >= self.conf.checkpoint_distance.into() { + self.freeze_inmem_layer(true); + self.last_freeze_at.store(last_lsn); + } + if let Ok(guard) = self.layer_flush_lock.try_lock() { + drop(guard); + let self_clone = Arc::clone(self); + thread_mgr::spawn( + thread_mgr::ThreadKind::LayerFlushThread, + Some(self.tenantid), + Some(self.timelineid), + "layer flush thread", + move || self_clone.flush_frozen_layers(false), + )?; + } + Ok(()) + } + + /// Flush all frozen layers to disk. + /// + /// Only one thread at a time can be doing layer-flushing for a + /// given timeline. If 'wait' is true, and another thread is + /// currently doing the flushing, this function will wait for it + /// to finish. If 'wait' is false, this function will return + /// immediately instead. + fn flush_frozen_layers(&self, wait: bool) -> Result<()> { + let flush_lock_guard = if wait { + self.layer_flush_lock.lock().unwrap() + } else { + match self.layer_flush_lock.try_lock() { + Ok(guard) => guard, + Err(TryLockError::WouldBlock) => return Ok(()), + Err(TryLockError::Poisoned(err)) => panic!("{:?}", err), + } + }; + + loop { + let layers = self.layers.lock().unwrap(); + if let Some(frozen_layer) = layers.frozen_layers.front() { + let frozen_layer = Arc::clone(frozen_layer); + drop(layers); // to allow concurrent reads and writes + self.flush_frozen_layer(frozen_layer)?; + } else { + // Drop the 'layer_flush_lock' *before* 'layers'. That + // way, if you freeze a layer, and then call + // flush_frozen_layers(false), it is guaranteed that + // if another thread was busy flushing layers and the + // call therefore returns immediately, the other + // thread will have seen the newly-frozen layer and + // will flush that too (assuming no errors). + drop(flush_lock_guard); + drop(layers); + break; + } + } + + Ok(()) + } + fn flush_frozen_layer(&self, frozen_layer: Arc) -> Result<()> { // Do we have a frozen in-memory layer that we need to write out? let new_delta = frozen_layer.write_to_disk()?; // Finally, replace the frozen in-memory layer with the new on-disk layers - let write_guard = self.write_lock.lock().unwrap(); let mut layers = self.layers.lock().unwrap(); - layers.frozen_layer = None; + let l = layers.frozen_layers.pop_front(); + + // Only one thread may call this function at a time (for this + // timeline). If two threads tried to flush the same frozen + // layer to disk at the same time, that would not work. + assert!(Arc::ptr_eq(&l.unwrap(), &frozen_layer)); // Add the new delta layer to the LayerMap let mut layer_paths = vec![new_delta.path()]; layers.insert_historic(Arc::new(new_delta)); - drop(write_guard); drop(layers); // Sync layers @@ -1929,10 +1984,8 @@ impl<'a> TimelineWriter<'_> for LayeredTimelineWriter<'a> { /// /// Remember the (end of) last valid WAL record remembered in the timeline. /// - fn advance_last_record_lsn(&self, new_lsn: Lsn) { - assert!(new_lsn.is_aligned()); - - self.tl.last_record_lsn.advance(new_lsn); + fn finish_write(&self, new_lsn: Lsn) { + self.tl.finish_write(new_lsn); } } @@ -2031,7 +2084,7 @@ mod tests { let writer = tline.writer(); writer.put(TEST_KEY, Lsn(0x10), Value::Image(TEST_IMG("foo at 0x10")))?; - writer.advance_last_record_lsn(Lsn(0x10)); + writer.finish_write(Lsn(0x10)); drop(writer); tline.checkpoint(CheckpointConfig::Forced)?; @@ -2039,7 +2092,7 @@ mod tests { let writer = tline.writer(); writer.put(TEST_KEY, Lsn(0x20), Value::Image(TEST_IMG("foo at 0x20")))?; - writer.advance_last_record_lsn(Lsn(0x20)); + writer.finish_write(Lsn(0x20)); drop(writer); tline.checkpoint(CheckpointConfig::Forced)?; @@ -2047,7 +2100,7 @@ mod tests { let writer = tline.writer(); writer.put(TEST_KEY, Lsn(0x30), Value::Image(TEST_IMG("foo at 0x30")))?; - writer.advance_last_record_lsn(Lsn(0x30)); + writer.finish_write(Lsn(0x30)); drop(writer); tline.checkpoint(CheckpointConfig::Forced)?; @@ -2055,7 +2108,7 @@ mod tests { let writer = tline.writer(); writer.put(TEST_KEY, Lsn(0x40), Value::Image(TEST_IMG("foo at 0x40")))?; - writer.advance_last_record_lsn(Lsn(0x40)); + writer.finish_write(Lsn(0x40)); drop(writer); tline.checkpoint(CheckpointConfig::Forced)?; @@ -2094,7 +2147,7 @@ mod tests { lsn, Value::Image(TEST_IMG(&format!("{} at {}", blknum, lsn))), )?; - writer.advance_last_record_lsn(lsn); + writer.finish_write(lsn); drop(writer); keyspace.add_key(test_key); @@ -2145,7 +2198,7 @@ mod tests { lsn, Value::Image(TEST_IMG(&format!("{} at {}", blknum, lsn))), )?; - writer.advance_last_record_lsn(lsn); + writer.finish_write(lsn); updated[blknum] = lsn; drop(writer); @@ -2167,7 +2220,7 @@ mod tests { Value::Image(TEST_IMG(&format!("{} at {}", blknum, lsn))), )?; println!("updating {} at {}", blknum, lsn); - writer.advance_last_record_lsn(lsn); + writer.finish_write(lsn); drop(writer); updated[blknum] = lsn; } @@ -2219,7 +2272,7 @@ mod tests { lsn, Value::Image(TEST_IMG(&format!("{} at {}", blknum, lsn))), )?; - writer.advance_last_record_lsn(lsn); + writer.finish_write(lsn); updated[blknum] = lsn; drop(writer); @@ -2253,7 +2306,7 @@ mod tests { Value::Image(TEST_IMG(&format!("{} at {}", blknum, lsn))), )?; println!("updating {} at {}", blknum, lsn); - writer.advance_last_record_lsn(lsn); + writer.finish_write(lsn); drop(writer); updated[blknum] = lsn; } diff --git a/pageserver/src/layered_repository/inmemory_layer.rs b/pageserver/src/layered_repository/inmemory_layer.rs index c623630851..577e562115 100644 --- a/pageserver/src/layered_repository/inmemory_layer.rs +++ b/pageserver/src/layered_repository/inmemory_layer.rs @@ -34,21 +34,6 @@ pub struct InMemoryLayer { /// start_lsn: Lsn, - /// - /// LSN of the oldest value stored in this layer. - /// - /// This is different from 'start_lsn' in that we enforce that the 'start_lsn' - /// of a layer always matches the 'end_lsn' of its predecessor, even if there - /// are no page versions until at a later LSN. That way you can detect any - /// missing layer files more easily. 'oldest_lsn' is the first page version - /// actually stored in this layer. In the range between 'start_lsn' and - /// 'oldest_lsn', there are no changes to the segment. - /// 'oldest_lsn' is used to adjust 'disk_consistent_lsn' and that is why it should - /// point to the beginning of WAL record. This is the other difference with 'start_lsn' - /// which points to end of WAL record. This is why 'oldest_lsn' can be smaller than 'start_lsn'. - /// - oldest_lsn: Lsn, - /// The above fields never change. The parts that do change are in 'inner', /// and protected by mutex. inner: RwLock, @@ -236,11 +221,6 @@ impl Layer for InMemoryLayer { } impl InMemoryLayer { - /// Return the oldest page version that's stored in this layer - pub fn get_oldest_lsn(&self) -> Lsn { - self.oldest_lsn - } - /// /// Create a new, empty, in-memory layer /// @@ -249,7 +229,6 @@ impl InMemoryLayer { timelineid: ZTimelineId, tenantid: ZTenantId, start_lsn: Lsn, - oldest_lsn: Lsn, ) -> Result { trace!( "initializing new empty InMemoryLayer for writing on timeline {} at {}", @@ -264,7 +243,6 @@ impl InMemoryLayer { timelineid, tenantid, start_lsn, - oldest_lsn, inner: RwLock::new(InMemoryLayerInner { end_lsn: None, index: HashMap::new(), diff --git a/pageserver/src/layered_repository/layer_map.rs b/pageserver/src/layered_repository/layer_map.rs index 27a3eb279a..4b0d950414 100644 --- a/pageserver/src/layered_repository/layer_map.rs +++ b/pageserver/src/layered_repository/layer_map.rs @@ -16,6 +16,7 @@ use crate::layered_repository::InMemoryLayer; use crate::repository::Key; use anyhow::Result; use lazy_static::lazy_static; +use std::collections::VecDeque; use std::ops::Range; use std::sync::Arc; use tracing::*; @@ -47,7 +48,7 @@ pub struct LayerMap { /// layer is during checkpointing, when an InMemoryLayer is being written out /// to disk. /// - pub frozen_layer: Option>, + pub frozen_layers: VecDeque>, /// All the historic layers are kept here diff --git a/pageserver/src/lib.rs b/pageserver/src/lib.rs index 904b1d3819..3c557e4e82 100644 --- a/pageserver/src/lib.rs +++ b/pageserver/src/lib.rs @@ -42,7 +42,7 @@ pub const LOG_FILE_NAME: &str = "pageserver.log"; #[derive(Debug, Clone, Copy)] pub enum CheckpointConfig { // Flush in-memory data that is older than this - Distance(u64), + Distance, // Flush all in-memory data Flush, // Flush all in-memory data and reconstruct all page images diff --git a/pageserver/src/pgdatadir_mapping.rs b/pageserver/src/pgdatadir_mapping.rs index 3337b2e6d4..9be8e658ca 100644 --- a/pageserver/src/pgdatadir_mapping.rs +++ b/pageserver/src/pgdatadir_mapping.rs @@ -798,7 +798,7 @@ impl<'a, R: Repository> DatadirTimelineWriter<'a, R> { writer.delete(key_range.clone(), self.lsn)?; } - writer.advance_last_record_lsn(self.lsn); + writer.finish_write(self.lsn); if last_partitioning == Lsn(0) || self.lsn.0 - last_partitioning.0 > TARGET_FILE_SIZE_BYTES / 8 diff --git a/pageserver/src/repository.rs b/pageserver/src/repository.rs index 322465c2a7..03fbff42a8 100644 --- a/pageserver/src/repository.rs +++ b/pageserver/src/repository.rs @@ -401,11 +401,14 @@ pub trait TimelineWriter<'a> { fn delete(&self, key_range: Range, lsn: Lsn) -> Result<()>; - /// Track end of the latest digested WAL record. + /// Track the end of the latest digested WAL record. /// - /// Advance requires aligned LSN as an argument and would wake wait_lsn() callers. - /// Previous last record LSN is stored alongside the latest and can be read. - fn advance_last_record_lsn(&self, lsn: Lsn); + /// Call this after you have finished writing all the WAL up to 'lsn'. + /// + /// 'lsn' must be aligned. This wakes up any wait_lsn() callers waiting for + /// the 'lsn' or anything older. The previous last record LSN is stored alongside + /// the latest and can be read. + fn finish_write(&self, lsn: Lsn); } #[cfg(test)] @@ -554,12 +557,12 @@ mod tests { let writer = tline.writer(); writer.put(*TEST_KEY, Lsn(0x10), Value::Image(TEST_IMG("foo at 0x10")))?; - writer.advance_last_record_lsn(Lsn(0x10)); + writer.finish_write(Lsn(0x10)); drop(writer); let writer = tline.writer(); writer.put(*TEST_KEY, Lsn(0x20), Value::Image(TEST_IMG("foo at 0x20")))?; - writer.advance_last_record_lsn(Lsn(0x20)); + writer.finish_write(Lsn(0x20)); drop(writer); assert_eq!(tline.get(*TEST_KEY, Lsn(0x10))?, TEST_IMG("foo at 0x10")); @@ -594,12 +597,12 @@ mod tests { // Insert a value on the timeline writer.put(TEST_KEY_A, Lsn(0x20), test_value("foo at 0x20"))?; writer.put(TEST_KEY_B, Lsn(0x20), test_value("foobar at 0x20"))?; - writer.advance_last_record_lsn(Lsn(0x20)); + writer.finish_write(Lsn(0x20)); writer.put(TEST_KEY_A, Lsn(0x30), test_value("foo at 0x30"))?; - writer.advance_last_record_lsn(Lsn(0x30)); + writer.finish_write(Lsn(0x30)); writer.put(TEST_KEY_A, Lsn(0x40), test_value("foo at 0x40"))?; - writer.advance_last_record_lsn(Lsn(0x40)); + writer.finish_write(Lsn(0x40)); //assert_current_logical_size(&tline, Lsn(0x40)); @@ -611,7 +614,7 @@ mod tests { }; let new_writer = newtline.writer(); new_writer.put(TEST_KEY_A, Lsn(0x40), test_value("bar at 0x40"))?; - new_writer.advance_last_record_lsn(Lsn(0x40)); + new_writer.finish_write(Lsn(0x40)); // Check page contents on both branches assert_eq!( @@ -643,14 +646,14 @@ mod tests { lsn, Value::Image(TEST_IMG(&format!("foo at {}", lsn))), )?; - writer.advance_last_record_lsn(lsn); + writer.finish_write(lsn); lsn += 0x10; writer.put( *TEST_KEY, lsn, Value::Image(TEST_IMG(&format!("foo at {}", lsn))), )?; - writer.advance_last_record_lsn(lsn); + writer.finish_write(lsn); lsn += 0x10; } tline.checkpoint(CheckpointConfig::Forced)?; @@ -661,14 +664,14 @@ mod tests { lsn, Value::Image(TEST_IMG(&format!("foo at {}", lsn))), )?; - writer.advance_last_record_lsn(lsn); + writer.finish_write(lsn); lsn += 0x10; writer.put( *TEST_KEY, lsn, Value::Image(TEST_IMG(&format!("foo at {}", lsn))), )?; - writer.advance_last_record_lsn(lsn); + writer.finish_write(lsn); } tline.checkpoint(CheckpointConfig::Forced) } diff --git a/pageserver/src/tenant_threads.rs b/pageserver/src/tenant_threads.rs index a6711f0542..c7fe625ecf 100644 --- a/pageserver/src/tenant_threads.rs +++ b/pageserver/src/tenant_threads.rs @@ -34,7 +34,7 @@ fn checkpoint_loop_ext(tenantid: ZTenantId, conf: &'static PageServerConf) -> Re // checkpoint timelines that have accumulated more than CHECKPOINT_DISTANCE // bytes of WAL since last checkpoint. let repo = tenant_mgr::get_repository_for_tenant(tenantid)?; - repo.checkpoint_iteration(CheckpointConfig::Distance(conf.checkpoint_distance))?; + repo.checkpoint_iteration(CheckpointConfig::Distance)?; } trace!( diff --git a/pageserver/src/thread_mgr.rs b/pageserver/src/thread_mgr.rs index a51f0909ca..ec3606b41e 100644 --- a/pageserver/src/thread_mgr.rs +++ b/pageserver/src/thread_mgr.rs @@ -98,6 +98,9 @@ pub enum ThreadKind { // Thread that handles GC of a tenant GarbageCollector, + // FIXME + LayerFlushThread, + // Thread for synchronizing pageserver relish data with the remote storage. // Shared by all tenants. StorageSync, diff --git a/pageserver/src/walreceiver.rs b/pageserver/src/walreceiver.rs index fd318b9cb7..993768fbac 100644 --- a/pageserver/src/walreceiver.rs +++ b/pageserver/src/walreceiver.rs @@ -266,6 +266,8 @@ fn walreceiver_main( caught_up = true; } + timeline.tline.check_checkpoint_distance()?; + Some(endlsn) }