Compare commits

...

2 Commits

Author SHA1 Message Date
Patrick Insinger
2975c26de7 pageserver - PageVersion heap first pass 2021-10-12 22:40:05 -07:00
Patrick Insinger
5f4f7d9762 pageserver - PageVersions switch to iterator 2021-10-12 21:44:21 -07:00
3 changed files with 109 additions and 29 deletions

View File

@@ -350,7 +350,7 @@ impl DeltaLayer {
/// data structure with two btreemaps as we do, so passing the btreemaps is currently
/// expedient.
#[allow(clippy::too_many_arguments)]
pub fn create<'a>(
pub fn create(
conf: &'static PageServerConf,
timelineid: ZTimelineId,
tenantid: ZTenantId,
@@ -358,7 +358,7 @@ impl DeltaLayer {
start_lsn: Lsn,
end_lsn: Lsn,
dropped: bool,
page_versions: impl Iterator<Item = (u32, Lsn, &'a PageVersion)>,
page_versions: impl Iterator<Item = (u32, Lsn, PageVersion)>,
relsizes: VecMap<Lsn, u32>,
) -> Result<DeltaLayer> {
if seg.rel.is_blocky() {
@@ -393,7 +393,8 @@ impl DeltaLayer {
let mut page_version_writer = BlobWriter::new(book, PAGE_VERSIONS_CHAPTER);
for (blknum, lsn, page_version) in page_versions {
let buf = PageVersion::ser(page_version)?;
// TODO avoid deserializing and then reserializing
let buf = PageVersion::ser(&page_version)?;
let blob_range = page_version_writer.write_blob(&buf)?;
inner

View File

@@ -157,8 +157,7 @@ impl Layer for InMemoryLayer {
// Scan the page versions backwards, starting from `lsn`.
let iter = inner
.page_versions
.get_block_lsn_range(blknum, ..=lsn)
.iter()
.iter_block_lsn_range(blknum, ..=lsn)
.rev();
for (_entry_lsn, entry) in iter {
if let Some(img) = &entry.page_image {

View File

@@ -1,13 +1,20 @@
use std::{collections::HashMap, ops::RangeBounds, slice};
use std::{
collections::HashMap,
ops::{Range, RangeBounds},
slice,
};
use zenith_utils::{lsn::Lsn, vec_map::VecMap};
use zenith_utils::{bin_ser::LeSer, lsn::Lsn, vec_map::VecMap};
use super::storage_layer::PageVersion;
const EMPTY_SLICE: &[(Lsn, PageVersion)] = &[];
const EMPTY_SLICE: &[(Lsn, Range<usize>)] = &[];
#[derive(Debug, Default)]
pub struct PageVersions(HashMap<u32, VecMap<Lsn, PageVersion>>);
pub struct PageVersions {
heap: Vec<u8>,
ranges: HashMap<u32, VecMap<Lsn, Range<usize>>>,
}
impl PageVersions {
pub fn append_or_update_last(
@@ -16,51 +23,124 @@ impl PageVersions {
lsn: Lsn,
page_version: PageVersion,
) -> Option<PageVersion> {
let map = self.0.entry(blknum).or_insert_with(VecMap::default);
map.append_or_update_last(lsn, page_version).unwrap()
let mut new_bytes = PageVersion::ser(&page_version).unwrap();
let map = self.ranges.entry(blknum).or_insert_with(VecMap::default);
if let Some((last_lsn, last_range)) = map.as_slice().last() {
if lsn == *last_lsn {
let old_bytes = &self.heap[last_range.clone()];
if old_bytes == new_bytes {
return Some(page_version);
}
// TODO optimize for case when old_bytes.len() >= new_bytes.len()
}
}
let new_range = self.heap.len()..self.heap.len() + new_bytes.len();
self.heap.append(&mut new_bytes);
map.append_or_update_last(lsn, new_range)
.unwrap()
.map(|old_range| {
let old_bytes = &self.heap[old_range];
PageVersion::des(old_bytes).unwrap()
})
}
/// Get all [`PageVersion`]s in a block
pub fn get_block_slice(&self, blknum: u32) -> &[(Lsn, PageVersion)] {
self.0
pub fn iter_block(&self, blknum: u32) -> BlockVersionIter<'_> {
let range_iter = self
.ranges
.get(&blknum)
.map(VecMap::as_slice)
.unwrap_or(EMPTY_SLICE)
.iter();
BlockVersionIter {
heap: &self.heap,
range_iter,
}
}
/// Get a range of [`PageVersions`] in a block
pub fn get_block_lsn_range<R: RangeBounds<Lsn>>(
pub fn iter_block_lsn_range<R: RangeBounds<Lsn>>(
&self,
blknum: u32,
range: R,
) -> &[(Lsn, PageVersion)] {
self.0
) -> BlockVersionIter<'_> {
let range_iter = self
.ranges
.get(&blknum)
.map(|vec_map| vec_map.slice_range(range))
.unwrap_or(EMPTY_SLICE)
.iter();
BlockVersionIter {
heap: &self.heap,
range_iter,
}
}
/// Iterate through [`PageVersion`]s in (block, lsn) order.
/// If a [`cutoff_lsn`] is set, only show versions with `lsn < cutoff_lsn`
pub fn ordered_page_version_iter(&self, cutoff_lsn: Option<Lsn>) -> OrderedPageVersionIter<'_> {
let mut ordered_blocks: Vec<u32> = self.0.keys().cloned().collect();
let mut ordered_blocks: Vec<u32> = self.ranges.keys().cloned().collect();
ordered_blocks.sort_unstable();
let slice = ordered_blocks
let cur_block_iter = ordered_blocks
.first()
.map(|&blknum| self.get_block_slice(blknum))
.unwrap_or(EMPTY_SLICE);
.map(|&blknum| self.iter_block(blknum))
.unwrap_or_else(|| {
let empty_iter = EMPTY_SLICE.iter();
BlockVersionIter {
heap: &self.heap,
range_iter: empty_iter,
}
});
OrderedPageVersionIter {
page_versions: self,
ordered_blocks,
cur_block_idx: 0,
cutoff_lsn,
cur_slice_iter: slice.iter(),
cur_block_iter,
}
}
}
pub struct BlockVersionIter<'a> {
heap: &'a Vec<u8>,
range_iter: slice::Iter<'a, (Lsn, Range<usize>)>,
}
impl BlockVersionIter<'_> {
fn get_iter_result(&self, tuple: Option<&(Lsn, Range<usize>)>) -> Option<(Lsn, PageVersion)> {
let (lsn, range) = tuple?;
let range = range.clone();
let pv_bytes = &self.heap[range];
let page_version = PageVersion::des(pv_bytes).unwrap();
Some((*lsn, page_version))
}
}
impl Iterator for BlockVersionIter<'_> {
type Item = (Lsn, PageVersion);
fn next(&mut self) -> Option<Self::Item> {
let tuple = self.range_iter.next();
self.get_iter_result(tuple)
}
}
impl DoubleEndedIterator for BlockVersionIter<'_> {
fn next_back(&mut self) -> Option<Self::Item> {
let tuple = self.range_iter.next_back();
self.get_iter_result(tuple)
}
}
pub struct OrderedPageVersionIter<'a> {
page_versions: &'a PageVersions,
@@ -69,35 +149,35 @@ pub struct OrderedPageVersionIter<'a> {
cutoff_lsn: Option<Lsn>,
cur_slice_iter: slice::Iter<'a, (Lsn, PageVersion)>,
cur_block_iter: BlockVersionIter<'a>,
}
impl OrderedPageVersionIter<'_> {
fn is_lsn_before_cutoff(&self, lsn: &Lsn) -> bool {
fn is_lsn_before_cutoff(&self, lsn: Lsn) -> bool {
if let Some(cutoff_lsn) = self.cutoff_lsn.as_ref() {
lsn < cutoff_lsn
lsn < *cutoff_lsn
} else {
true
}
}
}
impl<'a> Iterator for OrderedPageVersionIter<'a> {
type Item = (u32, Lsn, &'a PageVersion);
impl Iterator for OrderedPageVersionIter<'_> {
type Item = (u32, Lsn, PageVersion);
fn next(&mut self) -> Option<Self::Item> {
loop {
if let Some((lsn, page_version)) = self.cur_slice_iter.next() {
if let Some((lsn, page_version)) = self.cur_block_iter.next() {
if self.is_lsn_before_cutoff(lsn) {
let blknum = self.ordered_blocks[self.cur_block_idx];
return Some((blknum, *lsn, page_version));
return Some((blknum, lsn, page_version));
}
}
let next_block_idx = self.cur_block_idx + 1;
let blknum: u32 = *self.ordered_blocks.get(next_block_idx)?;
self.cur_block_idx = next_block_idx;
self.cur_slice_iter = self.page_versions.get_block_slice(blknum).iter();
self.cur_block_iter = self.page_versions.iter_block(blknum);
}
}
}