diff --git a/pageserver/src/tenant/storage_layer/delta_layer.rs b/pageserver/src/tenant/storage_layer/delta_layer.rs index dfd0196c87..2d36ac7442 100644 --- a/pageserver/src/tenant/storage_layer/delta_layer.rs +++ b/pageserver/src/tenant/storage_layer/delta_layer.rs @@ -1321,7 +1321,7 @@ impl DeltaLayerInner { offsets.start.pos(), offsets.end.pos(), meta, - Some(max_read_size), + max_read_size, )) } } else { @@ -1615,13 +1615,17 @@ impl<'a> DeltaLayerIterator<'a> { let lsn = DeltaKey::extract_lsn_from_buf(&raw_key); let blob_ref = BlobRef(value); let offset = blob_ref.pos(); - if let Some(batch_plan) = self.planner.handle(key, lsn, offset, BlobFlag::None) { + if let Some(batch_plan) = self.planner.handle(key, lsn, offset) { break batch_plan; } } else { self.is_end = true; let data_end_offset = self.delta_layer.index_start_offset(); - break self.planner.handle_range_end(data_end_offset); + if let Some(item) = self.planner.handle_range_end(data_end_offset) { + break item; + } else { + return Ok(()); // TODO: test empty iterator + } } }; let vectored_blob_reader = VectoredBlobReader::new(&self.delta_layer.file); diff --git a/pageserver/src/tenant/storage_layer/image_layer.rs b/pageserver/src/tenant/storage_layer/image_layer.rs index 1e03e1a58c..1440c0db84 100644 --- a/pageserver/src/tenant/storage_layer/image_layer.rs +++ b/pageserver/src/tenant/storage_layer/image_layer.rs @@ -994,14 +994,17 @@ impl<'a> ImageLayerIterator<'a> { Key::from_slice(&raw_key[..KEY_SIZE]), self.image_layer.lsn, offset, - BlobFlag::None, ) { break batch_plan; } } else { self.is_end = true; let payload_end = self.image_layer.index_start_blk as u64 * PAGE_SZ as u64; - break self.planner.handle_range_end(payload_end); + if let Some(item) = self.planner.handle_range_end(payload_end) { + break item; + } else { + return Ok(()); // TODO: a test case on empty iterator + } } }; let vectored_blob_reader = VectoredBlobReader::new(&self.image_layer.file); diff --git a/pageserver/src/tenant/vectored_blob_io.rs b/pageserver/src/tenant/vectored_blob_io.rs index 7ad8446e04..1b470034db 100644 --- a/pageserver/src/tenant/vectored_blob_io.rs +++ b/pageserver/src/tenant/vectored_blob_io.rs @@ -68,7 +68,7 @@ impl VectoredRead { } } -#[derive(Eq, PartialEq)] +#[derive(Eq, PartialEq, Debug)] pub(crate) enum VectoredReadExtended { Yes, No, @@ -91,7 +91,7 @@ impl VectoredReadBuilder { start_offset: u64, end_offset: u64, meta: BlobMeta, - max_read_size: Option, + max_read_size: usize, ) -> Self { let mut blobs_at = VecMap::default(); blobs_at @@ -102,10 +102,9 @@ impl VectoredReadBuilder { start: start_offset, end: end_offset, blobs_at, - max_read_size, + max_read_size: Some(max_read_size), } } - /// Attempt to extend the current read with a new blob if the start /// offset matches with the current end of the vectored read /// and the resuting size is below the max read size @@ -164,7 +163,7 @@ pub struct VectoredReadPlanner { // Arguments for previous blob passed into [`VectoredReadPlanner::handle`] prev: Option<(Key, Lsn, u64, BlobFlag)>, - max_read_size: Option, + max_read_size: usize, } impl VectoredReadPlanner { @@ -172,20 +171,7 @@ impl VectoredReadPlanner { Self { blobs: BTreeMap::new(), prev: None, - max_read_size: Some(max_read_size), - } - } - - /// This function should *only* be used if the caller has a way to control the limit. e.g., in [`StreamingVectoredReadPlanner`], - /// it uses the vectored read planner to avoid duplicated logic on handling blob start/end, while expecting the vectored - /// read planner to give a single read to a continuous range of bytes in the image layer. Therefore, it does not need the - /// code path to split reads into chunks of `max_read_size`, and controls the read size itself. - #[cfg(test)] - pub(crate) fn new_caller_controlled_max_limit() -> Self { - Self { - blobs: BTreeMap::new(), - prev: None, - max_read_size: None, + max_read_size, } } @@ -376,17 +362,18 @@ impl<'a> VectoredBlobReader<'a> { } /// Read planner used in [`crate::tenant::storage_layer::image_layer::ImageLayerIterator`]. It provides a streaming API for -/// getting read blobs. It returns a batch when `handle` gets called and when the current key would exceed the read_size and -/// max_cnt constraints. Underlying it uses [`VectoredReadPlanner`]. +/// getting read blobs. It returns a batch when `handle` gets called and when the current key would just exceed the read_size and +/// max_cnt constraints. #[cfg(test)] pub struct StreamingVectoredReadPlanner { - planner: VectoredReadPlanner, - /// Max read size per batch + read_builder: Option, + // Arguments for previous blob passed into [`StreamingVectoredReadPlanner::handle`] + prev: Option<(Key, Lsn, u64)>, + /// Max read size per batch. This is not a strict limit. If there are [0, 100) and [100, 200), while the `max_read_size` is 150, + /// we will produce a single batch instead of split them. max_read_size: u64, /// Max item count per batch max_cnt: usize, - /// The first offset of this batch - this_batch_first_offset: Option, /// Size of the current batch cnt: usize, } @@ -397,62 +384,88 @@ impl StreamingVectoredReadPlanner { assert!(max_cnt > 0); assert!(max_read_size > 0); Self { - // We want to have exactly one read syscall (plus several others for index lookup) for each `next_batch` call. - // Therefore, we enforce `self.max_read_size` by ourselves instead of using the VectoredReadPlanner's capability, - // to avoid splitting into two I/Os. - planner: VectoredReadPlanner::new_caller_controlled_max_limit(), + read_builder: None, + prev: None, max_cnt, max_read_size, - this_batch_first_offset: None, cnt: 0, } } - fn emit(&mut self, this_batch_first_offset: u64) -> VectoredRead { - let planner = std::mem::replace( - &mut self.planner, - VectoredReadPlanner::new_caller_controlled_max_limit(), - ); - self.this_batch_first_offset = Some(this_batch_first_offset); - self.cnt = 1; - let mut batch = planner.finish(); - assert_eq!(batch.len(), 1, "should have exactly one read batch"); - batch.pop().unwrap() + pub fn handle(&mut self, key: Key, lsn: Lsn, offset: u64) -> Option { + // Implementation note: internally lag behind by one blob such that + // we have a start and end offset when initialising [`VectoredRead`] + let (prev_key, prev_lsn, prev_offset) = match self.prev { + None => { + self.prev = Some((key, lsn, offset)); + return None; + } + Some(prev) => prev, + }; + + let res = self.add_blob(prev_key, prev_lsn, prev_offset, offset, false); + + self.prev = Some((key, lsn, offset)); + + res } - pub fn handle( + pub fn handle_range_end(&mut self, offset: u64) -> Option { + let res = if let Some((prev_key, prev_lsn, prev_offset)) = self.prev { + self.add_blob(prev_key, prev_lsn, prev_offset, offset, true) + } else { + None + }; + + self.prev = None; + + res + } + + fn add_blob( &mut self, key: Key, lsn: Lsn, - offset: u64, - flag: BlobFlag, + start_offset: u64, + end_offset: u64, + is_last_blob_in_read: bool, ) -> Option { - if let Some(begin_offset) = self.this_batch_first_offset { - // Each batch will have at least one item b/c `self.this_batch_first_offset` is set - // after one item gets processed - if offset - begin_offset > self.max_read_size { - self.planner.handle_range_end(offset); // End the current batch with the offset - let batch = self.emit(offset); // Produce a batch - self.planner.handle(key, lsn, offset, flag); // Add this key to the next batch - return Some(batch); + match &mut self.read_builder { + Some(read_builder) => { + let extended = read_builder.extend(start_offset, end_offset, BlobMeta { key, lsn }); + assert_eq!(extended, VectoredReadExtended::Yes); } - } else { - self.this_batch_first_offset = Some(offset) - } - if self.cnt >= self.max_cnt { - self.planner.handle_range_end(offset); // End the current batch with the offset - let batch = self.emit(offset); // Produce a batch - self.planner.handle(key, lsn, offset, flag); // Add this key to the next batch - return Some(batch); - } - self.planner.handle(key, lsn, offset, flag); // Add this key to the current batch - self.cnt += 1; - None - } + None => { + self.read_builder = { + let mut blobs_at = VecMap::default(); + blobs_at + .append(start_offset, BlobMeta { key, lsn }) + .expect("First insertion always succeeds"); - pub fn handle_range_end(&mut self, offset: u64) -> VectoredRead { - self.planner.handle_range_end(offset); - self.emit(offset) + Some(VectoredReadBuilder { + start: start_offset, + end: end_offset, + blobs_at, + max_read_size: None, + }) + }; + } + } + let read_builder = self.read_builder.as_mut().unwrap(); + self.cnt += 1; + if is_last_blob_in_read + || read_builder.size() >= self.max_read_size as usize + || self.cnt >= self.max_cnt + { + let prev_read_builder = self.read_builder.take(); + self.cnt = 0; + + // `current_read_builder` is None in the first iteration + if let Some(read_builder) = prev_read_builder { + return Some(read_builder.build()); + } + } + None } } @@ -509,8 +522,11 @@ mod tests { planner.handle_range_end(652 * 1024); let reads = planner.finish(); + assert_eq!(reads.len(), 6); + // TODO: could remove zero reads to produce 5 reads here + for (idx, read) in reads.iter().enumerate() { validate_read(read, ranges[idx]); } @@ -548,4 +564,127 @@ mod tests { validate_read(read, ranges[idx]); } } + + #[test] + fn streaming_planner_max_read_size_test() { + let max_read_size = 128 * 1024; + let key = Key::MIN; + let lsn = Lsn(0); + + let blob_descriptions = vec![ + (key, lsn, 0, BlobFlag::None), + (key, lsn, 32 * 1024, BlobFlag::None), + (key, lsn, 96 * 1024, BlobFlag::None), + (key, lsn, 128 * 1024, BlobFlag::None), + (key, lsn, 198 * 1024, BlobFlag::None), + (key, lsn, 268 * 1024, BlobFlag::None), + (key, lsn, 396 * 1024, BlobFlag::None), + (key, lsn, 652 * 1024, BlobFlag::None), + ]; + + let ranges = [ + &blob_descriptions[0..3], + &blob_descriptions[3..5], + &blob_descriptions[5..6], + &blob_descriptions[6..7], + &blob_descriptions[7..], + ]; + + let mut planner = StreamingVectoredReadPlanner::new(max_read_size, 1000); + let mut reads = Vec::new(); + for (key, lsn, offset, _) in blob_descriptions.clone() { + reads.extend(planner.handle(key, lsn, offset)); + } + reads.extend(planner.handle_range_end(652 * 1024)); + + assert_eq!(reads.len(), ranges.len()); + + for (idx, read) in reads.iter().enumerate() { + validate_read(read, ranges[idx]); + } + } + + #[test] + fn streaming_planner_max_cnt_test() { + let max_read_size = 1024 * 1024; + let key = Key::MIN; + let lsn = Lsn(0); + + let blob_descriptions = vec![ + (key, lsn, 0, BlobFlag::None), + (key, lsn, 32 * 1024, BlobFlag::None), + (key, lsn, 96 * 1024, BlobFlag::None), + (key, lsn, 128 * 1024, BlobFlag::None), + (key, lsn, 198 * 1024, BlobFlag::None), + (key, lsn, 268 * 1024, BlobFlag::None), + (key, lsn, 396 * 1024, BlobFlag::None), + (key, lsn, 652 * 1024, BlobFlag::None), + ]; + + let ranges = [ + &blob_descriptions[0..2], + &blob_descriptions[2..4], + &blob_descriptions[4..6], + &blob_descriptions[6..], + ]; + + let mut planner = StreamingVectoredReadPlanner::new(max_read_size, 2); + let mut reads = Vec::new(); + for (key, lsn, offset, _) in blob_descriptions.clone() { + reads.extend(planner.handle(key, lsn, offset)); + } + reads.extend(planner.handle_range_end(652 * 1024)); + + assert_eq!(reads.len(), ranges.len()); + + for (idx, read) in reads.iter().enumerate() { + validate_read(read, ranges[idx]); + } + } + + #[test] + fn streaming_planner_edge_test() { + let max_read_size = 1024 * 1024; + let key = Key::MIN; + let lsn = Lsn(0); + { + let mut planner = StreamingVectoredReadPlanner::new(max_read_size, 1); + let mut reads = Vec::new(); + reads.extend(planner.handle_range_end(652 * 1024)); + assert!(reads.is_empty()); + } + { + let mut planner = StreamingVectoredReadPlanner::new(max_read_size, 1); + let mut reads = Vec::new(); + reads.extend(planner.handle(key, lsn, 0)); + reads.extend(planner.handle_range_end(652 * 1024)); + assert_eq!(reads.len(), 1); + validate_read(&reads[0], &[(key, lsn, 0, BlobFlag::None)]); + } + { + let mut planner = StreamingVectoredReadPlanner::new(max_read_size, 1); + let mut reads = Vec::new(); + reads.extend(planner.handle(key, lsn, 0)); + reads.extend(planner.handle(key, lsn, 128 * 1024)); + reads.extend(planner.handle_range_end(652 * 1024)); + assert_eq!(reads.len(), 2); + validate_read(&reads[0], &[(key, lsn, 0, BlobFlag::None)]); + validate_read(&reads[1], &[(key, lsn, 128 * 1024, BlobFlag::None)]); + } + { + let mut planner = StreamingVectoredReadPlanner::new(max_read_size, 2); + let mut reads = Vec::new(); + reads.extend(planner.handle(key, lsn, 0)); + reads.extend(planner.handle(key, lsn, 128 * 1024)); + reads.extend(planner.handle_range_end(652 * 1024)); + assert_eq!(reads.len(), 1); + validate_read( + &reads[0], + &[ + (key, lsn, 0, BlobFlag::None), + (key, lsn, 128 * 1024, BlobFlag::None), + ], + ); + } + } }