mirror of
https://github.com/neondatabase/neon.git
synced 2026-03-03 16:30:38 +00:00
Compare commits
1 Commits
conrad/pro
...
yuchen/ext
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
5a87afd7b8 |
@@ -260,11 +260,11 @@ where
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
pub fn iter<'a>(self, start_key: &'a [u8; L], ctx: &'a RequestContext) -> DiskBtreeIterator<'a>
|
||||
pub fn iter<'a>(self, start_key: &'a [u8; L], ctx: &'a RequestContext) -> DiskBtreeIter<'a>
|
||||
where
|
||||
R: 'a + Send,
|
||||
{
|
||||
DiskBtreeIterator {
|
||||
DiskBtreeIter {
|
||||
stream: Box::pin(self.into_stream(start_key, ctx)),
|
||||
}
|
||||
}
|
||||
@@ -525,19 +525,128 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
pub struct DiskBtreeIterator<'a> {
|
||||
pub trait DiskBtreeIterator {
|
||||
type Item;
|
||||
async fn next(&mut self) -> Option<std::result::Result<Self::Item, DiskBtreeError>>;
|
||||
fn map<B, F>(self, f: F) -> MapDiskBtreeIter<Self, F>
|
||||
where
|
||||
Self: Sized,
|
||||
F: FnMut(Self::Item) -> B,
|
||||
{
|
||||
MapDiskBtreeIter::new(self, f)
|
||||
}
|
||||
|
||||
fn filter<P>(self, predicate: P) -> FilterDiskBtreeIter<Self, P>
|
||||
where
|
||||
Self: Sized,
|
||||
P: FnMut(&Self::Item) -> bool,
|
||||
{
|
||||
FilterDiskBtreeIter::new(self, predicate)
|
||||
}
|
||||
}
|
||||
|
||||
pub struct DiskBtreeIter<'a> {
|
||||
#[allow(clippy::type_complexity)]
|
||||
stream: std::pin::Pin<
|
||||
Box<dyn Stream<Item = std::result::Result<(Vec<u8>, u64), DiskBtreeError>> + 'a + Send>,
|
||||
>,
|
||||
}
|
||||
|
||||
impl<'a> DiskBtreeIterator<'a> {
|
||||
pub async fn next(&mut self) -> Option<std::result::Result<(Vec<u8>, u64), DiskBtreeError>> {
|
||||
impl<'a> DiskBtreeIterator for DiskBtreeIter<'a> {
|
||||
type Item = (Vec<u8>, u64);
|
||||
|
||||
async fn next(&mut self) -> Option<std::result::Result<Self::Item, DiskBtreeError>> {
|
||||
self.stream.next().await
|
||||
}
|
||||
}
|
||||
|
||||
pub struct MapDiskBtreeIter<I, F> {
|
||||
iter: I,
|
||||
f: F,
|
||||
}
|
||||
|
||||
impl<I, F> MapDiskBtreeIter<I, F> {
|
||||
pub fn new(iter: I, f: F) -> Self {
|
||||
MapDiskBtreeIter { iter, f }
|
||||
}
|
||||
}
|
||||
|
||||
impl<B, I: DiskBtreeIterator, F> DiskBtreeIterator for MapDiskBtreeIter<I, F>
|
||||
where
|
||||
F: FnMut(I::Item) -> B,
|
||||
{
|
||||
type Item = B;
|
||||
|
||||
async fn next(&mut self) -> Option<std::result::Result<Self::Item, DiskBtreeError>> {
|
||||
self.iter.next().await.map(|res| res.map(|x| (self.f)(x)))
|
||||
}
|
||||
}
|
||||
|
||||
pub struct FilterDiskBtreeIter<I, P> {
|
||||
iter: I,
|
||||
predicate: P,
|
||||
}
|
||||
|
||||
impl<I, P> FilterDiskBtreeIter<I, P> {
|
||||
pub fn new(iter: I, predicate: P) -> Self {
|
||||
FilterDiskBtreeIter { iter, predicate }
|
||||
}
|
||||
}
|
||||
|
||||
impl<I: DiskBtreeIterator, P> DiskBtreeIterator for FilterDiskBtreeIter<I, P>
|
||||
where
|
||||
P: FnMut(&I::Item) -> bool,
|
||||
{
|
||||
type Item = I::Item;
|
||||
|
||||
async fn next(&mut self) -> Option<std::result::Result<Self::Item, DiskBtreeError>> {
|
||||
self.iter.next().await.and_then(|res| match res {
|
||||
Ok(x) => {
|
||||
if (self.predicate)(&x) {
|
||||
Some(Ok(x))
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
Err(e) => Some(Err(e)),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// pub struct DeltaLayerDiskBtreeIterator<'a, F> {
|
||||
// inner: DiskBtreeIterator<'a, F, >,
|
||||
// extract: F,
|
||||
// }
|
||||
|
||||
// impl<'a, F> DeltaLayerDiskBtreeIterator<'a, F>
|
||||
// where
|
||||
// F: FnMut(Vec<u8>, u64) -> (Key, Lsn, u64),
|
||||
// {
|
||||
// pub async fn next(&mut self) -> Option<std::result::Result<(Key, Lsn, u64), DiskBtreeError>> {
|
||||
// self.inner
|
||||
// .next()
|
||||
// .await
|
||||
// .map(|res| res.map(|(raw_key, value)| (self.extract)(raw_key, value)))
|
||||
// }
|
||||
// }
|
||||
|
||||
// pub struct ImageLayerDiskBtreeIterator<'a, F> {
|
||||
// inner: DiskBtreeIterator<'a>,
|
||||
// extract: F,
|
||||
// }
|
||||
|
||||
// impl<'a, F> ImageLayerDiskBtreeIterator<'a, F>
|
||||
// where
|
||||
// F: FnMut(Vec<u8>, u64) -> (Key, Lsn, u64),
|
||||
// {
|
||||
// pub async fn next(&mut self) -> Option<std::result::Result<(Key, Lsn, u64), DiskBtreeError>> {
|
||||
// self.inner
|
||||
// .next()
|
||||
// .await
|
||||
// .map(|res| res.map(|(raw_key, value)| (self.extract)(raw_key, value)))
|
||||
// }
|
||||
// }
|
||||
|
||||
///
|
||||
/// Public builder object, for creating a new tree.
|
||||
///
|
||||
|
||||
@@ -34,7 +34,7 @@ use crate::repository::{Key, Value, KEY_SIZE};
|
||||
use crate::tenant::blob_io::BlobWriter;
|
||||
use crate::tenant::block_io::{BlockBuf, BlockCursor, BlockLease, BlockReader, FileBlockReader};
|
||||
use crate::tenant::disk_btree::{
|
||||
DiskBtreeBuilder, DiskBtreeIterator, DiskBtreeReader, VisitDirection,
|
||||
DiskBtreeBuilder, DiskBtreeIter, DiskBtreeIterator, DiskBtreeReader, VisitDirection,
|
||||
};
|
||||
use crate::tenant::storage_layer::layer::S3_UPLOAD_LIMIT;
|
||||
use crate::tenant::timeline::GetVectoredError;
|
||||
@@ -1426,14 +1426,17 @@ impl DeltaLayerInner {
|
||||
offset
|
||||
}
|
||||
|
||||
pub(crate) fn iter<'a>(&'a self, ctx: &'a RequestContext) -> DeltaLayerIterator<'a> {
|
||||
pub(crate) fn iter<'a>(
|
||||
&'a self,
|
||||
ctx: &'a RequestContext,
|
||||
) -> DeltaLayerIterator<'a, DeltaLayerDiskBtreeIter<'a>> {
|
||||
let block_reader = FileBlockReader::new(&self.file, self.file_id);
|
||||
let tree_reader =
|
||||
DiskBtreeReader::new(self.index_start_blk, self.index_root_blk, block_reader);
|
||||
DeltaLayerIterator {
|
||||
delta_layer: self,
|
||||
ctx,
|
||||
index_iter: tree_reader.iter(&[0; DELTA_KEY_SIZE], ctx),
|
||||
index_iter: tree_reader.iter(&[0; DELTA_KEY_SIZE], ctx).map_delta(),
|
||||
key_values_batch: std::collections::VecDeque::new(),
|
||||
is_end: false,
|
||||
planner: StreamingVectoredReadPlanner::new(
|
||||
@@ -1508,16 +1511,19 @@ impl<'a> pageserver_compaction::interface::CompactionDeltaEntry<'a, Key> for Del
|
||||
}
|
||||
}
|
||||
|
||||
pub struct DeltaLayerIterator<'a> {
|
||||
pub struct DeltaLayerIterator<'a, I = DeltaLayerDiskBtreeIter<'a>> {
|
||||
delta_layer: &'a DeltaLayerInner,
|
||||
ctx: &'a RequestContext,
|
||||
planner: StreamingVectoredReadPlanner,
|
||||
index_iter: DiskBtreeIterator<'a>,
|
||||
index_iter: I,
|
||||
key_values_batch: VecDeque<(Key, Lsn, Value)>,
|
||||
is_end: bool,
|
||||
}
|
||||
|
||||
impl<'a> DeltaLayerIterator<'a> {
|
||||
impl<'a, I> DeltaLayerIterator<'a, I>
|
||||
where
|
||||
I: DiskBtreeIterator<Item = (Key, Lsn, u64)>,
|
||||
{
|
||||
pub(crate) fn layer_dbg_info(&self) -> String {
|
||||
self.delta_layer.layer_dbg_info()
|
||||
}
|
||||
@@ -1529,11 +1535,7 @@ impl<'a> DeltaLayerIterator<'a> {
|
||||
|
||||
let plan = loop {
|
||||
if let Some(res) = self.index_iter.next().await {
|
||||
let (raw_key, value) = res?;
|
||||
let key = Key::from_slice(&raw_key[..KEY_SIZE]);
|
||||
let lsn = DeltaKey::extract_lsn_from_buf(&raw_key);
|
||||
let blob_ref = BlobRef(value);
|
||||
let offset = blob_ref.pos();
|
||||
let (key, lsn, offset) = res?;
|
||||
if let Some(batch_plan) = self.planner.handle(key, lsn, offset) {
|
||||
break batch_plan;
|
||||
}
|
||||
@@ -1579,6 +1581,51 @@ impl<'a> DeltaLayerIterator<'a> {
|
||||
.expect("should not be empty"),
|
||||
))
|
||||
}
|
||||
|
||||
pub async fn filter_key<P>(
|
||||
self,
|
||||
predicate: P,
|
||||
) -> DeltaLayerIterator<'a, impl DiskBtreeIterator<Item = (Key, Lsn, u64)>>
|
||||
where
|
||||
P: FnMut(&(Key, Lsn, u64)) -> bool,
|
||||
{
|
||||
DeltaLayerIterator {
|
||||
delta_layer: self.delta_layer,
|
||||
ctx: self.ctx,
|
||||
planner: self.planner,
|
||||
index_iter: self.index_iter.filter(predicate),
|
||||
key_values_batch: self.key_values_batch,
|
||||
is_end: self.is_end,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct DeltaLayerDiskBtreeIter<'a> {
|
||||
iter: DiskBtreeIter<'a>,
|
||||
}
|
||||
|
||||
impl<'a> DiskBtreeIter<'a> {
|
||||
pub(crate) fn map_delta(self) -> DeltaLayerDiskBtreeIter<'a> {
|
||||
DeltaLayerDiskBtreeIter { iter: self }
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> DiskBtreeIterator for DeltaLayerDiskBtreeIter<'a> {
|
||||
type Item = (Key, Lsn, u64);
|
||||
|
||||
async fn next(
|
||||
&mut self,
|
||||
) -> Option<std::result::Result<Self::Item, crate::tenant::disk_btree::DiskBtreeError>> {
|
||||
self.iter.next().await.map(|res| {
|
||||
res.map(|(raw_key, value)| {
|
||||
let key = Key::from_slice(&raw_key[..KEY_SIZE]);
|
||||
let lsn = DeltaKey::extract_lsn_from_buf(&raw_key);
|
||||
let blob_ref = BlobRef(value);
|
||||
let offset = blob_ref.pos();
|
||||
(key, lsn, offset)
|
||||
})
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
|
||||
@@ -32,7 +32,7 @@ use crate::repository::{Key, Value, KEY_SIZE};
|
||||
use crate::tenant::blob_io::BlobWriter;
|
||||
use crate::tenant::block_io::{BlockBuf, FileBlockReader};
|
||||
use crate::tenant::disk_btree::{
|
||||
DiskBtreeBuilder, DiskBtreeIterator, DiskBtreeReader, VisitDirection,
|
||||
DiskBtreeBuilder, DiskBtreeIter, DiskBtreeIterator, DiskBtreeReader, VisitDirection,
|
||||
};
|
||||
use crate::tenant::timeline::GetVectoredError;
|
||||
use crate::tenant::vectored_blob_io::{
|
||||
@@ -652,7 +652,7 @@ impl ImageLayerInner {
|
||||
ImageLayerIterator {
|
||||
image_layer: self,
|
||||
ctx,
|
||||
index_iter: tree_reader.iter(&[0; KEY_SIZE], ctx),
|
||||
index_iter: tree_reader.iter(&[0; KEY_SIZE], ctx).map_image(self.lsn),
|
||||
key_values_batch: VecDeque::new(),
|
||||
is_end: false,
|
||||
planner: StreamingVectoredReadPlanner::new(
|
||||
@@ -999,11 +999,37 @@ pub struct ImageLayerIterator<'a> {
|
||||
image_layer: &'a ImageLayerInner,
|
||||
ctx: &'a RequestContext,
|
||||
planner: StreamingVectoredReadPlanner,
|
||||
index_iter: DiskBtreeIterator<'a>,
|
||||
index_iter: ImageLayerDiskBtreeIter<'a>,
|
||||
key_values_batch: VecDeque<(Key, Lsn, Value)>,
|
||||
is_end: bool,
|
||||
}
|
||||
|
||||
pub struct ImageLayerDiskBtreeIter<'a> {
|
||||
iter: DiskBtreeIter<'a>,
|
||||
lsn: Lsn,
|
||||
}
|
||||
|
||||
impl<'a> DiskBtreeIter<'a> {
|
||||
pub(crate) fn map_image(self, lsn: Lsn) -> ImageLayerDiskBtreeIter<'a> {
|
||||
ImageLayerDiskBtreeIter { iter: self, lsn }
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> DiskBtreeIterator for ImageLayerDiskBtreeIter<'a> {
|
||||
type Item = (Key, Lsn, u64);
|
||||
|
||||
async fn next(
|
||||
&mut self,
|
||||
) -> Option<std::result::Result<Self::Item, crate::tenant::disk_btree::DiskBtreeError>> {
|
||||
self.iter.next().await.map(|res| {
|
||||
res.map(|(raw_key, offset)| {
|
||||
let key = Key::from_slice(&raw_key[..KEY_SIZE]);
|
||||
(key, self.lsn, offset)
|
||||
})
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> ImageLayerIterator<'a> {
|
||||
pub(crate) fn layer_dbg_info(&self) -> String {
|
||||
self.image_layer.layer_dbg_info()
|
||||
@@ -1016,12 +1042,8 @@ impl<'a> ImageLayerIterator<'a> {
|
||||
|
||||
let plan = loop {
|
||||
if let Some(res) = self.index_iter.next().await {
|
||||
let (raw_key, offset) = res?;
|
||||
if let Some(batch_plan) = self.planner.handle(
|
||||
Key::from_slice(&raw_key[..KEY_SIZE]),
|
||||
self.image_layer.lsn,
|
||||
offset,
|
||||
) {
|
||||
let (key, lsn, offset) = res?;
|
||||
if let Some(batch_plan) = self.planner.handle(key, lsn, offset) {
|
||||
break batch_plan;
|
||||
}
|
||||
} else {
|
||||
|
||||
Reference in New Issue
Block a user