From 68476bb4ba0565f68a01504a66a8ddb8fd2ac19b Mon Sep 17 00:00:00 2001 From: Alex Chi Z Date: Tue, 18 Jun 2024 16:02:57 -0400 Subject: [PATCH] feat(pageserver): add iterator API for btree reader (#8083) The new image iterator and delta iterator uses an iterator-based API. https://github.com/neondatabase/neon/pull/8006 / part of https://github.com/neondatabase/neon/issues/8002 This requires the underlying thing (the btree) to have an iterator API, and the iterator should have a type name so that it can be stored somewhere. ```rust pub struct DeltaLayerIterator { index_iterator: BTreeIterator } ``` versus ```rust pub struct DeltaLayerIterator { index_iterator: impl Stream<....> } ``` (this requires nightly flag and still buggy in the Rust compiler) There are multiple ways to achieve this: 1. Either write a BTreeIterator from scratch that provides `async next`. This is the most efficient way to do that. 2. Or wrap the current `get_stream` API, which is the current approach in the pull request. In the future, we should do (1), and the `get_stream` API should be refactored to use the iterator API. With (2), we have to wrap the `get_stream` API with `Pin>`, where we have the overhead of dynamic dispatch. However, (2) needs a rewrite of the `visit` function, which would take some time to write and review. I'd like to define this iterator API first and work on a real iterator API later. ## Summary of changes Add `DiskBtreeIterator` and related tests. Signed-off-by: Alex Chi Z --- pageserver/src/tenant/disk_btree.rs | 36 ++++++++++++++++++++++++++++- 1 file changed, 35 insertions(+), 1 deletion(-) diff --git a/pageserver/src/tenant/disk_btree.rs b/pageserver/src/tenant/disk_btree.rs index 6d85d1e60e..119df3e6c4 100644 --- a/pageserver/src/tenant/disk_btree.rs +++ b/pageserver/src/tenant/disk_btree.rs @@ -22,7 +22,7 @@ use async_stream::try_stream; use byteorder::{ReadBytesExt, BE}; use bytes::{BufMut, Bytes, BytesMut}; use either::Either; -use futures::Stream; +use futures::{Stream, StreamExt}; use hex; use std::{ cmp::Ordering, @@ -259,6 +259,16 @@ where Ok(result) } + pub fn iter<'a>( + &'a self, + start_key: &'a [u8; L], + ctx: &'a RequestContext, + ) -> DiskBtreeIterator<'a> { + DiskBtreeIterator { + stream: Box::pin(self.get_stream_from(start_key, ctx)), + } + } + /// Return a stream which yields all key, value pairs from the index /// starting from the first key greater or equal to `start_key`. /// @@ -496,6 +506,19 @@ where } } +pub struct DiskBtreeIterator<'a> { + #[allow(clippy::type_complexity)] + stream: std::pin::Pin< + Box, u64), DiskBtreeError>> + 'a>, + >, +} + +impl<'a> DiskBtreeIterator<'a> { + pub async fn next(&mut self) -> Option, u64), DiskBtreeError>> { + self.stream.next().await + } +} + /// /// Public builder object, for creating a new tree. /// @@ -1088,6 +1111,17 @@ pub(crate) mod tests { == all_data.get(&u128::MAX).cloned() ); + // Test iterator and get_stream API + let mut iter = reader.iter(&[0; 16], &ctx); + let mut cnt = 0; + while let Some(res) = iter.next().await { + let (key, val) = res?; + let key = u128::from_be_bytes(key.as_slice().try_into().unwrap()); + assert_eq!(val, *all_data.get(&key).unwrap()); + cnt += 1; + } + assert_eq!(cnt, all_data.len()); + Ok(()) }