diff --git a/pageserver/src/tenant/disk_btree.rs b/pageserver/src/tenant/disk_btree.rs index ca30b0ac4f..6d85d1e60e 100644 --- a/pageserver/src/tenant/disk_btree.rs +++ b/pageserver/src/tenant/disk_btree.rs @@ -18,10 +18,19 @@ //! - An Iterator interface would be more convenient for the callers than the //! 'visit' function //! +use async_stream::try_stream; use byteorder::{ReadBytesExt, BE}; use bytes::{BufMut, Bytes, BytesMut}; use either::Either; -use std::{cmp::Ordering, io, result}; +use futures::Stream; +use hex; +use std::{ + cmp::Ordering, + io, + iter::Rev, + ops::{Range, RangeInclusive}, + result, +}; use thiserror::Error; use tracing::error; @@ -250,6 +259,90 @@ where Ok(result) } + /// Return a stream which yields all key, value pairs from the index + /// starting from the first key greater or equal to `start_key`. + /// + /// Note that this is a copy of [`Self::visit`]. + /// TODO: Once the sequential read path is removed this will become + /// the only index traversal method. + pub fn get_stream_from<'a>( + &'a self, + start_key: &'a [u8; L], + ctx: &'a RequestContext, + ) -> impl Stream, u64), DiskBtreeError>> + 'a { + try_stream! { + let mut stack = Vec::new(); + stack.push((self.root_blk, None)); + let block_cursor = self.reader.block_cursor(); + while let Some((node_blknum, opt_iter)) = stack.pop() { + // Locate the node. + let node_buf = block_cursor + .read_blk(self.start_blk + node_blknum, ctx) + .await?; + + let node = OnDiskNode::deparse(node_buf.as_ref())?; + let prefix_len = node.prefix_len as usize; + let suffix_len = node.suffix_len as usize; + + assert!(node.num_children > 0); + + let mut keybuf = Vec::new(); + keybuf.extend(node.prefix); + keybuf.resize(prefix_len + suffix_len, 0); + + let mut iter: Either, Rev>> = if let Some(iter) = opt_iter { + iter + } else { + // Locate the first match + let idx = match node.binary_search(start_key, keybuf.as_mut_slice()) { + Ok(idx) => idx, + Err(idx) => { + if node.level == 0 { + // Imagine that the node contains the following keys: + // + // 1 + // 3 <-- idx + // 5 + // + // If the search key is '2' and there is exact match, + // the binary search would return the index of key + // '3'. That's cool, '3' is the first key to return. + idx + } else { + // This is an internal page, so each key represents a lower + // bound for what's in the child page. If there is no exact + // match, we have to return the *previous* entry. + // + // 1 <-- return this + // 3 <-- idx + // 5 + idx.saturating_sub(1) + } + } + }; + Either::Left(idx..node.num_children.into()) + }; + + // idx points to the first match now. Keep going from there + while let Some(idx) = iter.next() { + let key_off = idx * suffix_len; + let suffix = &node.keys[key_off..key_off + suffix_len]; + keybuf[prefix_len..].copy_from_slice(suffix); + let value = node.value(idx); + #[allow(clippy::collapsible_if)] + if node.level == 0 { + // leaf + yield (keybuf.clone(), value.to_u64()); + } else { + stack.push((node_blknum, Some(iter))); + stack.push((value.to_blknum(), None)); + break; + } + } + } + } + } + /// /// Scan the tree, starting from 'search_key', in the given direction. 'visitor' /// will be called for every key >= 'search_key' (or <= 'search_key', if scanning