From 3b82e806f28d69c92a67b145ceb33055aea52508 Mon Sep 17 00:00:00 2001 From: Patrick Insinger Date: Fri, 8 Oct 2021 00:08:03 -0700 Subject: [PATCH] pageserver - use VecMap for in-memory PageVersions --- pageserver/src/layered_repository.rs | 1 + .../src/layered_repository/delta_layer.rs | 6 +- .../src/layered_repository/inmemory_layer.rs | 62 +++-- .../src/layered_repository/page_versions.rs | 182 +++++++++++++ zenith_utils/src/lib.rs | 3 + zenith_utils/src/vec_map.rs | 239 ++++++++++++++++++ 6 files changed, 457 insertions(+), 36 deletions(-) create mode 100644 pageserver/src/layered_repository/page_versions.rs create mode 100644 zenith_utils/src/vec_map.rs diff --git a/pageserver/src/layered_repository.rs b/pageserver/src/layered_repository.rs index b420b381c5..0726baa86d 100644 --- a/pageserver/src/layered_repository.rs +++ b/pageserver/src/layered_repository.rs @@ -57,6 +57,7 @@ mod image_layer; mod inmemory_layer; mod interval_tree; mod layer_map; +mod page_versions; mod storage_layer; use delta_layer::DeltaLayer; diff --git a/pageserver/src/layered_repository/delta_layer.rs b/pageserver/src/layered_repository/delta_layer.rs index ad16a86030..4ea415c899 100644 --- a/pageserver/src/layered_repository/delta_layer.rs +++ b/pageserver/src/layered_repository/delta_layer.rs @@ -380,7 +380,7 @@ impl DeltaLayer { start_lsn: Lsn, end_lsn: Lsn, dropped: bool, - page_versions: impl Iterator, + page_versions: impl Iterator, relsizes: BTreeMap, ) -> Result { if seg.rel.is_blocky() { @@ -416,11 +416,11 @@ impl DeltaLayer { let mut page_version_writer = BlobWriter::new(book, PAGE_VERSIONS_CHAPTER); - for (key, page_version) in page_versions { + for (blknum, lsn, page_version) in page_versions { let buf = PageVersion::ser(page_version)?; let blob_range = page_version_writer.write_blob(&buf)?; - let old = inner.page_version_metas.insert(*key, blob_range); + let old = inner.page_version_metas.insert((blknum, lsn), blob_range); assert!(old.is_none()); } diff --git a/pageserver/src/layered_repository/inmemory_layer.rs b/pageserver/src/layered_repository/inmemory_layer.rs index f96b5e71d1..6e3b00350d 100644 --- a/pageserver/src/layered_repository/inmemory_layer.rs +++ b/pageserver/src/layered_repository/inmemory_layer.rs @@ -24,6 +24,8 @@ use std::sync::{Arc, RwLock}; use zenith_utils::accum::Accum; use zenith_utils::lsn::Lsn; +use super::page_versions::PageVersions; + pub struct InMemoryLayer { conf: &'static PageServerConf, tenantid: ZTenantId, @@ -58,7 +60,7 @@ pub struct InMemoryLayerInner { /// All versions of all pages in the layer are are kept here. /// Indexed by block number and LSN. /// - page_versions: BTreeMap<(u32, Lsn), PageVersion>, + page_versions: PageVersions, /// /// `segsizes` tracks the size of the segment at different points in time. @@ -172,13 +174,13 @@ impl Layer for InMemoryLayer { { let inner = self.inner.read().unwrap(); - // Scan the BTreeMap backwards, starting from reconstruct_data.lsn. - let minkey = (blknum, Lsn(0)); - let maxkey = (blknum, lsn); - let mut iter = inner + // Scan the page versions backwards, starting from `lsn`. + let iter = inner .page_versions - .range((Included(&minkey), Included(&maxkey))); - while let Some(((_blknum, _entry_lsn), entry)) = iter.next_back() { + .get_block_lsn_range(blknum, ..=lsn) + .iter() + .rev(); + for (_entry_lsn, entry) in iter { if let Some(img) = &entry.page_image { reconstruct_data.page_img = Some(img.clone()); need_image = false; @@ -279,13 +281,13 @@ impl Layer for InMemoryLayer { println!("segsizes {}: {}", k, v); } - for (k, v) in inner.page_versions.iter() { + for (blknum, lsn, pv) in inner.page_versions.ordered_page_version_iter(None) { println!( "blk {} at {}: {}/{}\n", - k.0, - k.1, - v.page_image.is_some(), - v.record.is_some() + blknum, + lsn, + pv.page_image.is_some(), + pv.record.is_some() ); } @@ -353,7 +355,7 @@ impl InMemoryLayer { incremental: false, inner: RwLock::new(InMemoryLayerInner { drop_lsn: None, - page_versions: BTreeMap::new(), + page_versions: PageVersions::default(), segsizes, writeable: true, }), @@ -403,7 +405,7 @@ impl InMemoryLayer { inner.check_writeable()?; - let old = inner.page_versions.insert((blknum, lsn), pv); + let old = inner.page_versions.append_or_update_last(blknum, lsn, pv); if old.is_some() { // We already had an entry for this LSN. That's odd.. @@ -448,7 +450,9 @@ impl InMemoryLayer { gapblknum, blknum ); - let old = inner.page_versions.insert((gapblknum, lsn), zeropv); + let old = inner + .page_versions + .append_or_update_last(gapblknum, lsn, zeropv); // We already had an entry for this LSN. That's odd.. if old.is_some() { @@ -550,7 +554,7 @@ impl InMemoryLayer { incremental: true, inner: RwLock::new(InMemoryLayerInner { drop_lsn: None, - page_versions: BTreeMap::new(), + page_versions: PageVersions::default(), segsizes, writeable: true, }), @@ -615,16 +619,9 @@ impl InMemoryLayer { } } - let mut before_page_versions = BTreeMap::new(); - let mut after_page_versions = BTreeMap::new(); - for ((blknum, lsn), pv) in inner.page_versions.iter() { - if *lsn > cutoff_lsn { - after_page_versions.insert((*blknum, *lsn), pv.clone()); - after_oldest_lsn.accum(min, *lsn); - } else { - before_page_versions.insert((*blknum, *lsn), pv.clone()); - } - } + let (before_page_versions, after_page_versions) = inner + .page_versions + .split_at(Lsn(cutoff_lsn.0 + 1), &mut after_oldest_lsn); let frozen = Arc::new(InMemoryLayer { conf: self.conf, @@ -654,7 +651,10 @@ impl InMemoryLayer { )?; let new_inner = new_open.inner.get_mut().unwrap(); - new_inner.page_versions.append(&mut after_page_versions); + // Ensure page_versions doesn't contain anything + // so we can just replace it + assert!(new_inner.page_versions.is_empty()); + new_inner.page_versions = after_page_versions; new_inner.segsizes.append(&mut after_segsizes); Some(Arc::new(new_open)) @@ -702,7 +702,7 @@ impl InMemoryLayer { self.start_lsn, drop_lsn, true, - inner.page_versions.iter(), + inner.page_versions.ordered_page_version_iter(None), inner.segsizes.clone(), )?; trace!( @@ -722,11 +722,7 @@ impl InMemoryLayer { before_segsizes.insert(*lsn, *size); } } - let mut before_page_versions = inner.page_versions.iter().filter(|tup| { - let ((_blknum, lsn), _pv) = tup; - - *lsn < end_lsn - }); + let mut before_page_versions = inner.page_versions.ordered_page_version_iter(Some(end_lsn)); let mut frozen_layers: Vec> = Vec::new(); diff --git a/pageserver/src/layered_repository/page_versions.rs b/pageserver/src/layered_repository/page_versions.rs new file mode 100644 index 0000000000..fe7df0caa9 --- /dev/null +++ b/pageserver/src/layered_repository/page_versions.rs @@ -0,0 +1,182 @@ +use std::{collections::HashMap, ops::RangeBounds, slice}; + +use zenith_utils::{accum::Accum, lsn::Lsn, vec_map::VecMap}; + +use super::storage_layer::PageVersion; + +const EMPTY_SLICE: &[(Lsn, PageVersion)] = &[]; + +#[derive(Debug, Default)] +pub struct PageVersions(HashMap>); + +impl PageVersions { + pub fn is_empty(&self) -> bool { + self.0.is_empty() + } + + pub fn append_or_update_last( + &mut self, + blknum: u32, + lsn: Lsn, + page_version: PageVersion, + ) -> Option { + let map = self.0.entry(blknum).or_insert_with(VecMap::default); + map.append_or_update_last(lsn, page_version).unwrap() + } + + /// Get all [`PageVersion`]s in a block + pub fn get_block_slice(&self, blknum: u32) -> &[(Lsn, PageVersion)] { + self.0 + .get(&blknum) + .map(VecMap::as_slice) + .unwrap_or(EMPTY_SLICE) + } + + /// Get a range of [`PageVersions`] in a block + pub fn get_block_lsn_range>( + &self, + blknum: u32, + range: R, + ) -> &[(Lsn, PageVersion)] { + self.0 + .get(&blknum) + .map(|vec_map| vec_map.slice_range(range)) + .unwrap_or(EMPTY_SLICE) + } + + /// Split the page version map into two. + /// + /// Left contains everything up to and not including [`cutoff_lsn`]. + /// Right contains [`cutoff_lsn`] and everything after. + pub fn split_at(&self, cutoff_lsn: Lsn, after_oldest_lsn: &mut Accum) -> (Self, Self) { + let mut before_blocks = HashMap::new(); + let mut after_blocks = HashMap::new(); + + for (blknum, vec_map) in self.0.iter() { + let (before_versions, after_versions) = vec_map.split_at(&cutoff_lsn); + + if !before_versions.is_empty() { + let old = before_blocks.insert(*blknum, before_versions); + assert!(old.is_none()); + } + + if !after_versions.is_empty() { + let (first_lsn, _first_pv) = &after_versions.as_slice()[0]; + after_oldest_lsn.accum(std::cmp::min, *first_lsn); + + let old = after_blocks.insert(*blknum, after_versions); + assert!(old.is_none()); + } + } + + (Self(before_blocks), Self(after_blocks)) + } + + /// Iterate through [`PageVersion`]s in (block, lsn) order. + /// If a [`cutoff_lsn`] is set, only show versions with `lsn < cutoff_lsn` + pub fn ordered_page_version_iter(&self, cutoff_lsn: Option) -> OrderedPageVersionIter<'_> { + let mut ordered_blocks: Vec = self.0.keys().cloned().collect(); + ordered_blocks.sort_unstable(); + + let slice = ordered_blocks + .first() + .map(|&blknum| self.get_block_slice(blknum)) + .unwrap_or(EMPTY_SLICE); + + OrderedPageVersionIter { + page_versions: self, + ordered_blocks, + cur_block_idx: 0, + cutoff_lsn, + cur_slice_iter: slice.iter(), + } + } +} + +pub struct OrderedPageVersionIter<'a> { + page_versions: &'a PageVersions, + + ordered_blocks: Vec, + cur_block_idx: usize, + + cutoff_lsn: Option, + + cur_slice_iter: slice::Iter<'a, (Lsn, PageVersion)>, +} + +impl OrderedPageVersionIter<'_> { + fn is_lsn_before_cutoff(&self, lsn: &Lsn) -> bool { + if let Some(cutoff_lsn) = self.cutoff_lsn.as_ref() { + lsn < cutoff_lsn + } else { + true + } + } +} + +impl<'a> Iterator for OrderedPageVersionIter<'a> { + type Item = (u32, Lsn, &'a PageVersion); + + fn next(&mut self) -> Option { + loop { + if let Some((lsn, page_version)) = self.cur_slice_iter.next() { + if self.is_lsn_before_cutoff(lsn) { + let blknum = self.ordered_blocks[self.cur_block_idx]; + return Some((blknum, *lsn, page_version)); + } + } + + let next_block_idx = self.cur_block_idx + 1; + let blknum: u32 = *self.ordered_blocks.get(next_block_idx)?; + self.cur_block_idx = next_block_idx; + self.cur_slice_iter = self.page_versions.get_block_slice(blknum).iter(); + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + const EMPTY_PAGE_VERSION: PageVersion = PageVersion { + page_image: None, + record: None, + }; + + #[test] + fn test_ordered_iter() { + let mut page_versions = PageVersions::default(); + const BLOCKS: u32 = 1000; + const LSNS: u64 = 50; + + for blknum in 0..BLOCKS { + for lsn in 0..LSNS { + let old = page_versions.append_or_update_last(blknum, Lsn(lsn), EMPTY_PAGE_VERSION); + assert!(old.is_none()); + } + } + + let mut iter = page_versions.ordered_page_version_iter(None); + for blknum in 0..BLOCKS { + for lsn in 0..LSNS { + let (actual_blknum, actual_lsn, _pv) = iter.next().unwrap(); + assert_eq!(actual_blknum, blknum); + assert_eq!(Lsn(lsn), actual_lsn); + } + } + assert!(iter.next().is_none()); + assert!(iter.next().is_none()); // should be robust against excessive next() calls + + const CUTOFF_LSN: Lsn = Lsn(30); + let mut iter = page_versions.ordered_page_version_iter(Some(CUTOFF_LSN)); + for blknum in 0..BLOCKS { + for lsn in 0..CUTOFF_LSN.0 { + let (actual_blknum, actual_lsn, _pv) = iter.next().unwrap(); + assert_eq!(actual_blknum, blknum); + assert_eq!(Lsn(lsn), actual_lsn); + } + } + assert!(iter.next().is_none()); + assert!(iter.next().is_none()); // should be robust against excessive next() calls + } +} diff --git a/zenith_utils/src/lib.rs b/zenith_utils/src/lib.rs index ca26be5df2..96b3cf5066 100644 --- a/zenith_utils/src/lib.rs +++ b/zenith_utils/src/lib.rs @@ -8,6 +8,9 @@ pub mod lsn; /// SeqWait allows waiting for a future sequence number to arrive pub mod seqwait; +/// append only ordered map implemented with a Vec +pub mod vec_map; + // Async version of SeqWait. Currently unused. // pub mod seqwait_async; diff --git a/zenith_utils/src/vec_map.rs b/zenith_utils/src/vec_map.rs new file mode 100644 index 0000000000..4753010188 --- /dev/null +++ b/zenith_utils/src/vec_map.rs @@ -0,0 +1,239 @@ +use std::{cmp::Ordering, ops::RangeBounds}; + +/// Ordered map datastructure implemented in a Vec. +/// Append only - can only add keys that are larger than the +/// current max key. +#[derive(Clone, Debug)] +pub struct VecMap(Vec<(K, V)>); + +impl Default for VecMap { + fn default() -> Self { + VecMap(Default::default()) + } +} + +#[derive(Debug)] +pub struct InvalidKey; + +impl VecMap { + pub fn is_empty(&self) -> bool { + self.0.is_empty() + } + + pub fn as_slice(&self) -> &[(K, V)] { + self.0.as_slice() + } + + /// This function may panic if given a range where the lower bound is + /// greater than the upper bound. + pub fn slice_range>(&self, range: R) -> &[(K, V)] { + use std::ops::Bound::*; + + let binary_search = |k: &K| self.0.binary_search_by_key(&k, extract_key); + + let start_idx = match range.start_bound() { + Unbounded => 0, + Included(k) => binary_search(k).unwrap_or_else(std::convert::identity), + Excluded(k) => match binary_search(k) { + Ok(idx) => idx + 1, + Err(idx) => idx, + }, + }; + + let end_idx = match range.end_bound() { + Unbounded => self.0.len(), + Included(k) => match binary_search(k) { + Ok(idx) => idx + 1, + Err(idx) => idx, + }, + Excluded(k) => binary_search(k).unwrap_or_else(std::convert::identity), + }; + + &self.0[start_idx..end_idx] + } + + /// Add a key value pair to the map. + /// If [`key`] is less than or equal to the current maximum key + /// the pair will not be added and InvalidKey error will be returned. + pub fn append(&mut self, key: K, value: V) -> Result<(), InvalidKey> { + if let Some((last_key, _last_value)) = self.0.last() { + if &key <= last_key { + return Err(InvalidKey); + } + } + + self.0.push((key, value)); + Ok(()) + } + + /// Update the maximum key value pair or add a new key value pair to the map. + /// If [`key`] is less than the current maximum key no updates or additions + /// will occur and InvalidKey error will be returned. + pub fn append_or_update_last(&mut self, key: K, mut value: V) -> Result, InvalidKey> { + if let Some((last_key, last_value)) = self.0.last_mut() { + match key.cmp(last_key) { + Ordering::Less => return Err(InvalidKey), + Ordering::Equal => { + std::mem::swap(last_value, &mut value); + return Ok(Some(value)); + } + Ordering::Greater => {} + } + } + + self.0.push((key, value)); + Ok(None) + } + + /// Split the map into two. + /// + /// The left map contains everything before [`cutoff`] (exclusive). + /// Right map contains [`cutoff`] and everything after (inclusive). + pub fn split_at(&self, cutoff: &K) -> (Self, Self) + where + K: Clone, + V: Clone, + { + let split_idx = self + .0 + .binary_search_by_key(&cutoff, extract_key) + .unwrap_or_else(std::convert::identity); + + ( + VecMap(self.0[..split_idx].to_vec()), + VecMap(self.0[split_idx..].to_vec()), + ) + } +} + +fn extract_key(entry: &(K, V)) -> &K { + &entry.0 +} + +#[cfg(test)] +mod tests { + use std::{collections::BTreeMap, ops::Bound}; + + use super::VecMap; + + #[test] + fn unbounded_range() { + let mut vec = VecMap::default(); + vec.append(0, ()).unwrap(); + + assert_eq!(vec.slice_range(0..0), &[]); + } + + #[test] + #[should_panic] + fn invalid_ordering_range() { + let mut vec = VecMap::default(); + vec.append(0, ()).unwrap(); + + #[allow(clippy::reversed_empty_ranges)] + vec.slice_range(1..0); + } + + #[test] + fn range_tests() { + let mut vec = VecMap::default(); + vec.append(0, ()).unwrap(); + vec.append(2, ()).unwrap(); + vec.append(4, ()).unwrap(); + + assert_eq!(vec.slice_range(0..0), &[]); + assert_eq!(vec.slice_range(0..1), &[(0, ())]); + assert_eq!(vec.slice_range(0..2), &[(0, ())]); + assert_eq!(vec.slice_range(0..3), &[(0, ()), (2, ())]); + + assert_eq!(vec.slice_range(..0), &[]); + assert_eq!(vec.slice_range(..1), &[(0, ())]); + + assert_eq!(vec.slice_range(..3), &[(0, ()), (2, ())]); + assert_eq!(vec.slice_range(..3), &[(0, ()), (2, ())]); + + assert_eq!(vec.slice_range(0..=0), &[(0, ())]); + assert_eq!(vec.slice_range(0..=1), &[(0, ())]); + assert_eq!(vec.slice_range(0..=2), &[(0, ()), (2, ())]); + assert_eq!(vec.slice_range(0..=3), &[(0, ()), (2, ())]); + + assert_eq!(vec.slice_range(..=0), &[(0, ())]); + assert_eq!(vec.slice_range(..=1), &[(0, ())]); + assert_eq!(vec.slice_range(..=2), &[(0, ()), (2, ())]); + assert_eq!(vec.slice_range(..=3), &[(0, ()), (2, ())]); + } + + struct BoundIter { + min: i32, + max: i32, + + next: Option>, + } + + impl BoundIter { + fn new(min: i32, max: i32) -> Self { + Self { + min, + max, + + next: Some(Bound::Unbounded), + } + } + } + + impl Iterator for BoundIter { + type Item = Bound; + + fn next(&mut self) -> Option { + let cur = self.next?; + + self.next = match &cur { + Bound::Unbounded => Some(Bound::Included(self.min)), + Bound::Included(x) => { + if *x >= self.max { + Some(Bound::Excluded(self.min)) + } else { + Some(Bound::Included(x + 1)) + } + } + Bound::Excluded(x) => { + if *x >= self.max { + None + } else { + Some(Bound::Excluded(x + 1)) + } + } + }; + + Some(cur) + } + } + + #[test] + fn range_exhaustive() { + let map: BTreeMap = (1..=7).step_by(2).map(|x| (x, ())).collect(); + let mut vec = VecMap::default(); + for &key in map.keys() { + vec.append(key, ()).unwrap(); + } + + const RANGE_MIN: i32 = 0; + const RANGE_MAX: i32 = 8; + for lower_bound in BoundIter::new(RANGE_MIN, RANGE_MAX) { + let ub_min = match lower_bound { + Bound::Unbounded => RANGE_MIN, + Bound::Included(x) => x, + Bound::Excluded(x) => x + 1, + }; + for upper_bound in BoundIter::new(ub_min, RANGE_MAX) { + let map_range: Vec<(i32, ())> = map + .range((lower_bound, upper_bound)) + .map(|(&x, _)| (x, ())) + .collect(); + let vec_slice = vec.slice_range((lower_bound, upper_bound)); + + assert_eq!(map_range, vec_slice); + } + } + } +}