pageserver - reserve while building delta metadata

This commit is contained in:
Patrick Insinger
2021-10-08 12:04:01 -07:00
parent 48a6bbe4c1
commit c4fb509aa2
4 changed files with 68 additions and 68 deletions

View File

@@ -65,6 +65,7 @@ use zenith_utils::bin_ser::BeSer;
use zenith_utils::lsn::Lsn;
use super::blob::{read_blob, BlobRange};
use super::page_versions::OrderedBlockIter;
// Magic constant to identify a Zenith delta file
pub const DELTA_FILE_MAGIC: u32 = 0x5A616E01;
@@ -374,7 +375,7 @@ impl DeltaLayer {
/// data structure with two btreemaps as we do, so passing the btreemaps is currently
/// expedient.
#[allow(clippy::too_many_arguments)]
pub fn create<'a>(
pub fn create(
conf: &'static PageServerConf,
timelineid: ZTimelineId,
tenantid: ZTenantId,
@@ -382,14 +383,14 @@ impl DeltaLayer {
start_lsn: Lsn,
end_lsn: Lsn,
dropped: bool,
page_versions: impl Iterator<Item = (u32, Lsn, &'a PageVersion)>,
page_versions: OrderedBlockIter,
relsizes: VecMap<Lsn, u32>,
) -> Result<DeltaLayer> {
if seg.rel.is_blocky() {
assert!(!relsizes.is_empty());
}
let delta_layer = DeltaLayer {
let mut delta_layer = DeltaLayer {
path_or_conf: PathOrConf::Conf(conf),
timelineid,
tenantid,
@@ -403,13 +404,14 @@ impl DeltaLayer {
relsizes,
}),
};
let mut inner = delta_layer.inner.lock().unwrap();
// Write the in-memory btreemaps into a file
let path = delta_layer
.path()
.expect("DeltaLayer is supposed to have a layer path on disk");
let inner = delta_layer.inner.get_mut().unwrap();
// Note: This overwrites any existing file. There shouldn't be any.
// FIXME: throw an error instead?
let file = File::create(&path)?;
@@ -418,14 +420,18 @@ impl DeltaLayer {
let mut page_version_writer = BlobWriter::new(book, PAGE_VERSIONS_CHAPTER);
for (blknum, lsn, page_version) in page_versions {
let buf = PageVersion::ser(page_version)?;
let blob_range = page_version_writer.write_blob(&buf)?;
for (blknum, history) in page_versions {
inner.page_version_metas.reserve(history.len());
inner
.page_version_metas
.append((blknum, lsn), blob_range)
.unwrap();
for (lsn, page_version) in history {
let buf = PageVersion::ser(page_version)?;
let blob_range = page_version_writer.write_blob(&buf)?;
inner
.page_version_metas
.append((blknum, *lsn), blob_range)
.unwrap();
}
}
let book = page_version_writer.close()?;
@@ -462,8 +468,6 @@ impl DeltaLayer {
trace!("saved {}", &path.display());
drop(inner);
Ok(delta_layer)
}

View File

@@ -280,14 +280,16 @@ impl Layer for InMemoryLayer {
println!("segsizes {}: {}", k, v);
}
for (blknum, lsn, pv) in inner.page_versions.ordered_page_version_iter(None) {
println!(
"blk {} at {}: {}/{}\n",
blknum,
lsn,
pv.page_image.is_some(),
pv.record.is_some()
);
for (blknum, history) in inner.page_versions.ordered_block_iter(None) {
for (lsn, pv) in history {
println!(
"blk {} at {}: {}/{}\n",
blknum,
lsn,
pv.page_image.is_some(),
pv.record.is_some()
);
}
}
Ok(())
@@ -698,7 +700,7 @@ impl InMemoryLayer {
self.start_lsn,
drop_lsn,
true,
inner.page_versions.ordered_page_version_iter(None),
inner.page_versions.ordered_block_iter(None),
inner.segsizes.clone(),
)?;
trace!(
@@ -712,7 +714,7 @@ impl InMemoryLayer {
let end_lsn = self.end_lsn.unwrap();
let mut before_page_versions = inner.page_versions.ordered_page_version_iter(Some(end_lsn));
let mut before_page_versions = inner.page_versions.ordered_block_iter(Some(end_lsn));
let mut frozen_layers: Vec<Arc<dyn Layer>> = Vec::new();

View File

@@ -1,4 +1,4 @@
use std::{collections::HashMap, ops::RangeBounds, slice};
use std::{collections::HashMap, ops::RangeBounds};
use zenith_utils::{accum::Accum, lsn::Lsn, vec_map::VecMap};
@@ -72,65 +72,49 @@ impl PageVersions {
(Self(before_blocks), Self(after_blocks))
}
/// Iterate through [`PageVersion`]s in (block, lsn) order.
/// If a [`cutoff_lsn`] is set, only show versions with `lsn < cutoff_lsn`
pub fn ordered_page_version_iter(&self, cutoff_lsn: Option<Lsn>) -> OrderedPageVersionIter<'_> {
/// Iterate through block-history pairs in block order.
/// If a [`cutoff_lsn`] is set, only include history with `lsn < cutoff_lsn`
pub fn ordered_block_iter(&self, cutoff_lsn: Option<Lsn>) -> OrderedBlockIter<'_> {
let mut ordered_blocks: Vec<u32> = self.0.keys().cloned().collect();
ordered_blocks.sort_unstable();
let slice = ordered_blocks
.first()
.map(|&blknum| self.get_block_slice(blknum))
.unwrap_or(EMPTY_SLICE);
OrderedPageVersionIter {
OrderedBlockIter {
page_versions: self,
ordered_blocks,
cur_block_idx: 0,
cutoff_lsn,
cur_slice_iter: slice.iter(),
}
}
}
pub struct OrderedPageVersionIter<'a> {
pub struct OrderedBlockIter<'a> {
page_versions: &'a PageVersions,
ordered_blocks: Vec<u32>,
cur_block_idx: usize,
cutoff_lsn: Option<Lsn>,
cur_slice_iter: slice::Iter<'a, (Lsn, PageVersion)>,
}
impl OrderedPageVersionIter<'_> {
fn is_lsn_before_cutoff(&self, lsn: &Lsn) -> bool {
if let Some(cutoff_lsn) = self.cutoff_lsn.as_ref() {
lsn < cutoff_lsn
} else {
true
}
}
}
impl<'a> Iterator for OrderedPageVersionIter<'a> {
type Item = (u32, Lsn, &'a PageVersion);
impl<'a> Iterator for OrderedBlockIter<'a> {
type Item = (u32, &'a [(Lsn, PageVersion)]);
fn next(&mut self) -> Option<Self::Item> {
loop {
if let Some((lsn, page_version)) = self.cur_slice_iter.next() {
if self.is_lsn_before_cutoff(lsn) {
let blknum = self.ordered_blocks[self.cur_block_idx];
return Some((blknum, *lsn, page_version));
}
}
while self.cur_block_idx < self.ordered_blocks.len() {
let blknum = self.ordered_blocks[self.cur_block_idx];
self.cur_block_idx += 1;
let next_block_idx = self.cur_block_idx + 1;
let blknum: u32 = *self.ordered_blocks.get(next_block_idx)?;
self.cur_block_idx = next_block_idx;
self.cur_slice_iter = self.page_versions.get_block_slice(blknum).iter();
if let Some(cutoff_lsn) = self.cutoff_lsn.as_ref() {
let slice = self.page_versions.get_block_lsn_range(blknum, ..cutoff_lsn);
if !slice.is_empty() {
return Some((blknum, slice));
}
} else {
return Some((blknum, self.page_versions.get_block_slice(blknum)));
}
}
None
}
}
@@ -156,27 +140,33 @@ mod tests {
}
}
let mut iter = page_versions.ordered_page_version_iter(None);
let mut iter = page_versions.ordered_block_iter(None);
for blknum in 0..BLOCKS {
let (actual_blknum, slice) = iter.next().unwrap();
assert_eq!(actual_blknum, blknum);
assert_eq!(slice.len(), LSNS as usize);
for lsn in 0..LSNS {
let (actual_blknum, actual_lsn, _pv) = iter.next().unwrap();
assert_eq!(actual_blknum, blknum);
assert_eq!(Lsn(lsn), actual_lsn);
assert_eq!(Lsn(lsn), slice[lsn as usize].0);
}
}
assert!(iter.next().is_none());
assert!(iter.next().is_none()); // should be robust against excessive next() calls
const CUTOFF_LSN: Lsn = Lsn(30);
let mut iter = page_versions.ordered_page_version_iter(Some(CUTOFF_LSN));
let mut iter = page_versions.ordered_block_iter(Some(CUTOFF_LSN));
for blknum in 0..BLOCKS {
let (actual_blknum, slice) = iter.next().unwrap();
assert_eq!(actual_blknum, blknum);
assert_eq!(slice.len(), CUTOFF_LSN.0 as usize);
for lsn in 0..CUTOFF_LSN.0 {
let (actual_blknum, actual_lsn, _pv) = iter.next().unwrap();
assert_eq!(actual_blknum, blknum);
assert_eq!(Lsn(lsn), actual_lsn);
assert_eq!(Lsn(lsn), slice[lsn as usize].0);
}
}
assert!(iter.next().is_none());
assert!(iter.next().is_none()); // should be robust against excessive next() calls
let mut iter = page_versions.ordered_block_iter(Some(Lsn(0)));
assert!(iter.next().is_none());
assert!(iter.next().is_none()); // should be robust against excessive next() calls
}
}

View File

@@ -124,6 +124,10 @@ impl<K: Ord, V> VecMap<K, V> {
Ok(())
}
pub fn reserve(&mut self, additional: usize) {
self.0.reserve(additional);
}
}
fn extract_key<K, V>(entry: &(K, V)) -> &K {