Compare commits

...

1 Commits

Author SHA1 Message Date
Patrick Insinger
b3de51296a pageserver - OrderedBlockIter 2021-10-09 00:49:59 -07:00
3 changed files with 48 additions and 88 deletions

View File

@@ -65,6 +65,7 @@ use zenith_utils::bin_ser::BeSer;
use zenith_utils::lsn::Lsn;
use super::blob::{read_blob, BlobRange};
use super::page_versions::OrderedBlockIter;
// Magic constant to identify a Zenith delta file
pub const DELTA_FILE_MAGIC: u32 = 0x5A616E01;
@@ -374,7 +375,7 @@ impl DeltaLayer {
/// data structure with two btreemaps as we do, so passing the btreemaps is currently
/// expedient.
#[allow(clippy::too_many_arguments)]
pub fn create<'a>(
pub fn create(
conf: &'static PageServerConf,
timelineid: ZTimelineId,
tenantid: ZTenantId,
@@ -382,7 +383,7 @@ impl DeltaLayer {
start_lsn: Lsn,
end_lsn: Lsn,
dropped: bool,
page_versions: impl Iterator<Item = (u32, Lsn, &'a PageVersion)>,
page_versions: OrderedBlockIter,
relsizes: VecMap<Lsn, u32>,
) -> Result<DeltaLayer> {
if seg.rel.is_blocky() {
@@ -418,14 +419,20 @@ impl DeltaLayer {
let mut page_version_writer = BlobWriter::new(book, PAGE_VERSIONS_CHAPTER);
for (blknum, lsn, page_version) in page_versions {
let buf = PageVersion::ser(page_version)?;
let blob_range = page_version_writer.write_blob(&buf)?;
for (blknum, history) in page_versions {
for (lsn, page_version) in history.as_slice() {
if lsn >= &end_lsn {
continue;
}
inner
.page_version_metas
.append((blknum, lsn), blob_range)
.unwrap();
let buf = PageVersion::ser(page_version)?;
let blob_range = page_version_writer.write_blob(&buf)?;
inner
.page_version_metas
.append((blknum, *lsn), blob_range)
.unwrap();
}
}
let book = page_version_writer.close()?;

View File

@@ -280,14 +280,16 @@ impl Layer for InMemoryLayer {
println!("segsizes {}: {}", k, v);
}
for (blknum, lsn, pv) in inner.page_versions.ordered_page_version_iter(None) {
println!(
"blk {} at {}: {}/{}\n",
blknum,
lsn,
pv.page_image.is_some(),
pv.record.is_some()
);
for (blknum, history) in inner.page_versions.ordered_block_iter() {
for (lsn, pv) in history.as_slice() {
println!(
"blk {} at {}: {}/{}\n",
blknum,
lsn,
pv.page_image.is_some(),
pv.record.is_some()
);
}
}
Ok(())
@@ -698,7 +700,7 @@ impl InMemoryLayer {
self.start_lsn,
drop_lsn,
true,
inner.page_versions.ordered_page_version_iter(None),
inner.page_versions.ordered_block_iter(),
inner.segsizes.clone(),
)?;
trace!(
@@ -712,8 +714,6 @@ impl InMemoryLayer {
let end_lsn = self.end_lsn.unwrap();
let mut before_page_versions = inner.page_versions.ordered_page_version_iter(Some(end_lsn));
let mut frozen_layers: Vec<Arc<dyn Layer>> = Vec::new();
if self.start_lsn != end_lsn {
@@ -728,7 +728,7 @@ impl InMemoryLayer {
self.start_lsn,
end_lsn,
false,
before_page_versions,
inner.page_versions.ordered_block_iter(),
before_segsizes,
)?;
frozen_layers.push(Arc::new(delta_layer));
@@ -739,7 +739,10 @@ impl InMemoryLayer {
end_lsn
);
} else {
assert!(before_page_versions.next().is_none());
for (_blknum, history) in inner.page_versions.ordered_block_iter() {
let (lsn, _pv) = history.as_slice().first().unwrap();
assert!(lsn >= &end_lsn);
}
}
drop(inner);

View File

@@ -1,4 +1,4 @@
use std::{collections::HashMap, ops::RangeBounds, slice};
use std::{collections::HashMap, ops::RangeBounds};
use zenith_utils::{accum::Accum, lsn::Lsn, vec_map::VecMap};
@@ -24,14 +24,6 @@ impl PageVersions {
map.append_or_update_last(lsn, page_version).unwrap()
}
/// Get all [`PageVersion`]s in a block
pub fn get_block_slice(&self, blknum: u32) -> &[(Lsn, PageVersion)] {
self.0
.get(&blknum)
.map(VecMap::as_slice)
.unwrap_or(EMPTY_SLICE)
}
/// Get a range of [`PageVersions`] in a block
pub fn get_block_lsn_range<R: RangeBounds<Lsn>>(
&self,
@@ -72,65 +64,33 @@ impl PageVersions {
(Self(before_blocks), Self(after_blocks))
}
/// Iterate through [`PageVersion`]s in (block, lsn) order.
/// If a [`cutoff_lsn`] is set, only show versions with `lsn < cutoff_lsn`
pub fn ordered_page_version_iter(&self, cutoff_lsn: Option<Lsn>) -> OrderedPageVersionIter<'_> {
/// Iterate through block-history pairs in block order.
pub fn ordered_block_iter(&self) -> OrderedBlockIter<'_> {
let mut ordered_blocks: Vec<u32> = self.0.keys().cloned().collect();
ordered_blocks.sort_unstable();
let slice = ordered_blocks
.first()
.map(|&blknum| self.get_block_slice(blknum))
.unwrap_or(EMPTY_SLICE);
OrderedPageVersionIter {
OrderedBlockIter {
page_versions: self,
ordered_blocks,
cur_block_idx: 0,
cutoff_lsn,
cur_slice_iter: slice.iter(),
}
}
}
pub struct OrderedPageVersionIter<'a> {
pub struct OrderedBlockIter<'a> {
page_versions: &'a PageVersions,
ordered_blocks: Vec<u32>,
cur_block_idx: usize,
cutoff_lsn: Option<Lsn>,
cur_slice_iter: slice::Iter<'a, (Lsn, PageVersion)>,
}
impl OrderedPageVersionIter<'_> {
fn is_lsn_before_cutoff(&self, lsn: &Lsn) -> bool {
if let Some(cutoff_lsn) = self.cutoff_lsn.as_ref() {
lsn < cutoff_lsn
} else {
true
}
}
}
impl<'a> Iterator for OrderedPageVersionIter<'a> {
type Item = (u32, Lsn, &'a PageVersion);
impl<'a> Iterator for OrderedBlockIter<'a> {
type Item = (u32, &'a VecMap<Lsn, PageVersion>);
fn next(&mut self) -> Option<Self::Item> {
loop {
if let Some((lsn, page_version)) = self.cur_slice_iter.next() {
if self.is_lsn_before_cutoff(lsn) {
let blknum = self.ordered_blocks[self.cur_block_idx];
return Some((blknum, *lsn, page_version));
}
}
let next_block_idx = self.cur_block_idx + 1;
let blknum: u32 = *self.ordered_blocks.get(next_block_idx)?;
self.cur_block_idx = next_block_idx;
self.cur_slice_iter = self.page_versions.get_block_slice(blknum).iter();
}
let blknum: u32 = *self.ordered_blocks.get(self.cur_block_idx)?;
self.cur_block_idx += 1;
Some((blknum, self.page_versions.0.get(&blknum).unwrap()))
}
}
@@ -156,24 +116,14 @@ mod tests {
}
}
let mut iter = page_versions.ordered_page_version_iter(None);
let mut iter = page_versions.ordered_block_iter();
for blknum in 0..BLOCKS {
let (actual_blknum, vec_map) = iter.next().unwrap();
let slice = vec_map.as_slice();
assert_eq!(actual_blknum, blknum);
assert_eq!(slice.len(), LSNS as usize);
for lsn in 0..LSNS {
let (actual_blknum, actual_lsn, _pv) = iter.next().unwrap();
assert_eq!(actual_blknum, blknum);
assert_eq!(Lsn(lsn), actual_lsn);
}
}
assert!(iter.next().is_none());
assert!(iter.next().is_none()); // should be robust against excessive next() calls
const CUTOFF_LSN: Lsn = Lsn(30);
let mut iter = page_versions.ordered_page_version_iter(Some(CUTOFF_LSN));
for blknum in 0..BLOCKS {
for lsn in 0..CUTOFF_LSN.0 {
let (actual_blknum, actual_lsn, _pv) = iter.next().unwrap();
assert_eq!(actual_blknum, blknum);
assert_eq!(Lsn(lsn), actual_lsn);
assert_eq!(Lsn(lsn), slice[lsn as usize].0);
}
}
assert!(iter.next().is_none());