mirror of
https://github.com/neondatabase/neon.git
synced 2026-02-12 07:00:36 +00:00
Compare commits
10 Commits
conrad/pro
...
lsn-vec
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
3e1e934a21 | ||
|
|
2ccc8f3e36 | ||
|
|
77722b4c9d | ||
|
|
7ea2251f02 | ||
|
|
896a172d46 | ||
|
|
b7c3f02ed1 | ||
|
|
98f58fa2ab | ||
|
|
adf4ac0ef7 | ||
|
|
9dd1393f0e | ||
|
|
935984018d |
@@ -48,7 +48,6 @@ use crate::{ZTenantId, ZTimelineId};
|
|||||||
use anyhow::{bail, Result};
|
use anyhow::{bail, Result};
|
||||||
use log::*;
|
use log::*;
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
use std::collections::BTreeMap;
|
|
||||||
// avoid binding to Write (conflicts with std::io::Write)
|
// avoid binding to Write (conflicts with std::io::Write)
|
||||||
// while being able to use std::fmt::Write's methods
|
// while being able to use std::fmt::Write's methods
|
||||||
use std::fmt::Write as _;
|
use std::fmt::Write as _;
|
||||||
@@ -63,6 +62,7 @@ use bookfile::{Book, BookWriter};
|
|||||||
|
|
||||||
use zenith_utils::bin_ser::BeSer;
|
use zenith_utils::bin_ser::BeSer;
|
||||||
use zenith_utils::lsn::Lsn;
|
use zenith_utils::lsn::Lsn;
|
||||||
|
use zenith_utils::ordered_vec::OrderedVec;
|
||||||
|
|
||||||
use super::blob::{read_blob, BlobRange};
|
use super::blob::{read_blob, BlobRange};
|
||||||
|
|
||||||
@@ -144,10 +144,10 @@ pub struct DeltaLayerInner {
|
|||||||
|
|
||||||
/// All versions of all pages in the file are are kept here.
|
/// All versions of all pages in the file are are kept here.
|
||||||
/// Indexed by block number and LSN.
|
/// Indexed by block number and LSN.
|
||||||
page_version_metas: BTreeMap<(u32, Lsn), BlobRange>,
|
page_version_metas: OrderedVec<(u32, Lsn), BlobRange>,
|
||||||
|
|
||||||
/// `relsizes` tracks the size of the relation at different points in time.
|
/// `relsizes` tracks the size of the relation at different points in time.
|
||||||
relsizes: BTreeMap<Lsn, u32>,
|
relsizes: OrderedVec<Lsn, u32>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Layer for DeltaLayer {
|
impl Layer for DeltaLayer {
|
||||||
@@ -220,7 +220,8 @@ impl Layer for DeltaLayer {
|
|||||||
let maxkey = (blknum, lsn);
|
let maxkey = (blknum, lsn);
|
||||||
let mut iter = inner
|
let mut iter = inner
|
||||||
.page_version_metas
|
.page_version_metas
|
||||||
.range((Included(&minkey), Included(&maxkey)));
|
.range((Included(&minkey), Included(&maxkey)))
|
||||||
|
.iter();
|
||||||
while let Some(((_blknum, _entry_lsn), blob_range)) = iter.next_back() {
|
while let Some(((_blknum, _entry_lsn), blob_range)) = iter.next_back() {
|
||||||
let pv = PageVersion::des(&read_blob(&page_version_reader, blob_range)?)?;
|
let pv = PageVersion::des(&read_blob(&page_version_reader, blob_range)?)?;
|
||||||
|
|
||||||
@@ -268,10 +269,10 @@ impl Layer for DeltaLayer {
|
|||||||
|
|
||||||
// Scan the BTreeMap backwards, starting from the given entry.
|
// Scan the BTreeMap backwards, starting from the given entry.
|
||||||
let inner = self.load()?;
|
let inner = self.load()?;
|
||||||
let mut iter = inner.relsizes.range((Included(&Lsn(0)), Included(&lsn)));
|
let slice = inner.relsizes.range((Included(&Lsn(0)), Included(&lsn)));
|
||||||
|
|
||||||
let result;
|
let result;
|
||||||
if let Some((_entry_lsn, entry)) = iter.next_back() {
|
if let Some((_entry_lsn, entry)) = slice.last() {
|
||||||
result = *entry;
|
result = *entry;
|
||||||
// Use the base image if needed
|
// Use the base image if needed
|
||||||
} else if let Some(predecessor) = &self.predecessor {
|
} else if let Some(predecessor) = &self.predecessor {
|
||||||
@@ -299,8 +300,8 @@ impl Layer for DeltaLayer {
|
|||||||
///
|
///
|
||||||
fn unload(&self) -> Result<()> {
|
fn unload(&self) -> Result<()> {
|
||||||
let mut inner = self.inner.lock().unwrap();
|
let mut inner = self.inner.lock().unwrap();
|
||||||
inner.page_version_metas = BTreeMap::new();
|
inner.page_version_metas = OrderedVec::default();
|
||||||
inner.relsizes = BTreeMap::new();
|
inner.relsizes = OrderedVec::default();
|
||||||
inner.loaded = false;
|
inner.loaded = false;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
@@ -390,8 +391,8 @@ impl DeltaLayer {
|
|||||||
end_lsn: Lsn,
|
end_lsn: Lsn,
|
||||||
dropped: bool,
|
dropped: bool,
|
||||||
predecessor: Option<Arc<dyn Layer>>,
|
predecessor: Option<Arc<dyn Layer>>,
|
||||||
page_versions: impl Iterator<Item = (&'a (u32, Lsn), &'a PageVersion)>,
|
page_versions: impl Iterator<Item = (u32, Lsn, &'a PageVersion)>,
|
||||||
relsizes: BTreeMap<Lsn, u32>,
|
relsizes: OrderedVec<Lsn, u32>,
|
||||||
) -> Result<DeltaLayer> {
|
) -> Result<DeltaLayer> {
|
||||||
let delta_layer = DeltaLayer {
|
let delta_layer = DeltaLayer {
|
||||||
path_or_conf: PathOrConf::Conf(conf),
|
path_or_conf: PathOrConf::Conf(conf),
|
||||||
@@ -403,7 +404,7 @@ impl DeltaLayer {
|
|||||||
dropped,
|
dropped,
|
||||||
inner: Mutex::new(DeltaLayerInner {
|
inner: Mutex::new(DeltaLayerInner {
|
||||||
loaded: true,
|
loaded: true,
|
||||||
page_version_metas: BTreeMap::new(),
|
page_version_metas: OrderedVec::default(), // TODO create with a size estimate
|
||||||
relsizes,
|
relsizes,
|
||||||
}),
|
}),
|
||||||
predecessor,
|
predecessor,
|
||||||
@@ -423,26 +424,24 @@ impl DeltaLayer {
|
|||||||
|
|
||||||
let mut page_version_writer = BlobWriter::new(book, PAGE_VERSIONS_CHAPTER);
|
let mut page_version_writer = BlobWriter::new(book, PAGE_VERSIONS_CHAPTER);
|
||||||
|
|
||||||
for (key, page_version) in page_versions {
|
for (blknum, lsn, page_version) in page_versions {
|
||||||
let buf = PageVersion::ser(page_version)?;
|
let buf = PageVersion::ser(page_version)?;
|
||||||
let blob_range = page_version_writer.write_blob(&buf)?;
|
let blob_range = page_version_writer.write_blob(&buf)?;
|
||||||
|
|
||||||
let old = inner.page_version_metas.insert(*key, blob_range);
|
inner.page_version_metas.append((blknum, lsn), blob_range);
|
||||||
|
|
||||||
assert!(old.is_none());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
let book = page_version_writer.close()?;
|
let book = page_version_writer.close()?;
|
||||||
|
|
||||||
// Write out page versions
|
// Write out page versions
|
||||||
let mut chapter = book.new_chapter(PAGE_VERSION_METAS_CHAPTER);
|
let mut chapter = book.new_chapter(PAGE_VERSION_METAS_CHAPTER);
|
||||||
let buf = BTreeMap::ser(&inner.page_version_metas)?;
|
let buf = OrderedVec::ser(&inner.page_version_metas)?;
|
||||||
chapter.write_all(&buf)?;
|
chapter.write_all(&buf)?;
|
||||||
let book = chapter.close()?;
|
let book = chapter.close()?;
|
||||||
|
|
||||||
// and relsizes to separate chapter
|
// and relsizes to separate chapter
|
||||||
let mut chapter = book.new_chapter(REL_SIZES_CHAPTER);
|
let mut chapter = book.new_chapter(REL_SIZES_CHAPTER);
|
||||||
let buf = BTreeMap::ser(&inner.relsizes)?;
|
let buf = OrderedVec::ser(&inner.relsizes)?;
|
||||||
chapter.write_all(&buf)?;
|
chapter.write_all(&buf)?;
|
||||||
let book = chapter.close()?;
|
let book = chapter.close()?;
|
||||||
|
|
||||||
@@ -529,10 +528,10 @@ impl DeltaLayer {
|
|||||||
}
|
}
|
||||||
|
|
||||||
let chapter = book.read_chapter(PAGE_VERSION_METAS_CHAPTER)?;
|
let chapter = book.read_chapter(PAGE_VERSION_METAS_CHAPTER)?;
|
||||||
let page_version_metas = BTreeMap::des(&chapter)?;
|
let page_version_metas = OrderedVec::des(&chapter)?;
|
||||||
|
|
||||||
let chapter = book.read_chapter(REL_SIZES_CHAPTER)?;
|
let chapter = book.read_chapter(REL_SIZES_CHAPTER)?;
|
||||||
let relsizes = BTreeMap::des(&chapter)?;
|
let relsizes = OrderedVec::des(&chapter)?;
|
||||||
|
|
||||||
debug!("loaded from {}", &path.display());
|
debug!("loaded from {}", &path.display());
|
||||||
|
|
||||||
@@ -563,8 +562,8 @@ impl DeltaLayer {
|
|||||||
dropped: filename.dropped,
|
dropped: filename.dropped,
|
||||||
inner: Mutex::new(DeltaLayerInner {
|
inner: Mutex::new(DeltaLayerInner {
|
||||||
loaded: false,
|
loaded: false,
|
||||||
page_version_metas: BTreeMap::new(),
|
page_version_metas: OrderedVec::default(),
|
||||||
relsizes: BTreeMap::new(),
|
relsizes: OrderedVec::default(),
|
||||||
}),
|
}),
|
||||||
predecessor,
|
predecessor,
|
||||||
}
|
}
|
||||||
@@ -587,8 +586,8 @@ impl DeltaLayer {
|
|||||||
dropped: summary.dropped,
|
dropped: summary.dropped,
|
||||||
inner: Mutex::new(DeltaLayerInner {
|
inner: Mutex::new(DeltaLayerInner {
|
||||||
loaded: false,
|
loaded: false,
|
||||||
page_version_metas: BTreeMap::new(),
|
page_version_metas: OrderedVec::default(),
|
||||||
relsizes: BTreeMap::new(),
|
relsizes: OrderedVec::default(),
|
||||||
}),
|
}),
|
||||||
predecessor: None,
|
predecessor: None,
|
||||||
})
|
})
|
||||||
|
|||||||
@@ -16,10 +16,12 @@ use anyhow::{bail, Result};
|
|||||||
use bytes::Bytes;
|
use bytes::Bytes;
|
||||||
use log::*;
|
use log::*;
|
||||||
use std::cmp::min;
|
use std::cmp::min;
|
||||||
use std::collections::BTreeMap;
|
use std::collections::HashMap;
|
||||||
use std::ops::Bound::Included;
|
use std::ops::Bound::Included;
|
||||||
|
use std::ops::RangeBounds;
|
||||||
use std::path::PathBuf;
|
use std::path::PathBuf;
|
||||||
use std::sync::{Arc, RwLock};
|
use std::sync::{Arc, RwLock};
|
||||||
|
use zenith_utils::ordered_vec::OrderedVec;
|
||||||
|
|
||||||
use zenith_utils::accum::Accum;
|
use zenith_utils::accum::Accum;
|
||||||
use zenith_utils::lsn::Lsn;
|
use zenith_utils::lsn::Lsn;
|
||||||
@@ -47,6 +49,83 @@ pub struct InMemoryLayer {
|
|||||||
inner: RwLock<InMemoryLayerInner>,
|
inner: RwLock<InMemoryLayerInner>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Default)]
|
||||||
|
struct PageVersions(HashMap<u32, OrderedVec<Lsn, PageVersion>>);
|
||||||
|
|
||||||
|
impl PageVersions {
|
||||||
|
fn range<R: RangeBounds<Lsn>>(&self, blknum: u32, lsn_range: R) -> &[(Lsn, PageVersion)] {
|
||||||
|
match self.0.get(&blknum) {
|
||||||
|
Some(ov) => ov.range(lsn_range),
|
||||||
|
None => &[],
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn update(&mut self, blknum: u32, lsn: Lsn, page_version: PageVersion) {
|
||||||
|
let ordered_vec = self.0.entry(blknum).or_insert_with(OrderedVec::default);
|
||||||
|
ordered_vec.append_update(lsn, page_version);
|
||||||
|
}
|
||||||
|
|
||||||
|
fn ordered_iter(&self, cutoff_lsn: Option<Lsn>) -> OrderedBlockIter {
|
||||||
|
let mut block_numbers: Vec<u32> = self.0.keys().cloned().collect();
|
||||||
|
// TODO consider counting sort given the small size of the key space
|
||||||
|
block_numbers.sort_unstable();
|
||||||
|
|
||||||
|
let cur_idx = 0;
|
||||||
|
let ordered_vec_iter = block_numbers
|
||||||
|
.get(cur_idx)
|
||||||
|
.map(|blk_num| self.0.get(blk_num).unwrap().iter())
|
||||||
|
.unwrap_or_else(|| [].iter());
|
||||||
|
|
||||||
|
OrderedBlockIter {
|
||||||
|
page_versions: self,
|
||||||
|
cutoff_lsn,
|
||||||
|
|
||||||
|
block_numbers,
|
||||||
|
cur_idx,
|
||||||
|
|
||||||
|
ordered_vec_iter,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn is_empty(&self) -> bool {
|
||||||
|
self.0.is_empty()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
struct OrderedBlockIter<'a> {
|
||||||
|
page_versions: &'a PageVersions,
|
||||||
|
cutoff_lsn: Option<Lsn>,
|
||||||
|
|
||||||
|
block_numbers: Vec<u32>,
|
||||||
|
cur_idx: usize,
|
||||||
|
|
||||||
|
ordered_vec_iter: std::slice::Iter<'a, (Lsn, PageVersion)>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'a> Iterator for OrderedBlockIter<'a> {
|
||||||
|
type Item = (u32, Lsn, &'a PageVersion);
|
||||||
|
|
||||||
|
fn next(&mut self) -> Option<Self::Item> {
|
||||||
|
loop {
|
||||||
|
if let Some((lsn, page_version)) = self.ordered_vec_iter.next() {
|
||||||
|
if self
|
||||||
|
.cutoff_lsn
|
||||||
|
.as_ref()
|
||||||
|
.map(|cutoff| lsn < cutoff)
|
||||||
|
.unwrap_or(true)
|
||||||
|
{
|
||||||
|
let blk_num = self.block_numbers[self.cur_idx];
|
||||||
|
return Some((blk_num, *lsn, page_version));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
let blk_num = self.block_numbers.get(self.cur_idx + 1)?;
|
||||||
|
self.cur_idx += 1;
|
||||||
|
self.ordered_vec_iter = self.page_versions.0.get(blk_num).unwrap().iter();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
pub struct InMemoryLayerInner {
|
pub struct InMemoryLayerInner {
|
||||||
/// If this relation was dropped, remember when that happened.
|
/// If this relation was dropped, remember when that happened.
|
||||||
drop_lsn: Option<Lsn>,
|
drop_lsn: Option<Lsn>,
|
||||||
@@ -55,12 +134,12 @@ pub struct InMemoryLayerInner {
|
|||||||
/// All versions of all pages in the layer are are kept here.
|
/// All versions of all pages in the layer are are kept here.
|
||||||
/// Indexed by block number and LSN.
|
/// Indexed by block number and LSN.
|
||||||
///
|
///
|
||||||
page_versions: BTreeMap<(u32, Lsn), PageVersion>,
|
page_versions: PageVersions,
|
||||||
|
|
||||||
///
|
///
|
||||||
/// `segsizes` tracks the size of the segment at different points in time.
|
/// `segsizes` tracks the size of the segment at different points in time.
|
||||||
///
|
///
|
||||||
segsizes: BTreeMap<Lsn, u32>,
|
segsizes: OrderedVec<Lsn, u32>,
|
||||||
|
|
||||||
/// Writes are only allowed when true.
|
/// Writes are only allowed when true.
|
||||||
/// Set to false when this layer is in the process of being replaced.
|
/// Set to false when this layer is in the process of being replaced.
|
||||||
@@ -81,9 +160,9 @@ impl InMemoryLayerInner {
|
|||||||
|
|
||||||
fn get_seg_size(&self, lsn: Lsn) -> u32 {
|
fn get_seg_size(&self, lsn: Lsn) -> u32 {
|
||||||
// Scan the BTreeMap backwards, starting from the given entry.
|
// Scan the BTreeMap backwards, starting from the given entry.
|
||||||
let mut iter = self.segsizes.range((Included(&Lsn(0)), Included(&lsn)));
|
let slice = self.segsizes.range((Included(&Lsn(0)), Included(&lsn)));
|
||||||
|
|
||||||
if let Some((_entry_lsn, entry)) = iter.next_back() {
|
if let Some((_entry_lsn, entry)) = slice.last() {
|
||||||
*entry
|
*entry
|
||||||
} else {
|
} else {
|
||||||
0
|
0
|
||||||
@@ -170,12 +249,8 @@ impl Layer for InMemoryLayer {
|
|||||||
let inner = self.inner.read().unwrap();
|
let inner = self.inner.read().unwrap();
|
||||||
|
|
||||||
// Scan the BTreeMap backwards, starting from reconstruct_data.lsn.
|
// Scan the BTreeMap backwards, starting from reconstruct_data.lsn.
|
||||||
let minkey = (blknum, Lsn(0));
|
let mut iter = inner.page_versions.range(blknum, ..=lsn).iter();
|
||||||
let maxkey = (blknum, lsn);
|
while let Some((_entry_lsn, entry)) = iter.next_back() {
|
||||||
let mut iter = inner
|
|
||||||
.page_versions
|
|
||||||
.range((Included(&minkey), Included(&maxkey)));
|
|
||||||
while let Some(((_blknum, _entry_lsn), entry)) = iter.next_back() {
|
|
||||||
if let Some(img) = &entry.page_image {
|
if let Some(img) = &entry.page_image {
|
||||||
reconstruct_data.page_img = Some(img.clone());
|
reconstruct_data.page_img = Some(img.clone());
|
||||||
need_image = false;
|
need_image = false;
|
||||||
@@ -275,11 +350,11 @@ impl Layer for InMemoryLayer {
|
|||||||
println!("segsizes {}: {}", k, v);
|
println!("segsizes {}: {}", k, v);
|
||||||
}
|
}
|
||||||
|
|
||||||
for (k, v) in inner.page_versions.iter() {
|
for (blknum, lsn, v) in inner.page_versions.ordered_iter(None) {
|
||||||
println!(
|
println!(
|
||||||
"blk {} at {}: {}/{}\n",
|
"blk {} at {}: {}/{}\n",
|
||||||
k.0,
|
blknum,
|
||||||
k.1,
|
lsn,
|
||||||
v.page_image.is_some(),
|
v.page_image.is_some(),
|
||||||
v.record.is_some()
|
v.record.is_some()
|
||||||
);
|
);
|
||||||
@@ -342,8 +417,8 @@ impl InMemoryLayer {
|
|||||||
oldest_pending_lsn,
|
oldest_pending_lsn,
|
||||||
inner: RwLock::new(InMemoryLayerInner {
|
inner: RwLock::new(InMemoryLayerInner {
|
||||||
drop_lsn: None,
|
drop_lsn: None,
|
||||||
page_versions: BTreeMap::new(),
|
page_versions: PageVersions::default(),
|
||||||
segsizes: BTreeMap::new(),
|
segsizes: OrderedVec::default(),
|
||||||
writeable: true,
|
writeable: true,
|
||||||
predecessor: None,
|
predecessor: None,
|
||||||
}),
|
}),
|
||||||
@@ -393,8 +468,9 @@ impl InMemoryLayer {
|
|||||||
|
|
||||||
inner.check_writeable()?;
|
inner.check_writeable()?;
|
||||||
|
|
||||||
let old = inner.page_versions.insert((blknum, lsn), pv);
|
inner.page_versions.update(blknum, lsn, pv);
|
||||||
|
|
||||||
|
/*
|
||||||
if old.is_some() {
|
if old.is_some() {
|
||||||
// We already had an entry for this LSN. That's odd..
|
// We already had an entry for this LSN. That's odd..
|
||||||
warn!(
|
warn!(
|
||||||
@@ -402,6 +478,7 @@ impl InMemoryLayer {
|
|||||||
self.seg.rel, blknum, lsn
|
self.seg.rel, blknum, lsn
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
*/
|
||||||
|
|
||||||
// Also update the relation size, if this extended the relation.
|
// Also update the relation size, if this extended the relation.
|
||||||
if self.seg.rel.is_blocky() {
|
if self.seg.rel.is_blocky() {
|
||||||
@@ -438,18 +515,20 @@ impl InMemoryLayer {
|
|||||||
gapblknum,
|
gapblknum,
|
||||||
blknum
|
blknum
|
||||||
);
|
);
|
||||||
let old = inner.page_versions.insert((gapblknum, lsn), zeropv);
|
inner.page_versions.update(gapblknum, lsn, zeropv);
|
||||||
// We already had an entry for this LSN. That's odd..
|
// We already had an entry for this LSN. That's odd..
|
||||||
|
|
||||||
|
/*
|
||||||
if old.is_some() {
|
if old.is_some() {
|
||||||
warn!(
|
warn!(
|
||||||
"Page version of rel {} blk {} at {} already exists",
|
"Page version of rel {} blk {} at {} already exists",
|
||||||
self.seg.rel, blknum, lsn
|
self.seg.rel, blknum, lsn
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
*/
|
||||||
}
|
}
|
||||||
|
|
||||||
inner.segsizes.insert(lsn, newsize);
|
inner.segsizes.append_update(lsn, newsize);
|
||||||
return Ok(newsize - oldsize);
|
return Ok(newsize - oldsize);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -467,12 +546,7 @@ impl InMemoryLayer {
|
|||||||
let oldsize = inner.get_seg_size(lsn);
|
let oldsize = inner.get_seg_size(lsn);
|
||||||
assert!(segsize < oldsize);
|
assert!(segsize < oldsize);
|
||||||
|
|
||||||
let old = inner.segsizes.insert(lsn, segsize);
|
inner.segsizes.append_update(lsn, segsize);
|
||||||
|
|
||||||
if old.is_some() {
|
|
||||||
// We already had an entry for this LSN. That's odd..
|
|
||||||
warn!("Inserting truncation, but had an entry for the LSN already");
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
@@ -519,10 +593,10 @@ impl InMemoryLayer {
|
|||||||
);
|
);
|
||||||
|
|
||||||
// For convenience, copy the segment size from the predecessor layer
|
// For convenience, copy the segment size from the predecessor layer
|
||||||
let mut segsizes = BTreeMap::new();
|
let mut segsizes = OrderedVec::default();
|
||||||
if seg.rel.is_blocky() {
|
if seg.rel.is_blocky() {
|
||||||
let size = src.get_seg_size(start_lsn)?;
|
let size = src.get_seg_size(start_lsn)?;
|
||||||
segsizes.insert(start_lsn, size);
|
segsizes.append(start_lsn, size);
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(InMemoryLayer {
|
Ok(InMemoryLayer {
|
||||||
@@ -535,7 +609,7 @@ impl InMemoryLayer {
|
|||||||
oldest_pending_lsn,
|
oldest_pending_lsn,
|
||||||
inner: RwLock::new(InMemoryLayerInner {
|
inner: RwLock::new(InMemoryLayerInner {
|
||||||
drop_lsn: None,
|
drop_lsn: None,
|
||||||
page_versions: BTreeMap::new(),
|
page_versions: PageVersions::default(),
|
||||||
segsizes,
|
segsizes,
|
||||||
writeable: true,
|
writeable: true,
|
||||||
predecessor: Some(src),
|
predecessor: Some(src),
|
||||||
@@ -589,26 +663,27 @@ impl InMemoryLayer {
|
|||||||
|
|
||||||
// Divide all the page versions into old and new
|
// Divide all the page versions into old and new
|
||||||
// at the 'cutoff_lsn' point.
|
// at the 'cutoff_lsn' point.
|
||||||
let mut before_segsizes = BTreeMap::new();
|
let (before_segsizes, after_segsizes) = inner.segsizes.copy_split(&Lsn(cutoff_lsn.0 + 1));
|
||||||
let mut after_segsizes = BTreeMap::new();
|
|
||||||
let mut after_oldest_lsn: Accum<Lsn> = Accum(None);
|
|
||||||
for (lsn, size) in inner.segsizes.iter() {
|
|
||||||
if *lsn > cutoff_lsn {
|
|
||||||
after_segsizes.insert(*lsn, *size);
|
|
||||||
after_oldest_lsn.accum(min, *lsn);
|
|
||||||
} else {
|
|
||||||
before_segsizes.insert(*lsn, *size);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
let mut before_page_versions = BTreeMap::new();
|
// The iterator is in Lsn order, so the first element will have the smallest Lsn
|
||||||
let mut after_page_versions = BTreeMap::new();
|
let mut after_oldest_lsn: Accum<Lsn> =
|
||||||
for ((blknum, lsn), pv) in inner.page_versions.iter() {
|
Accum(after_segsizes.iter().next().map(|(lsn, _size)| *lsn));
|
||||||
if *lsn > cutoff_lsn {
|
|
||||||
after_page_versions.insert((*blknum, *lsn), pv.clone());
|
let mut before_page_versions = PageVersions::default();
|
||||||
|
let mut after_page_versions = PageVersions::default();
|
||||||
|
for (blknum, ordered_vec) in inner.page_versions.0.iter() {
|
||||||
|
let (before_ov, after_ov) = ordered_vec.copy_split(&Lsn(cutoff_lsn.0 + 1));
|
||||||
|
|
||||||
|
if let Some((lsn, _pv)) = after_ov.iter().next() {
|
||||||
after_oldest_lsn.accum(min, *lsn);
|
after_oldest_lsn.accum(min, *lsn);
|
||||||
} else {
|
}
|
||||||
before_page_versions.insert((*blknum, *lsn), pv.clone());
|
|
||||||
|
if !before_ov.is_empty() {
|
||||||
|
before_page_versions.0.insert(*blknum, before_ov);
|
||||||
|
}
|
||||||
|
|
||||||
|
if !after_ov.is_empty() {
|
||||||
|
after_page_versions.0.insert(*blknum, after_ov);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -640,8 +715,8 @@ impl InMemoryLayer {
|
|||||||
)?;
|
)?;
|
||||||
|
|
||||||
let new_inner = new_open.inner.get_mut().unwrap();
|
let new_inner = new_open.inner.get_mut().unwrap();
|
||||||
new_inner.page_versions.append(&mut after_page_versions);
|
new_inner.page_versions = after_page_versions;
|
||||||
new_inner.segsizes.append(&mut after_segsizes);
|
new_inner.segsizes.extend(after_segsizes);
|
||||||
|
|
||||||
Some(Arc::new(new_open))
|
Some(Arc::new(new_open))
|
||||||
} else {
|
} else {
|
||||||
@@ -691,7 +766,7 @@ impl InMemoryLayer {
|
|||||||
drop_lsn,
|
drop_lsn,
|
||||||
true,
|
true,
|
||||||
predecessor,
|
predecessor,
|
||||||
inner.page_versions.iter(),
|
inner.page_versions.ordered_iter(None),
|
||||||
inner.segsizes.clone(),
|
inner.segsizes.clone(),
|
||||||
)?;
|
)?;
|
||||||
trace!(
|
trace!(
|
||||||
@@ -705,17 +780,8 @@ impl InMemoryLayer {
|
|||||||
|
|
||||||
let end_lsn = self.end_lsn.unwrap();
|
let end_lsn = self.end_lsn.unwrap();
|
||||||
|
|
||||||
let mut before_segsizes = BTreeMap::new();
|
let before_segsizes = inner.segsizes.copy_prefix(&Lsn(end_lsn.0 + 1));
|
||||||
for (lsn, size) in inner.segsizes.iter() {
|
let mut before_page_versions = inner.page_versions.ordered_iter(Some(end_lsn));
|
||||||
if *lsn <= end_lsn {
|
|
||||||
before_segsizes.insert(*lsn, *size);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
let mut before_page_versions = inner.page_versions.iter().filter(|tup| {
|
|
||||||
let ((_blknum, lsn), _pv) = tup;
|
|
||||||
|
|
||||||
*lsn < end_lsn
|
|
||||||
});
|
|
||||||
|
|
||||||
let mut frozen_layers: Vec<Arc<dyn Layer>> = Vec::new();
|
let mut frozen_layers: Vec<Arc<dyn Layer>> = Vec::new();
|
||||||
|
|
||||||
|
|||||||
@@ -8,6 +8,8 @@ pub mod lsn;
|
|||||||
/// SeqWait allows waiting for a future sequence number to arrive
|
/// SeqWait allows waiting for a future sequence number to arrive
|
||||||
pub mod seqwait;
|
pub mod seqwait;
|
||||||
|
|
||||||
|
pub mod ordered_vec;
|
||||||
|
|
||||||
// Async version of SeqWait. Currently unused.
|
// Async version of SeqWait. Currently unused.
|
||||||
// pub mod seqwait_async;
|
// pub mod seqwait_async;
|
||||||
|
|
||||||
|
|||||||
273
zenith_utils/src/ordered_vec.rs
Normal file
273
zenith_utils/src/ordered_vec.rs
Normal file
@@ -0,0 +1,273 @@
|
|||||||
|
use std::{
|
||||||
|
collections::BTreeMap,
|
||||||
|
ops::{Bound, RangeBounds},
|
||||||
|
};
|
||||||
|
|
||||||
|
use serde::{Deserialize, Serialize};
|
||||||
|
|
||||||
|
#[derive(Clone, Debug, Serialize, Deserialize)]
|
||||||
|
pub struct OrderedVec<K, V>(Vec<(K, V)>);
|
||||||
|
|
||||||
|
impl<K, V> Default for OrderedVec<K, V> {
|
||||||
|
fn default() -> Self {
|
||||||
|
Self(Default::default())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<K: Ord + Copy, V> OrderedVec<K, V> {
|
||||||
|
pub fn iter(&self) -> std::slice::Iter<'_, (K, V)> {
|
||||||
|
self.0.iter()
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn range<R: RangeBounds<K>>(&self, range: R) -> &[(K, V)] {
|
||||||
|
match (range.start_bound(), range.end_bound()) {
|
||||||
|
(Bound::Excluded(l), Bound::Excluded(u)) if l == u => panic!("Invalid excluded"),
|
||||||
|
// TODO check for l <= x with or patterns
|
||||||
|
_ => {}
|
||||||
|
}
|
||||||
|
|
||||||
|
let start_idx = match range.start_bound() {
|
||||||
|
Bound::Included(key) => match self.0.binary_search_by_key(key, extract_key) {
|
||||||
|
Ok(idx) => idx,
|
||||||
|
Err(idx) => idx,
|
||||||
|
},
|
||||||
|
Bound::Excluded(key) => match self.0.binary_search_by_key(key, extract_key) {
|
||||||
|
Ok(idx) => idx + 1,
|
||||||
|
Err(idx) => idx,
|
||||||
|
},
|
||||||
|
Bound::Unbounded => 0,
|
||||||
|
};
|
||||||
|
|
||||||
|
let end_idx = match range.end_bound() {
|
||||||
|
Bound::Included(key) => match self.0.binary_search_by_key(key, extract_key) {
|
||||||
|
Ok(idx) => idx + 1,
|
||||||
|
Err(idx) => idx,
|
||||||
|
},
|
||||||
|
Bound::Excluded(key) => match self.0.binary_search_by_key(key, extract_key) {
|
||||||
|
Ok(idx) => idx,
|
||||||
|
Err(idx) => idx,
|
||||||
|
},
|
||||||
|
Bound::Unbounded => self.0.len(),
|
||||||
|
};
|
||||||
|
|
||||||
|
&self.0[start_idx..end_idx]
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn append(&mut self, key: K, value: V) {
|
||||||
|
if let Some((last_key, _last_value)) = self.0.last() {
|
||||||
|
debug_assert!(last_key < &key);
|
||||||
|
}
|
||||||
|
|
||||||
|
self.0.push((key, value));
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn append_update(&mut self, key: K, value: V) {
|
||||||
|
if let Some((last_key, this_value)) = self.0.last_mut() {
|
||||||
|
use std::cmp::Ordering;
|
||||||
|
match (*last_key).cmp(&key) {
|
||||||
|
Ordering::Less => {}
|
||||||
|
Ordering::Equal => {
|
||||||
|
*this_value = value;
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
Ordering::Greater => {
|
||||||
|
panic!();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
self.0.push((key, value));
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn extend(&mut self, other: OrderedVec<K, V>) {
|
||||||
|
if let (Some((last, _)), Some((first, _))) = (self.0.last(), other.0.first()) {
|
||||||
|
assert!(last < first);
|
||||||
|
}
|
||||||
|
|
||||||
|
self.0.extend(other.0);
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn is_empty(&self) -> bool {
|
||||||
|
self.0.is_empty()
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn copy_prefix(&self, key: &K) -> Self
|
||||||
|
where
|
||||||
|
K: Clone,
|
||||||
|
V: Clone,
|
||||||
|
{
|
||||||
|
let idx = match self.0.binary_search_by_key(key, extract_key) {
|
||||||
|
Ok(idx) => idx,
|
||||||
|
Err(idx) => idx,
|
||||||
|
};
|
||||||
|
|
||||||
|
OrderedVec(Vec::from(&self.0[..idx]))
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn copy_split(&self, key: &K) -> (Self, Self)
|
||||||
|
where
|
||||||
|
K: Clone,
|
||||||
|
V: Clone,
|
||||||
|
{
|
||||||
|
let idx = match self.0.binary_search_by_key(key, extract_key) {
|
||||||
|
Ok(idx) => idx,
|
||||||
|
Err(idx) => idx,
|
||||||
|
};
|
||||||
|
|
||||||
|
(
|
||||||
|
OrderedVec(Vec::from(&self.0[..idx])),
|
||||||
|
OrderedVec(Vec::from(&self.0[idx..])),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<K: Ord, V> From<BTreeMap<K, V>> for OrderedVec<K, V> {
|
||||||
|
fn from(map: BTreeMap<K, V>) -> Self {
|
||||||
|
let vec: Vec<(K, V)> = map.into_iter().collect();
|
||||||
|
|
||||||
|
// TODO probably change this
|
||||||
|
for windows in vec.windows(2) {
|
||||||
|
let (k1, _data1) = &windows[0];
|
||||||
|
let (k2, _data2) = &windows[1];
|
||||||
|
debug_assert!(k1 < k2);
|
||||||
|
}
|
||||||
|
|
||||||
|
OrderedVec(vec)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn extract_key<K: Copy, V>(pair: &(K, V)) -> K {
|
||||||
|
pair.0
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use std::{
|
||||||
|
collections::BTreeMap,
|
||||||
|
ops::{Bound, RangeBounds},
|
||||||
|
};
|
||||||
|
|
||||||
|
use super::OrderedVec;
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
#[should_panic]
|
||||||
|
fn invalid_range() {
|
||||||
|
let mut map = BTreeMap::new();
|
||||||
|
map.insert(0, ());
|
||||||
|
|
||||||
|
let vec: OrderedVec<i32, ()> = OrderedVec::from(map);
|
||||||
|
struct InvalidRange;
|
||||||
|
impl RangeBounds<i32> for InvalidRange {
|
||||||
|
fn start_bound(&self) -> Bound<&i32> {
|
||||||
|
Bound::Excluded(&0)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn end_bound(&self) -> Bound<&i32> {
|
||||||
|
Bound::Excluded(&0)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
vec.range(InvalidRange);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn range_tests() {
|
||||||
|
let mut map = BTreeMap::new();
|
||||||
|
map.insert(0, ());
|
||||||
|
map.insert(2, ());
|
||||||
|
map.insert(4, ());
|
||||||
|
let vec = OrderedVec::from(map);
|
||||||
|
|
||||||
|
assert_eq!(vec.range(0..0), &[]);
|
||||||
|
assert_eq!(vec.range(0..1), &[(0, ())]);
|
||||||
|
assert_eq!(vec.range(0..2), &[(0, ())]);
|
||||||
|
assert_eq!(vec.range(0..3), &[(0, ()), (2, ())]);
|
||||||
|
|
||||||
|
assert_eq!(vec.range(..0), &[]);
|
||||||
|
assert_eq!(vec.range(..1), &[(0, ())]);
|
||||||
|
|
||||||
|
assert_eq!(vec.range(..3), &[(0, ()), (2, ())]);
|
||||||
|
assert_eq!(vec.range(..3), &[(0, ()), (2, ())]);
|
||||||
|
|
||||||
|
assert_eq!(vec.range(0..=0), &[(0, ())]);
|
||||||
|
assert_eq!(vec.range(0..=1), &[(0, ())]);
|
||||||
|
assert_eq!(vec.range(0..=2), &[(0, ()), (2, ())]);
|
||||||
|
assert_eq!(vec.range(0..=3), &[(0, ()), (2, ())]);
|
||||||
|
|
||||||
|
assert_eq!(vec.range(..=0), &[(0, ())]);
|
||||||
|
assert_eq!(vec.range(..=1), &[(0, ())]);
|
||||||
|
assert_eq!(vec.range(..=2), &[(0, ()), (2, ())]);
|
||||||
|
assert_eq!(vec.range(..=3), &[(0, ()), (2, ())]);
|
||||||
|
}
|
||||||
|
|
||||||
|
struct BoundIter {
|
||||||
|
min: i32,
|
||||||
|
max: i32,
|
||||||
|
|
||||||
|
next: Option<Bound<i32>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl BoundIter {
|
||||||
|
fn new(min: i32, max: i32) -> Self {
|
||||||
|
Self {
|
||||||
|
min,
|
||||||
|
max,
|
||||||
|
|
||||||
|
next: Some(Bound::Unbounded),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Iterator for BoundIter {
|
||||||
|
type Item = Bound<i32>;
|
||||||
|
|
||||||
|
fn next(&mut self) -> Option<Self::Item> {
|
||||||
|
let cur = self.next?;
|
||||||
|
|
||||||
|
self.next = match &cur {
|
||||||
|
Bound::Unbounded => Some(Bound::Included(self.min)),
|
||||||
|
Bound::Included(x) => {
|
||||||
|
if *x >= self.max {
|
||||||
|
Some(Bound::Excluded(self.min))
|
||||||
|
} else {
|
||||||
|
Some(Bound::Included(x + 1))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Bound::Excluded(x) => {
|
||||||
|
if *x >= self.max {
|
||||||
|
None
|
||||||
|
} else {
|
||||||
|
Some(Bound::Excluded(x + 1))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
Some(cur)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn range_exhaustive() {
|
||||||
|
let map: BTreeMap<i32, ()> = (1..=7).step_by(2).map(|x| (x, ())).collect();
|
||||||
|
let vec = OrderedVec::from(map.clone());
|
||||||
|
|
||||||
|
const RANGE_MIN: i32 = 0;
|
||||||
|
const RANGE_MAX: i32 = 8;
|
||||||
|
for lower_bound in BoundIter::new(RANGE_MIN, RANGE_MAX) {
|
||||||
|
let ub_min = match lower_bound {
|
||||||
|
Bound::Unbounded => RANGE_MIN,
|
||||||
|
Bound::Included(x) => x,
|
||||||
|
Bound::Excluded(x) => x + 1,
|
||||||
|
};
|
||||||
|
for upper_bound in BoundIter::new(ub_min, RANGE_MAX) {
|
||||||
|
let map_range: Vec<(i32, ())> = map
|
||||||
|
.range((lower_bound, upper_bound))
|
||||||
|
.map(|(&x, _)| (x, ()))
|
||||||
|
.collect();
|
||||||
|
let vec_slice = vec.range((lower_bound, upper_bound));
|
||||||
|
|
||||||
|
assert_eq!(map_range, vec_slice);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user