mirror of
https://github.com/neondatabase/neon.git
synced 2026-03-04 08:50:38 +00:00
Compare commits
2 Commits
hack/compu
...
vec-heap
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
2975c26de7 | ||
|
|
5f4f7d9762 |
@@ -350,7 +350,7 @@ impl DeltaLayer {
|
|||||||
/// data structure with two btreemaps as we do, so passing the btreemaps is currently
|
/// data structure with two btreemaps as we do, so passing the btreemaps is currently
|
||||||
/// expedient.
|
/// expedient.
|
||||||
#[allow(clippy::too_many_arguments)]
|
#[allow(clippy::too_many_arguments)]
|
||||||
pub fn create<'a>(
|
pub fn create(
|
||||||
conf: &'static PageServerConf,
|
conf: &'static PageServerConf,
|
||||||
timelineid: ZTimelineId,
|
timelineid: ZTimelineId,
|
||||||
tenantid: ZTenantId,
|
tenantid: ZTenantId,
|
||||||
@@ -358,7 +358,7 @@ impl DeltaLayer {
|
|||||||
start_lsn: Lsn,
|
start_lsn: Lsn,
|
||||||
end_lsn: Lsn,
|
end_lsn: Lsn,
|
||||||
dropped: bool,
|
dropped: bool,
|
||||||
page_versions: impl Iterator<Item = (u32, Lsn, &'a PageVersion)>,
|
page_versions: impl Iterator<Item = (u32, Lsn, PageVersion)>,
|
||||||
relsizes: VecMap<Lsn, u32>,
|
relsizes: VecMap<Lsn, u32>,
|
||||||
) -> Result<DeltaLayer> {
|
) -> Result<DeltaLayer> {
|
||||||
if seg.rel.is_blocky() {
|
if seg.rel.is_blocky() {
|
||||||
@@ -393,7 +393,8 @@ impl DeltaLayer {
|
|||||||
let mut page_version_writer = BlobWriter::new(book, PAGE_VERSIONS_CHAPTER);
|
let mut page_version_writer = BlobWriter::new(book, PAGE_VERSIONS_CHAPTER);
|
||||||
|
|
||||||
for (blknum, lsn, page_version) in page_versions {
|
for (blknum, lsn, page_version) in page_versions {
|
||||||
let buf = PageVersion::ser(page_version)?;
|
// TODO avoid deserializing and then reserializing
|
||||||
|
let buf = PageVersion::ser(&page_version)?;
|
||||||
let blob_range = page_version_writer.write_blob(&buf)?;
|
let blob_range = page_version_writer.write_blob(&buf)?;
|
||||||
|
|
||||||
inner
|
inner
|
||||||
|
|||||||
@@ -157,8 +157,7 @@ impl Layer for InMemoryLayer {
|
|||||||
// Scan the page versions backwards, starting from `lsn`.
|
// Scan the page versions backwards, starting from `lsn`.
|
||||||
let iter = inner
|
let iter = inner
|
||||||
.page_versions
|
.page_versions
|
||||||
.get_block_lsn_range(blknum, ..=lsn)
|
.iter_block_lsn_range(blknum, ..=lsn)
|
||||||
.iter()
|
|
||||||
.rev();
|
.rev();
|
||||||
for (_entry_lsn, entry) in iter {
|
for (_entry_lsn, entry) in iter {
|
||||||
if let Some(img) = &entry.page_image {
|
if let Some(img) = &entry.page_image {
|
||||||
|
|||||||
@@ -1,13 +1,20 @@
|
|||||||
use std::{collections::HashMap, ops::RangeBounds, slice};
|
use std::{
|
||||||
|
collections::HashMap,
|
||||||
|
ops::{Range, RangeBounds},
|
||||||
|
slice,
|
||||||
|
};
|
||||||
|
|
||||||
use zenith_utils::{lsn::Lsn, vec_map::VecMap};
|
use zenith_utils::{bin_ser::LeSer, lsn::Lsn, vec_map::VecMap};
|
||||||
|
|
||||||
use super::storage_layer::PageVersion;
|
use super::storage_layer::PageVersion;
|
||||||
|
|
||||||
const EMPTY_SLICE: &[(Lsn, PageVersion)] = &[];
|
const EMPTY_SLICE: &[(Lsn, Range<usize>)] = &[];
|
||||||
|
|
||||||
#[derive(Debug, Default)]
|
#[derive(Debug, Default)]
|
||||||
pub struct PageVersions(HashMap<u32, VecMap<Lsn, PageVersion>>);
|
pub struct PageVersions {
|
||||||
|
heap: Vec<u8>,
|
||||||
|
ranges: HashMap<u32, VecMap<Lsn, Range<usize>>>,
|
||||||
|
}
|
||||||
|
|
||||||
impl PageVersions {
|
impl PageVersions {
|
||||||
pub fn append_or_update_last(
|
pub fn append_or_update_last(
|
||||||
@@ -16,51 +23,124 @@ impl PageVersions {
|
|||||||
lsn: Lsn,
|
lsn: Lsn,
|
||||||
page_version: PageVersion,
|
page_version: PageVersion,
|
||||||
) -> Option<PageVersion> {
|
) -> Option<PageVersion> {
|
||||||
let map = self.0.entry(blknum).or_insert_with(VecMap::default);
|
let mut new_bytes = PageVersion::ser(&page_version).unwrap();
|
||||||
map.append_or_update_last(lsn, page_version).unwrap()
|
|
||||||
|
let map = self.ranges.entry(blknum).or_insert_with(VecMap::default);
|
||||||
|
|
||||||
|
if let Some((last_lsn, last_range)) = map.as_slice().last() {
|
||||||
|
if lsn == *last_lsn {
|
||||||
|
let old_bytes = &self.heap[last_range.clone()];
|
||||||
|
if old_bytes == new_bytes {
|
||||||
|
return Some(page_version);
|
||||||
|
}
|
||||||
|
// TODO optimize for case when old_bytes.len() >= new_bytes.len()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
let new_range = self.heap.len()..self.heap.len() + new_bytes.len();
|
||||||
|
self.heap.append(&mut new_bytes);
|
||||||
|
map.append_or_update_last(lsn, new_range)
|
||||||
|
.unwrap()
|
||||||
|
.map(|old_range| {
|
||||||
|
let old_bytes = &self.heap[old_range];
|
||||||
|
PageVersion::des(old_bytes).unwrap()
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Get all [`PageVersion`]s in a block
|
/// Get all [`PageVersion`]s in a block
|
||||||
pub fn get_block_slice(&self, blknum: u32) -> &[(Lsn, PageVersion)] {
|
pub fn iter_block(&self, blknum: u32) -> BlockVersionIter<'_> {
|
||||||
self.0
|
let range_iter = self
|
||||||
|
.ranges
|
||||||
.get(&blknum)
|
.get(&blknum)
|
||||||
.map(VecMap::as_slice)
|
.map(VecMap::as_slice)
|
||||||
.unwrap_or(EMPTY_SLICE)
|
.unwrap_or(EMPTY_SLICE)
|
||||||
|
.iter();
|
||||||
|
|
||||||
|
BlockVersionIter {
|
||||||
|
heap: &self.heap,
|
||||||
|
range_iter,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Get a range of [`PageVersions`] in a block
|
/// Get a range of [`PageVersions`] in a block
|
||||||
pub fn get_block_lsn_range<R: RangeBounds<Lsn>>(
|
pub fn iter_block_lsn_range<R: RangeBounds<Lsn>>(
|
||||||
&self,
|
&self,
|
||||||
blknum: u32,
|
blknum: u32,
|
||||||
range: R,
|
range: R,
|
||||||
) -> &[(Lsn, PageVersion)] {
|
) -> BlockVersionIter<'_> {
|
||||||
self.0
|
let range_iter = self
|
||||||
|
.ranges
|
||||||
.get(&blknum)
|
.get(&blknum)
|
||||||
.map(|vec_map| vec_map.slice_range(range))
|
.map(|vec_map| vec_map.slice_range(range))
|
||||||
.unwrap_or(EMPTY_SLICE)
|
.unwrap_or(EMPTY_SLICE)
|
||||||
|
.iter();
|
||||||
|
|
||||||
|
BlockVersionIter {
|
||||||
|
heap: &self.heap,
|
||||||
|
range_iter,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Iterate through [`PageVersion`]s in (block, lsn) order.
|
/// Iterate through [`PageVersion`]s in (block, lsn) order.
|
||||||
/// If a [`cutoff_lsn`] is set, only show versions with `lsn < cutoff_lsn`
|
/// If a [`cutoff_lsn`] is set, only show versions with `lsn < cutoff_lsn`
|
||||||
pub fn ordered_page_version_iter(&self, cutoff_lsn: Option<Lsn>) -> OrderedPageVersionIter<'_> {
|
pub fn ordered_page_version_iter(&self, cutoff_lsn: Option<Lsn>) -> OrderedPageVersionIter<'_> {
|
||||||
let mut ordered_blocks: Vec<u32> = self.0.keys().cloned().collect();
|
let mut ordered_blocks: Vec<u32> = self.ranges.keys().cloned().collect();
|
||||||
ordered_blocks.sort_unstable();
|
ordered_blocks.sort_unstable();
|
||||||
|
|
||||||
let slice = ordered_blocks
|
let cur_block_iter = ordered_blocks
|
||||||
.first()
|
.first()
|
||||||
.map(|&blknum| self.get_block_slice(blknum))
|
.map(|&blknum| self.iter_block(blknum))
|
||||||
.unwrap_or(EMPTY_SLICE);
|
.unwrap_or_else(|| {
|
||||||
|
let empty_iter = EMPTY_SLICE.iter();
|
||||||
|
BlockVersionIter {
|
||||||
|
heap: &self.heap,
|
||||||
|
range_iter: empty_iter,
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
OrderedPageVersionIter {
|
OrderedPageVersionIter {
|
||||||
page_versions: self,
|
page_versions: self,
|
||||||
ordered_blocks,
|
ordered_blocks,
|
||||||
cur_block_idx: 0,
|
cur_block_idx: 0,
|
||||||
cutoff_lsn,
|
cutoff_lsn,
|
||||||
cur_slice_iter: slice.iter(),
|
cur_block_iter,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub struct BlockVersionIter<'a> {
|
||||||
|
heap: &'a Vec<u8>,
|
||||||
|
range_iter: slice::Iter<'a, (Lsn, Range<usize>)>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl BlockVersionIter<'_> {
|
||||||
|
fn get_iter_result(&self, tuple: Option<&(Lsn, Range<usize>)>) -> Option<(Lsn, PageVersion)> {
|
||||||
|
let (lsn, range) = tuple?;
|
||||||
|
let range = range.clone();
|
||||||
|
|
||||||
|
let pv_bytes = &self.heap[range];
|
||||||
|
let page_version = PageVersion::des(pv_bytes).unwrap();
|
||||||
|
|
||||||
|
Some((*lsn, page_version))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Iterator for BlockVersionIter<'_> {
|
||||||
|
type Item = (Lsn, PageVersion);
|
||||||
|
|
||||||
|
fn next(&mut self) -> Option<Self::Item> {
|
||||||
|
let tuple = self.range_iter.next();
|
||||||
|
self.get_iter_result(tuple)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl DoubleEndedIterator for BlockVersionIter<'_> {
|
||||||
|
fn next_back(&mut self) -> Option<Self::Item> {
|
||||||
|
let tuple = self.range_iter.next_back();
|
||||||
|
self.get_iter_result(tuple)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
pub struct OrderedPageVersionIter<'a> {
|
pub struct OrderedPageVersionIter<'a> {
|
||||||
page_versions: &'a PageVersions,
|
page_versions: &'a PageVersions,
|
||||||
|
|
||||||
@@ -69,35 +149,35 @@ pub struct OrderedPageVersionIter<'a> {
|
|||||||
|
|
||||||
cutoff_lsn: Option<Lsn>,
|
cutoff_lsn: Option<Lsn>,
|
||||||
|
|
||||||
cur_slice_iter: slice::Iter<'a, (Lsn, PageVersion)>,
|
cur_block_iter: BlockVersionIter<'a>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl OrderedPageVersionIter<'_> {
|
impl OrderedPageVersionIter<'_> {
|
||||||
fn is_lsn_before_cutoff(&self, lsn: &Lsn) -> bool {
|
fn is_lsn_before_cutoff(&self, lsn: Lsn) -> bool {
|
||||||
if let Some(cutoff_lsn) = self.cutoff_lsn.as_ref() {
|
if let Some(cutoff_lsn) = self.cutoff_lsn.as_ref() {
|
||||||
lsn < cutoff_lsn
|
lsn < *cutoff_lsn
|
||||||
} else {
|
} else {
|
||||||
true
|
true
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<'a> Iterator for OrderedPageVersionIter<'a> {
|
impl Iterator for OrderedPageVersionIter<'_> {
|
||||||
type Item = (u32, Lsn, &'a PageVersion);
|
type Item = (u32, Lsn, PageVersion);
|
||||||
|
|
||||||
fn next(&mut self) -> Option<Self::Item> {
|
fn next(&mut self) -> Option<Self::Item> {
|
||||||
loop {
|
loop {
|
||||||
if let Some((lsn, page_version)) = self.cur_slice_iter.next() {
|
if let Some((lsn, page_version)) = self.cur_block_iter.next() {
|
||||||
if self.is_lsn_before_cutoff(lsn) {
|
if self.is_lsn_before_cutoff(lsn) {
|
||||||
let blknum = self.ordered_blocks[self.cur_block_idx];
|
let blknum = self.ordered_blocks[self.cur_block_idx];
|
||||||
return Some((blknum, *lsn, page_version));
|
return Some((blknum, lsn, page_version));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
let next_block_idx = self.cur_block_idx + 1;
|
let next_block_idx = self.cur_block_idx + 1;
|
||||||
let blknum: u32 = *self.ordered_blocks.get(next_block_idx)?;
|
let blknum: u32 = *self.ordered_blocks.get(next_block_idx)?;
|
||||||
self.cur_block_idx = next_block_idx;
|
self.cur_block_idx = next_block_idx;
|
||||||
self.cur_slice_iter = self.page_versions.get_block_slice(blknum).iter();
|
self.cur_block_iter = self.page_versions.iter_block(blknum);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user