From c4fb509aa23863ebf3c35d2d17dacd6feb03f2b0 Mon Sep 17 00:00:00 2001 From: Patrick Insinger Date: Fri, 8 Oct 2021 12:04:01 -0700 Subject: [PATCH] pageserver - reserve while building delta metadata --- .../src/layered_repository/delta_layer.rs | 30 ++++--- .../src/layered_repository/inmemory_layer.rs | 22 ++--- .../src/layered_repository/page_versions.rs | 80 ++++++++----------- zenith_utils/src/vec_map.rs | 4 + 4 files changed, 68 insertions(+), 68 deletions(-) diff --git a/pageserver/src/layered_repository/delta_layer.rs b/pageserver/src/layered_repository/delta_layer.rs index b96ea88920..945603df44 100644 --- a/pageserver/src/layered_repository/delta_layer.rs +++ b/pageserver/src/layered_repository/delta_layer.rs @@ -65,6 +65,7 @@ use zenith_utils::bin_ser::BeSer; use zenith_utils::lsn::Lsn; use super::blob::{read_blob, BlobRange}; +use super::page_versions::OrderedBlockIter; // Magic constant to identify a Zenith delta file pub const DELTA_FILE_MAGIC: u32 = 0x5A616E01; @@ -374,7 +375,7 @@ impl DeltaLayer { /// data structure with two btreemaps as we do, so passing the btreemaps is currently /// expedient. #[allow(clippy::too_many_arguments)] - pub fn create<'a>( + pub fn create( conf: &'static PageServerConf, timelineid: ZTimelineId, tenantid: ZTenantId, @@ -382,14 +383,14 @@ impl DeltaLayer { start_lsn: Lsn, end_lsn: Lsn, dropped: bool, - page_versions: impl Iterator, + page_versions: OrderedBlockIter, relsizes: VecMap, ) -> Result { if seg.rel.is_blocky() { assert!(!relsizes.is_empty()); } - let delta_layer = DeltaLayer { + let mut delta_layer = DeltaLayer { path_or_conf: PathOrConf::Conf(conf), timelineid, tenantid, @@ -403,13 +404,14 @@ impl DeltaLayer { relsizes, }), }; - let mut inner = delta_layer.inner.lock().unwrap(); // Write the in-memory btreemaps into a file let path = delta_layer .path() .expect("DeltaLayer is supposed to have a layer path on disk"); + let inner = delta_layer.inner.get_mut().unwrap(); + // Note: This overwrites any existing file. There shouldn't be any. // FIXME: throw an error instead? let file = File::create(&path)?; @@ -418,14 +420,18 @@ impl DeltaLayer { let mut page_version_writer = BlobWriter::new(book, PAGE_VERSIONS_CHAPTER); - for (blknum, lsn, page_version) in page_versions { - let buf = PageVersion::ser(page_version)?; - let blob_range = page_version_writer.write_blob(&buf)?; + for (blknum, history) in page_versions { + inner.page_version_metas.reserve(history.len()); - inner - .page_version_metas - .append((blknum, lsn), blob_range) - .unwrap(); + for (lsn, page_version) in history { + let buf = PageVersion::ser(page_version)?; + let blob_range = page_version_writer.write_blob(&buf)?; + + inner + .page_version_metas + .append((blknum, *lsn), blob_range) + .unwrap(); + } } let book = page_version_writer.close()?; @@ -462,8 +468,6 @@ impl DeltaLayer { trace!("saved {}", &path.display()); - drop(inner); - Ok(delta_layer) } diff --git a/pageserver/src/layered_repository/inmemory_layer.rs b/pageserver/src/layered_repository/inmemory_layer.rs index 5f5307fec0..c58f7b5b22 100644 --- a/pageserver/src/layered_repository/inmemory_layer.rs +++ b/pageserver/src/layered_repository/inmemory_layer.rs @@ -280,14 +280,16 @@ impl Layer for InMemoryLayer { println!("segsizes {}: {}", k, v); } - for (blknum, lsn, pv) in inner.page_versions.ordered_page_version_iter(None) { - println!( - "blk {} at {}: {}/{}\n", - blknum, - lsn, - pv.page_image.is_some(), - pv.record.is_some() - ); + for (blknum, history) in inner.page_versions.ordered_block_iter(None) { + for (lsn, pv) in history { + println!( + "blk {} at {}: {}/{}\n", + blknum, + lsn, + pv.page_image.is_some(), + pv.record.is_some() + ); + } } Ok(()) @@ -698,7 +700,7 @@ impl InMemoryLayer { self.start_lsn, drop_lsn, true, - inner.page_versions.ordered_page_version_iter(None), + inner.page_versions.ordered_block_iter(None), inner.segsizes.clone(), )?; trace!( @@ -712,7 +714,7 @@ impl InMemoryLayer { let end_lsn = self.end_lsn.unwrap(); - let mut before_page_versions = inner.page_versions.ordered_page_version_iter(Some(end_lsn)); + let mut before_page_versions = inner.page_versions.ordered_block_iter(Some(end_lsn)); let mut frozen_layers: Vec> = Vec::new(); diff --git a/pageserver/src/layered_repository/page_versions.rs b/pageserver/src/layered_repository/page_versions.rs index fe7df0caa9..2c81a816c1 100644 --- a/pageserver/src/layered_repository/page_versions.rs +++ b/pageserver/src/layered_repository/page_versions.rs @@ -1,4 +1,4 @@ -use std::{collections::HashMap, ops::RangeBounds, slice}; +use std::{collections::HashMap, ops::RangeBounds}; use zenith_utils::{accum::Accum, lsn::Lsn, vec_map::VecMap}; @@ -72,65 +72,49 @@ impl PageVersions { (Self(before_blocks), Self(after_blocks)) } - /// Iterate through [`PageVersion`]s in (block, lsn) order. - /// If a [`cutoff_lsn`] is set, only show versions with `lsn < cutoff_lsn` - pub fn ordered_page_version_iter(&self, cutoff_lsn: Option) -> OrderedPageVersionIter<'_> { + /// Iterate through block-history pairs in block order. + /// If a [`cutoff_lsn`] is set, only include history with `lsn < cutoff_lsn` + pub fn ordered_block_iter(&self, cutoff_lsn: Option) -> OrderedBlockIter<'_> { let mut ordered_blocks: Vec = self.0.keys().cloned().collect(); ordered_blocks.sort_unstable(); - let slice = ordered_blocks - .first() - .map(|&blknum| self.get_block_slice(blknum)) - .unwrap_or(EMPTY_SLICE); - - OrderedPageVersionIter { + OrderedBlockIter { page_versions: self, ordered_blocks, cur_block_idx: 0, cutoff_lsn, - cur_slice_iter: slice.iter(), } } } -pub struct OrderedPageVersionIter<'a> { +pub struct OrderedBlockIter<'a> { page_versions: &'a PageVersions, ordered_blocks: Vec, cur_block_idx: usize, cutoff_lsn: Option, - - cur_slice_iter: slice::Iter<'a, (Lsn, PageVersion)>, } -impl OrderedPageVersionIter<'_> { - fn is_lsn_before_cutoff(&self, lsn: &Lsn) -> bool { - if let Some(cutoff_lsn) = self.cutoff_lsn.as_ref() { - lsn < cutoff_lsn - } else { - true - } - } -} - -impl<'a> Iterator for OrderedPageVersionIter<'a> { - type Item = (u32, Lsn, &'a PageVersion); +impl<'a> Iterator for OrderedBlockIter<'a> { + type Item = (u32, &'a [(Lsn, PageVersion)]); fn next(&mut self) -> Option { - loop { - if let Some((lsn, page_version)) = self.cur_slice_iter.next() { - if self.is_lsn_before_cutoff(lsn) { - let blknum = self.ordered_blocks[self.cur_block_idx]; - return Some((blknum, *lsn, page_version)); - } - } + while self.cur_block_idx < self.ordered_blocks.len() { + let blknum = self.ordered_blocks[self.cur_block_idx]; + self.cur_block_idx += 1; - let next_block_idx = self.cur_block_idx + 1; - let blknum: u32 = *self.ordered_blocks.get(next_block_idx)?; - self.cur_block_idx = next_block_idx; - self.cur_slice_iter = self.page_versions.get_block_slice(blknum).iter(); + if let Some(cutoff_lsn) = self.cutoff_lsn.as_ref() { + let slice = self.page_versions.get_block_lsn_range(blknum, ..cutoff_lsn); + if !slice.is_empty() { + return Some((blknum, slice)); + } + } else { + return Some((blknum, self.page_versions.get_block_slice(blknum))); + } } + + None } } @@ -156,27 +140,33 @@ mod tests { } } - let mut iter = page_versions.ordered_page_version_iter(None); + let mut iter = page_versions.ordered_block_iter(None); for blknum in 0..BLOCKS { + let (actual_blknum, slice) = iter.next().unwrap(); + assert_eq!(actual_blknum, blknum); + assert_eq!(slice.len(), LSNS as usize); for lsn in 0..LSNS { - let (actual_blknum, actual_lsn, _pv) = iter.next().unwrap(); - assert_eq!(actual_blknum, blknum); - assert_eq!(Lsn(lsn), actual_lsn); + assert_eq!(Lsn(lsn), slice[lsn as usize].0); } } assert!(iter.next().is_none()); assert!(iter.next().is_none()); // should be robust against excessive next() calls const CUTOFF_LSN: Lsn = Lsn(30); - let mut iter = page_versions.ordered_page_version_iter(Some(CUTOFF_LSN)); + let mut iter = page_versions.ordered_block_iter(Some(CUTOFF_LSN)); for blknum in 0..BLOCKS { + let (actual_blknum, slice) = iter.next().unwrap(); + assert_eq!(actual_blknum, blknum); + assert_eq!(slice.len(), CUTOFF_LSN.0 as usize); for lsn in 0..CUTOFF_LSN.0 { - let (actual_blknum, actual_lsn, _pv) = iter.next().unwrap(); - assert_eq!(actual_blknum, blknum); - assert_eq!(Lsn(lsn), actual_lsn); + assert_eq!(Lsn(lsn), slice[lsn as usize].0); } } assert!(iter.next().is_none()); assert!(iter.next().is_none()); // should be robust against excessive next() calls + + let mut iter = page_versions.ordered_block_iter(Some(Lsn(0))); + assert!(iter.next().is_none()); + assert!(iter.next().is_none()); // should be robust against excessive next() calls } } diff --git a/zenith_utils/src/vec_map.rs b/zenith_utils/src/vec_map.rs index 0fd33bf489..86b68dac50 100644 --- a/zenith_utils/src/vec_map.rs +++ b/zenith_utils/src/vec_map.rs @@ -124,6 +124,10 @@ impl VecMap { Ok(()) } + + pub fn reserve(&mut self, additional: usize) { + self.0.reserve(additional); + } } fn extract_key(entry: &(K, V)) -> &K {