mirror of
https://github.com/neondatabase/neon.git
synced 2026-02-01 17:50:38 +00:00
Compare commits
1 Commits
persistent
...
ordered-bl
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
b3de51296a |
@@ -65,6 +65,7 @@ use zenith_utils::bin_ser::BeSer;
|
|||||||
use zenith_utils::lsn::Lsn;
|
use zenith_utils::lsn::Lsn;
|
||||||
|
|
||||||
use super::blob::{read_blob, BlobRange};
|
use super::blob::{read_blob, BlobRange};
|
||||||
|
use super::page_versions::OrderedBlockIter;
|
||||||
|
|
||||||
// Magic constant to identify a Zenith delta file
|
// Magic constant to identify a Zenith delta file
|
||||||
pub const DELTA_FILE_MAGIC: u32 = 0x5A616E01;
|
pub const DELTA_FILE_MAGIC: u32 = 0x5A616E01;
|
||||||
@@ -374,7 +375,7 @@ impl DeltaLayer {
|
|||||||
/// data structure with two btreemaps as we do, so passing the btreemaps is currently
|
/// data structure with two btreemaps as we do, so passing the btreemaps is currently
|
||||||
/// expedient.
|
/// expedient.
|
||||||
#[allow(clippy::too_many_arguments)]
|
#[allow(clippy::too_many_arguments)]
|
||||||
pub fn create<'a>(
|
pub fn create(
|
||||||
conf: &'static PageServerConf,
|
conf: &'static PageServerConf,
|
||||||
timelineid: ZTimelineId,
|
timelineid: ZTimelineId,
|
||||||
tenantid: ZTenantId,
|
tenantid: ZTenantId,
|
||||||
@@ -382,7 +383,7 @@ impl DeltaLayer {
|
|||||||
start_lsn: Lsn,
|
start_lsn: Lsn,
|
||||||
end_lsn: Lsn,
|
end_lsn: Lsn,
|
||||||
dropped: bool,
|
dropped: bool,
|
||||||
page_versions: impl Iterator<Item = (u32, Lsn, &'a PageVersion)>,
|
page_versions: OrderedBlockIter,
|
||||||
relsizes: VecMap<Lsn, u32>,
|
relsizes: VecMap<Lsn, u32>,
|
||||||
) -> Result<DeltaLayer> {
|
) -> Result<DeltaLayer> {
|
||||||
if seg.rel.is_blocky() {
|
if seg.rel.is_blocky() {
|
||||||
@@ -418,14 +419,20 @@ impl DeltaLayer {
|
|||||||
|
|
||||||
let mut page_version_writer = BlobWriter::new(book, PAGE_VERSIONS_CHAPTER);
|
let mut page_version_writer = BlobWriter::new(book, PAGE_VERSIONS_CHAPTER);
|
||||||
|
|
||||||
for (blknum, lsn, page_version) in page_versions {
|
for (blknum, history) in page_versions {
|
||||||
let buf = PageVersion::ser(page_version)?;
|
for (lsn, page_version) in history.as_slice() {
|
||||||
let blob_range = page_version_writer.write_blob(&buf)?;
|
if lsn >= &end_lsn {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
inner
|
let buf = PageVersion::ser(page_version)?;
|
||||||
.page_version_metas
|
let blob_range = page_version_writer.write_blob(&buf)?;
|
||||||
.append((blknum, lsn), blob_range)
|
|
||||||
.unwrap();
|
inner
|
||||||
|
.page_version_metas
|
||||||
|
.append((blknum, *lsn), blob_range)
|
||||||
|
.unwrap();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
let book = page_version_writer.close()?;
|
let book = page_version_writer.close()?;
|
||||||
|
|||||||
@@ -280,14 +280,16 @@ impl Layer for InMemoryLayer {
|
|||||||
println!("segsizes {}: {}", k, v);
|
println!("segsizes {}: {}", k, v);
|
||||||
}
|
}
|
||||||
|
|
||||||
for (blknum, lsn, pv) in inner.page_versions.ordered_page_version_iter(None) {
|
for (blknum, history) in inner.page_versions.ordered_block_iter() {
|
||||||
println!(
|
for (lsn, pv) in history.as_slice() {
|
||||||
"blk {} at {}: {}/{}\n",
|
println!(
|
||||||
blknum,
|
"blk {} at {}: {}/{}\n",
|
||||||
lsn,
|
blknum,
|
||||||
pv.page_image.is_some(),
|
lsn,
|
||||||
pv.record.is_some()
|
pv.page_image.is_some(),
|
||||||
);
|
pv.record.is_some()
|
||||||
|
);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
@@ -698,7 +700,7 @@ impl InMemoryLayer {
|
|||||||
self.start_lsn,
|
self.start_lsn,
|
||||||
drop_lsn,
|
drop_lsn,
|
||||||
true,
|
true,
|
||||||
inner.page_versions.ordered_page_version_iter(None),
|
inner.page_versions.ordered_block_iter(),
|
||||||
inner.segsizes.clone(),
|
inner.segsizes.clone(),
|
||||||
)?;
|
)?;
|
||||||
trace!(
|
trace!(
|
||||||
@@ -712,8 +714,6 @@ impl InMemoryLayer {
|
|||||||
|
|
||||||
let end_lsn = self.end_lsn.unwrap();
|
let end_lsn = self.end_lsn.unwrap();
|
||||||
|
|
||||||
let mut before_page_versions = inner.page_versions.ordered_page_version_iter(Some(end_lsn));
|
|
||||||
|
|
||||||
let mut frozen_layers: Vec<Arc<dyn Layer>> = Vec::new();
|
let mut frozen_layers: Vec<Arc<dyn Layer>> = Vec::new();
|
||||||
|
|
||||||
if self.start_lsn != end_lsn {
|
if self.start_lsn != end_lsn {
|
||||||
@@ -728,7 +728,7 @@ impl InMemoryLayer {
|
|||||||
self.start_lsn,
|
self.start_lsn,
|
||||||
end_lsn,
|
end_lsn,
|
||||||
false,
|
false,
|
||||||
before_page_versions,
|
inner.page_versions.ordered_block_iter(),
|
||||||
before_segsizes,
|
before_segsizes,
|
||||||
)?;
|
)?;
|
||||||
frozen_layers.push(Arc::new(delta_layer));
|
frozen_layers.push(Arc::new(delta_layer));
|
||||||
@@ -739,7 +739,10 @@ impl InMemoryLayer {
|
|||||||
end_lsn
|
end_lsn
|
||||||
);
|
);
|
||||||
} else {
|
} else {
|
||||||
assert!(before_page_versions.next().is_none());
|
for (_blknum, history) in inner.page_versions.ordered_block_iter() {
|
||||||
|
let (lsn, _pv) = history.as_slice().first().unwrap();
|
||||||
|
assert!(lsn >= &end_lsn);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
drop(inner);
|
drop(inner);
|
||||||
|
|||||||
@@ -1,4 +1,4 @@
|
|||||||
use std::{collections::HashMap, ops::RangeBounds, slice};
|
use std::{collections::HashMap, ops::RangeBounds};
|
||||||
|
|
||||||
use zenith_utils::{accum::Accum, lsn::Lsn, vec_map::VecMap};
|
use zenith_utils::{accum::Accum, lsn::Lsn, vec_map::VecMap};
|
||||||
|
|
||||||
@@ -24,14 +24,6 @@ impl PageVersions {
|
|||||||
map.append_or_update_last(lsn, page_version).unwrap()
|
map.append_or_update_last(lsn, page_version).unwrap()
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Get all [`PageVersion`]s in a block
|
|
||||||
pub fn get_block_slice(&self, blknum: u32) -> &[(Lsn, PageVersion)] {
|
|
||||||
self.0
|
|
||||||
.get(&blknum)
|
|
||||||
.map(VecMap::as_slice)
|
|
||||||
.unwrap_or(EMPTY_SLICE)
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Get a range of [`PageVersions`] in a block
|
/// Get a range of [`PageVersions`] in a block
|
||||||
pub fn get_block_lsn_range<R: RangeBounds<Lsn>>(
|
pub fn get_block_lsn_range<R: RangeBounds<Lsn>>(
|
||||||
&self,
|
&self,
|
||||||
@@ -72,65 +64,33 @@ impl PageVersions {
|
|||||||
(Self(before_blocks), Self(after_blocks))
|
(Self(before_blocks), Self(after_blocks))
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Iterate through [`PageVersion`]s in (block, lsn) order.
|
/// Iterate through block-history pairs in block order.
|
||||||
/// If a [`cutoff_lsn`] is set, only show versions with `lsn < cutoff_lsn`
|
pub fn ordered_block_iter(&self) -> OrderedBlockIter<'_> {
|
||||||
pub fn ordered_page_version_iter(&self, cutoff_lsn: Option<Lsn>) -> OrderedPageVersionIter<'_> {
|
|
||||||
let mut ordered_blocks: Vec<u32> = self.0.keys().cloned().collect();
|
let mut ordered_blocks: Vec<u32> = self.0.keys().cloned().collect();
|
||||||
ordered_blocks.sort_unstable();
|
ordered_blocks.sort_unstable();
|
||||||
|
|
||||||
let slice = ordered_blocks
|
OrderedBlockIter {
|
||||||
.first()
|
|
||||||
.map(|&blknum| self.get_block_slice(blknum))
|
|
||||||
.unwrap_or(EMPTY_SLICE);
|
|
||||||
|
|
||||||
OrderedPageVersionIter {
|
|
||||||
page_versions: self,
|
page_versions: self,
|
||||||
ordered_blocks,
|
ordered_blocks,
|
||||||
cur_block_idx: 0,
|
cur_block_idx: 0,
|
||||||
cutoff_lsn,
|
|
||||||
cur_slice_iter: slice.iter(),
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct OrderedPageVersionIter<'a> {
|
pub struct OrderedBlockIter<'a> {
|
||||||
page_versions: &'a PageVersions,
|
page_versions: &'a PageVersions,
|
||||||
|
|
||||||
ordered_blocks: Vec<u32>,
|
ordered_blocks: Vec<u32>,
|
||||||
cur_block_idx: usize,
|
cur_block_idx: usize,
|
||||||
|
|
||||||
cutoff_lsn: Option<Lsn>,
|
|
||||||
|
|
||||||
cur_slice_iter: slice::Iter<'a, (Lsn, PageVersion)>,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl OrderedPageVersionIter<'_> {
|
impl<'a> Iterator for OrderedBlockIter<'a> {
|
||||||
fn is_lsn_before_cutoff(&self, lsn: &Lsn) -> bool {
|
type Item = (u32, &'a VecMap<Lsn, PageVersion>);
|
||||||
if let Some(cutoff_lsn) = self.cutoff_lsn.as_ref() {
|
|
||||||
lsn < cutoff_lsn
|
|
||||||
} else {
|
|
||||||
true
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<'a> Iterator for OrderedPageVersionIter<'a> {
|
|
||||||
type Item = (u32, Lsn, &'a PageVersion);
|
|
||||||
|
|
||||||
fn next(&mut self) -> Option<Self::Item> {
|
fn next(&mut self) -> Option<Self::Item> {
|
||||||
loop {
|
let blknum: u32 = *self.ordered_blocks.get(self.cur_block_idx)?;
|
||||||
if let Some((lsn, page_version)) = self.cur_slice_iter.next() {
|
self.cur_block_idx += 1;
|
||||||
if self.is_lsn_before_cutoff(lsn) {
|
Some((blknum, self.page_versions.0.get(&blknum).unwrap()))
|
||||||
let blknum = self.ordered_blocks[self.cur_block_idx];
|
|
||||||
return Some((blknum, *lsn, page_version));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
let next_block_idx = self.cur_block_idx + 1;
|
|
||||||
let blknum: u32 = *self.ordered_blocks.get(next_block_idx)?;
|
|
||||||
self.cur_block_idx = next_block_idx;
|
|
||||||
self.cur_slice_iter = self.page_versions.get_block_slice(blknum).iter();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -156,24 +116,14 @@ mod tests {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
let mut iter = page_versions.ordered_page_version_iter(None);
|
let mut iter = page_versions.ordered_block_iter();
|
||||||
for blknum in 0..BLOCKS {
|
for blknum in 0..BLOCKS {
|
||||||
|
let (actual_blknum, vec_map) = iter.next().unwrap();
|
||||||
|
let slice = vec_map.as_slice();
|
||||||
|
assert_eq!(actual_blknum, blknum);
|
||||||
|
assert_eq!(slice.len(), LSNS as usize);
|
||||||
for lsn in 0..LSNS {
|
for lsn in 0..LSNS {
|
||||||
let (actual_blknum, actual_lsn, _pv) = iter.next().unwrap();
|
assert_eq!(Lsn(lsn), slice[lsn as usize].0);
|
||||||
assert_eq!(actual_blknum, blknum);
|
|
||||||
assert_eq!(Lsn(lsn), actual_lsn);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
assert!(iter.next().is_none());
|
|
||||||
assert!(iter.next().is_none()); // should be robust against excessive next() calls
|
|
||||||
|
|
||||||
const CUTOFF_LSN: Lsn = Lsn(30);
|
|
||||||
let mut iter = page_versions.ordered_page_version_iter(Some(CUTOFF_LSN));
|
|
||||||
for blknum in 0..BLOCKS {
|
|
||||||
for lsn in 0..CUTOFF_LSN.0 {
|
|
||||||
let (actual_blknum, actual_lsn, _pv) = iter.next().unwrap();
|
|
||||||
assert_eq!(actual_blknum, blknum);
|
|
||||||
assert_eq!(Lsn(lsn), actual_lsn);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
assert!(iter.next().is_none());
|
assert!(iter.next().is_none());
|
||||||
|
|||||||
Reference in New Issue
Block a user