From 2975c26de7ed7905e3ea413edd07a47e9a4a9a90 Mon Sep 17 00:00:00 2001 From: Patrick Insinger Date: Tue, 12 Oct 2021 22:40:05 -0700 Subject: [PATCH] pageserver - PageVersion heap first pass --- .../src/layered_repository/delta_layer.rs | 7 +- .../src/layered_repository/page_versions.rs | 122 ++++++++++++++---- 2 files changed, 103 insertions(+), 26 deletions(-) diff --git a/pageserver/src/layered_repository/delta_layer.rs b/pageserver/src/layered_repository/delta_layer.rs index e93eddb7e6..bdfff78125 100644 --- a/pageserver/src/layered_repository/delta_layer.rs +++ b/pageserver/src/layered_repository/delta_layer.rs @@ -350,7 +350,7 @@ impl DeltaLayer { /// data structure with two btreemaps as we do, so passing the btreemaps is currently /// expedient. #[allow(clippy::too_many_arguments)] - pub fn create<'a>( + pub fn create( conf: &'static PageServerConf, timelineid: ZTimelineId, tenantid: ZTenantId, @@ -358,7 +358,7 @@ impl DeltaLayer { start_lsn: Lsn, end_lsn: Lsn, dropped: bool, - page_versions: impl Iterator, + page_versions: impl Iterator, relsizes: VecMap, ) -> Result { if seg.rel.is_blocky() { @@ -393,7 +393,8 @@ impl DeltaLayer { let mut page_version_writer = BlobWriter::new(book, PAGE_VERSIONS_CHAPTER); for (blknum, lsn, page_version) in page_versions { - let buf = PageVersion::ser(page_version)?; + // TODO avoid deserializing and then reserializing + let buf = PageVersion::ser(&page_version)?; let blob_range = page_version_writer.write_blob(&buf)?; inner diff --git a/pageserver/src/layered_repository/page_versions.rs b/pageserver/src/layered_repository/page_versions.rs index 2d5ce13a56..0de5c9118c 100644 --- a/pageserver/src/layered_repository/page_versions.rs +++ b/pageserver/src/layered_repository/page_versions.rs @@ -1,13 +1,20 @@ -use std::{collections::HashMap, ops::RangeBounds, slice}; +use std::{ + collections::HashMap, + ops::{Range, RangeBounds}, + slice, +}; -use zenith_utils::{lsn::Lsn, vec_map::VecMap}; +use zenith_utils::{bin_ser::LeSer, lsn::Lsn, vec_map::VecMap}; use super::storage_layer::PageVersion; -const EMPTY_SLICE: &[(Lsn, PageVersion)] = &[]; +const EMPTY_SLICE: &[(Lsn, Range)] = &[]; #[derive(Debug, Default)] -pub struct PageVersions(HashMap>); +pub struct PageVersions { + heap: Vec, + ranges: HashMap>>, +} impl PageVersions { pub fn append_or_update_last( @@ -16,17 +23,43 @@ impl PageVersions { lsn: Lsn, page_version: PageVersion, ) -> Option { - let map = self.0.entry(blknum).or_insert_with(VecMap::default); - map.append_or_update_last(lsn, page_version).unwrap() + let mut new_bytes = PageVersion::ser(&page_version).unwrap(); + + let map = self.ranges.entry(blknum).or_insert_with(VecMap::default); + + if let Some((last_lsn, last_range)) = map.as_slice().last() { + if lsn == *last_lsn { + let old_bytes = &self.heap[last_range.clone()]; + if old_bytes == new_bytes { + return Some(page_version); + } + // TODO optimize for case when old_bytes.len() >= new_bytes.len() + } + } + + let new_range = self.heap.len()..self.heap.len() + new_bytes.len(); + self.heap.append(&mut new_bytes); + map.append_or_update_last(lsn, new_range) + .unwrap() + .map(|old_range| { + let old_bytes = &self.heap[old_range]; + PageVersion::des(old_bytes).unwrap() + }) } /// Get all [`PageVersion`]s in a block pub fn iter_block(&self, blknum: u32) -> BlockVersionIter<'_> { - self.0 + let range_iter = self + .ranges .get(&blknum) .map(VecMap::as_slice) .unwrap_or(EMPTY_SLICE) - .iter() + .iter(); + + BlockVersionIter { + heap: &self.heap, + range_iter, + } } /// Get a range of [`PageVersions`] in a block @@ -35,35 +68,78 @@ impl PageVersions { blknum: u32, range: R, ) -> BlockVersionIter<'_> { - self.0 + let range_iter = self + .ranges .get(&blknum) .map(|vec_map| vec_map.slice_range(range)) .unwrap_or(EMPTY_SLICE) - .iter() + .iter(); + + BlockVersionIter { + heap: &self.heap, + range_iter, + } } /// Iterate through [`PageVersion`]s in (block, lsn) order. /// If a [`cutoff_lsn`] is set, only show versions with `lsn < cutoff_lsn` pub fn ordered_page_version_iter(&self, cutoff_lsn: Option) -> OrderedPageVersionIter<'_> { - let mut ordered_blocks: Vec = self.0.keys().cloned().collect(); + let mut ordered_blocks: Vec = self.ranges.keys().cloned().collect(); ordered_blocks.sort_unstable(); - let iter = ordered_blocks + let cur_block_iter = ordered_blocks .first() .map(|&blknum| self.iter_block(blknum)) - .unwrap_or_else(|| EMPTY_SLICE.iter()); + .unwrap_or_else(|| { + let empty_iter = EMPTY_SLICE.iter(); + BlockVersionIter { + heap: &self.heap, + range_iter: empty_iter, + } + }); OrderedPageVersionIter { page_versions: self, ordered_blocks, cur_block_idx: 0, cutoff_lsn, - cur_slice_iter: iter, + cur_block_iter, } } } -pub type BlockVersionIter<'a> = std::slice::Iter<'a, (Lsn, PageVersion)>; +pub struct BlockVersionIter<'a> { + heap: &'a Vec, + range_iter: slice::Iter<'a, (Lsn, Range)>, +} + +impl BlockVersionIter<'_> { + fn get_iter_result(&self, tuple: Option<&(Lsn, Range)>) -> Option<(Lsn, PageVersion)> { + let (lsn, range) = tuple?; + let range = range.clone(); + + let pv_bytes = &self.heap[range]; + let page_version = PageVersion::des(pv_bytes).unwrap(); + + Some((*lsn, page_version)) + } +} + +impl Iterator for BlockVersionIter<'_> { + type Item = (Lsn, PageVersion); + + fn next(&mut self) -> Option { + let tuple = self.range_iter.next(); + self.get_iter_result(tuple) + } +} + +impl DoubleEndedIterator for BlockVersionIter<'_> { + fn next_back(&mut self) -> Option { + let tuple = self.range_iter.next_back(); + self.get_iter_result(tuple) + } +} pub struct OrderedPageVersionIter<'a> { page_versions: &'a PageVersions, @@ -73,35 +149,35 @@ pub struct OrderedPageVersionIter<'a> { cutoff_lsn: Option, - cur_slice_iter: slice::Iter<'a, (Lsn, PageVersion)>, + cur_block_iter: BlockVersionIter<'a>, } impl OrderedPageVersionIter<'_> { - fn is_lsn_before_cutoff(&self, lsn: &Lsn) -> bool { + fn is_lsn_before_cutoff(&self, lsn: Lsn) -> bool { if let Some(cutoff_lsn) = self.cutoff_lsn.as_ref() { - lsn < cutoff_lsn + lsn < *cutoff_lsn } else { true } } } -impl<'a> Iterator for OrderedPageVersionIter<'a> { - type Item = (u32, Lsn, &'a PageVersion); +impl Iterator for OrderedPageVersionIter<'_> { + type Item = (u32, Lsn, PageVersion); fn next(&mut self) -> Option { loop { - if let Some((lsn, page_version)) = self.cur_slice_iter.next() { + if let Some((lsn, page_version)) = self.cur_block_iter.next() { if self.is_lsn_before_cutoff(lsn) { let blknum = self.ordered_blocks[self.cur_block_idx]; - return Some((blknum, *lsn, page_version)); + return Some((blknum, lsn, page_version)); } } let next_block_idx = self.cur_block_idx + 1; let blknum: u32 = *self.ordered_blocks.get(next_block_idx)?; self.cur_block_idx = next_block_idx; - self.cur_slice_iter = self.page_versions.iter_block(blknum); + self.cur_block_iter = self.page_versions.iter_block(blknum); } } }