pageserver - use VecMap for in-memory PageVersions

This commit is contained in:
Patrick Insinger
2021-10-08 00:08:03 -07:00
parent b3b8f18f61
commit 0579c8bbb9
6 changed files with 457 additions and 36 deletions

View File

@@ -57,6 +57,7 @@ mod image_layer;
mod inmemory_layer;
mod interval_tree;
mod layer_map;
mod page_versions;
mod storage_layer;
use delta_layer::DeltaLayer;

View File

@@ -380,7 +380,7 @@ impl DeltaLayer {
start_lsn: Lsn,
end_lsn: Lsn,
dropped: bool,
page_versions: impl Iterator<Item = (&'a (u32, Lsn), &'a PageVersion)>,
page_versions: impl Iterator<Item = (u32, Lsn, &'a PageVersion)>,
relsizes: BTreeMap<Lsn, u32>,
) -> Result<DeltaLayer> {
if seg.rel.is_blocky() {
@@ -416,11 +416,11 @@ impl DeltaLayer {
let mut page_version_writer = BlobWriter::new(book, PAGE_VERSIONS_CHAPTER);
for (key, page_version) in page_versions {
for (blknum, lsn, page_version) in page_versions {
let buf = PageVersion::ser(page_version)?;
let blob_range = page_version_writer.write_blob(&buf)?;
let old = inner.page_version_metas.insert(*key, blob_range);
let old = inner.page_version_metas.insert((blknum, lsn), blob_range);
assert!(old.is_none());
}

View File

@@ -24,6 +24,8 @@ use std::sync::{Arc, RwLock};
use zenith_utils::accum::Accum;
use zenith_utils::lsn::Lsn;
use super::page_versions::PageVersions;
pub struct InMemoryLayer {
conf: &'static PageServerConf,
tenantid: ZTenantId,
@@ -58,7 +60,7 @@ pub struct InMemoryLayerInner {
/// All versions of all pages in the layer are are kept here.
/// Indexed by block number and LSN.
///
page_versions: BTreeMap<(u32, Lsn), PageVersion>,
page_versions: PageVersions,
///
/// `segsizes` tracks the size of the segment at different points in time.
@@ -172,13 +174,13 @@ impl Layer for InMemoryLayer {
{
let inner = self.inner.read().unwrap();
// Scan the BTreeMap backwards, starting from reconstruct_data.lsn.
let minkey = (blknum, Lsn(0));
let maxkey = (blknum, lsn);
let mut iter = inner
// Scan the page versions backwards, starting from `lsn`.
let iter = inner
.page_versions
.range((Included(&minkey), Included(&maxkey)));
while let Some(((_blknum, _entry_lsn), entry)) = iter.next_back() {
.get_block_lsn_range(blknum, ..=lsn)
.iter()
.rev();
for (_entry_lsn, entry) in iter {
if let Some(img) = &entry.page_image {
reconstruct_data.page_img = Some(img.clone());
need_image = false;
@@ -279,13 +281,13 @@ impl Layer for InMemoryLayer {
println!("segsizes {}: {}", k, v);
}
for (k, v) in inner.page_versions.iter() {
for (blknum, lsn, pv) in inner.page_versions.ordered_page_version_iter(None) {
println!(
"blk {} at {}: {}/{}\n",
k.0,
k.1,
v.page_image.is_some(),
v.record.is_some()
blknum,
lsn,
pv.page_image.is_some(),
pv.record.is_some()
);
}
@@ -353,7 +355,7 @@ impl InMemoryLayer {
incremental: false,
inner: RwLock::new(InMemoryLayerInner {
drop_lsn: None,
page_versions: BTreeMap::new(),
page_versions: PageVersions::default(),
segsizes,
writeable: true,
}),
@@ -403,7 +405,7 @@ impl InMemoryLayer {
inner.check_writeable()?;
let old = inner.page_versions.insert((blknum, lsn), pv);
let old = inner.page_versions.append_or_update_last(blknum, lsn, pv);
if old.is_some() {
// We already had an entry for this LSN. That's odd..
@@ -448,7 +450,9 @@ impl InMemoryLayer {
gapblknum,
blknum
);
let old = inner.page_versions.insert((gapblknum, lsn), zeropv);
let old = inner
.page_versions
.append_or_update_last(gapblknum, lsn, zeropv);
// We already had an entry for this LSN. That's odd..
if old.is_some() {
@@ -550,7 +554,7 @@ impl InMemoryLayer {
incremental: true,
inner: RwLock::new(InMemoryLayerInner {
drop_lsn: None,
page_versions: BTreeMap::new(),
page_versions: PageVersions::default(),
segsizes,
writeable: true,
}),
@@ -615,16 +619,9 @@ impl InMemoryLayer {
}
}
let mut before_page_versions = BTreeMap::new();
let mut after_page_versions = BTreeMap::new();
for ((blknum, lsn), pv) in inner.page_versions.iter() {
if *lsn > cutoff_lsn {
after_page_versions.insert((*blknum, *lsn), pv.clone());
after_oldest_lsn.accum(min, *lsn);
} else {
before_page_versions.insert((*blknum, *lsn), pv.clone());
}
}
let (before_page_versions, after_page_versions) = inner
.page_versions
.split_at(Lsn(cutoff_lsn.0 + 1), &mut after_oldest_lsn);
let frozen = Arc::new(InMemoryLayer {
conf: self.conf,
@@ -654,7 +651,10 @@ impl InMemoryLayer {
)?;
let new_inner = new_open.inner.get_mut().unwrap();
new_inner.page_versions.append(&mut after_page_versions);
// Ensure page_versions doesn't contain anything
// so we can just replace it
assert!(new_inner.page_versions.is_empty());
new_inner.page_versions = after_page_versions;
new_inner.segsizes.append(&mut after_segsizes);
Some(Arc::new(new_open))
@@ -702,7 +702,7 @@ impl InMemoryLayer {
self.start_lsn,
drop_lsn,
true,
inner.page_versions.iter(),
inner.page_versions.ordered_page_version_iter(None),
inner.segsizes.clone(),
)?;
trace!(
@@ -722,11 +722,7 @@ impl InMemoryLayer {
before_segsizes.insert(*lsn, *size);
}
}
let mut before_page_versions = inner.page_versions.iter().filter(|tup| {
let ((_blknum, lsn), _pv) = tup;
*lsn < end_lsn
});
let mut before_page_versions = inner.page_versions.ordered_page_version_iter(Some(end_lsn));
let mut frozen_layers: Vec<Arc<dyn Layer>> = Vec::new();

View File

@@ -0,0 +1,182 @@
use std::{collections::HashMap, ops::RangeBounds, slice};
use zenith_utils::{accum::Accum, lsn::Lsn, vec_map::VecMap};
use super::storage_layer::PageVersion;
const EMPTY_SLICE: &[(Lsn, PageVersion)] = &[];
#[derive(Debug, Default)]
pub struct PageVersions(HashMap<u32, VecMap<Lsn, PageVersion>>);
impl PageVersions {
pub fn is_empty(&self) -> bool {
self.0.is_empty()
}
pub fn append_or_update_last(
&mut self,
blknum: u32,
lsn: Lsn,
page_version: PageVersion,
) -> Option<PageVersion> {
let map = self.0.entry(blknum).or_insert_with(VecMap::default);
map.append_or_update_last(lsn, page_version).unwrap()
}
/// Get all [`PageVersion`]s in a block
pub fn get_block_slice(&self, blknum: u32) -> &[(Lsn, PageVersion)] {
self.0
.get(&blknum)
.map(VecMap::as_slice)
.unwrap_or(EMPTY_SLICE)
}
/// Get a range of [`PageVersions`] in a block
pub fn get_block_lsn_range<R: RangeBounds<Lsn>>(
&self,
blknum: u32,
range: R,
) -> &[(Lsn, PageVersion)] {
self.0
.get(&blknum)
.map(|vec_map| vec_map.slice_range(range))
.unwrap_or(EMPTY_SLICE)
}
/// Split the page version map into two.
///
/// Left contains everything up to and not including [`cutoff_lsn`].
/// Right contains [`cutoff_lsn`] and everything after.
pub fn split_at(&self, cutoff_lsn: Lsn, after_oldest_lsn: &mut Accum<Lsn>) -> (Self, Self) {
let mut before_blocks = HashMap::new();
let mut after_blocks = HashMap::new();
for (blknum, vec_map) in self.0.iter() {
let (before_versions, after_versions) = vec_map.split_at(&cutoff_lsn);
if !before_versions.is_empty() {
let old = before_blocks.insert(*blknum, before_versions);
assert!(old.is_none());
}
if !after_versions.is_empty() {
let (first_lsn, _first_pv) = &after_versions.as_slice()[0];
after_oldest_lsn.accum(std::cmp::min, *first_lsn);
let old = after_blocks.insert(*blknum, after_versions);
assert!(old.is_none());
}
}
(Self(before_blocks), Self(after_blocks))
}
/// Iterate through [`PageVersion`]s in (block, lsn) order.
/// If a [`cutoff_lsn`] is set, only show versions with `lsn < cutoff_lsn`
pub fn ordered_page_version_iter(&self, cutoff_lsn: Option<Lsn>) -> OrderedPageVersionIter<'_> {
let mut ordered_blocks: Vec<u32> = self.0.keys().cloned().collect();
ordered_blocks.sort_unstable();
let slice = ordered_blocks
.first()
.map(|&blknum| self.get_block_slice(blknum))
.unwrap_or(EMPTY_SLICE);
OrderedPageVersionIter {
page_versions: self,
ordered_blocks,
cur_block_idx: 0,
cutoff_lsn,
cur_slice_iter: slice.iter(),
}
}
}
pub struct OrderedPageVersionIter<'a> {
page_versions: &'a PageVersions,
ordered_blocks: Vec<u32>,
cur_block_idx: usize,
cutoff_lsn: Option<Lsn>,
cur_slice_iter: slice::Iter<'a, (Lsn, PageVersion)>,
}
impl OrderedPageVersionIter<'_> {
fn is_lsn_before_cutoff(&self, lsn: &Lsn) -> bool {
if let Some(cutoff_lsn) = self.cutoff_lsn.as_ref() {
lsn < cutoff_lsn
} else {
true
}
}
}
impl<'a> Iterator for OrderedPageVersionIter<'a> {
type Item = (u32, Lsn, &'a PageVersion);
fn next(&mut self) -> Option<Self::Item> {
loop {
if let Some((lsn, page_version)) = self.cur_slice_iter.next() {
if self.is_lsn_before_cutoff(lsn) {
let blknum = self.ordered_blocks[self.cur_block_idx];
return Some((blknum, *lsn, page_version));
}
}
let next_block_idx = self.cur_block_idx + 1;
let blknum: u32 = *self.ordered_blocks.get(next_block_idx)?;
self.cur_block_idx = next_block_idx;
self.cur_slice_iter = self.page_versions.get_block_slice(blknum).iter();
}
}
}
#[cfg(test)]
mod tests {
use super::*;
const EMPTY_PAGE_VERSION: PageVersion = PageVersion {
page_image: None,
record: None,
};
#[test]
fn test_ordered_iter() {
let mut page_versions = PageVersions::default();
const BLOCKS: u32 = 1000;
const LSNS: u64 = 50;
for blknum in 0..BLOCKS {
for lsn in 0..LSNS {
let old = page_versions.append_or_update_last(blknum, Lsn(lsn), EMPTY_PAGE_VERSION);
assert!(old.is_none());
}
}
let mut iter = page_versions.ordered_page_version_iter(None);
for blknum in 0..BLOCKS {
for lsn in 0..LSNS {
let (actual_blknum, actual_lsn, _pv) = iter.next().unwrap();
assert_eq!(actual_blknum, blknum);
assert_eq!(Lsn(lsn), actual_lsn);
}
}
assert!(iter.next().is_none());
assert!(iter.next().is_none()); // should be robust against excessive next() calls
const CUTOFF_LSN: Lsn = Lsn(30);
let mut iter = page_versions.ordered_page_version_iter(Some(CUTOFF_LSN));
for blknum in 0..BLOCKS {
for lsn in 0..CUTOFF_LSN.0 {
let (actual_blknum, actual_lsn, _pv) = iter.next().unwrap();
assert_eq!(actual_blknum, blknum);
assert_eq!(Lsn(lsn), actual_lsn);
}
}
assert!(iter.next().is_none());
assert!(iter.next().is_none()); // should be robust against excessive next() calls
}
}