page versions

This commit is contained in:
Patrick Insinger
2021-09-28 23:45:20 -07:00
parent 98f58fa2ab
commit b7c3f02ed1
2 changed files with 87 additions and 28 deletions

View File

@@ -391,7 +391,7 @@ impl DeltaLayer {
end_lsn: Lsn,
dropped: bool,
predecessor: Option<Arc<dyn Layer>>,
page_versions: impl Iterator<Item = (&'a (u32, Lsn), &'a PageVersion)>,
page_versions: impl Iterator<Item = (&'a u32, &'a Lsn, &'a PageVersion)>,
relsizes: OrderedVec<Lsn, u32>,
) -> Result<DeltaLayer> {
let delta_layer = DeltaLayer {
@@ -424,11 +424,11 @@ impl DeltaLayer {
let mut page_version_writer = BlobWriter::new(book, PAGE_VERSIONS_CHAPTER);
for (key, page_version) in page_versions {
for (blknum, lsn, page_version) in page_versions {
let buf = PageVersion::ser(page_version)?;
let blob_range = page_version_writer.write_blob(&buf)?;
inner.page_version_metas.append(*key, blob_range);
inner.page_version_metas.append((*blknum, *lsn), blob_range);
}
let book = page_version_writer.close()?;

View File

@@ -18,6 +18,7 @@ use log::*;
use std::cmp::min;
use std::collections::BTreeMap;
use std::ops::Bound::Included;
use std::ops::RangeBounds;
use std::path::PathBuf;
use std::sync::{Arc, RwLock};
use zenith_utils::ordered_vec::OrderedVec;
@@ -48,6 +49,62 @@ pub struct InMemoryLayer {
inner: RwLock<InMemoryLayerInner>,
}
#[derive(Default)]
struct PageVersions(BTreeMap<u32, OrderedVec<Lsn, PageVersion>>);
impl PageVersions {
fn range<R: RangeBounds<Lsn>>(&self, blknum: u32, lsn_range: R) -> &[(Lsn, PageVersion)] {
match self.0.get(&blknum) {
Some(ov) => ov.range(lsn_range),
None => &[],
}
}
fn update(&mut self, blknum: u32, lsn: Lsn, page_version: PageVersion) {
let ordered_vec = self.0.entry(blknum).or_insert_with(OrderedVec::default);
ordered_vec.append_update(lsn, page_version);
}
fn iter(&self) -> PageVersionIter {
let mut map_iter = self.0.iter();
let map_entry = map_iter
.next()
.map(|(blknum, ordered_vec)| (blknum, ordered_vec.iter()));
PageVersionIter {
map_iter,
map_entry,
}
}
fn is_empty(&self) -> bool {
self.0.is_empty()
}
}
struct PageVersionIter<'a> {
map_iter: std::collections::btree_map::Iter<'a, u32, OrderedVec<Lsn, PageVersion>>,
map_entry: Option<(&'a u32, std::slice::Iter<'a, (Lsn, PageVersion)>)>,
}
impl<'a> Iterator for PageVersionIter<'a> {
type Item = (&'a u32, &'a Lsn, &'a PageVersion);
fn next(&mut self) -> Option<Self::Item> {
loop {
let (blknum, iter) = self.map_entry.as_mut()?;
if let Some((lsn, pv)) = iter.next() {
return Some((blknum, lsn, pv));
}
self.map_entry = self
.map_iter
.next()
.map(|(blknum, ordered_vec)| (blknum, ordered_vec.iter()));
}
}
}
pub struct InMemoryLayerInner {
/// If this relation was dropped, remember when that happened.
drop_lsn: Option<Lsn>,
@@ -56,7 +113,7 @@ pub struct InMemoryLayerInner {
/// All versions of all pages in the layer are are kept here.
/// Indexed by block number and LSN.
///
page_versions: BTreeMap<(u32, Lsn), PageVersion>,
page_versions: PageVersions,
///
/// `segsizes` tracks the size of the segment at different points in time.
@@ -171,12 +228,8 @@ impl Layer for InMemoryLayer {
let inner = self.inner.read().unwrap();
// Scan the BTreeMap backwards, starting from reconstruct_data.lsn.
let minkey = (blknum, Lsn(0));
let maxkey = (blknum, lsn);
let mut iter = inner
.page_versions
.range((Included(&minkey), Included(&maxkey)));
while let Some(((_blknum, _entry_lsn), entry)) = iter.next_back() {
let mut iter = inner.page_versions.range(blknum, ..=lsn).iter();
while let Some((_entry_lsn, entry)) = iter.next_back() {
if let Some(img) = &entry.page_image {
reconstruct_data.page_img = Some(img.clone());
need_image = false;
@@ -276,11 +329,11 @@ impl Layer for InMemoryLayer {
println!("segsizes {}: {}", k, v);
}
for (k, v) in inner.page_versions.iter() {
for (blknum, lsn, v) in inner.page_versions.iter() {
println!(
"blk {} at {}: {}/{}\n",
k.0,
k.1,
blknum,
lsn,
v.page_image.is_some(),
v.record.is_some()
);
@@ -343,7 +396,7 @@ impl InMemoryLayer {
oldest_pending_lsn,
inner: RwLock::new(InMemoryLayerInner {
drop_lsn: None,
page_versions: BTreeMap::new(),
page_versions: PageVersions::default(),
segsizes: OrderedVec::default(),
writeable: true,
predecessor: None,
@@ -394,8 +447,9 @@ impl InMemoryLayer {
inner.check_writeable()?;
let old = inner.page_versions.insert((blknum, lsn), pv);
inner.page_versions.update(blknum, lsn, pv);
/*
if old.is_some() {
// We already had an entry for this LSN. That's odd..
warn!(
@@ -403,6 +457,7 @@ impl InMemoryLayer {
self.seg.rel, blknum, lsn
);
}
*/
// Also update the relation size, if this extended the relation.
if self.seg.rel.is_blocky() {
@@ -439,15 +494,17 @@ impl InMemoryLayer {
gapblknum,
blknum
);
let old = inner.page_versions.insert((gapblknum, lsn), zeropv);
inner.page_versions.update(gapblknum, lsn, zeropv);
// We already had an entry for this LSN. That's odd..
/*
if old.is_some() {
warn!(
"Page version of rel {} blk {} at {} already exists",
self.seg.rel, blknum, lsn
);
}
*/
}
inner.segsizes.append_update(lsn, newsize);
@@ -531,7 +588,7 @@ impl InMemoryLayer {
oldest_pending_lsn,
inner: RwLock::new(InMemoryLayerInner {
drop_lsn: None,
page_versions: BTreeMap::new(),
page_versions: PageVersions::default(),
segsizes,
writeable: true,
predecessor: Some(src),
@@ -598,14 +655,16 @@ impl InMemoryLayer {
}
}
let mut before_page_versions = BTreeMap::new();
let mut after_page_versions = BTreeMap::new();
for ((blknum, lsn), pv) in inner.page_versions.iter() {
if *lsn > cutoff_lsn {
after_page_versions.insert((*blknum, *lsn), pv.clone());
after_oldest_lsn.accum(min, *lsn);
} else {
before_page_versions.insert((*blknum, *lsn), pv.clone());
let mut before_page_versions = PageVersions::default();
let mut after_page_versions = PageVersions::default();
for (blknum, ordered_vec) in inner.page_versions.0.iter() {
for (lsn, pv) in ordered_vec.iter() {
if *lsn > cutoff_lsn {
after_page_versions.update(*blknum, *lsn, pv.clone());
after_oldest_lsn.accum(min, *lsn);
} else {
before_page_versions.update(*blknum, *lsn, pv.clone());
}
}
}
@@ -637,7 +696,7 @@ impl InMemoryLayer {
)?;
let new_inner = new_open.inner.get_mut().unwrap();
new_inner.page_versions.append(&mut after_page_versions);
new_inner.page_versions = after_page_versions;
new_inner.segsizes.extend(after_segsizes);
Some(Arc::new(new_open))
@@ -708,8 +767,8 @@ impl InMemoryLayer {
before_segsizes.append(*lsn, *size);
}
}
let mut before_page_versions = inner.page_versions.iter().filter(|tup| {
let ((_blknum, lsn), _pv) = tup;
let mut before_page_versions = inner.page_versions.iter().filter(|&tup| {
let (_blknum, lsn, _pv) = tup;
*lsn < end_lsn
});