From 5b79e033bdec16c8f6c97d49465866aeff48c54f Mon Sep 17 00:00:00 2001 From: Eric Seppanen Date: Wed, 18 Aug 2021 10:55:43 -0700 Subject: [PATCH] wip: adapt layered_repository to snapfile --- pageserver/src/layered_repository.rs | 1 + .../src/layered_repository/inmemory_layer.rs | 105 +++++++++--------- .../src/layered_repository/page_history.rs | 94 ++++++++++++++++ .../src/layered_repository/snapshot_layer.rs | 28 ++++- 4 files changed, 171 insertions(+), 57 deletions(-) create mode 100644 pageserver/src/layered_repository/page_history.rs diff --git a/pageserver/src/layered_repository.rs b/pageserver/src/layered_repository.rs index cc78086468..3df41c3ae0 100644 --- a/pageserver/src/layered_repository.rs +++ b/pageserver/src/layered_repository.rs @@ -41,6 +41,7 @@ use zenith_utils::seqwait::SeqWait; mod inmemory_layer; mod layer_map; +mod page_history; mod snapshot_layer; mod storage_layer; diff --git a/pageserver/src/layered_repository/inmemory_layer.rs b/pageserver/src/layered_repository/inmemory_layer.rs index 70a7b7216e..3261c4a84d 100644 --- a/pageserver/src/layered_repository/inmemory_layer.rs +++ b/pageserver/src/layered_repository/inmemory_layer.rs @@ -2,15 +2,15 @@ //! An in-memory layer stores recently received page versions in memory. The page versions //! are held in a BTreeMap, and there's another BTreeMap to track the size of the relation. //! +use crate::layered_repository::page_history::PageHistory; use crate::layered_repository::storage_layer::{ Layer, PageReconstructData, PageVersion, SegmentTag, RELISH_SEG_SIZE, }; -use crate::layered_repository::LayeredTimeline; -use crate::layered_repository::SnapshotLayer; +use crate::layered_repository::{LayeredTimeline, SnapshotLayer}; use crate::repository::WALRecord; use crate::PageServerConf; use crate::{ZTenantId, ZTimelineId}; -use anyhow::{bail, Result}; +use anyhow::{anyhow, bail, Result}; use bytes::Bytes; use log::*; use std::collections::BTreeMap; @@ -43,9 +43,9 @@ pub struct InMemoryLayerInner { /// /// All versions of all pages in the layer are are kept here. - /// Indexed by block number and LSN. + /// Indexed by block number. /// - page_versions: BTreeMap<(u32, Lsn), PageVersion>, + pages: BTreeMap, /// /// `segsizes` tracks the size of the segment at different points in time. @@ -90,29 +90,32 @@ impl Layer for InMemoryLayer { ) -> Result> { // Scan the BTreeMap backwards, starting from reconstruct_data.lsn. let mut need_base_image_lsn: Option = Some(lsn); - assert!(self.seg.blknum_in_seg(blknum)); { let inner = self.inner.lock().unwrap(); - let minkey = (blknum, Lsn(0)); - let maxkey = (blknum, lsn); - let mut iter = inner - .page_versions - .range((Included(&minkey), Included(&maxkey))); - while let Some(((_blknum, entry_lsn), entry)) = iter.next_back() { + let pages = &inner.pages; + + // FIXME: this assumes the latest page version is always the right answer. + // How should this work if the requested lsn is in the past? in the future? + + let latest_version = pages + .get(&blknum) + .and_then(PageHistory::latest) + .ok_or_else(|| anyhow!("page not found"))?; + + let (entry_lsn, entry) = latest_version; + if true { if let Some(img) = &entry.page_image { reconstruct_data.page_img = Some(img.clone()); need_base_image_lsn = None; - break; } else if let Some(rec) = &entry.record { reconstruct_data.records.push(rec.clone()); if rec.will_init { // This WAL record initializes the page, so no need to go further back need_base_image_lsn = None; - break; } else { - need_base_image_lsn = Some(*entry_lsn); + need_base_image_lsn = Some(entry_lsn); } } else { // No base image, and no WAL record. Huh? @@ -120,7 +123,7 @@ impl Layer for InMemoryLayer { } } - // release lock on 'page_versions' + // release lock on self.pages } Ok(need_base_image_lsn) @@ -184,7 +187,7 @@ impl InMemoryLayer { start_lsn, inner: Mutex::new(InMemoryLayerInner { drop_lsn: None, - page_versions: BTreeMap::new(), + pages: BTreeMap::new(), segsizes: BTreeMap::new(), }), }) @@ -230,15 +233,11 @@ impl InMemoryLayer { ); let mut inner = self.inner.lock().unwrap(); - let old = inner.page_versions.insert((blknum, lsn), pv); - - if old.is_some() { - // We already had an entry for this LSN. That's odd.. - warn!( - "Page version of rel {} blk {} at {} already exists", - self.seg.rel, blknum, lsn - ); - } + let page_history = inner + .pages + .entry(blknum) + .or_insert_with(PageHistory::default); + page_history.push(lsn, pv); // Also update the relation size, if this extended the relation. if self.seg.rel.is_blocky() { @@ -311,7 +310,7 @@ impl InMemoryLayer { timelineid, lsn ); - let mut page_versions = BTreeMap::new(); + let mut pages = BTreeMap::new(); let mut segsizes = BTreeMap::new(); let seg = src.get_seg_tag(); @@ -333,7 +332,8 @@ impl InMemoryLayer { page_image: Some(img), record: None, }; - page_versions.insert((blknum, lsn), pv); + let page_history = PageHistory::from_image(lsn, pv); + pages.insert(blknum, page_history); } Ok(InMemoryLayer { @@ -344,8 +344,8 @@ impl InMemoryLayer { start_lsn: lsn, inner: Mutex::new(InMemoryLayerInner { drop_lsn: None, - page_versions: page_versions, - segsizes: segsizes, + pages, + segsizes, }), }) } @@ -388,10 +388,11 @@ impl InMemoryLayer { }; // Divide all the page versions into old and new at the 'end_lsn' cutoff point. - let mut before_page_versions; + let mut before_pages = BTreeMap::new(); let mut before_segsizes; - let mut after_page_versions; let mut after_segsizes; + let mut after_pages = BTreeMap::new(); + if !dropped { before_segsizes = BTreeMap::new(); after_segsizes = BTreeMap::new(); @@ -403,20 +404,16 @@ impl InMemoryLayer { } } - before_page_versions = BTreeMap::new(); - after_page_versions = BTreeMap::new(); - for ((blknum, lsn), pv) in inner.page_versions.iter() { - if *lsn > end_lsn { - after_page_versions.insert((*blknum, *lsn), pv.clone()); - } else { - before_page_versions.insert((*blknum, *lsn), pv.clone()); - } + for (blknum, page_history) in inner.pages.iter() { + let (old, new) = page_history.clone().split_at(end_lsn); + before_pages.insert(*blknum, old); + after_pages.insert(*blknum, new); } } else { - before_page_versions = inner.page_versions.clone(); + before_pages = inner.pages.clone(); before_segsizes = inner.segsizes.clone(); after_segsizes = BTreeMap::new(); - after_page_versions = BTreeMap::new(); + after_pages = BTreeMap::new(); } // we can release the lock now. @@ -431,13 +428,13 @@ impl InMemoryLayer { self.start_lsn, end_lsn, dropped, - before_page_versions, + before_pages, before_segsizes, )?; // If there were any "new" page versions, initialize a new in-memory layer to hold // them - let new_open = if !after_segsizes.is_empty() || !after_page_versions.is_empty() { + let new_open = if !after_segsizes.is_empty() || !after_pages.is_empty() { info!("created new in-mem layer for {} {}-", self.seg, end_lsn); let new_open = Self::copy_snapshot( @@ -449,7 +446,7 @@ impl InMemoryLayer { end_lsn, )?; let mut new_inner = new_open.inner.lock().unwrap(); - new_inner.page_versions.append(&mut after_page_versions); + new_inner.pages.append(&mut after_pages); new_inner.segsizes.append(&mut after_segsizes); drop(new_inner); @@ -476,14 +473,16 @@ impl InMemoryLayer { for (k, v) in inner.segsizes.iter() { result += &format!("{}: {}\n", k, v); } - for (k, v) in inner.page_versions.iter() { - result += &format!( - "blk {} at {}: {}/{}\n", - k.0, - k.1, - v.page_image.is_some(), - v.record.is_some() - ); + for (page_num, page_history) in inner.pages.iter() { + for (lsn, image) in page_history.iter() { + result += &format!( + "blk {} at {}: {}/{}\n", + page_num, + lsn, + image.page_image.is_some(), + image.record.is_some() + ); + } } result diff --git a/pageserver/src/layered_repository/page_history.rs b/pageserver/src/layered_repository/page_history.rs new file mode 100644 index 0000000000..2b36594ef7 --- /dev/null +++ b/pageserver/src/layered_repository/page_history.rs @@ -0,0 +1,94 @@ +use super::storage_layer::PageVersion; +use std::collections::VecDeque; +use zenith_utils::lsn::Lsn; + +/// A data structure that holds one or more versions of a particular page number. +// +#[derive(Default, Clone)] +pub struct PageHistory { + /// Pages stored in order, from oldest to newest. + pages: VecDeque<(Lsn, PageVersion)>, +} + +impl PageHistory { + /// Create a new PageHistory containing a single image. + pub fn from_image(lsn: Lsn, image: PageVersion) -> Self { + let mut pages = VecDeque::new(); + pages.push_back((lsn, image)); + PageHistory { pages } + } + + /// Push a newer page image. + pub fn push(&mut self, lsn: Lsn, page: PageVersion) { + if let Some((back_lsn, _)) = self.pages.back() { + debug_assert_ne!( + back_lsn, &lsn, + "push page at lsn {:?} but one already exists", + lsn + ); + debug_assert!(back_lsn < &lsn, "pushed page is older than latest lsn"); + } + self.pages.push_back((lsn, page)); + } + + pub fn latest(&self) -> Option<(Lsn, &PageVersion)> { + self.pages.back().map(|(lsn, page)| (*lsn, page)) + } + + /// Split a page history at a particular LSN. + /// + /// This consumes this PageHistory and returns two new ones. + /// Any changes exactly matching the split LSN will be in the + /// "old" history. + // + // FIXME: Is this necessary? There is some debate whether "splitting" + // layers is the best design. + // + pub fn split_at(self, split_lsn: Lsn) -> (PageHistory, PageHistory) { + let mut old = PageHistory::default(); + let mut new = PageHistory::default(); + for (lsn, page) in self.pages { + if lsn > split_lsn { + new.push(lsn, page) + } else { + old.push(lsn, page); + } + } + (old, new) + } + + pub fn iter(&self) -> impl Iterator { + self.pages.iter() + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn page_history() { + fn make_page(b: u8) -> PageVersion { + let image = vec![b; 8192].into(); + PageVersion { + page_image: Some(image), + record: None, + } + } + + let mut ph = PageHistory::default(); + ph.push(10.into(), make_page(1)); + ph.push(20.into(), make_page(2)); + ph.push(30.into(), make_page(3)); + + let (latest_lsn, latest_image) = ph.latest().unwrap(); + assert_eq!(latest_lsn, 30.into()); + assert!(matches!(latest_image, PageVersion { page_image: Some(im), .. } if im[0] == 3)); + + let mut it = ph.iter(); + assert_eq!(it.next().unwrap().0, 10.into()); + assert_eq!(it.next().unwrap().0, 20.into()); + assert_eq!(it.next().unwrap().0, 30.into()); + assert!(it.next().is_none()); + } +} diff --git a/pageserver/src/layered_repository/snapshot_layer.rs b/pageserver/src/layered_repository/snapshot_layer.rs index e0f4e77995..91860852b0 100644 --- a/pageserver/src/layered_repository/snapshot_layer.rs +++ b/pageserver/src/layered_repository/snapshot_layer.rs @@ -37,6 +37,7 @@ //! A snapshot file is constructed using the 'bookfile' crate. Each file consists of two //! parts: the page versions and the relation sizes. They are stored as separate chapters. //! +use crate::layered_repository::page_history::PageHistory; use crate::layered_repository::storage_layer::{ Layer, PageReconstructData, PageVersion, SegmentTag, }; @@ -236,7 +237,7 @@ pub struct SnapshotLayerInner { /// All versions of all pages in the file are are kept here. /// Indexed by block number and LSN. - page_versions: BTreeMap<(u32, Lsn), PageVersion>, + pages: BTreeMap, /// `relsizes` tracks the size of the relation at different points in time. relsizes: BTreeMap, @@ -270,6 +271,7 @@ impl Layer for SnapshotLayer { lsn: Lsn, reconstruct_data: &mut PageReconstructData, ) -> Result> { + /* // Scan the BTreeMap backwards, starting from the given entry. let mut need_base_image_lsn: Option = Some(lsn); { @@ -303,6 +305,9 @@ impl Layer for SnapshotLayer { } Ok(need_base_image_lsn) + + */ + todo!() } /// Get size of the relation at given LSN @@ -380,7 +385,7 @@ impl SnapshotLayer { start_lsn: Lsn, end_lsn: Lsn, dropped: bool, - page_versions: BTreeMap<(u32, Lsn), PageVersion>, + pages: BTreeMap, relsizes: BTreeMap, ) -> Result { let snapfile = SnapshotLayer { @@ -393,10 +398,12 @@ impl SnapshotLayer { dropped, inner: Mutex::new(SnapshotLayerInner { loaded: true, - page_versions: page_versions, - relsizes: relsizes, + pages, + relsizes, }), }; + + /* let inner = snapfile.inner.lock().unwrap(); // Write the in-memory btreemaps into a file @@ -426,12 +433,16 @@ impl SnapshotLayer { drop(inner); Ok(snapfile) + */ + + todo!() } /// /// Load the contents of the file into memory /// fn load(&self) -> Result> { + /* // quick exit if already loaded let mut inner = self.inner.lock().unwrap(); @@ -469,6 +480,9 @@ impl SnapshotLayer { }; Ok(inner) + */ + + todo!() } /// Create SnapshotLayers representing all files on disk @@ -479,6 +493,7 @@ impl SnapshotLayer { timelineid: ZTimelineId, tenantid: ZTenantId, ) -> Result>> { + /* let path = conf.timeline_path(&timelineid, &tenantid); let mut snapfiles: Vec> = Vec::new(); @@ -506,6 +521,8 @@ impl SnapshotLayer { } } return Ok(snapfiles); + */ + todo!() } pub fn delete(&self) -> Result<()> { @@ -519,11 +536,14 @@ impl SnapshotLayer { /// it will need to be loaded back. /// pub fn unload(&self) -> Result<()> { + /* let mut inner = self.inner.lock().unwrap(); inner.page_versions = BTreeMap::new(); inner.relsizes = BTreeMap::new(); inner.loaded = false; Ok(()) + */ + todo!() } /// debugging function to print out the contents of the layer