diff --git a/pageserver/src/layered_repository.rs b/pageserver/src/layered_repository.rs index c6cd9094f6..4de6419444 100644 --- a/pageserver/src/layered_repository.rs +++ b/pageserver/src/layered_repository.rs @@ -33,7 +33,6 @@ use std::sync::{Arc, Mutex, MutexGuard}; use std::thread::JoinHandle; use std::time::{Duration, Instant}; -use crate::layered_repository::inmemory_layer::FreezeLayers; use crate::relish::*; use crate::relish_storage::schedule_timeline_upload; use crate::repository::{GcResult, Repository, Timeline, TimelineWriter, WALRecord}; @@ -72,8 +71,6 @@ use storage_layer::{ Layer, PageReconstructData, PageReconstructResult, SegmentTag, RELISH_SEG_SIZE, }; -use self::inmemory_layer::{NonWriteableError, WriteResult}; - static ZERO_PAGE: Bytes = Bytes::from_static(&[0u8; 8192]); // Timeout when waiting for WAL receiver to catch up to an LSN given in a GetPage@LSN call. @@ -670,6 +667,13 @@ pub struct LayeredTimeline { /// If `true`, will backup its timeline files to remote storage after freezing. upload_relishes: bool, + + /// Ensures layers aren't frozen by checkpointer between + /// [`LayeredTimeline::get_layer_for_write`] and layer reads. + /// Locked automatically by [`LayeredTimelineWriter`] and checkpointer. + /// Must always be acquired before the layer map/individual layer lock + /// to avoid deadlock. + write_lock: Mutex<()>, } /// Public interface functions @@ -903,7 +907,10 @@ impl Timeline for LayeredTimeline { } fn writer<'a>(&'a self) -> Box { - Box::new(LayeredTimelineWriter(self)) + Box::new(LayeredTimelineWriter { + tl: self, + _write_guard: self.write_lock.lock().unwrap(), + }) } } @@ -945,6 +952,8 @@ impl LayeredTimeline { current_logical_size: AtomicUsize::new(current_logical_size), current_logical_size_gauge, upload_relishes, + + write_lock: Mutex::new(()), }; Ok(timeline) } @@ -1219,18 +1228,13 @@ impl LayeredTimeline { /// /// NOTE: This has nothing to do with checkpoint in PostgreSQL. fn checkpoint_internal(&self, checkpoint_distance: u64, forced: bool) -> Result<()> { - // Grab lock on the layer map. - // - // TODO: We hold it locked throughout the checkpoint operation. That's bad, - // the checkpointing could take many seconds, and any incoming get_page_at_lsn() - // requests will block. + let mut write_guard = self.write_lock.lock().unwrap(); let mut layers = self.layers.lock().unwrap(); // Bump the generation number in the layer map, so that we can distinguish // entries inserted after the checkpoint started let current_generation = layers.increment_generation(); - // Read 'last_record_lsn'. That becomes the cutoff LSN for frozen layers. let RecordLsn { last: last_record_lsn, prev: prev_record_lsn, @@ -1280,32 +1284,24 @@ impl LayeredTimeline { break; } - // Freeze the layer. - // - // This is a two-step process. First, we "freeze" the in-memory - // layer, to close it for new writes, and replace the original - // layer with the new frozen in-memory layer (and possibly a new - // open layer to hold changes newer than the cutoff.) Then we write - // the frozen layer to disk, and replace the in-memory frozen layer - // with the new on-disk layers. - let FreezeLayers { - frozen, - open: maybe_new_open, - } = oldest_layer.freeze(last_record_lsn)?; + // Mark the layer as no longer accepting writes and record the end_lsn. + // This happens in-place, no new layers are created now. + // We call `get_last_record_lsn` again, which may be different from the + // original load, as we may have released the write lock since then. + oldest_layer.freeze(self.get_last_record_lsn()); - // replace this layer with the new layers that 'freeze' returned + // The layer is no longer open, update the layer map to reflect this. + // We will replace it with on-disk historics below. layers.pop_oldest_open(); - if let Some(new_open) = maybe_new_open.clone() { - layers.insert_open(new_open); - } - - // We temporarily insert InMemory layer into historic list here. - // TODO: check that all possible concurrent users of 'historic' treat it right - layers.insert_historic(frozen.clone()); + layers.insert_historic(oldest_layer.clone()); // Write the now-frozen layer to disk. That could take a while, so release the lock while do it drop(layers); - let new_historics = frozen.write_to_disk(self)?; + drop(write_guard); + + let new_historics = oldest_layer.write_to_disk(self)?; + + write_guard = self.write_lock.lock().unwrap(); layers = self.layers.lock().unwrap(); if !new_historics.is_empty() { @@ -1313,7 +1309,7 @@ impl LayeredTimeline { } // Finally, replace the frozen in-memory layer with the new on-disk layers - layers.remove_historic(frozen.clone()); + layers.remove_historic(oldest_layer); // Add the historics to the LayerMap for delta_layer in new_historics.delta_layers { @@ -1334,6 +1330,7 @@ impl LayeredTimeline { } drop(layers); + drop(write_guard); if created_historics { // We must fsync the timeline dir to ensure the directory entries for @@ -1760,46 +1757,18 @@ impl LayeredTimeline { self.current_logical_size_gauge .set(val as i64 - diff as i64); } - - /// If a layer is in the process of being replaced in [`LayerMap`], write - /// operations will fail with [`NonWriteableError`]. This may happen due to - /// a race: the checkpointer thread freezes a layer just after - /// [`Self::get_layer_for_write`] returned it. To handle this error, we try - /// again getting the layer and attempt the write. - fn perform_write_op( - &self, - seg: SegmentTag, - lsn: Lsn, - write_op: impl Fn(&Arc) -> WriteResult, - ) -> anyhow::Result { - let mut layer = self.get_layer_for_write(seg, lsn)?; - loop { - match write_op(&layer) { - Ok(r) => return Ok(r), - Err(NonWriteableError {}) => {} - } - - info!( - "attempted to write to non-writeable layer, retrying {} {}", - seg, lsn - ); - - // layer was non-writeable, try again - let new_layer = self.get_layer_for_write(seg, lsn)?; - // the new layer does not have to be writeable, but it should at least be different - assert!(!Arc::ptr_eq(&layer, &new_layer)); - layer = new_layer; - } - } } -struct LayeredTimelineWriter<'a>(&'a LayeredTimeline); +struct LayeredTimelineWriter<'a> { + tl: &'a LayeredTimeline, + _write_guard: MutexGuard<'a, ()>, +} impl Deref for LayeredTimelineWriter<'_> { type Target = dyn Timeline; fn deref(&self) -> &Self::Target { - self.0 + self.tl } } @@ -1815,10 +1784,9 @@ impl<'a> TimelineWriter for LayeredTimelineWriter<'a> { ensure!(rec.lsn.is_aligned(), "unaligned record LSN"); let seg = SegmentTag::from_blknum(rel, blknum); - let delta_size = self.0.perform_write_op(seg, rec.lsn, |layer| { - layer.put_wal_record(blknum, rec.clone()) - })?; - self.0 + let layer = self.tl.get_layer_for_write(seg, rec.lsn)?; + let delta_size = layer.put_wal_record(blknum, rec); + self.tl .increase_current_logical_size(delta_size * BLCKSZ as u32); Ok(()) } @@ -1835,11 +1803,10 @@ impl<'a> TimelineWriter for LayeredTimelineWriter<'a> { let seg = SegmentTag::from_blknum(rel, blknum); - let delta_size = self.0.perform_write_op(seg, lsn, |layer| { - layer.put_page_image(blknum, lsn, img.clone()) - })?; + let layer = self.tl.get_layer_for_write(seg, lsn)?; + let delta_size = layer.put_page_image(blknum, lsn, img); - self.0 + self.tl .increase_current_logical_size(delta_size * BLCKSZ as u32); Ok(()) } @@ -1853,8 +1820,8 @@ impl<'a> TimelineWriter for LayeredTimelineWriter<'a> { debug!("put_truncation: {} to {} blocks at {}", rel, relsize, lsn); let oldsize = self - .0 - .get_relish_size(rel, self.0.get_last_record_lsn())? + .tl + .get_relish_size(rel, self.tl.get_last_record_lsn())? .ok_or_else(|| { anyhow!( "attempted to truncate non-existent relish {} at {}", @@ -1880,8 +1847,9 @@ impl<'a> TimelineWriter for LayeredTimelineWriter<'a> { rel, segno: remove_segno, }; - self.0 - .perform_write_op(seg, lsn, |layer| layer.drop_segment(lsn))?; + + let layer = self.tl.get_layer_for_write(seg, lsn)?; + layer.drop_segment(lsn); } // Truncate the last remaining segment to the specified size @@ -1890,11 +1858,10 @@ impl<'a> TimelineWriter for LayeredTimelineWriter<'a> { rel, segno: last_remain_seg, }; - self.0.perform_write_op(seg, lsn, |layer| { - layer.put_truncation(lsn, relsize % RELISH_SEG_SIZE) - })?; + let layer = self.tl.get_layer_for_write(seg, lsn)?; + layer.put_truncation(lsn, relsize % RELISH_SEG_SIZE) } - self.0 + self.tl .decrease_current_logical_size((oldsize - relsize) * BLCKSZ as u32); Ok(()) } @@ -1903,7 +1870,10 @@ impl<'a> TimelineWriter for LayeredTimelineWriter<'a> { trace!("drop_segment: {} at {}", rel, lsn); if rel.is_blocky() { - if let Some(oldsize) = self.0.get_relish_size(rel, self.0.get_last_record_lsn())? { + if let Some(oldsize) = self + .tl + .get_relish_size(rel, self.tl.get_last_record_lsn())? + { let old_last_seg = if oldsize == 0 { 0 } else { @@ -1916,10 +1886,10 @@ impl<'a> TimelineWriter for LayeredTimelineWriter<'a> { rel, segno: remove_segno, }; - self.0 - .perform_write_op(seg, lsn, |layer| layer.drop_segment(lsn))?; + let layer = self.tl.get_layer_for_write(seg, lsn)?; + layer.drop_segment(lsn); } - self.0 + self.tl .decrease_current_logical_size(oldsize * BLCKSZ as u32); } else { warn!( @@ -1930,8 +1900,8 @@ impl<'a> TimelineWriter for LayeredTimelineWriter<'a> { } else { // TODO handle TwoPhase relishes let seg = SegmentTag::from_blknum(rel, 0); - self.0 - .perform_write_op(seg, lsn, |layer| layer.drop_segment(lsn))?; + let layer = self.tl.get_layer_for_write(seg, lsn)?; + layer.drop_segment(lsn); } Ok(()) @@ -1943,7 +1913,7 @@ impl<'a> TimelineWriter for LayeredTimelineWriter<'a> { fn advance_last_record_lsn(&self, new_lsn: Lsn) { assert!(new_lsn.is_aligned()); - self.0.last_record_lsn.advance(new_lsn); + self.tl.last_record_lsn.advance(new_lsn); } } diff --git a/pageserver/src/layered_repository/inmemory_layer.rs b/pageserver/src/layered_repository/inmemory_layer.rs index 8c44ac6296..bd1860fd47 100644 --- a/pageserver/src/layered_repository/inmemory_layer.rs +++ b/pageserver/src/layered_repository/inmemory_layer.rs @@ -15,12 +15,10 @@ use crate::{ZTenantId, ZTimelineId}; use anyhow::{bail, ensure, Result}; use bytes::Bytes; use log::*; -use std::cmp::min; use std::path::PathBuf; use std::sync::{Arc, RwLock}; use zenith_utils::vec_map::VecMap; -use zenith_utils::accum::Accum; use zenith_utils::lsn::Lsn; use super::page_versions::PageVersions; @@ -37,9 +35,6 @@ pub struct InMemoryLayer { /// start_lsn: Lsn, - /// Frozen in-memory layers have an inclusive end LSN. - end_lsn: Option, - /// LSN of the oldest page version stored in this layer oldest_pending_lsn: Lsn, @@ -52,8 +47,13 @@ pub struct InMemoryLayer { } pub struct InMemoryLayerInner { + /// Frozen in-memory layers have an exclusive end LSN. + /// Writes are only allowed when this is None + end_lsn: Option, + /// If this relation was dropped, remember when that happened. - drop_lsn: Option, + /// The drop LSN is recorded in [`end_lsn`]. + dropped: bool, /// /// All versions of all pages in the layer are are kept here. @@ -69,19 +69,11 @@ pub struct InMemoryLayerInner { /// a non-blocky rel, 'segsizes' is not used and is always empty. /// segsizes: VecMap, - - /// Writes are only allowed when true. - /// Set to false when this layer is in the process of being replaced. - writeable: bool, } impl InMemoryLayerInner { - fn check_writeable(&self) -> WriteResult<()> { - if self.writeable { - Ok(()) - } else { - Err(NonWriteableError) - } + fn assert_writeable(&self) { + assert!(self.end_lsn.is_none()); } fn get_seg_size(&self, lsn: Lsn) -> u32 { @@ -104,20 +96,17 @@ impl Layer for InMemoryLayer { let inner = self.inner.read().unwrap(); let end_lsn; - let dropped; - if let Some(drop_lsn) = inner.drop_lsn { + if let Some(drop_lsn) = inner.end_lsn { end_lsn = drop_lsn; - dropped = true; } else { end_lsn = Lsn(u64::MAX); - dropped = false; } let delta_filename = DeltaFileName { seg: self.seg, start_lsn: self.start_lsn, end_lsn, - dropped, + dropped: inner.dropped, } .to_string(); @@ -137,14 +126,10 @@ impl Layer for InMemoryLayer { } fn get_end_lsn(&self) -> Lsn { - if let Some(end_lsn) = self.end_lsn { - return Lsn(end_lsn.0 + 1); - } - let inner = self.inner.read().unwrap(); - if let Some(drop_lsn) = inner.drop_lsn { - drop_lsn + if let Some(end_lsn) = inner.end_lsn { + end_lsn } else { Lsn(u64::MAX) } @@ -152,7 +137,7 @@ impl Layer for InMemoryLayer { fn is_dropped(&self) -> bool { let inner = self.inner.read().unwrap(); - inner.drop_lsn.is_some() + inner.dropped } /// Look up given page in the cache. @@ -230,8 +215,8 @@ impl Layer for InMemoryLayer { assert!(lsn >= self.start_lsn); // Is the requested LSN after the segment was dropped? - if let Some(drop_lsn) = inner.drop_lsn { - if lsn >= drop_lsn { + if let Some(end_lsn) = inner.end_lsn { + if lsn >= end_lsn { return Ok(false); } } @@ -262,14 +247,14 @@ impl Layer for InMemoryLayer { let inner = self.inner.read().unwrap(); let end_str = inner - .drop_lsn + .end_lsn .as_ref() - .map(|drop_lsn| drop_lsn.to_string()) + .map(Lsn::to_string) .unwrap_or_default(); println!( - "----- in-memory layer for tli {} seg {} {}-{} ----", - self.timelineid, self.seg, self.start_lsn, end_str + "----- in-memory layer for tli {} seg {} {}-{} {} ----", + self.timelineid, self.seg, self.start_lsn, end_str, inner.dropped, ); for (k, v) in inner.segsizes.as_slice() { @@ -290,21 +275,6 @@ impl Layer for InMemoryLayer { } } -/// Write failed because the layer is in process of being replaced. -/// See [`LayeredTimeline::perform_write_op`] for how to handle this error. -#[derive(Debug)] -pub struct NonWriteableError; - -pub type WriteResult = std::result::Result; - -/// Helper struct to cleanup `InMemoryLayer::freeze` return signature. -pub struct FreezeLayers { - /// Replacement layer for the layer which freeze was called on. - pub frozen: Arc, - /// New open layer containing leftover data. - pub open: Option>, -} - /// A result of an inmemory layer data being written to disk. pub struct LayersOnDisk { pub delta_layers: Vec, @@ -318,10 +288,6 @@ impl LayersOnDisk { } impl InMemoryLayer { - fn assert_not_frozen(&self) { - assert!(self.end_lsn.is_none()); - } - /// Return the oldest page version that's stored in this layer pub fn get_oldest_pending_lsn(&self) -> Lsn { self.oldest_pending_lsn @@ -357,14 +323,13 @@ impl InMemoryLayer { tenantid, seg, start_lsn, - end_lsn: None, oldest_pending_lsn, incremental: false, inner: RwLock::new(InMemoryLayerInner { - drop_lsn: None, + end_lsn: None, + dropped: false, page_versions: PageVersions::default(), segsizes, - writeable: true, }), }) } @@ -372,7 +337,7 @@ impl InMemoryLayer { // Write operations /// Remember new page version, as a WAL record over previous version - pub fn put_wal_record(&self, blknum: u32, rec: WALRecord) -> WriteResult { + pub fn put_wal_record(&self, blknum: u32, rec: WALRecord) -> u32 { self.put_page_version( blknum, rec.lsn, @@ -384,7 +349,7 @@ impl InMemoryLayer { } /// Remember new page version, as a full page image - pub fn put_page_image(&self, blknum: u32, lsn: Lsn, img: Bytes) -> WriteResult { + pub fn put_page_image(&self, blknum: u32, lsn: Lsn, img: Bytes) -> u32 { self.put_page_version( blknum, lsn, @@ -397,8 +362,7 @@ impl InMemoryLayer { /// Common subroutine of the public put_wal_record() and put_page_image() functions. /// Adds the page version to the in-memory tree - pub fn put_page_version(&self, blknum: u32, lsn: Lsn, pv: PageVersion) -> WriteResult { - self.assert_not_frozen(); + pub fn put_page_version(&self, blknum: u32, lsn: Lsn, pv: PageVersion) -> u32 { assert!(self.seg.blknum_in_seg(blknum)); trace!( @@ -410,7 +374,7 @@ impl InMemoryLayer { ); let mut inner = self.inner.write().unwrap(); - inner.check_writeable()?; + inner.assert_writeable(); let old = inner.page_versions.append_or_update_last(blknum, lsn, pv); @@ -471,22 +435,22 @@ impl InMemoryLayer { } inner.segsizes.append_or_update_last(lsn, newsize).unwrap(); - return Ok(newsize - oldsize); + return newsize - oldsize; } } - Ok(0) + + 0 } /// Remember that the relation was truncated at given LSN - pub fn put_truncation(&self, lsn: Lsn, segsize: u32) -> WriteResult<()> { + pub fn put_truncation(&self, lsn: Lsn, segsize: u32) { assert!( self.seg.rel.is_blocky(), "put_truncation() called on a non-blocky rel" ); - self.assert_not_frozen(); let mut inner = self.inner.write().unwrap(); - inner.check_writeable()?; + inner.assert_writeable(); // check that this we truncate to a smaller size than segment was before the truncation let oldsize = inner.get_seg_size(lsn); @@ -498,25 +462,19 @@ impl InMemoryLayer { // We already had an entry for this LSN. That's odd.. warn!("Inserting truncation, but had an entry for the LSN already"); } - - Ok(()) } /// Remember that the segment was dropped at given LSN - pub fn drop_segment(&self, lsn: Lsn) -> WriteResult<()> { - self.assert_not_frozen(); - + pub fn drop_segment(&self, lsn: Lsn) { let mut inner = self.inner.write().unwrap(); - inner.check_writeable()?; - - assert!(inner.drop_lsn.is_none()); - inner.drop_lsn = Some(lsn); - inner.writeable = false; + assert!(inner.end_lsn.is_none()); + assert!(!inner.dropped); + inner.dropped = true; + assert!(self.start_lsn < lsn); + inner.end_lsn = Some(lsn); trace!("dropped segment {} at {}", self.seg, lsn); - - Ok(()) } /// @@ -556,116 +514,43 @@ impl InMemoryLayer { tenantid, seg, start_lsn, - end_lsn: None, oldest_pending_lsn, incremental: true, inner: RwLock::new(InMemoryLayerInner { - drop_lsn: None, + end_lsn: None, + dropped: false, page_versions: PageVersions::default(), segsizes, - writeable: true, }), }) } pub fn is_writeable(&self) -> bool { let inner = self.inner.read().unwrap(); - inner.writeable + inner.end_lsn.is_none() } - /// Splits `self` into two InMemoryLayers: `frozen` and `open`. - /// All data up to and including `cutoff_lsn` - /// is copied to `frozen`, while the remaining data is copied to `open`. - /// After completion, self is non-writeable, but not frozen. - pub fn freeze(self: Arc, cutoff_lsn: Lsn) -> Result { - info!( - "freezing in-memory layer {} at {} (oldest {})", - self.filename().display(), - cutoff_lsn, - self.oldest_pending_lsn - ); + /// Make the layer non-writeable. Only call once. + /// Records the end_lsn for non-dropped layers. + /// `end_lsn` is inclusive + pub fn freeze(&self, end_lsn: Lsn) { + let mut inner = self.inner.write().unwrap(); - self.assert_not_frozen(); - - let self_ref = self.clone(); - let mut inner = self_ref.inner.write().unwrap(); - // Dropped layers don't need any special freeze actions, - // they are marked as non-writeable at drop and just - // written out to disk by checkpointer. - if inner.drop_lsn.is_some() { - assert!(!inner.writeable); - info!( - "freezing in memory layer for {} on timeline {} is dropped at {}", - self.seg, - self.timelineid, - inner.drop_lsn.unwrap() - ); - - // There should be no newer layer that refers this non-writeable layer, - // because layer that is created after dropped one represents a new rel. - return Ok(FreezeLayers { - frozen: self, - open: None, - }); - } - assert!(inner.writeable); - inner.writeable = false; - - // Divide all the page versions into old and new - // at the 'cutoff_lsn' point. - let mut after_oldest_lsn: Accum = Accum(None); - - let cutoff_lsn_exclusive = Lsn(cutoff_lsn.0 + 1); - - let (before_segsizes, mut after_segsizes) = inner.segsizes.split_at(&cutoff_lsn_exclusive); - if let Some((lsn, _size)) = after_segsizes.as_slice().first() { - after_oldest_lsn.accum(min, *lsn); - } - - let (before_page_versions, after_page_versions) = inner - .page_versions - .split_at(cutoff_lsn_exclusive, &mut after_oldest_lsn); - - let frozen = Arc::new(InMemoryLayer { - conf: self.conf, - tenantid: self.tenantid, - timelineid: self.timelineid, - seg: self.seg, - start_lsn: self.start_lsn, - end_lsn: Some(cutoff_lsn), - oldest_pending_lsn: self.start_lsn, - incremental: self.incremental, - inner: RwLock::new(InMemoryLayerInner { - drop_lsn: inner.drop_lsn, - page_versions: before_page_versions, - segsizes: before_segsizes, - writeable: false, - }), - }); - - let open = if !after_segsizes.is_empty() || !after_page_versions.is_empty() { - let mut new_open = Self::create_successor_layer( - self.conf, - frozen.clone(), - self.timelineid, - self.tenantid, - cutoff_lsn + 1, - after_oldest_lsn.0.unwrap(), - )?; - - let new_inner = new_open.inner.get_mut().unwrap(); - // Ensure page_versions doesn't contain anything - // so we can just replace it - assert!(new_inner.page_versions.is_empty()); - new_inner.page_versions = after_page_versions; - new_inner.segsizes.extend(&mut after_segsizes).unwrap(); - - Some(Arc::new(new_open)) + if inner.end_lsn.is_some() { + assert!(inner.dropped); } else { - None - }; + assert!(!inner.dropped); + assert!(self.start_lsn < end_lsn + 1); + inner.end_lsn = Some(Lsn(end_lsn.0 + 1)); - Ok(FreezeLayers { frozen, open }) + if let Some((lsn, _)) = inner.segsizes.as_slice().last() { + assert!(lsn <= &end_lsn, "{:?} {:?}", lsn, end_lsn); + } + + for (_blk, lsn, _pv) in inner.page_versions.ordered_page_version_iter(None) { + assert!(lsn <= end_lsn); + } + } } /// Write the this frozen in-memory layer to disk. @@ -678,9 +563,8 @@ impl InMemoryLayer { /// end LSN are the same.) pub fn write_to_disk(&self, timeline: &LayeredTimeline) -> Result { trace!( - "write_to_disk {} end_lsn is {} get_end_lsn is {}", + "write_to_disk {} get_end_lsn is {}", self.filename().display(), - self.end_lsn.unwrap_or(Lsn(0)), self.get_end_lsn() ); @@ -694,16 +578,16 @@ impl InMemoryLayer { // would have to wait until we release it. That race condition is very // rare though, so we just accept the potential latency hit for now. let inner = self.inner.read().unwrap(); - assert!(!inner.writeable); + let end_lsn_exclusive = inner.end_lsn.unwrap(); - if let Some(drop_lsn) = inner.drop_lsn { + if inner.dropped { let delta_layer = DeltaLayer::create( self.conf, self.timelineid, self.tenantid, self.seg, self.start_lsn, - drop_lsn, + end_lsn_exclusive, true, inner.page_versions.ordered_page_version_iter(None), inner.segsizes.clone(), @@ -712,7 +596,7 @@ impl InMemoryLayer { "freeze: created delta layer for dropped segment {} {}-{}", self.seg, self.start_lsn, - drop_lsn + end_lsn_exclusive ); return Ok(LayersOnDisk { delta_layers: vec![delta_layer], @@ -720,14 +604,19 @@ impl InMemoryLayer { }); } - let end_lsn = self.end_lsn.unwrap(); + // Since `end_lsn` is inclusive, subtract 1. + // We want to make an ImageLayer for the last included LSN, + // so the DeltaLayer should exlcude that LSN. + let end_lsn_inclusive = Lsn(end_lsn_exclusive.0 - 1); - let mut before_page_versions = inner.page_versions.ordered_page_version_iter(Some(end_lsn)); + let mut page_versions = inner + .page_versions + .ordered_page_version_iter(Some(end_lsn_inclusive)); let mut delta_layers = Vec::new(); - if self.start_lsn != end_lsn { - let (before_segsizes, _after_segsizes) = inner.segsizes.split_at(&Lsn(end_lsn.0 + 1)); + if self.start_lsn != end_lsn_inclusive { + let (segsizes, _) = inner.segsizes.split_at(&end_lsn_exclusive); // Write the page versions before the cutoff to disk. let delta_layer = DeltaLayer::create( self.conf, @@ -735,27 +624,32 @@ impl InMemoryLayer { self.tenantid, self.seg, self.start_lsn, - end_lsn, + end_lsn_inclusive, false, - before_page_versions, - before_segsizes, + page_versions, + segsizes, )?; delta_layers.push(delta_layer); trace!( "freeze: created delta layer {} {}-{}", self.seg, self.start_lsn, - end_lsn + end_lsn_inclusive ); } else { - assert!(before_page_versions.next().is_none()); + assert!(page_versions.next().is_none()); } drop(inner); // Write a new base image layer at the cutoff point - let image_layer = ImageLayer::create_from_src(self.conf, timeline, self, end_lsn)?; - trace!("freeze: created image layer {} at {}", self.seg, end_lsn); + let image_layer = + ImageLayer::create_from_src(self.conf, timeline, self, end_lsn_inclusive)?; + trace!( + "freeze: created image layer {} at {}", + self.seg, + end_lsn_inclusive + ); Ok(LayersOnDisk { delta_layers, diff --git a/pageserver/src/layered_repository/page_versions.rs b/pageserver/src/layered_repository/page_versions.rs index fe7df0caa9..90321f96cd 100644 --- a/pageserver/src/layered_repository/page_versions.rs +++ b/pageserver/src/layered_repository/page_versions.rs @@ -1,6 +1,6 @@ use std::{collections::HashMap, ops::RangeBounds, slice}; -use zenith_utils::{accum::Accum, lsn::Lsn, vec_map::VecMap}; +use zenith_utils::{lsn::Lsn, vec_map::VecMap}; use super::storage_layer::PageVersion; @@ -10,10 +10,6 @@ const EMPTY_SLICE: &[(Lsn, PageVersion)] = &[]; pub struct PageVersions(HashMap>); impl PageVersions { - pub fn is_empty(&self) -> bool { - self.0.is_empty() - } - pub fn append_or_update_last( &mut self, blknum: u32, @@ -44,34 +40,6 @@ impl PageVersions { .unwrap_or(EMPTY_SLICE) } - /// Split the page version map into two. - /// - /// Left contains everything up to and not including [`cutoff_lsn`]. - /// Right contains [`cutoff_lsn`] and everything after. - pub fn split_at(&self, cutoff_lsn: Lsn, after_oldest_lsn: &mut Accum) -> (Self, Self) { - let mut before_blocks = HashMap::new(); - let mut after_blocks = HashMap::new(); - - for (blknum, vec_map) in self.0.iter() { - let (before_versions, after_versions) = vec_map.split_at(&cutoff_lsn); - - if !before_versions.is_empty() { - let old = before_blocks.insert(*blknum, before_versions); - assert!(old.is_none()); - } - - if !after_versions.is_empty() { - let (first_lsn, _first_pv) = &after_versions.as_slice()[0]; - after_oldest_lsn.accum(std::cmp::min, *first_lsn); - - let old = after_blocks.insert(*blknum, after_versions); - assert!(old.is_none()); - } - } - - (Self(before_blocks), Self(after_blocks)) - } - /// Iterate through [`PageVersion`]s in (block, lsn) order. /// If a [`cutoff_lsn`] is set, only show versions with `lsn < cutoff_lsn` pub fn ordered_page_version_iter(&self, cutoff_lsn: Option) -> OrderedPageVersionIter<'_> {