mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-21 04:12:55 +00:00
Compare commits
4 Commits
proxy-test
...
vecmap-del
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
c4fb509aa2 | ||
|
|
48a6bbe4c1 | ||
|
|
9a687eae28 | ||
|
|
0579c8bbb9 |
@@ -57,6 +57,7 @@ mod image_layer;
|
||||
mod inmemory_layer;
|
||||
mod interval_tree;
|
||||
mod layer_map;
|
||||
mod page_versions;
|
||||
mod storage_layer;
|
||||
|
||||
use delta_layer::DeltaLayer;
|
||||
|
||||
@@ -48,7 +48,7 @@ use crate::{ZTenantId, ZTimelineId};
|
||||
use anyhow::{bail, ensure, Result};
|
||||
use log::*;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::collections::BTreeMap;
|
||||
use zenith_utils::vec_map::VecMap;
|
||||
// avoid binding to Write (conflicts with std::io::Write)
|
||||
// while being able to use std::fmt::Write's methods
|
||||
use std::fmt::Write as _;
|
||||
@@ -65,6 +65,7 @@ use zenith_utils::bin_ser::BeSer;
|
||||
use zenith_utils::lsn::Lsn;
|
||||
|
||||
use super::blob::{read_blob, BlobRange};
|
||||
use super::page_versions::OrderedBlockIter;
|
||||
|
||||
// Magic constant to identify a Zenith delta file
|
||||
pub const DELTA_FILE_MAGIC: u32 = 0x5A616E01;
|
||||
@@ -141,10 +142,10 @@ pub struct DeltaLayerInner {
|
||||
|
||||
/// All versions of all pages in the file are are kept here.
|
||||
/// Indexed by block number and LSN.
|
||||
page_version_metas: BTreeMap<(u32, Lsn), BlobRange>,
|
||||
page_version_metas: VecMap<(u32, Lsn), BlobRange>,
|
||||
|
||||
/// `relsizes` tracks the size of the relation at different points in time.
|
||||
relsizes: BTreeMap<Lsn, u32>,
|
||||
relsizes: VecMap<Lsn, u32>,
|
||||
}
|
||||
|
||||
impl Layer for DeltaLayer {
|
||||
@@ -215,10 +216,12 @@ impl Layer for DeltaLayer {
|
||||
// Scan the metadata BTreeMap backwards, starting from the given entry.
|
||||
let minkey = (blknum, Lsn(0));
|
||||
let maxkey = (blknum, lsn);
|
||||
let mut iter = inner
|
||||
let iter = inner
|
||||
.page_version_metas
|
||||
.range((Included(&minkey), Included(&maxkey)));
|
||||
while let Some(((_blknum, _entry_lsn), blob_range)) = iter.next_back() {
|
||||
.slice_range((Included(&minkey), Included(&maxkey)))
|
||||
.iter()
|
||||
.rev();
|
||||
for ((_blknum, _lsn), blob_range) in iter {
|
||||
let pv = PageVersion::des(&read_blob(&page_version_reader, blob_range)?)?;
|
||||
|
||||
if let Some(img) = pv.page_image {
|
||||
@@ -262,15 +265,15 @@ impl Layer for DeltaLayer {
|
||||
|
||||
// Scan the BTreeMap backwards, starting from the given entry.
|
||||
let inner = self.load()?;
|
||||
let mut iter = inner.relsizes.range((Included(&Lsn(0)), Included(&lsn)));
|
||||
let slice = inner
|
||||
.relsizes
|
||||
.slice_range((Included(&Lsn(0)), Included(&lsn)));
|
||||
|
||||
let result;
|
||||
if let Some((_entry_lsn, entry)) = iter.next_back() {
|
||||
result = *entry;
|
||||
if let Some((_entry_lsn, entry)) = slice.last() {
|
||||
Ok(*entry)
|
||||
} else {
|
||||
bail!("could not find seg size in delta layer");
|
||||
Err(anyhow::anyhow!("could not find seg size in delta layer"))
|
||||
}
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
/// Does this segment exist at given LSN?
|
||||
@@ -290,8 +293,8 @@ impl Layer for DeltaLayer {
|
||||
///
|
||||
fn unload(&self) -> Result<()> {
|
||||
let mut inner = self.inner.lock().unwrap();
|
||||
inner.page_version_metas = BTreeMap::new();
|
||||
inner.relsizes = BTreeMap::new();
|
||||
inner.page_version_metas = VecMap::default();
|
||||
inner.relsizes = VecMap::default();
|
||||
inner.loaded = false;
|
||||
Ok(())
|
||||
}
|
||||
@@ -317,13 +320,13 @@ impl Layer for DeltaLayer {
|
||||
|
||||
println!("--- relsizes ---");
|
||||
let inner = self.load()?;
|
||||
for (k, v) in inner.relsizes.iter() {
|
||||
for (k, v) in inner.relsizes.as_slice() {
|
||||
println!(" {}: {}", k, v);
|
||||
}
|
||||
println!("--- page versions ---");
|
||||
let (_path, book) = self.open_book()?;
|
||||
let chapter = book.chapter_reader(PAGE_VERSIONS_CHAPTER)?;
|
||||
for ((blk, lsn), blob_range) in inner.page_version_metas.iter() {
|
||||
for ((blk, lsn), blob_range) in inner.page_version_metas.as_slice() {
|
||||
let mut desc = String::new();
|
||||
|
||||
let buf = read_blob(&chapter, blob_range)?;
|
||||
@@ -372,7 +375,7 @@ impl DeltaLayer {
|
||||
/// data structure with two btreemaps as we do, so passing the btreemaps is currently
|
||||
/// expedient.
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub fn create<'a>(
|
||||
pub fn create(
|
||||
conf: &'static PageServerConf,
|
||||
timelineid: ZTimelineId,
|
||||
tenantid: ZTenantId,
|
||||
@@ -380,14 +383,14 @@ impl DeltaLayer {
|
||||
start_lsn: Lsn,
|
||||
end_lsn: Lsn,
|
||||
dropped: bool,
|
||||
page_versions: impl Iterator<Item = (&'a (u32, Lsn), &'a PageVersion)>,
|
||||
relsizes: BTreeMap<Lsn, u32>,
|
||||
page_versions: OrderedBlockIter,
|
||||
relsizes: VecMap<Lsn, u32>,
|
||||
) -> Result<DeltaLayer> {
|
||||
if seg.rel.is_blocky() {
|
||||
assert!(!relsizes.is_empty());
|
||||
}
|
||||
|
||||
let delta_layer = DeltaLayer {
|
||||
let mut delta_layer = DeltaLayer {
|
||||
path_or_conf: PathOrConf::Conf(conf),
|
||||
timelineid,
|
||||
tenantid,
|
||||
@@ -397,17 +400,18 @@ impl DeltaLayer {
|
||||
dropped,
|
||||
inner: Mutex::new(DeltaLayerInner {
|
||||
loaded: true,
|
||||
page_version_metas: BTreeMap::new(),
|
||||
page_version_metas: VecMap::default(),
|
||||
relsizes,
|
||||
}),
|
||||
};
|
||||
let mut inner = delta_layer.inner.lock().unwrap();
|
||||
|
||||
// Write the in-memory btreemaps into a file
|
||||
let path = delta_layer
|
||||
.path()
|
||||
.expect("DeltaLayer is supposed to have a layer path on disk");
|
||||
|
||||
let inner = delta_layer.inner.get_mut().unwrap();
|
||||
|
||||
// Note: This overwrites any existing file. There shouldn't be any.
|
||||
// FIXME: throw an error instead?
|
||||
let file = File::create(&path)?;
|
||||
@@ -416,26 +420,31 @@ impl DeltaLayer {
|
||||
|
||||
let mut page_version_writer = BlobWriter::new(book, PAGE_VERSIONS_CHAPTER);
|
||||
|
||||
for (key, page_version) in page_versions {
|
||||
let buf = PageVersion::ser(page_version)?;
|
||||
let blob_range = page_version_writer.write_blob(&buf)?;
|
||||
for (blknum, history) in page_versions {
|
||||
inner.page_version_metas.reserve(history.len());
|
||||
|
||||
let old = inner.page_version_metas.insert(*key, blob_range);
|
||||
for (lsn, page_version) in history {
|
||||
let buf = PageVersion::ser(page_version)?;
|
||||
let blob_range = page_version_writer.write_blob(&buf)?;
|
||||
|
||||
assert!(old.is_none());
|
||||
inner
|
||||
.page_version_metas
|
||||
.append((blknum, *lsn), blob_range)
|
||||
.unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
let book = page_version_writer.close()?;
|
||||
|
||||
// Write out page versions
|
||||
let mut chapter = book.new_chapter(PAGE_VERSION_METAS_CHAPTER);
|
||||
let buf = BTreeMap::ser(&inner.page_version_metas)?;
|
||||
let buf = VecMap::ser(&inner.page_version_metas)?;
|
||||
chapter.write_all(&buf)?;
|
||||
let book = chapter.close()?;
|
||||
|
||||
// and relsizes to separate chapter
|
||||
let mut chapter = book.new_chapter(REL_SIZES_CHAPTER);
|
||||
let buf = BTreeMap::ser(&inner.relsizes)?;
|
||||
let buf = VecMap::ser(&inner.relsizes)?;
|
||||
chapter.write_all(&buf)?;
|
||||
let book = chapter.close()?;
|
||||
|
||||
@@ -459,8 +468,6 @@ impl DeltaLayer {
|
||||
|
||||
trace!("saved {}", &path.display());
|
||||
|
||||
drop(inner);
|
||||
|
||||
Ok(delta_layer)
|
||||
}
|
||||
|
||||
@@ -522,10 +529,10 @@ impl DeltaLayer {
|
||||
}
|
||||
|
||||
let chapter = book.read_chapter(PAGE_VERSION_METAS_CHAPTER)?;
|
||||
let page_version_metas = BTreeMap::des(&chapter)?;
|
||||
let page_version_metas = VecMap::des(&chapter)?;
|
||||
|
||||
let chapter = book.read_chapter(REL_SIZES_CHAPTER)?;
|
||||
let relsizes = BTreeMap::des(&chapter)?;
|
||||
let relsizes = VecMap::des(&chapter)?;
|
||||
|
||||
debug!("loaded from {}", &path.display());
|
||||
|
||||
@@ -555,8 +562,8 @@ impl DeltaLayer {
|
||||
dropped: filename.dropped,
|
||||
inner: Mutex::new(DeltaLayerInner {
|
||||
loaded: false,
|
||||
page_version_metas: BTreeMap::new(),
|
||||
relsizes: BTreeMap::new(),
|
||||
page_version_metas: VecMap::default(),
|
||||
relsizes: VecMap::default(),
|
||||
}),
|
||||
}
|
||||
}
|
||||
@@ -578,8 +585,8 @@ impl DeltaLayer {
|
||||
dropped: summary.dropped,
|
||||
inner: Mutex::new(DeltaLayerInner {
|
||||
loaded: false,
|
||||
page_version_metas: BTreeMap::new(),
|
||||
relsizes: BTreeMap::new(),
|
||||
page_version_metas: VecMap::default(),
|
||||
relsizes: VecMap::default(),
|
||||
}),
|
||||
})
|
||||
}
|
||||
|
||||
@@ -16,14 +16,15 @@ use anyhow::{bail, ensure, Result};
|
||||
use bytes::Bytes;
|
||||
use log::*;
|
||||
use std::cmp::min;
|
||||
use std::collections::BTreeMap;
|
||||
use std::ops::Bound::Included;
|
||||
use std::path::PathBuf;
|
||||
use std::sync::{Arc, RwLock};
|
||||
use zenith_utils::vec_map::VecMap;
|
||||
|
||||
use zenith_utils::accum::Accum;
|
||||
use zenith_utils::lsn::Lsn;
|
||||
|
||||
use super::page_versions::PageVersions;
|
||||
|
||||
pub struct InMemoryLayer {
|
||||
conf: &'static PageServerConf,
|
||||
tenantid: ZTenantId,
|
||||
@@ -58,7 +59,7 @@ pub struct InMemoryLayerInner {
|
||||
/// All versions of all pages in the layer are are kept here.
|
||||
/// Indexed by block number and LSN.
|
||||
///
|
||||
page_versions: BTreeMap<(u32, Lsn), PageVersion>,
|
||||
page_versions: PageVersions,
|
||||
|
||||
///
|
||||
/// `segsizes` tracks the size of the segment at different points in time.
|
||||
@@ -67,7 +68,7 @@ pub struct InMemoryLayerInner {
|
||||
/// so that determining the size never depends on the predecessor layer. For
|
||||
/// a non-blocky rel, 'segsizes' is not used and is always empty.
|
||||
///
|
||||
segsizes: BTreeMap<Lsn, u32>,
|
||||
segsizes: VecMap<Lsn, u32>,
|
||||
|
||||
/// Writes are only allowed when true.
|
||||
/// Set to false when this layer is in the process of being replaced.
|
||||
@@ -85,10 +86,10 @@ impl InMemoryLayerInner {
|
||||
|
||||
fn get_seg_size(&self, lsn: Lsn) -> u32 {
|
||||
// Scan the BTreeMap backwards, starting from the given entry.
|
||||
let mut iter = self.segsizes.range((Included(&Lsn(0)), Included(&lsn)));
|
||||
let slice = self.segsizes.slice_range(..=lsn);
|
||||
|
||||
// We make sure there is always at least one entry
|
||||
if let Some((_entry_lsn, entry)) = iter.next_back() {
|
||||
if let Some((_entry_lsn, entry)) = slice.last() {
|
||||
*entry
|
||||
} else {
|
||||
panic!("could not find seg size in in-memory layer");
|
||||
@@ -172,13 +173,13 @@ impl Layer for InMemoryLayer {
|
||||
{
|
||||
let inner = self.inner.read().unwrap();
|
||||
|
||||
// Scan the BTreeMap backwards, starting from reconstruct_data.lsn.
|
||||
let minkey = (blknum, Lsn(0));
|
||||
let maxkey = (blknum, lsn);
|
||||
let mut iter = inner
|
||||
// Scan the page versions backwards, starting from `lsn`.
|
||||
let iter = inner
|
||||
.page_versions
|
||||
.range((Included(&minkey), Included(&maxkey)));
|
||||
while let Some(((_blknum, _entry_lsn), entry)) = iter.next_back() {
|
||||
.get_block_lsn_range(blknum, ..=lsn)
|
||||
.iter()
|
||||
.rev();
|
||||
for (_entry_lsn, entry) in iter {
|
||||
if let Some(img) = &entry.page_image {
|
||||
reconstruct_data.page_img = Some(img.clone());
|
||||
need_image = false;
|
||||
@@ -275,18 +276,20 @@ impl Layer for InMemoryLayer {
|
||||
self.timelineid, self.seg, self.start_lsn, end_str
|
||||
);
|
||||
|
||||
for (k, v) in inner.segsizes.iter() {
|
||||
for (k, v) in inner.segsizes.as_slice() {
|
||||
println!("segsizes {}: {}", k, v);
|
||||
}
|
||||
|
||||
for (k, v) in inner.page_versions.iter() {
|
||||
println!(
|
||||
"blk {} at {}: {}/{}\n",
|
||||
k.0,
|
||||
k.1,
|
||||
v.page_image.is_some(),
|
||||
v.record.is_some()
|
||||
);
|
||||
for (blknum, history) in inner.page_versions.ordered_block_iter(None) {
|
||||
for (lsn, pv) in history {
|
||||
println!(
|
||||
"blk {} at {}: {}/{}\n",
|
||||
blknum,
|
||||
lsn,
|
||||
pv.page_image.is_some(),
|
||||
pv.record.is_some()
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
@@ -337,9 +340,9 @@ impl InMemoryLayer {
|
||||
);
|
||||
|
||||
// The segment is initially empty, so initialize 'segsizes' with 0.
|
||||
let mut segsizes = BTreeMap::new();
|
||||
let mut segsizes = VecMap::default();
|
||||
if seg.rel.is_blocky() {
|
||||
segsizes.insert(start_lsn, 0);
|
||||
segsizes.append(start_lsn, 0).unwrap();
|
||||
}
|
||||
|
||||
Ok(InMemoryLayer {
|
||||
@@ -353,7 +356,7 @@ impl InMemoryLayer {
|
||||
incremental: false,
|
||||
inner: RwLock::new(InMemoryLayerInner {
|
||||
drop_lsn: None,
|
||||
page_versions: BTreeMap::new(),
|
||||
page_versions: PageVersions::default(),
|
||||
segsizes,
|
||||
writeable: true,
|
||||
}),
|
||||
@@ -403,7 +406,7 @@ impl InMemoryLayer {
|
||||
|
||||
inner.check_writeable()?;
|
||||
|
||||
let old = inner.page_versions.insert((blknum, lsn), pv);
|
||||
let old = inner.page_versions.append_or_update_last(blknum, lsn, pv);
|
||||
|
||||
if old.is_some() {
|
||||
// We already had an entry for this LSN. That's odd..
|
||||
@@ -448,7 +451,9 @@ impl InMemoryLayer {
|
||||
gapblknum,
|
||||
blknum
|
||||
);
|
||||
let old = inner.page_versions.insert((gapblknum, lsn), zeropv);
|
||||
let old = inner
|
||||
.page_versions
|
||||
.append_or_update_last(gapblknum, lsn, zeropv);
|
||||
// We already had an entry for this LSN. That's odd..
|
||||
|
||||
if old.is_some() {
|
||||
@@ -459,7 +464,7 @@ impl InMemoryLayer {
|
||||
}
|
||||
}
|
||||
|
||||
inner.segsizes.insert(lsn, newsize);
|
||||
inner.segsizes.append_or_update_last(lsn, newsize).unwrap();
|
||||
return Ok(newsize - oldsize);
|
||||
}
|
||||
}
|
||||
@@ -481,7 +486,7 @@ impl InMemoryLayer {
|
||||
let oldsize = inner.get_seg_size(lsn);
|
||||
assert!(segsize < oldsize);
|
||||
|
||||
let old = inner.segsizes.insert(lsn, segsize);
|
||||
let old = inner.segsizes.append_or_update_last(lsn, segsize).unwrap();
|
||||
|
||||
if old.is_some() {
|
||||
// We already had an entry for this LSN. That's odd..
|
||||
@@ -533,10 +538,10 @@ impl InMemoryLayer {
|
||||
);
|
||||
|
||||
// Copy the segment size at the start LSN from the predecessor layer.
|
||||
let mut segsizes = BTreeMap::new();
|
||||
let mut segsizes = VecMap::default();
|
||||
if seg.rel.is_blocky() {
|
||||
let size = src.get_seg_size(start_lsn)?;
|
||||
segsizes.insert(start_lsn, size);
|
||||
segsizes.append(start_lsn, size).unwrap();
|
||||
}
|
||||
|
||||
Ok(InMemoryLayer {
|
||||
@@ -550,7 +555,7 @@ impl InMemoryLayer {
|
||||
incremental: true,
|
||||
inner: RwLock::new(InMemoryLayerInner {
|
||||
drop_lsn: None,
|
||||
page_versions: BTreeMap::new(),
|
||||
page_versions: PageVersions::default(),
|
||||
segsizes,
|
||||
writeable: true,
|
||||
}),
|
||||
@@ -603,28 +608,18 @@ impl InMemoryLayer {
|
||||
|
||||
// Divide all the page versions into old and new
|
||||
// at the 'cutoff_lsn' point.
|
||||
let mut before_segsizes = BTreeMap::new();
|
||||
let mut after_segsizes = BTreeMap::new();
|
||||
let mut after_oldest_lsn: Accum<Lsn> = Accum(None);
|
||||
for (lsn, size) in inner.segsizes.iter() {
|
||||
if *lsn > cutoff_lsn {
|
||||
after_segsizes.insert(*lsn, *size);
|
||||
after_oldest_lsn.accum(min, *lsn);
|
||||
} else {
|
||||
before_segsizes.insert(*lsn, *size);
|
||||
}
|
||||
|
||||
let cutoff_lsn_exclusive = Lsn(cutoff_lsn.0 + 1);
|
||||
|
||||
let (before_segsizes, mut after_segsizes) = inner.segsizes.split_at(&cutoff_lsn_exclusive);
|
||||
if let Some((lsn, _size)) = after_segsizes.as_slice().first() {
|
||||
after_oldest_lsn.accum(min, *lsn);
|
||||
}
|
||||
|
||||
let mut before_page_versions = BTreeMap::new();
|
||||
let mut after_page_versions = BTreeMap::new();
|
||||
for ((blknum, lsn), pv) in inner.page_versions.iter() {
|
||||
if *lsn > cutoff_lsn {
|
||||
after_page_versions.insert((*blknum, *lsn), pv.clone());
|
||||
after_oldest_lsn.accum(min, *lsn);
|
||||
} else {
|
||||
before_page_versions.insert((*blknum, *lsn), pv.clone());
|
||||
}
|
||||
}
|
||||
let (before_page_versions, after_page_versions) = inner
|
||||
.page_versions
|
||||
.split_at(cutoff_lsn_exclusive, &mut after_oldest_lsn);
|
||||
|
||||
let frozen = Arc::new(InMemoryLayer {
|
||||
conf: self.conf,
|
||||
@@ -654,8 +649,11 @@ impl InMemoryLayer {
|
||||
)?;
|
||||
|
||||
let new_inner = new_open.inner.get_mut().unwrap();
|
||||
new_inner.page_versions.append(&mut after_page_versions);
|
||||
new_inner.segsizes.append(&mut after_segsizes);
|
||||
// Ensure page_versions doesn't contain anything
|
||||
// so we can just replace it
|
||||
assert!(new_inner.page_versions.is_empty());
|
||||
new_inner.page_versions = after_page_versions;
|
||||
new_inner.segsizes.extend(&mut after_segsizes).unwrap();
|
||||
|
||||
Some(Arc::new(new_open))
|
||||
} else {
|
||||
@@ -702,7 +700,7 @@ impl InMemoryLayer {
|
||||
self.start_lsn,
|
||||
drop_lsn,
|
||||
true,
|
||||
inner.page_versions.iter(),
|
||||
inner.page_versions.ordered_block_iter(None),
|
||||
inner.segsizes.clone(),
|
||||
)?;
|
||||
trace!(
|
||||
@@ -716,21 +714,13 @@ impl InMemoryLayer {
|
||||
|
||||
let end_lsn = self.end_lsn.unwrap();
|
||||
|
||||
let mut before_segsizes = BTreeMap::new();
|
||||
for (lsn, size) in inner.segsizes.iter() {
|
||||
if *lsn <= end_lsn {
|
||||
before_segsizes.insert(*lsn, *size);
|
||||
}
|
||||
}
|
||||
let mut before_page_versions = inner.page_versions.iter().filter(|tup| {
|
||||
let ((_blknum, lsn), _pv) = tup;
|
||||
|
||||
*lsn < end_lsn
|
||||
});
|
||||
let mut before_page_versions = inner.page_versions.ordered_block_iter(Some(end_lsn));
|
||||
|
||||
let mut frozen_layers: Vec<Arc<dyn Layer>> = Vec::new();
|
||||
|
||||
if self.start_lsn != end_lsn {
|
||||
let (before_segsizes, _after_segsizes) = inner.segsizes.split_at(&Lsn(end_lsn.0 + 1));
|
||||
|
||||
// Write the page versions before the cutoff to disk.
|
||||
let delta_layer = DeltaLayer::create(
|
||||
self.conf,
|
||||
|
||||
172
pageserver/src/layered_repository/page_versions.rs
Normal file
172
pageserver/src/layered_repository/page_versions.rs
Normal file
@@ -0,0 +1,172 @@
|
||||
use std::{collections::HashMap, ops::RangeBounds};
|
||||
|
||||
use zenith_utils::{accum::Accum, lsn::Lsn, vec_map::VecMap};
|
||||
|
||||
use super::storage_layer::PageVersion;
|
||||
|
||||
const EMPTY_SLICE: &[(Lsn, PageVersion)] = &[];
|
||||
|
||||
#[derive(Debug, Default)]
|
||||
pub struct PageVersions(HashMap<u32, VecMap<Lsn, PageVersion>>);
|
||||
|
||||
impl PageVersions {
|
||||
pub fn is_empty(&self) -> bool {
|
||||
self.0.is_empty()
|
||||
}
|
||||
|
||||
pub fn append_or_update_last(
|
||||
&mut self,
|
||||
blknum: u32,
|
||||
lsn: Lsn,
|
||||
page_version: PageVersion,
|
||||
) -> Option<PageVersion> {
|
||||
let map = self.0.entry(blknum).or_insert_with(VecMap::default);
|
||||
map.append_or_update_last(lsn, page_version).unwrap()
|
||||
}
|
||||
|
||||
/// Get all [`PageVersion`]s in a block
|
||||
pub fn get_block_slice(&self, blknum: u32) -> &[(Lsn, PageVersion)] {
|
||||
self.0
|
||||
.get(&blknum)
|
||||
.map(VecMap::as_slice)
|
||||
.unwrap_or(EMPTY_SLICE)
|
||||
}
|
||||
|
||||
/// Get a range of [`PageVersions`] in a block
|
||||
pub fn get_block_lsn_range<R: RangeBounds<Lsn>>(
|
||||
&self,
|
||||
blknum: u32,
|
||||
range: R,
|
||||
) -> &[(Lsn, PageVersion)] {
|
||||
self.0
|
||||
.get(&blknum)
|
||||
.map(|vec_map| vec_map.slice_range(range))
|
||||
.unwrap_or(EMPTY_SLICE)
|
||||
}
|
||||
|
||||
/// Split the page version map into two.
|
||||
///
|
||||
/// Left contains everything up to and not including [`cutoff_lsn`].
|
||||
/// Right contains [`cutoff_lsn`] and everything after.
|
||||
pub fn split_at(&self, cutoff_lsn: Lsn, after_oldest_lsn: &mut Accum<Lsn>) -> (Self, Self) {
|
||||
let mut before_blocks = HashMap::new();
|
||||
let mut after_blocks = HashMap::new();
|
||||
|
||||
for (blknum, vec_map) in self.0.iter() {
|
||||
let (before_versions, after_versions) = vec_map.split_at(&cutoff_lsn);
|
||||
|
||||
if !before_versions.is_empty() {
|
||||
let old = before_blocks.insert(*blknum, before_versions);
|
||||
assert!(old.is_none());
|
||||
}
|
||||
|
||||
if !after_versions.is_empty() {
|
||||
let (first_lsn, _first_pv) = &after_versions.as_slice()[0];
|
||||
after_oldest_lsn.accum(std::cmp::min, *first_lsn);
|
||||
|
||||
let old = after_blocks.insert(*blknum, after_versions);
|
||||
assert!(old.is_none());
|
||||
}
|
||||
}
|
||||
|
||||
(Self(before_blocks), Self(after_blocks))
|
||||
}
|
||||
|
||||
/// Iterate through block-history pairs in block order.
|
||||
/// If a [`cutoff_lsn`] is set, only include history with `lsn < cutoff_lsn`
|
||||
pub fn ordered_block_iter(&self, cutoff_lsn: Option<Lsn>) -> OrderedBlockIter<'_> {
|
||||
let mut ordered_blocks: Vec<u32> = self.0.keys().cloned().collect();
|
||||
ordered_blocks.sort_unstable();
|
||||
|
||||
OrderedBlockIter {
|
||||
page_versions: self,
|
||||
ordered_blocks,
|
||||
cur_block_idx: 0,
|
||||
cutoff_lsn,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct OrderedBlockIter<'a> {
|
||||
page_versions: &'a PageVersions,
|
||||
|
||||
ordered_blocks: Vec<u32>,
|
||||
cur_block_idx: usize,
|
||||
|
||||
cutoff_lsn: Option<Lsn>,
|
||||
}
|
||||
|
||||
impl<'a> Iterator for OrderedBlockIter<'a> {
|
||||
type Item = (u32, &'a [(Lsn, PageVersion)]);
|
||||
|
||||
fn next(&mut self) -> Option<Self::Item> {
|
||||
while self.cur_block_idx < self.ordered_blocks.len() {
|
||||
let blknum = self.ordered_blocks[self.cur_block_idx];
|
||||
self.cur_block_idx += 1;
|
||||
|
||||
if let Some(cutoff_lsn) = self.cutoff_lsn.as_ref() {
|
||||
let slice = self.page_versions.get_block_lsn_range(blknum, ..cutoff_lsn);
|
||||
if !slice.is_empty() {
|
||||
return Some((blknum, slice));
|
||||
}
|
||||
} else {
|
||||
return Some((blknum, self.page_versions.get_block_slice(blknum)));
|
||||
}
|
||||
}
|
||||
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
const EMPTY_PAGE_VERSION: PageVersion = PageVersion {
|
||||
page_image: None,
|
||||
record: None,
|
||||
};
|
||||
|
||||
#[test]
|
||||
fn test_ordered_iter() {
|
||||
let mut page_versions = PageVersions::default();
|
||||
const BLOCKS: u32 = 1000;
|
||||
const LSNS: u64 = 50;
|
||||
|
||||
for blknum in 0..BLOCKS {
|
||||
for lsn in 0..LSNS {
|
||||
let old = page_versions.append_or_update_last(blknum, Lsn(lsn), EMPTY_PAGE_VERSION);
|
||||
assert!(old.is_none());
|
||||
}
|
||||
}
|
||||
|
||||
let mut iter = page_versions.ordered_block_iter(None);
|
||||
for blknum in 0..BLOCKS {
|
||||
let (actual_blknum, slice) = iter.next().unwrap();
|
||||
assert_eq!(actual_blknum, blknum);
|
||||
assert_eq!(slice.len(), LSNS as usize);
|
||||
for lsn in 0..LSNS {
|
||||
assert_eq!(Lsn(lsn), slice[lsn as usize].0);
|
||||
}
|
||||
}
|
||||
assert!(iter.next().is_none());
|
||||
assert!(iter.next().is_none()); // should be robust against excessive next() calls
|
||||
|
||||
const CUTOFF_LSN: Lsn = Lsn(30);
|
||||
let mut iter = page_versions.ordered_block_iter(Some(CUTOFF_LSN));
|
||||
for blknum in 0..BLOCKS {
|
||||
let (actual_blknum, slice) = iter.next().unwrap();
|
||||
assert_eq!(actual_blknum, blknum);
|
||||
assert_eq!(slice.len(), CUTOFF_LSN.0 as usize);
|
||||
for lsn in 0..CUTOFF_LSN.0 {
|
||||
assert_eq!(Lsn(lsn), slice[lsn as usize].0);
|
||||
}
|
||||
}
|
||||
assert!(iter.next().is_none());
|
||||
assert!(iter.next().is_none()); // should be robust against excessive next() calls
|
||||
|
||||
let mut iter = page_versions.ordered_block_iter(Some(Lsn(0)));
|
||||
assert!(iter.next().is_none());
|
||||
assert!(iter.next().is_none()); // should be robust against excessive next() calls
|
||||
}
|
||||
}
|
||||
@@ -8,6 +8,9 @@ pub mod lsn;
|
||||
/// SeqWait allows waiting for a future sequence number to arrive
|
||||
pub mod seqwait;
|
||||
|
||||
/// append only ordered map implemented with a Vec
|
||||
pub mod vec_map;
|
||||
|
||||
// Async version of SeqWait. Currently unused.
|
||||
// pub mod seqwait_async;
|
||||
|
||||
|
||||
297
zenith_utils/src/vec_map.rs
Normal file
297
zenith_utils/src/vec_map.rs
Normal file
@@ -0,0 +1,297 @@
|
||||
use std::{cmp::Ordering, ops::RangeBounds};
|
||||
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
/// Ordered map datastructure implemented in a Vec.
|
||||
/// Append only - can only add keys that are larger than the
|
||||
/// current max key.
|
||||
#[derive(Clone, Debug, Serialize, Deserialize)]
|
||||
pub struct VecMap<K, V>(Vec<(K, V)>);
|
||||
|
||||
impl<K, V> Default for VecMap<K, V> {
|
||||
fn default() -> Self {
|
||||
VecMap(Default::default())
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct InvalidKey;
|
||||
|
||||
impl<K: Ord, V> VecMap<K, V> {
|
||||
pub fn is_empty(&self) -> bool {
|
||||
self.0.is_empty()
|
||||
}
|
||||
|
||||
pub fn as_slice(&self) -> &[(K, V)] {
|
||||
self.0.as_slice()
|
||||
}
|
||||
|
||||
/// This function may panic if given a range where the lower bound is
|
||||
/// greater than the upper bound.
|
||||
pub fn slice_range<R: RangeBounds<K>>(&self, range: R) -> &[(K, V)] {
|
||||
use std::ops::Bound::*;
|
||||
|
||||
let binary_search = |k: &K| self.0.binary_search_by_key(&k, extract_key);
|
||||
|
||||
let start_idx = match range.start_bound() {
|
||||
Unbounded => 0,
|
||||
Included(k) => binary_search(k).unwrap_or_else(std::convert::identity),
|
||||
Excluded(k) => match binary_search(k) {
|
||||
Ok(idx) => idx + 1,
|
||||
Err(idx) => idx,
|
||||
},
|
||||
};
|
||||
|
||||
let end_idx = match range.end_bound() {
|
||||
Unbounded => self.0.len(),
|
||||
Included(k) => match binary_search(k) {
|
||||
Ok(idx) => idx + 1,
|
||||
Err(idx) => idx,
|
||||
},
|
||||
Excluded(k) => binary_search(k).unwrap_or_else(std::convert::identity),
|
||||
};
|
||||
|
||||
&self.0[start_idx..end_idx]
|
||||
}
|
||||
|
||||
/// Add a key value pair to the map.
|
||||
/// If [`key`] is less than or equal to the current maximum key
|
||||
/// the pair will not be added and InvalidKey error will be returned.
|
||||
pub fn append(&mut self, key: K, value: V) -> Result<(), InvalidKey> {
|
||||
if let Some((last_key, _last_value)) = self.0.last() {
|
||||
if &key <= last_key {
|
||||
return Err(InvalidKey);
|
||||
}
|
||||
}
|
||||
|
||||
self.0.push((key, value));
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Update the maximum key value pair or add a new key value pair to the map.
|
||||
/// If [`key`] is less than the current maximum key no updates or additions
|
||||
/// will occur and InvalidKey error will be returned.
|
||||
pub fn append_or_update_last(&mut self, key: K, mut value: V) -> Result<Option<V>, InvalidKey> {
|
||||
if let Some((last_key, last_value)) = self.0.last_mut() {
|
||||
match key.cmp(last_key) {
|
||||
Ordering::Less => return Err(InvalidKey),
|
||||
Ordering::Equal => {
|
||||
std::mem::swap(last_value, &mut value);
|
||||
return Ok(Some(value));
|
||||
}
|
||||
Ordering::Greater => {}
|
||||
}
|
||||
}
|
||||
|
||||
self.0.push((key, value));
|
||||
Ok(None)
|
||||
}
|
||||
|
||||
/// Split the map into two.
|
||||
///
|
||||
/// The left map contains everything before [`cutoff`] (exclusive).
|
||||
/// Right map contains [`cutoff`] and everything after (inclusive).
|
||||
pub fn split_at(&self, cutoff: &K) -> (Self, Self)
|
||||
where
|
||||
K: Clone,
|
||||
V: Clone,
|
||||
{
|
||||
let split_idx = self
|
||||
.0
|
||||
.binary_search_by_key(&cutoff, extract_key)
|
||||
.unwrap_or_else(std::convert::identity);
|
||||
|
||||
(
|
||||
VecMap(self.0[..split_idx].to_vec()),
|
||||
VecMap(self.0[split_idx..].to_vec()),
|
||||
)
|
||||
}
|
||||
|
||||
/// Move items from [`other`] to the end of [`self`], leaving [`other`] empty.
|
||||
/// If any keys in [`other`] is less than or equal to any key in [`self`],
|
||||
/// [`InvalidKey`] error will be returned and no mutation will occur.
|
||||
pub fn extend(&mut self, other: &mut Self) -> Result<(), InvalidKey> {
|
||||
let self_last_opt = self.0.last().map(extract_key);
|
||||
let other_first_opt = other.0.last().map(extract_key);
|
||||
|
||||
if let (Some(self_last), Some(other_first)) = (self_last_opt, other_first_opt) {
|
||||
if self_last >= other_first {
|
||||
return Err(InvalidKey);
|
||||
}
|
||||
}
|
||||
|
||||
self.0.append(&mut other.0);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn reserve(&mut self, additional: usize) {
|
||||
self.0.reserve(additional);
|
||||
}
|
||||
}
|
||||
|
||||
fn extract_key<K, V>(entry: &(K, V)) -> &K {
|
||||
&entry.0
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::{collections::BTreeMap, ops::Bound};
|
||||
|
||||
use super::VecMap;
|
||||
|
||||
#[test]
|
||||
fn unbounded_range() {
|
||||
let mut vec = VecMap::default();
|
||||
vec.append(0, ()).unwrap();
|
||||
|
||||
assert_eq!(vec.slice_range(0..0), &[]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[should_panic]
|
||||
fn invalid_ordering_range() {
|
||||
let mut vec = VecMap::default();
|
||||
vec.append(0, ()).unwrap();
|
||||
|
||||
#[allow(clippy::reversed_empty_ranges)]
|
||||
vec.slice_range(1..0);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn range_tests() {
|
||||
let mut vec = VecMap::default();
|
||||
vec.append(0, ()).unwrap();
|
||||
vec.append(2, ()).unwrap();
|
||||
vec.append(4, ()).unwrap();
|
||||
|
||||
assert_eq!(vec.slice_range(0..0), &[]);
|
||||
assert_eq!(vec.slice_range(0..1), &[(0, ())]);
|
||||
assert_eq!(vec.slice_range(0..2), &[(0, ())]);
|
||||
assert_eq!(vec.slice_range(0..3), &[(0, ()), (2, ())]);
|
||||
|
||||
assert_eq!(vec.slice_range(..0), &[]);
|
||||
assert_eq!(vec.slice_range(..1), &[(0, ())]);
|
||||
|
||||
assert_eq!(vec.slice_range(..3), &[(0, ()), (2, ())]);
|
||||
assert_eq!(vec.slice_range(..3), &[(0, ()), (2, ())]);
|
||||
|
||||
assert_eq!(vec.slice_range(0..=0), &[(0, ())]);
|
||||
assert_eq!(vec.slice_range(0..=1), &[(0, ())]);
|
||||
assert_eq!(vec.slice_range(0..=2), &[(0, ()), (2, ())]);
|
||||
assert_eq!(vec.slice_range(0..=3), &[(0, ()), (2, ())]);
|
||||
|
||||
assert_eq!(vec.slice_range(..=0), &[(0, ())]);
|
||||
assert_eq!(vec.slice_range(..=1), &[(0, ())]);
|
||||
assert_eq!(vec.slice_range(..=2), &[(0, ()), (2, ())]);
|
||||
assert_eq!(vec.slice_range(..=3), &[(0, ()), (2, ())]);
|
||||
}
|
||||
|
||||
struct BoundIter {
|
||||
min: i32,
|
||||
max: i32,
|
||||
|
||||
next: Option<Bound<i32>>,
|
||||
}
|
||||
|
||||
impl BoundIter {
|
||||
fn new(min: i32, max: i32) -> Self {
|
||||
Self {
|
||||
min,
|
||||
max,
|
||||
|
||||
next: Some(Bound::Unbounded),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Iterator for BoundIter {
|
||||
type Item = Bound<i32>;
|
||||
|
||||
fn next(&mut self) -> Option<Self::Item> {
|
||||
let cur = self.next?;
|
||||
|
||||
self.next = match &cur {
|
||||
Bound::Unbounded => Some(Bound::Included(self.min)),
|
||||
Bound::Included(x) => {
|
||||
if *x >= self.max {
|
||||
Some(Bound::Excluded(self.min))
|
||||
} else {
|
||||
Some(Bound::Included(x + 1))
|
||||
}
|
||||
}
|
||||
Bound::Excluded(x) => {
|
||||
if *x >= self.max {
|
||||
None
|
||||
} else {
|
||||
Some(Bound::Excluded(x + 1))
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
Some(cur)
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn range_exhaustive() {
|
||||
let map: BTreeMap<i32, ()> = (1..=7).step_by(2).map(|x| (x, ())).collect();
|
||||
let mut vec = VecMap::default();
|
||||
for &key in map.keys() {
|
||||
vec.append(key, ()).unwrap();
|
||||
}
|
||||
|
||||
const RANGE_MIN: i32 = 0;
|
||||
const RANGE_MAX: i32 = 8;
|
||||
for lower_bound in BoundIter::new(RANGE_MIN, RANGE_MAX) {
|
||||
let ub_min = match lower_bound {
|
||||
Bound::Unbounded => RANGE_MIN,
|
||||
Bound::Included(x) => x,
|
||||
Bound::Excluded(x) => x + 1,
|
||||
};
|
||||
for upper_bound in BoundIter::new(ub_min, RANGE_MAX) {
|
||||
let map_range: Vec<(i32, ())> = map
|
||||
.range((lower_bound, upper_bound))
|
||||
.map(|(&x, _)| (x, ()))
|
||||
.collect();
|
||||
let vec_slice = vec.slice_range((lower_bound, upper_bound));
|
||||
|
||||
assert_eq!(map_range, vec_slice);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn extend() {
|
||||
let mut left = VecMap::default();
|
||||
left.append(0, ()).unwrap();
|
||||
assert_eq!(left.as_slice(), &[(0, ())]);
|
||||
|
||||
let mut empty = VecMap::default();
|
||||
left.extend(&mut empty).unwrap();
|
||||
assert_eq!(left.as_slice(), &[(0, ())]);
|
||||
assert_eq!(empty.as_slice(), &[]);
|
||||
|
||||
let mut right = VecMap::default();
|
||||
right.append(1, ()).unwrap();
|
||||
|
||||
left.extend(&mut right).unwrap();
|
||||
|
||||
assert_eq!(left.as_slice(), &[(0, ()), (1, ())]);
|
||||
assert_eq!(right.as_slice(), &[]);
|
||||
|
||||
let mut zero_map = VecMap::default();
|
||||
zero_map.append(0, ()).unwrap();
|
||||
|
||||
left.extend(&mut zero_map).unwrap_err();
|
||||
assert_eq!(left.as_slice(), &[(0, ()), (1, ())]);
|
||||
assert_eq!(zero_map.as_slice(), &[(0, ())]);
|
||||
|
||||
let mut one_map = VecMap::default();
|
||||
one_map.append(1, ()).unwrap();
|
||||
|
||||
left.extend(&mut one_map).unwrap_err();
|
||||
assert_eq!(left.as_slice(), &[(0, ()), (1, ())]);
|
||||
assert_eq!(one_map.as_slice(), &[(1, ())]);
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user