From b7c3f02ed1ed46a5477cb55d48cc46f7244ecc9d Mon Sep 17 00:00:00 2001 From: Patrick Insinger Date: Tue, 28 Sep 2021 23:45:20 -0700 Subject: [PATCH] page versions --- .../src/layered_repository/delta_layer.rs | 6 +- .../src/layered_repository/inmemory_layer.rs | 109 ++++++++++++++---- 2 files changed, 87 insertions(+), 28 deletions(-) diff --git a/pageserver/src/layered_repository/delta_layer.rs b/pageserver/src/layered_repository/delta_layer.rs index 2ea0548c88..54fa795084 100644 --- a/pageserver/src/layered_repository/delta_layer.rs +++ b/pageserver/src/layered_repository/delta_layer.rs @@ -391,7 +391,7 @@ impl DeltaLayer { end_lsn: Lsn, dropped: bool, predecessor: Option>, - page_versions: impl Iterator, + page_versions: impl Iterator, relsizes: OrderedVec, ) -> Result { let delta_layer = DeltaLayer { @@ -424,11 +424,11 @@ impl DeltaLayer { let mut page_version_writer = BlobWriter::new(book, PAGE_VERSIONS_CHAPTER); - for (key, page_version) in page_versions { + for (blknum, lsn, page_version) in page_versions { let buf = PageVersion::ser(page_version)?; let blob_range = page_version_writer.write_blob(&buf)?; - inner.page_version_metas.append(*key, blob_range); + inner.page_version_metas.append((*blknum, *lsn), blob_range); } let book = page_version_writer.close()?; diff --git a/pageserver/src/layered_repository/inmemory_layer.rs b/pageserver/src/layered_repository/inmemory_layer.rs index 1e3d99f713..4d7f7169a0 100644 --- a/pageserver/src/layered_repository/inmemory_layer.rs +++ b/pageserver/src/layered_repository/inmemory_layer.rs @@ -18,6 +18,7 @@ use log::*; use std::cmp::min; use std::collections::BTreeMap; use std::ops::Bound::Included; +use std::ops::RangeBounds; use std::path::PathBuf; use std::sync::{Arc, RwLock}; use zenith_utils::ordered_vec::OrderedVec; @@ -48,6 +49,62 @@ pub struct InMemoryLayer { inner: RwLock, } +#[derive(Default)] +struct PageVersions(BTreeMap>); + +impl PageVersions { + fn range>(&self, blknum: u32, lsn_range: R) -> &[(Lsn, PageVersion)] { + match self.0.get(&blknum) { + Some(ov) => ov.range(lsn_range), + None => &[], + } + } + + fn update(&mut self, blknum: u32, lsn: Lsn, page_version: PageVersion) { + let ordered_vec = self.0.entry(blknum).or_insert_with(OrderedVec::default); + ordered_vec.append_update(lsn, page_version); + } + + fn iter(&self) -> PageVersionIter { + let mut map_iter = self.0.iter(); + let map_entry = map_iter + .next() + .map(|(blknum, ordered_vec)| (blknum, ordered_vec.iter())); + + PageVersionIter { + map_iter, + map_entry, + } + } + + fn is_empty(&self) -> bool { + self.0.is_empty() + } +} + +struct PageVersionIter<'a> { + map_iter: std::collections::btree_map::Iter<'a, u32, OrderedVec>, + map_entry: Option<(&'a u32, std::slice::Iter<'a, (Lsn, PageVersion)>)>, +} + +impl<'a> Iterator for PageVersionIter<'a> { + type Item = (&'a u32, &'a Lsn, &'a PageVersion); + + fn next(&mut self) -> Option { + loop { + let (blknum, iter) = self.map_entry.as_mut()?; + if let Some((lsn, pv)) = iter.next() { + return Some((blknum, lsn, pv)); + } + + self.map_entry = self + .map_iter + .next() + .map(|(blknum, ordered_vec)| (blknum, ordered_vec.iter())); + } + } +} + pub struct InMemoryLayerInner { /// If this relation was dropped, remember when that happened. drop_lsn: Option, @@ -56,7 +113,7 @@ pub struct InMemoryLayerInner { /// All versions of all pages in the layer are are kept here. /// Indexed by block number and LSN. /// - page_versions: BTreeMap<(u32, Lsn), PageVersion>, + page_versions: PageVersions, /// /// `segsizes` tracks the size of the segment at different points in time. @@ -171,12 +228,8 @@ impl Layer for InMemoryLayer { let inner = self.inner.read().unwrap(); // Scan the BTreeMap backwards, starting from reconstruct_data.lsn. - let minkey = (blknum, Lsn(0)); - let maxkey = (blknum, lsn); - let mut iter = inner - .page_versions - .range((Included(&minkey), Included(&maxkey))); - while let Some(((_blknum, _entry_lsn), entry)) = iter.next_back() { + let mut iter = inner.page_versions.range(blknum, ..=lsn).iter(); + while let Some((_entry_lsn, entry)) = iter.next_back() { if let Some(img) = &entry.page_image { reconstruct_data.page_img = Some(img.clone()); need_image = false; @@ -276,11 +329,11 @@ impl Layer for InMemoryLayer { println!("segsizes {}: {}", k, v); } - for (k, v) in inner.page_versions.iter() { + for (blknum, lsn, v) in inner.page_versions.iter() { println!( "blk {} at {}: {}/{}\n", - k.0, - k.1, + blknum, + lsn, v.page_image.is_some(), v.record.is_some() ); @@ -343,7 +396,7 @@ impl InMemoryLayer { oldest_pending_lsn, inner: RwLock::new(InMemoryLayerInner { drop_lsn: None, - page_versions: BTreeMap::new(), + page_versions: PageVersions::default(), segsizes: OrderedVec::default(), writeable: true, predecessor: None, @@ -394,8 +447,9 @@ impl InMemoryLayer { inner.check_writeable()?; - let old = inner.page_versions.insert((blknum, lsn), pv); + inner.page_versions.update(blknum, lsn, pv); + /* if old.is_some() { // We already had an entry for this LSN. That's odd.. warn!( @@ -403,6 +457,7 @@ impl InMemoryLayer { self.seg.rel, blknum, lsn ); } + */ // Also update the relation size, if this extended the relation. if self.seg.rel.is_blocky() { @@ -439,15 +494,17 @@ impl InMemoryLayer { gapblknum, blknum ); - let old = inner.page_versions.insert((gapblknum, lsn), zeropv); + inner.page_versions.update(gapblknum, lsn, zeropv); // We already had an entry for this LSN. That's odd.. + /* if old.is_some() { warn!( "Page version of rel {} blk {} at {} already exists", self.seg.rel, blknum, lsn ); } + */ } inner.segsizes.append_update(lsn, newsize); @@ -531,7 +588,7 @@ impl InMemoryLayer { oldest_pending_lsn, inner: RwLock::new(InMemoryLayerInner { drop_lsn: None, - page_versions: BTreeMap::new(), + page_versions: PageVersions::default(), segsizes, writeable: true, predecessor: Some(src), @@ -598,14 +655,16 @@ impl InMemoryLayer { } } - let mut before_page_versions = BTreeMap::new(); - let mut after_page_versions = BTreeMap::new(); - for ((blknum, lsn), pv) in inner.page_versions.iter() { - if *lsn > cutoff_lsn { - after_page_versions.insert((*blknum, *lsn), pv.clone()); - after_oldest_lsn.accum(min, *lsn); - } else { - before_page_versions.insert((*blknum, *lsn), pv.clone()); + let mut before_page_versions = PageVersions::default(); + let mut after_page_versions = PageVersions::default(); + for (blknum, ordered_vec) in inner.page_versions.0.iter() { + for (lsn, pv) in ordered_vec.iter() { + if *lsn > cutoff_lsn { + after_page_versions.update(*blknum, *lsn, pv.clone()); + after_oldest_lsn.accum(min, *lsn); + } else { + before_page_versions.update(*blknum, *lsn, pv.clone()); + } } } @@ -637,7 +696,7 @@ impl InMemoryLayer { )?; let new_inner = new_open.inner.get_mut().unwrap(); - new_inner.page_versions.append(&mut after_page_versions); + new_inner.page_versions = after_page_versions; new_inner.segsizes.extend(after_segsizes); Some(Arc::new(new_open)) @@ -708,8 +767,8 @@ impl InMemoryLayer { before_segsizes.append(*lsn, *size); } } - let mut before_page_versions = inner.page_versions.iter().filter(|tup| { - let ((_blknum, lsn), _pv) = tup; + let mut before_page_versions = inner.page_versions.iter().filter(|&tup| { + let (_blknum, lsn, _pv) = tup; *lsn < end_lsn });