diff --git a/pageserver/src/layered_repository.rs b/pageserver/src/layered_repository.rs index f5eb7b2082..613610664f 100644 --- a/pageserver/src/layered_repository.rs +++ b/pageserver/src/layered_repository.rs @@ -31,6 +31,7 @@ use std::sync::{Arc, Mutex, MutexGuard}; use std::time::{Duration, Instant}; use std::{fs, thread}; +use crate::layered_repository::inmemory_layer::FreezeLayers; use crate::relish::*; use crate::repository::{GcResult, Repository, Timeline, WALRecord}; use crate::restore_local_repo::import_timeline_wal; @@ -1272,11 +1273,22 @@ impl LayeredTimeline { } // freeze it - let (new_historics, new_open) = oldest_layer.freeze(last_record_lsn, self)?; + let FreezeLayers { + frozen, + open: maybe_new_open, + } = oldest_layer.freeze(last_record_lsn)?; + + let new_historics = frozen.write_to_disk(self)?; + + if let Some(last_historic) = new_historics.last() { + if let Some(new_open) = &maybe_new_open { + new_open.update_predecessor(Arc::clone(last_historic)); + } + } // replace this layer with the new layers that 'freeze' returned layers.pop_oldest_open(); - if let Some(n) = new_open { + if let Some(n) = maybe_new_open { layers.insert_open(n); } for n in new_historics { diff --git a/pageserver/src/layered_repository/inmemory_layer.rs b/pageserver/src/layered_repository/inmemory_layer.rs index 80d9825de9..8b31c8df10 100644 --- a/pageserver/src/layered_repository/inmemory_layer.rs +++ b/pageserver/src/layered_repository/inmemory_layer.rs @@ -15,7 +15,6 @@ use crate::{ZTenantId, ZTimelineId}; use anyhow::{bail, Result}; use bytes::Bytes; use log::*; -use std::cmp::Ordering; use std::collections::BTreeMap; use std::ops::Bound::Included; use std::path::PathBuf; @@ -31,20 +30,19 @@ pub struct InMemoryLayer { /// /// This layer contains all the changes from 'start_lsn'. The - /// start is inclusive. There is no end LSN; we only use in-memory - /// layer at the end of a timeline. + /// start is inclusive. /// start_lsn: Lsn, + /// Frozen in-memory layers have an inclusive end LSN. + end_lsn: Option, + /// LSN of the oldest page version stored in this layer oldest_pending_lsn: Lsn, /// The above fields never change. The parts that do change are in 'inner', /// and protected by mutex. inner: Mutex, - - /// Predecessor layer - predecessor: Option>, } pub struct InMemoryLayerInner { @@ -62,9 +60,12 @@ pub struct InMemoryLayerInner { /// segsizes: BTreeMap, - /// True if freeze() has been called on the layer, indicating it no longer - /// accepts writes. - frozen: bool, + /// Writes are only allowed when true. + /// Set to false when this layer is in the process of being replaced. + writeable: bool, + + /// Predecessor layer + predecessor: Option>, } impl InMemoryLayerInner { @@ -74,7 +75,7 @@ impl InMemoryLayerInner { // `get_layer_for_write` and InMemoryLayer write calls interleave with // a checkpoint operation, however timing should make this rare. // Assert only so we can identify when the bug triggers more easily. - assert!(!self.frozen); + assert!(self.writeable); } fn get_seg_size(&self, lsn: Lsn) -> u32 { @@ -154,6 +155,8 @@ impl Layer for InMemoryLayer { assert!(self.seg.blknum_in_seg(blknum)); + let predecessor: Option>; + { let inner = self.inner.lock().unwrap(); @@ -181,17 +184,15 @@ impl Layer for InMemoryLayer { } } + predecessor = inner.predecessor.clone(); // release lock on 'inner' } // If an older page image is needed to reconstruct the page, let the // caller know about the predecessor layer. if need_image { - if let Some(cont_layer) = &self.predecessor { - Ok(PageReconstructResult::Continue( - self.start_lsn, - Arc::clone(cont_layer), - )) + if let Some(cont_layer) = predecessor { + Ok(PageReconstructResult::Continue(self.start_lsn, cont_layer)) } else { Ok(PageReconstructResult::Missing(self.start_lsn)) } @@ -242,7 +243,8 @@ impl Layer for InMemoryLayer { } fn is_incremental(&self) -> bool { - self.predecessor.is_some() + let inner = self.inner.lock().unwrap(); + inner.predecessor.is_some() } /// debugging function to print out the contents of the layer @@ -278,11 +280,19 @@ impl Layer for InMemoryLayer { } } -// Type alias to simplify InMemoryLayer::freeze signature -// -type SuccessorLayers = (Vec>, Option>); +/// Helper struct to cleanup `InMemoryLayer::freeze` return signature. +pub struct FreezeLayers { + /// Replacement layer for the layer which freeze was called on. + pub frozen: Arc, + /// New open layer containing leftover data. + pub open: Option>, +} impl InMemoryLayer { + fn assert_not_frozen(&self) { + assert!(self.end_lsn.is_none()); + } + /// Return the oldest page version that's stored in this layer pub fn get_oldest_pending_lsn(&self) -> Lsn { self.oldest_pending_lsn @@ -312,14 +322,15 @@ impl InMemoryLayer { tenantid, seg, start_lsn, + end_lsn: None, oldest_pending_lsn, inner: Mutex::new(InMemoryLayerInner { drop_lsn: None, page_versions: BTreeMap::new(), segsizes: BTreeMap::new(), - frozen: false, + writeable: true, + predecessor: None, }), - predecessor: None, }) } @@ -352,6 +363,7 @@ impl InMemoryLayer { /// Common subroutine of the public put_wal_record() and put_page_image() functions. /// Adds the page version to the in-memory tree pub fn put_page_version(&self, blknum: u32, lsn: Lsn, pv: PageVersion) -> Result { + self.assert_not_frozen(); assert!(self.seg.blknum_in_seg(blknum)); trace!( @@ -430,8 +442,9 @@ impl InMemoryLayer { /// Remember that the relation was truncated at given LSN pub fn put_truncation(&self, lsn: Lsn, segsize: u32) -> anyhow::Result<()> { - let mut inner = self.inner.lock().unwrap(); + self.assert_not_frozen(); + let mut inner = self.inner.lock().unwrap(); inner.assert_writeable(); // check that this we truncate to a smaller size than segment was before the truncation @@ -450,6 +463,8 @@ impl InMemoryLayer { /// Remember that the segment was dropped at given LSN pub fn drop_segment(&self, lsn: Lsn) -> anyhow::Result<()> { + self.assert_not_frozen(); + let mut inner = self.inner.lock().unwrap(); inner.assert_writeable(); @@ -496,37 +511,23 @@ impl InMemoryLayer { tenantid, seg, start_lsn, + end_lsn: None, oldest_pending_lsn, inner: Mutex::new(InMemoryLayerInner { drop_lsn: None, page_versions: BTreeMap::new(), segsizes, - frozen: false, + writeable: true, + predecessor: Some(src), }), - predecessor: Some(src), }) } - /// - /// Write the this in-memory layer to disk. - /// - /// The cutoff point for the layer that's written to disk is 'end_lsn'. - /// - /// Returns new layers that replace this one. Always returns a new image - /// layer containing the page versions at the cutoff LSN, that were written - /// to disk, and usually also a DeltaLayer that includes all the WAL records - /// between start LSN and the cutoff. (The delta layer is not needed when - /// a new relish is created with a single LSN, so that the start and end LSN - /// are the same.) If there were page versions newer than 'end_lsn', also - /// returns a new in-memory layer containing those page versions. The caller - /// replaces this layer with the returned layers in the layer map. - /// - pub fn freeze( - &self, - cutoff_lsn: Lsn, - // This is needed just to call materialize_page() - timeline: &LayeredTimeline, - ) -> Result { + /// Splits `self` into two InMemoryLayers: `frozen` and `open`. + /// All data up to and including `cutoff_lsn` (or the drop LSN, if dropped) + /// is copied to `frozen`, while the remaining data is copied to `open`. + /// After completion, self is non-writeable, but not frozen. + pub fn freeze(&self, cutoff_lsn: Lsn) -> Result { info!( "freezing in memory layer for {} on timeline {} at {}", self.seg, self.timelineid, cutoff_lsn @@ -534,7 +535,7 @@ impl InMemoryLayer { let mut inner = self.inner.lock().unwrap(); inner.assert_writeable(); - inner.frozen = true; + inner.writeable = false; // Normally, use the cutoff LSN as the end of the frozen layer. // But if the relation was dropped, we know that there are no @@ -568,17 +569,10 @@ impl InMemoryLayer { before_page_versions = BTreeMap::new(); after_page_versions = BTreeMap::new(); for ((blknum, lsn), pv) in inner.page_versions.iter() { - match lsn.cmp(&end_lsn) { - Ordering::Less => { - before_page_versions.insert((*blknum, *lsn), pv.clone()); - } - Ordering::Equal => { - // Page versions at the cutoff LSN will be stored in the - // materialized image layer. - } - Ordering::Greater => { - after_page_versions.insert((*blknum, *lsn), pv.clone()); - } + if *lsn > end_lsn { + after_page_versions.insert((*blknum, *lsn), pv.clone()); + } else { + before_page_versions.insert((*blknum, *lsn), pv.clone()); } } } else { @@ -588,7 +582,83 @@ impl InMemoryLayer { after_page_versions = BTreeMap::new(); } - // we can release the lock now. + let frozen = Arc::new(InMemoryLayer { + conf: self.conf, + tenantid: self.tenantid, + timelineid: self.timelineid, + seg: self.seg, + start_lsn: self.start_lsn, + end_lsn: Some(end_lsn), + oldest_pending_lsn: self.start_lsn, + inner: Mutex::new(InMemoryLayerInner { + drop_lsn: inner.drop_lsn, + page_versions: before_page_versions, + segsizes: before_segsizes, + writeable: false, + predecessor: inner.predecessor.clone(), + }), + }); + + let open = if !dropped && (!after_segsizes.is_empty() || !after_page_versions.is_empty()) { + let mut new_open = Self::create_successor_layer( + self.conf, + frozen.clone(), + self.timelineid, + self.tenantid, + end_lsn, + end_lsn, + )?; + + let new_inner = new_open.inner.get_mut().unwrap(); + new_inner.page_versions.append(&mut after_page_versions); + new_inner.segsizes.append(&mut after_segsizes); + + Some(Arc::new(new_open)) + } else { + None + }; + + // TODO could we avoid creating the `frozen` if it contains no data + Ok(FreezeLayers { frozen, open }) + } + + /// Write the this frozen in-memory layer to disk. + /// + /// Returns new layers that replace this one. + /// If not dropped, returns a new image layer containing the page versions + /// at the `end_lsn`. Can also return a DeltaLayer that includes all the + /// WAL records between start and end LSN. (The delta layer is not needed + /// when a new relish is created with a single LSN, so that the start and + /// end LSN are the same.) + pub fn write_to_disk(&self, timeline: &LayeredTimeline) -> Result>> { + let end_lsn = self.end_lsn.expect("can only write frozen layers to disk"); + + let inner = self.inner.lock().unwrap(); + + let drop_lsn = inner.drop_lsn.clone(); + let predecessor = inner.predecessor.clone(); + + let mut before_page_versions; + let mut before_segsizes; + if inner.drop_lsn.is_none() { + before_segsizes = BTreeMap::new(); + for (lsn, size) in inner.segsizes.iter() { + if *lsn <= end_lsn { + before_segsizes.insert(*lsn, *size); + } + } + + before_page_versions = BTreeMap::new(); + for ((blknum, lsn), pv) in inner.page_versions.iter() { + if *lsn < end_lsn { + before_page_versions.insert((*blknum, *lsn), pv.clone()); + } + } + } else { + before_page_versions = inner.page_versions.clone(); + before_segsizes = inner.segsizes.clone(); + } + drop(inner); let mut frozen_layers: Vec> = Vec::new(); @@ -602,13 +672,12 @@ impl InMemoryLayer { self.seg, self.start_lsn, end_lsn, - dropped, - self.predecessor.clone(), + drop_lsn.is_some(), + predecessor, before_page_versions, before_segsizes, )?; - let delta_layer_rc: Arc = Arc::new(delta_layer); - frozen_layers.push(delta_layer_rc); + frozen_layers.push(Arc::new(delta_layer)); trace!( "freeze: created delta layer {} {}-{}", self.seg, @@ -619,35 +688,18 @@ impl InMemoryLayer { assert!(before_page_versions.is_empty()); } - let mut new_open_rc = None; - if !dropped { + if drop_lsn.is_none() { // Write a new base image layer at the cutoff point - let imgfile = ImageLayer::create_from_src(self.conf, timeline, self, end_lsn)?; - let imgfile_rc: Arc = Arc::new(imgfile); - frozen_layers.push(Arc::clone(&imgfile_rc)); + let image_layer = ImageLayer::create_from_src(self.conf, &timeline, self, end_lsn)?; + frozen_layers.push(Arc::new(image_layer)); trace!("freeze: created image layer {} at {}", self.seg, end_lsn); - - // If there were any page versions newer than the cutoff, initialize a new in-memory - // layer to hold them - if !after_segsizes.is_empty() || !after_page_versions.is_empty() { - let new_open = Self::create_successor_layer( - self.conf, - imgfile_rc, - self.timelineid, - self.tenantid, - end_lsn, - end_lsn, - )?; - let mut new_inner = new_open.inner.lock().unwrap(); - new_inner.page_versions.append(&mut after_page_versions); - new_inner.segsizes.append(&mut after_segsizes); - drop(new_inner); - trace!("freeze: created new in-mem layer {} {}-", self.seg, end_lsn); - - new_open_rc = Some(Arc::new(new_open)) - } } - Ok((frozen_layers, new_open_rc)) + Ok(frozen_layers) + } + + pub fn update_predecessor(&self, predecessor: Arc) { + let mut inner = self.inner.lock().unwrap(); + inner.predecessor = Some(predecessor); } }