From 76864e6a2a67f1ae0480bffafbad7114d77c1826 Mon Sep 17 00:00:00 2001 From: Alex Chi Z Date: Tue, 25 Jun 2024 16:49:29 -0400 Subject: [PATCH] feat(pageserver): add image layer iterator (#8006) part of https://github.com/neondatabase/neon/issues/8002 ## Summary of changes This pull request adds the image layer iterator. It buffers a fixed amount of key-value pairs in memory, and give developer an iterator abstraction over the image layer. Once the buffer is exhausted, it will issue 1 I/O to fetch the next batch. Due to the Rust lifetime mysteries, the `get_stream_from` API has been refactored to `into_stream` and consumes `self`. Delta layer iterator implementation will be similar, therefore I'll add it after this pull request gets merged. --------- Signed-off-by: Alex Chi Z --- libs/pageserver_api/src/key.rs | 7 +- pageserver/src/tenant/block_io.rs | 1 + pageserver/src/tenant/disk_btree.rs | 32 ++- .../src/tenant/storage_layer/delta_layer.rs | 28 +-- .../src/tenant/storage_layer/image_layer.rs | 209 +++++++++++++++++- pageserver/src/tenant/storage_layer/layer.rs | 14 +- pageserver/src/tenant/vectored_blob_io.rs | 110 ++++++++- 7 files changed, 363 insertions(+), 38 deletions(-) diff --git a/libs/pageserver_api/src/key.rs b/libs/pageserver_api/src/key.rs index 997c1cc43a..cd430bfab7 100644 --- a/libs/pageserver_api/src/key.rs +++ b/libs/pageserver_api/src/key.rs @@ -160,8 +160,9 @@ impl Key { key } - /// Convert a 18B slice to a key. This function should not be used for metadata keys because field2 is handled differently. - /// Use [`Key::from_i128`] instead if you want to handle 16B keys (i.e., metadata keys). + /// Convert a 18B slice to a key. This function should not be used for 16B metadata keys because `field2` is handled differently. + /// Use [`Key::from_i128`] instead if you want to handle 16B keys (i.e., metadata keys). There are some restrictions on `field2`, + /// and therefore not all 18B slices are valid page server keys. pub fn from_slice(b: &[u8]) -> Self { Key { field1: b[0], @@ -173,7 +174,7 @@ impl Key { } } - /// Convert a key to a 18B slice. This function should not be used for metadata keys because field2 is handled differently. + /// Convert a key to a 18B slice. This function should not be used for getting a 16B metadata key because `field2` is handled differently. /// Use [`Key::to_i128`] instead if you want to get a 16B key (i.e., metadata keys). pub fn write_to_byte_slice(&self, buf: &mut [u8]) { buf[0] = self.field1; diff --git a/pageserver/src/tenant/block_io.rs b/pageserver/src/tenant/block_io.rs index 92928116c1..b406d50332 100644 --- a/pageserver/src/tenant/block_io.rs +++ b/pageserver/src/tenant/block_io.rs @@ -160,6 +160,7 @@ impl<'a> BlockCursor<'a> { /// /// The file is assumed to be immutable. This doesn't provide any functions /// for modifying the file, nor for invalidating the cache if it is modified. +#[derive(Clone)] pub struct FileBlockReader<'a> { pub file: &'a VirtualFile, diff --git a/pageserver/src/tenant/disk_btree.rs b/pageserver/src/tenant/disk_btree.rs index 119df3e6c4..b76498b608 100644 --- a/pageserver/src/tenant/disk_btree.rs +++ b/pageserver/src/tenant/disk_btree.rs @@ -212,6 +212,7 @@ impl<'a, const L: usize> OnDiskNode<'a, L> { /// /// Public reader object, to search the tree. /// +#[derive(Clone)] pub struct DiskBtreeReader where R: BlockReader, @@ -259,27 +260,38 @@ where Ok(result) } - pub fn iter<'a>( - &'a self, - start_key: &'a [u8; L], - ctx: &'a RequestContext, - ) -> DiskBtreeIterator<'a> { + pub fn iter<'a>(self, start_key: &'a [u8; L], ctx: &'a RequestContext) -> DiskBtreeIterator<'a> + where + R: 'a, + { DiskBtreeIterator { - stream: Box::pin(self.get_stream_from(start_key, ctx)), + stream: Box::pin(self.into_stream(start_key, ctx)), } } /// Return a stream which yields all key, value pairs from the index /// starting from the first key greater or equal to `start_key`. /// - /// Note that this is a copy of [`Self::visit`]. + /// Note 1: that this is a copy of [`Self::visit`]. /// TODO: Once the sequential read path is removed this will become /// the only index traversal method. - pub fn get_stream_from<'a>( - &'a self, + /// + /// Note 2: this function used to take `&self` but it now consumes `self`. This is due to + /// the lifetime constraints of the reader and the stream / iterator it creates. Using `&self` + /// requires the reader to be present when the stream is used, and this creates a lifetime + /// dependency between the reader and the stream. Now if we want to create an iterator that + /// holds the stream, someone will need to keep a reference to the reader, which is inconvenient + /// to use from the image/delta layer APIs. + /// + /// Feel free to add the `&self` variant back if it's necessary. + pub fn into_stream<'a>( + self, start_key: &'a [u8; L], ctx: &'a RequestContext, - ) -> impl Stream, u64), DiskBtreeError>> + 'a { + ) -> impl Stream, u64), DiskBtreeError>> + 'a + where + R: 'a, + { try_stream! { let mut stack = Vec::new(); stack.push((self.root_blk, None)); diff --git a/pageserver/src/tenant/storage_layer/delta_layer.rs b/pageserver/src/tenant/storage_layer/delta_layer.rs index 5e01ecd71d..ab3ef4980f 100644 --- a/pageserver/src/tenant/storage_layer/delta_layer.rs +++ b/pageserver/src/tenant/storage_layer/delta_layer.rs @@ -941,7 +941,7 @@ impl DeltaLayerInner { ); let mut result = Vec::new(); let mut stream = - Box::pin(self.stream_index_forwards(&index_reader, &[0; DELTA_KEY_SIZE], ctx)); + Box::pin(self.stream_index_forwards(index_reader, &[0; DELTA_KEY_SIZE], ctx)); let block_reader = FileBlockReader::new(&self.file, self.file_id); let cursor = block_reader.block_cursor(); let mut buf = Vec::new(); @@ -976,7 +976,7 @@ impl DeltaLayerInner { ctx: &RequestContext, ) -> anyhow::Result> where - Reader: BlockReader, + Reader: BlockReader + Clone, { let ctx = RequestContextBuilder::extend(ctx) .page_content_kind(PageContentKind::DeltaLayerBtreeNode) @@ -986,7 +986,7 @@ impl DeltaLayerInner { let mut range_end_handled = false; let start_key = DeltaKey::from_key_lsn(&range.start, lsn_range.start); - let index_stream = index_reader.get_stream_from(&start_key.0, &ctx); + let index_stream = index_reader.clone().into_stream(&start_key.0, &ctx); let mut index_stream = std::pin::pin!(index_stream); while let Some(index_entry) = index_stream.next().await { @@ -1241,7 +1241,7 @@ impl DeltaLayerInner { block_reader, ); - let stream = self.stream_index_forwards(&tree_reader, &[0u8; DELTA_KEY_SIZE], ctx); + let stream = self.stream_index_forwards(tree_reader, &[0u8; DELTA_KEY_SIZE], ctx); let stream = stream.map_ok(|(key, lsn, pos)| Item::Actual(key, lsn, pos)); // put in a sentinel value for getting the end offset for last item, and not having to // repeat the whole read part @@ -1300,7 +1300,7 @@ impl DeltaLayerInner { offsets.start.pos(), offsets.end.pos(), meta, - max_read_size, + Some(max_read_size), )) } } else { @@ -1459,17 +1459,17 @@ impl DeltaLayerInner { fn stream_index_forwards<'a, R>( &'a self, - reader: &'a DiskBtreeReader, + reader: DiskBtreeReader, start: &'a [u8; DELTA_KEY_SIZE], ctx: &'a RequestContext, ) -> impl futures::stream::Stream< Item = Result<(Key, Lsn, BlobRef), crate::tenant::disk_btree::DiskBtreeError>, > + 'a where - R: BlockReader, + R: BlockReader + 'a, { use futures::stream::TryStreamExt; - let stream = reader.get_stream_from(start, ctx); + let stream = reader.into_stream(start, ctx); stream.map_ok(|(key, value)| { let key = DeltaKey::from_slice(&key); let (key, lsn) = (key.key(), key.lsn()); @@ -1857,7 +1857,7 @@ mod test { .finish(entries_meta.key_range.end, &timeline, &ctx) .await?; - let inner = resident.as_delta(&ctx).await?; + let inner = resident.get_as_delta(&ctx).await?; let file_size = inner.file.metadata().await?.len(); tracing::info!( @@ -2044,11 +2044,11 @@ mod test { let copied_layer = writer.finish(Key::MAX, &branch, ctx).await.unwrap(); - copied_layer.as_delta(ctx).await.unwrap(); + copied_layer.get_as_delta(ctx).await.unwrap(); assert_keys_and_values_eq( - new_layer.as_delta(ctx).await.unwrap(), - copied_layer.as_delta(ctx).await.unwrap(), + new_layer.get_as_delta(ctx).await.unwrap(), + copied_layer.get_as_delta(ctx).await.unwrap(), truncate_at, ctx, ) @@ -2073,7 +2073,7 @@ mod test { source.index_root_blk, &source_reader, ); - let source_stream = source.stream_index_forwards(&source_tree, &start_key, ctx); + let source_stream = source.stream_index_forwards(source_tree, &start_key, ctx); let source_stream = source_stream.filter(|res| match res { Ok((_, lsn, _)) => ready(lsn < &truncated_at), _ => ready(true), @@ -2086,7 +2086,7 @@ mod test { truncated.index_root_blk, &truncated_reader, ); - let truncated_stream = truncated.stream_index_forwards(&truncated_tree, &start_key, ctx); + let truncated_stream = truncated.stream_index_forwards(truncated_tree, &start_key, ctx); let mut truncated_stream = std::pin::pin!(truncated_stream); let mut scratch_left = Vec::new(); diff --git a/pageserver/src/tenant/storage_layer/image_layer.rs b/pageserver/src/tenant/storage_layer/image_layer.rs index 06e2f09384..99bce1890d 100644 --- a/pageserver/src/tenant/storage_layer/image_layer.rs +++ b/pageserver/src/tenant/storage_layer/image_layer.rs @@ -495,7 +495,7 @@ impl ImageLayerInner { let tree_reader = DiskBtreeReader::new(self.index_start_blk, self.index_root_blk, &block_reader); let mut result = Vec::new(); - let mut stream = Box::pin(tree_reader.get_stream_from(&[0; KEY_SIZE], ctx)); + let mut stream = Box::pin(tree_reader.into_stream(&[0; KEY_SIZE], ctx)); let block_reader = FileBlockReader::new(&self.file, self.file_id); let cursor = block_reader.block_cursor(); while let Some(item) = stream.next().await { @@ -544,7 +544,7 @@ impl ImageLayerInner { let mut search_key: [u8; KEY_SIZE] = [0u8; KEY_SIZE]; range.start.write_to_byte_slice(&mut search_key); - let index_stream = tree_reader.get_stream_from(&search_key, &ctx); + let index_stream = tree_reader.clone().into_stream(&search_key, &ctx); let mut index_stream = std::pin::pin!(index_stream); while let Some(index_entry) = index_stream.next().await { @@ -689,6 +689,24 @@ impl ImageLayerInner { }; } } + + #[cfg(test)] + pub(crate) fn iter<'a>(&'a self, ctx: &'a RequestContext) -> ImageLayerIterator<'a> { + let block_reader = FileBlockReader::new(&self.file, self.file_id); + let tree_reader = + DiskBtreeReader::new(self.index_start_blk, self.index_root_blk, block_reader); + ImageLayerIterator { + image_layer: self, + ctx, + index_iter: tree_reader.iter(&[0; KEY_SIZE], ctx), + key_values_batch: std::collections::VecDeque::new(), + is_end: false, + planner: crate::tenant::vectored_blob_io::StreamingVectoredReadPlanner::new( + 1024 * 8192, // The default value. Unit tests might use a different value. 1024 * 8K = 8MB buffer. + 1024, // The default value. Unit tests might use a different value + ), + } + } } /// A builder object for constructing a new image layer. @@ -943,11 +961,77 @@ impl Drop for ImageLayerWriter { } } +#[cfg(test)] +pub struct ImageLayerIterator<'a> { + image_layer: &'a ImageLayerInner, + ctx: &'a RequestContext, + planner: crate::tenant::vectored_blob_io::StreamingVectoredReadPlanner, + index_iter: crate::tenant::disk_btree::DiskBtreeIterator<'a>, + key_values_batch: std::collections::VecDeque<(Key, Lsn, Value)>, + is_end: bool, +} + +#[cfg(test)] +impl<'a> ImageLayerIterator<'a> { + /// Retrieve a batch of key-value pairs into the iterator buffer. + async fn next_batch(&mut self) -> anyhow::Result<()> { + assert!(self.key_values_batch.is_empty()); + assert!(!self.is_end); + + let plan = loop { + if let Some(res) = self.index_iter.next().await { + let (raw_key, offset) = res?; + if let Some(batch_plan) = self.planner.handle( + Key::from_slice(&raw_key[..KEY_SIZE]), + self.image_layer.lsn, + offset, + BlobFlag::None, + ) { + break batch_plan; + } + } else { + self.is_end = true; + let payload_end = self.image_layer.index_start_blk as u64 * PAGE_SZ as u64; + break self.planner.handle_range_end(payload_end); + } + }; + let vectored_blob_reader = VectoredBlobReader::new(&self.image_layer.file); + let mut next_batch = std::collections::VecDeque::new(); + let buf_size = plan.size(); + let buf = BytesMut::with_capacity(buf_size); + let blobs_buf = vectored_blob_reader + .read_blobs(&plan, buf, self.ctx) + .await?; + let frozen_buf: Bytes = blobs_buf.buf.freeze(); + for meta in blobs_buf.blobs.iter() { + let img_buf = frozen_buf.slice(meta.start..meta.end); + next_batch.push_back((meta.meta.key, self.image_layer.lsn, Value::Image(img_buf))); + } + self.key_values_batch = next_batch; + Ok(()) + } + + pub async fn next(&mut self) -> anyhow::Result> { + if self.key_values_batch.is_empty() { + if self.is_end { + return Ok(None); + } + self.next_batch().await?; + } + Ok(Some( + self.key_values_batch + .pop_front() + .expect("should not be empty"), + )) + } +} + #[cfg(test)] mod test { - use std::time::Duration; + use std::{sync::Arc, time::Duration}; use bytes::Bytes; + use itertools::Itertools; use pageserver_api::{ key::Key, shard::{ShardCount, ShardIdentity, ShardNumber, ShardStripeSize}, @@ -959,11 +1043,19 @@ mod test { }; use crate::{ - tenant::{config::TenantConf, harness::TenantHarness}, + context::RequestContext, + repository::Value, + tenant::{ + config::TenantConf, + harness::{TenantHarness, TIMELINE_ID}, + storage_layer::ResidentLayer, + vectored_blob_io::StreamingVectoredReadPlanner, + Tenant, Timeline, + }, DEFAULT_PG_VERSION, }; - use super::ImageLayerWriter; + use super::{ImageLayerIterator, ImageLayerWriter}; #[tokio::test] async fn image_layer_rewrite() { @@ -1134,4 +1226,111 @@ mod test { } } } + + async fn produce_image_layer( + tenant: &Tenant, + tline: &Arc, + mut images: Vec<(Key, Bytes)>, + lsn: Lsn, + ctx: &RequestContext, + ) -> anyhow::Result { + images.sort(); + let (key_start, _) = images.first().unwrap(); + let (key_last, _) = images.last().unwrap(); + let key_end = key_last.next(); + let key_range = *key_start..key_end; + let mut writer = ImageLayerWriter::new( + tenant.conf, + tline.timeline_id, + tenant.tenant_shard_id, + &key_range, + lsn, + ctx, + ) + .await?; + + for (key, img) in images { + writer.put_image(key, img, ctx).await?; + } + let img_layer = writer.finish(tline, ctx).await?; + + Ok::<_, anyhow::Error>(img_layer) + } + + async fn assert_img_iter_equal( + img_iter: &mut ImageLayerIterator<'_>, + expect: &[(Key, Bytes)], + expect_lsn: Lsn, + ) { + let mut expect_iter = expect.iter(); + loop { + let o1 = img_iter.next().await.unwrap(); + let o2 = expect_iter.next(); + match (o1, o2) { + (None, None) => break, + (Some((k1, l1, v1)), Some((k2, i2))) => { + let Value::Image(i1) = v1 else { + panic!("expect Value::Image") + }; + assert_eq!(&k1, k2); + assert_eq!(l1, expect_lsn); + assert_eq!(&i1, i2); + } + (o1, o2) => panic!("iterators length mismatch: {:?}, {:?}", o1, o2), + } + } + } + + #[tokio::test] + async fn image_layer_iterator() { + let harness = TenantHarness::create("image_layer_iterator").unwrap(); + let (tenant, ctx) = harness.load().await; + + let tline = tenant + .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx) + .await + .unwrap(); + + fn get_key(id: u32) -> Key { + let mut key = Key::from_hex("000000000033333333444444445500000000").unwrap(); + key.field6 = id; + key + } + const N: usize = 1000; + let test_imgs = (0..N) + .map(|idx| (get_key(idx as u32), Bytes::from(format!("img{idx:05}")))) + .collect_vec(); + let resident_layer = + produce_image_layer(&tenant, &tline, test_imgs.clone(), Lsn(0x10), &ctx) + .await + .unwrap(); + let img_layer = resident_layer.get_as_image(&ctx).await.unwrap(); + for max_read_size in [1, 1024] { + for batch_size in [1, 2, 4, 8, 3, 7, 13] { + println!("running with batch_size={batch_size} max_read_size={max_read_size}"); + // Test if the batch size is correctly determined + let mut iter = img_layer.iter(&ctx); + iter.planner = StreamingVectoredReadPlanner::new(max_read_size, batch_size); + let mut num_items = 0; + for _ in 0..3 { + iter.next_batch().await.unwrap(); + num_items += iter.key_values_batch.len(); + if max_read_size == 1 { + // every key should be a batch b/c the value is larger than max_read_size + assert_eq!(iter.key_values_batch.len(), 1); + } else { + assert_eq!(iter.key_values_batch.len(), batch_size); + } + if num_items >= N { + break; + } + iter.key_values_batch.clear(); + } + // Test if the result is correct + let mut iter = img_layer.iter(&ctx); + iter.planner = StreamingVectoredReadPlanner::new(max_read_size, batch_size); + assert_img_iter_equal(&mut iter, &test_imgs, Lsn(0x10)).await; + } + } + } } diff --git a/pageserver/src/tenant/storage_layer/layer.rs b/pageserver/src/tenant/storage_layer/layer.rs index 32acb3f0cd..d856909f2e 100644 --- a/pageserver/src/tenant/storage_layer/layer.rs +++ b/pageserver/src/tenant/storage_layer/layer.rs @@ -1905,7 +1905,7 @@ impl ResidentLayer { } #[cfg(test)] - pub(crate) async fn as_delta( + pub(crate) async fn get_as_delta( &self, ctx: &RequestContext, ) -> anyhow::Result<&delta_layer::DeltaLayerInner> { @@ -1915,6 +1915,18 @@ impl ResidentLayer { Image(_) => Err(anyhow::anyhow!("image layer")), } } + + #[cfg(test)] + pub(crate) async fn get_as_image( + &self, + ctx: &RequestContext, + ) -> anyhow::Result<&image_layer::ImageLayerInner> { + use LayerKind::*; + match self.downloaded.get(&self.owner.0, ctx).await? { + Image(ref d) => Ok(d), + Delta(_) => Err(anyhow::anyhow!("delta layer")), + } + } } impl AsLayerDesc for ResidentLayer { diff --git a/pageserver/src/tenant/vectored_blob_io.rs b/pageserver/src/tenant/vectored_blob_io.rs index 6e825760e3..1241a13902 100644 --- a/pageserver/src/tenant/vectored_blob_io.rs +++ b/pageserver/src/tenant/vectored_blob_io.rs @@ -77,7 +77,7 @@ pub(crate) struct VectoredReadBuilder { start: u64, end: u64, blobs_at: VecMap, - max_read_size: usize, + max_read_size: Option, } impl VectoredReadBuilder { @@ -90,7 +90,7 @@ impl VectoredReadBuilder { start_offset: u64, end_offset: u64, meta: BlobMeta, - max_read_size: usize, + max_read_size: Option, ) -> Self { let mut blobs_at = VecMap::default(); blobs_at @@ -111,7 +111,13 @@ impl VectoredReadBuilder { pub(crate) fn extend(&mut self, start: u64, end: u64, meta: BlobMeta) -> VectoredReadExtended { tracing::trace!(start, end, "trying to extend"); let size = (end - start) as usize; - if self.end == start && self.size() + size <= self.max_read_size { + if self.end == start && { + if let Some(max_read_size) = self.max_read_size { + self.size() + size <= max_read_size + } else { + true + } + } { self.end = end; self.blobs_at .append(start, meta) @@ -157,7 +163,7 @@ pub struct VectoredReadPlanner { // Arguments for previous blob passed into [`VectoredReadPlanner::handle`] prev: Option<(Key, Lsn, u64, BlobFlag)>, - max_read_size: usize, + max_read_size: Option, } impl VectoredReadPlanner { @@ -165,7 +171,20 @@ impl VectoredReadPlanner { Self { blobs: BTreeMap::new(), prev: None, - max_read_size, + max_read_size: Some(max_read_size), + } + } + + /// This function should *only* be used if the caller has a way to control the limit. e.g., in [`StreamingVectoredReadPlanner`], + /// it uses the vectored read planner to avoid duplicated logic on handling blob start/end, while expecting the vectored + /// read planner to give a single read to a continuous range of bytes in the image layer. Therefore, it does not need the + /// code path to split reads into chunks of `max_read_size`, and controls the read size itself. + #[cfg(test)] + pub(crate) fn new_caller_controlled_max_limit() -> Self { + Self { + blobs: BTreeMap::new(), + prev: None, + max_read_size: None, } } @@ -354,6 +373,87 @@ impl<'a> VectoredBlobReader<'a> { } } +/// Read planner used in [`crate::tenant::storage_layer::image_layer::ImageLayerIterator`]. It provides a streaming API for +/// getting read blobs. It returns a batch when `handle` gets called and when the current key would exceed the read_size and +/// max_cnt constraints. Underlying it uses [`VectoredReadPlanner`]. +#[cfg(test)] +pub struct StreamingVectoredReadPlanner { + planner: VectoredReadPlanner, + /// Max read size per batch + max_read_size: u64, + /// Max item count per batch + max_cnt: usize, + /// The first offset of this batch + this_batch_first_offset: Option, + /// Size of the current batch + cnt: usize, +} + +#[cfg(test)] +impl StreamingVectoredReadPlanner { + pub fn new(max_read_size: u64, max_cnt: usize) -> Self { + assert!(max_cnt > 0); + assert!(max_read_size > 0); + Self { + // We want to have exactly one read syscall (plus several others for index lookup) for each `next_batch` call. + // Therefore, we enforce `self.max_read_size` by ourselves instead of using the VectoredReadPlanner's capability, + // to avoid splitting into two I/Os. + planner: VectoredReadPlanner::new_caller_controlled_max_limit(), + max_cnt, + max_read_size, + this_batch_first_offset: None, + cnt: 0, + } + } + + fn emit(&mut self, this_batch_first_offset: u64) -> VectoredRead { + let planner = std::mem::replace( + &mut self.planner, + VectoredReadPlanner::new_caller_controlled_max_limit(), + ); + self.this_batch_first_offset = Some(this_batch_first_offset); + self.cnt = 1; + let mut batch = planner.finish(); + assert_eq!(batch.len(), 1, "should have exactly one read batch"); + batch.pop().unwrap() + } + + pub fn handle( + &mut self, + key: Key, + lsn: Lsn, + offset: u64, + flag: BlobFlag, + ) -> Option { + if let Some(begin_offset) = self.this_batch_first_offset { + // Each batch will have at least one item b/c `self.this_batch_first_offset` is set + // after one item gets processed + if offset - begin_offset > self.max_read_size { + self.planner.handle_range_end(offset); // End the current batch with the offset + let batch = self.emit(offset); // Produce a batch + self.planner.handle(key, lsn, offset, flag); // Add this key to the next batch + return Some(batch); + } + } else { + self.this_batch_first_offset = Some(offset) + } + if self.cnt >= self.max_cnt { + self.planner.handle_range_end(offset); // End the current batch with the offset + let batch = self.emit(offset); // Produce a batch + self.planner.handle(key, lsn, offset, flag); // Add this key to the next batch + return Some(batch); + } + self.planner.handle(key, lsn, offset, flag); // Add this key to the current batch + self.cnt += 1; + None + } + + pub fn handle_range_end(&mut self, offset: u64) -> VectoredRead { + self.planner.handle_range_end(offset); + self.emit(offset) + } +} + #[cfg(test)] mod tests { use super::*;