mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-26 01:20:38 +00:00
pageserver - PageVersion heap first pass
This commit is contained in:
@@ -350,7 +350,7 @@ impl DeltaLayer {
|
||||
/// data structure with two btreemaps as we do, so passing the btreemaps is currently
|
||||
/// expedient.
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub fn create<'a>(
|
||||
pub fn create(
|
||||
conf: &'static PageServerConf,
|
||||
timelineid: ZTimelineId,
|
||||
tenantid: ZTenantId,
|
||||
@@ -358,7 +358,7 @@ impl DeltaLayer {
|
||||
start_lsn: Lsn,
|
||||
end_lsn: Lsn,
|
||||
dropped: bool,
|
||||
page_versions: impl Iterator<Item = (u32, Lsn, &'a PageVersion)>,
|
||||
page_versions: impl Iterator<Item = (u32, Lsn, PageVersion)>,
|
||||
relsizes: VecMap<Lsn, u32>,
|
||||
) -> Result<DeltaLayer> {
|
||||
if seg.rel.is_blocky() {
|
||||
@@ -393,7 +393,8 @@ impl DeltaLayer {
|
||||
let mut page_version_writer = BlobWriter::new(book, PAGE_VERSIONS_CHAPTER);
|
||||
|
||||
for (blknum, lsn, page_version) in page_versions {
|
||||
let buf = PageVersion::ser(page_version)?;
|
||||
// TODO avoid deserializing and then reserializing
|
||||
let buf = PageVersion::ser(&page_version)?;
|
||||
let blob_range = page_version_writer.write_blob(&buf)?;
|
||||
|
||||
inner
|
||||
|
||||
@@ -1,13 +1,20 @@
|
||||
use std::{collections::HashMap, ops::RangeBounds, slice};
|
||||
use std::{
|
||||
collections::HashMap,
|
||||
ops::{Range, RangeBounds},
|
||||
slice,
|
||||
};
|
||||
|
||||
use zenith_utils::{lsn::Lsn, vec_map::VecMap};
|
||||
use zenith_utils::{bin_ser::LeSer, lsn::Lsn, vec_map::VecMap};
|
||||
|
||||
use super::storage_layer::PageVersion;
|
||||
|
||||
const EMPTY_SLICE: &[(Lsn, PageVersion)] = &[];
|
||||
const EMPTY_SLICE: &[(Lsn, Range<usize>)] = &[];
|
||||
|
||||
#[derive(Debug, Default)]
|
||||
pub struct PageVersions(HashMap<u32, VecMap<Lsn, PageVersion>>);
|
||||
pub struct PageVersions {
|
||||
heap: Vec<u8>,
|
||||
ranges: HashMap<u32, VecMap<Lsn, Range<usize>>>,
|
||||
}
|
||||
|
||||
impl PageVersions {
|
||||
pub fn append_or_update_last(
|
||||
@@ -16,17 +23,43 @@ impl PageVersions {
|
||||
lsn: Lsn,
|
||||
page_version: PageVersion,
|
||||
) -> Option<PageVersion> {
|
||||
let map = self.0.entry(blknum).or_insert_with(VecMap::default);
|
||||
map.append_or_update_last(lsn, page_version).unwrap()
|
||||
let mut new_bytes = PageVersion::ser(&page_version).unwrap();
|
||||
|
||||
let map = self.ranges.entry(blknum).or_insert_with(VecMap::default);
|
||||
|
||||
if let Some((last_lsn, last_range)) = map.as_slice().last() {
|
||||
if lsn == *last_lsn {
|
||||
let old_bytes = &self.heap[last_range.clone()];
|
||||
if old_bytes == new_bytes {
|
||||
return Some(page_version);
|
||||
}
|
||||
// TODO optimize for case when old_bytes.len() >= new_bytes.len()
|
||||
}
|
||||
}
|
||||
|
||||
let new_range = self.heap.len()..self.heap.len() + new_bytes.len();
|
||||
self.heap.append(&mut new_bytes);
|
||||
map.append_or_update_last(lsn, new_range)
|
||||
.unwrap()
|
||||
.map(|old_range| {
|
||||
let old_bytes = &self.heap[old_range];
|
||||
PageVersion::des(old_bytes).unwrap()
|
||||
})
|
||||
}
|
||||
|
||||
/// Get all [`PageVersion`]s in a block
|
||||
pub fn iter_block(&self, blknum: u32) -> BlockVersionIter<'_> {
|
||||
self.0
|
||||
let range_iter = self
|
||||
.ranges
|
||||
.get(&blknum)
|
||||
.map(VecMap::as_slice)
|
||||
.unwrap_or(EMPTY_SLICE)
|
||||
.iter()
|
||||
.iter();
|
||||
|
||||
BlockVersionIter {
|
||||
heap: &self.heap,
|
||||
range_iter,
|
||||
}
|
||||
}
|
||||
|
||||
/// Get a range of [`PageVersions`] in a block
|
||||
@@ -35,35 +68,78 @@ impl PageVersions {
|
||||
blknum: u32,
|
||||
range: R,
|
||||
) -> BlockVersionIter<'_> {
|
||||
self.0
|
||||
let range_iter = self
|
||||
.ranges
|
||||
.get(&blknum)
|
||||
.map(|vec_map| vec_map.slice_range(range))
|
||||
.unwrap_or(EMPTY_SLICE)
|
||||
.iter()
|
||||
.iter();
|
||||
|
||||
BlockVersionIter {
|
||||
heap: &self.heap,
|
||||
range_iter,
|
||||
}
|
||||
}
|
||||
|
||||
/// Iterate through [`PageVersion`]s in (block, lsn) order.
|
||||
/// If a [`cutoff_lsn`] is set, only show versions with `lsn < cutoff_lsn`
|
||||
pub fn ordered_page_version_iter(&self, cutoff_lsn: Option<Lsn>) -> OrderedPageVersionIter<'_> {
|
||||
let mut ordered_blocks: Vec<u32> = self.0.keys().cloned().collect();
|
||||
let mut ordered_blocks: Vec<u32> = self.ranges.keys().cloned().collect();
|
||||
ordered_blocks.sort_unstable();
|
||||
|
||||
let iter = ordered_blocks
|
||||
let cur_block_iter = ordered_blocks
|
||||
.first()
|
||||
.map(|&blknum| self.iter_block(blknum))
|
||||
.unwrap_or_else(|| EMPTY_SLICE.iter());
|
||||
.unwrap_or_else(|| {
|
||||
let empty_iter = EMPTY_SLICE.iter();
|
||||
BlockVersionIter {
|
||||
heap: &self.heap,
|
||||
range_iter: empty_iter,
|
||||
}
|
||||
});
|
||||
|
||||
OrderedPageVersionIter {
|
||||
page_versions: self,
|
||||
ordered_blocks,
|
||||
cur_block_idx: 0,
|
||||
cutoff_lsn,
|
||||
cur_slice_iter: iter,
|
||||
cur_block_iter,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub type BlockVersionIter<'a> = std::slice::Iter<'a, (Lsn, PageVersion)>;
|
||||
pub struct BlockVersionIter<'a> {
|
||||
heap: &'a Vec<u8>,
|
||||
range_iter: slice::Iter<'a, (Lsn, Range<usize>)>,
|
||||
}
|
||||
|
||||
impl BlockVersionIter<'_> {
|
||||
fn get_iter_result(&self, tuple: Option<&(Lsn, Range<usize>)>) -> Option<(Lsn, PageVersion)> {
|
||||
let (lsn, range) = tuple?;
|
||||
let range = range.clone();
|
||||
|
||||
let pv_bytes = &self.heap[range];
|
||||
let page_version = PageVersion::des(pv_bytes).unwrap();
|
||||
|
||||
Some((*lsn, page_version))
|
||||
}
|
||||
}
|
||||
|
||||
impl Iterator for BlockVersionIter<'_> {
|
||||
type Item = (Lsn, PageVersion);
|
||||
|
||||
fn next(&mut self) -> Option<Self::Item> {
|
||||
let tuple = self.range_iter.next();
|
||||
self.get_iter_result(tuple)
|
||||
}
|
||||
}
|
||||
|
||||
impl DoubleEndedIterator for BlockVersionIter<'_> {
|
||||
fn next_back(&mut self) -> Option<Self::Item> {
|
||||
let tuple = self.range_iter.next_back();
|
||||
self.get_iter_result(tuple)
|
||||
}
|
||||
}
|
||||
|
||||
pub struct OrderedPageVersionIter<'a> {
|
||||
page_versions: &'a PageVersions,
|
||||
@@ -73,35 +149,35 @@ pub struct OrderedPageVersionIter<'a> {
|
||||
|
||||
cutoff_lsn: Option<Lsn>,
|
||||
|
||||
cur_slice_iter: slice::Iter<'a, (Lsn, PageVersion)>,
|
||||
cur_block_iter: BlockVersionIter<'a>,
|
||||
}
|
||||
|
||||
impl OrderedPageVersionIter<'_> {
|
||||
fn is_lsn_before_cutoff(&self, lsn: &Lsn) -> bool {
|
||||
fn is_lsn_before_cutoff(&self, lsn: Lsn) -> bool {
|
||||
if let Some(cutoff_lsn) = self.cutoff_lsn.as_ref() {
|
||||
lsn < cutoff_lsn
|
||||
lsn < *cutoff_lsn
|
||||
} else {
|
||||
true
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> Iterator for OrderedPageVersionIter<'a> {
|
||||
type Item = (u32, Lsn, &'a PageVersion);
|
||||
impl Iterator for OrderedPageVersionIter<'_> {
|
||||
type Item = (u32, Lsn, PageVersion);
|
||||
|
||||
fn next(&mut self) -> Option<Self::Item> {
|
||||
loop {
|
||||
if let Some((lsn, page_version)) = self.cur_slice_iter.next() {
|
||||
if let Some((lsn, page_version)) = self.cur_block_iter.next() {
|
||||
if self.is_lsn_before_cutoff(lsn) {
|
||||
let blknum = self.ordered_blocks[self.cur_block_idx];
|
||||
return Some((blknum, *lsn, page_version));
|
||||
return Some((blknum, lsn, page_version));
|
||||
}
|
||||
}
|
||||
|
||||
let next_block_idx = self.cur_block_idx + 1;
|
||||
let blknum: u32 = *self.ordered_blocks.get(next_block_idx)?;
|
||||
self.cur_block_idx = next_block_idx;
|
||||
self.cur_slice_iter = self.page_versions.iter_block(blknum);
|
||||
self.cur_block_iter = self.page_versions.iter_block(blknum);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user