From 6827f2f58ccb8dec0922d0a2c7a413998cf2f539 Mon Sep 17 00:00:00 2001 From: "Alex Chi Z." <4198311+skyzh@users.noreply.github.com> Date: Tue, 6 May 2025 20:27:16 +0800 Subject: [PATCH] fix(pageserver): only keep `iter_with_options` API, improve docs in gc-compact (#11804) ## Problem Address comments in https://github.com/neondatabase/neon/pull/11709 ## Summary of changes - remove `iter` API, users always need to specify buffer size depending on the expected memory usage. - several doc improvements --------- Signed-off-by: Alex Chi Z Co-authored-by: Christian Schwarz --- .../src/tenant/storage_layer/delta_layer.rs | 15 +----- .../tenant/storage_layer/filter_iterator.rs | 4 +- .../src/tenant/storage_layer/image_layer.rs | 15 +----- .../tenant/storage_layer/merge_iterator.rs | 46 +++++++++++-------- pageserver/src/tenant/timeline/compaction.rs | 13 ++++-- 5 files changed, 42 insertions(+), 51 deletions(-) diff --git a/pageserver/src/tenant/storage_layer/delta_layer.rs b/pageserver/src/tenant/storage_layer/delta_layer.rs index 607b0d513c..11875ac653 100644 --- a/pageserver/src/tenant/storage_layer/delta_layer.rs +++ b/pageserver/src/tenant/storage_layer/delta_layer.rs @@ -1441,14 +1441,6 @@ impl DeltaLayerInner { offset } - pub fn iter<'a>(&'a self, ctx: &'a RequestContext) -> DeltaLayerIterator<'a> { - self.iter_with_options( - ctx, - 1024 * 8192, // The default value. Unit tests might use a different value. 1024 * 8K = 8MB buffer. - 1024, // The default value. Unit tests might use a different value - ) - } - pub fn iter_with_options<'a>( &'a self, ctx: &'a RequestContext, @@ -1634,7 +1626,6 @@ pub(crate) mod test { use crate::tenant::disk_btree::tests::TestDisk; use crate::tenant::harness::{TIMELINE_ID, TenantHarness}; use crate::tenant::storage_layer::{Layer, ResidentLayer}; - use crate::tenant::vectored_blob_io::StreamingVectoredReadPlanner; use crate::tenant::{TenantShard, Timeline}; /// Construct an index for a fictional delta layer and and then @@ -2311,8 +2302,7 @@ pub(crate) mod test { for batch_size in [1, 2, 4, 8, 3, 7, 13] { println!("running with batch_size={batch_size} max_read_size={max_read_size}"); // Test if the batch size is correctly determined - let mut iter = delta_layer.iter(&ctx); - iter.planner = StreamingVectoredReadPlanner::new(max_read_size, batch_size); + let mut iter = delta_layer.iter_with_options(&ctx, max_read_size, batch_size); let mut num_items = 0; for _ in 0..3 { iter.next_batch().await.unwrap(); @@ -2329,8 +2319,7 @@ pub(crate) mod test { iter.key_values_batch.clear(); } // Test if the result is correct - let mut iter = delta_layer.iter(&ctx); - iter.planner = StreamingVectoredReadPlanner::new(max_read_size, batch_size); + let mut iter = delta_layer.iter_with_options(&ctx, max_read_size, batch_size); assert_delta_iter_equal(&mut iter, &test_deltas).await; } } diff --git a/pageserver/src/tenant/storage_layer/filter_iterator.rs b/pageserver/src/tenant/storage_layer/filter_iterator.rs index 8d172a1c19..1a330ecfc2 100644 --- a/pageserver/src/tenant/storage_layer/filter_iterator.rs +++ b/pageserver/src/tenant/storage_layer/filter_iterator.rs @@ -157,7 +157,7 @@ mod tests { .await .unwrap(); - let merge_iter = MergeIterator::create( + let merge_iter = MergeIterator::create_for_testing( &[resident_layer_1.get_as_delta(&ctx).await.unwrap()], &[], &ctx, @@ -182,7 +182,7 @@ mod tests { result.extend(test_deltas1[90..100].iter().cloned()); assert_filter_iter_equal(&mut filter_iter, &result).await; - let merge_iter = MergeIterator::create( + let merge_iter = MergeIterator::create_for_testing( &[resident_layer_1.get_as_delta(&ctx).await.unwrap()], &[], &ctx, diff --git a/pageserver/src/tenant/storage_layer/image_layer.rs b/pageserver/src/tenant/storage_layer/image_layer.rs index 2f7c5715bb..d684230572 100644 --- a/pageserver/src/tenant/storage_layer/image_layer.rs +++ b/pageserver/src/tenant/storage_layer/image_layer.rs @@ -684,14 +684,6 @@ impl ImageLayerInner { } } - pub(crate) fn iter<'a>(&'a self, ctx: &'a RequestContext) -> ImageLayerIterator<'a> { - self.iter_with_options( - ctx, - 1024 * 8192, // The default value. Unit tests might use a different value. 1024 * 8K = 8MB buffer. - 1024, // The default value. Unit tests might use a different value - ) - } - pub(crate) fn iter_with_options<'a>( &'a self, ctx: &'a RequestContext, @@ -1240,7 +1232,6 @@ mod test { use crate::context::RequestContext; use crate::tenant::harness::{TIMELINE_ID, TenantHarness}; use crate::tenant::storage_layer::{Layer, ResidentLayer}; - use crate::tenant::vectored_blob_io::StreamingVectoredReadPlanner; use crate::tenant::{TenantShard, Timeline}; #[tokio::test] @@ -1507,8 +1498,7 @@ mod test { for batch_size in [1, 2, 4, 8, 3, 7, 13] { println!("running with batch_size={batch_size} max_read_size={max_read_size}"); // Test if the batch size is correctly determined - let mut iter = img_layer.iter(&ctx); - iter.planner = StreamingVectoredReadPlanner::new(max_read_size, batch_size); + let mut iter = img_layer.iter_with_options(&ctx, max_read_size, batch_size); let mut num_items = 0; for _ in 0..3 { iter.next_batch().await.unwrap(); @@ -1525,8 +1515,7 @@ mod test { iter.key_values_batch.clear(); } // Test if the result is correct - let mut iter = img_layer.iter(&ctx); - iter.planner = StreamingVectoredReadPlanner::new(max_read_size, batch_size); + let mut iter = img_layer.iter_with_options(&ctx, max_read_size, batch_size); assert_img_iter_equal(&mut iter, &test_imgs, Lsn(0x10)).await; } } diff --git a/pageserver/src/tenant/storage_layer/merge_iterator.rs b/pageserver/src/tenant/storage_layer/merge_iterator.rs index e084e3d567..ea3dea50c3 100644 --- a/pageserver/src/tenant/storage_layer/merge_iterator.rs +++ b/pageserver/src/tenant/storage_layer/merge_iterator.rs @@ -19,14 +19,6 @@ pub(crate) enum LayerRef<'a> { } impl<'a> LayerRef<'a> { - #[allow(dead_code)] - fn iter(self, ctx: &'a RequestContext) -> LayerIterRef<'a> { - match self { - Self::Image(x) => LayerIterRef::Image(x.iter(ctx)), - Self::Delta(x) => LayerIterRef::Delta(x.iter(ctx)), - } - } - fn iter_with_options( self, ctx: &'a RequestContext, @@ -322,6 +314,28 @@ impl MergeIteratorItem for ((Key, Lsn, Value), Arc) { } impl<'a> MergeIterator<'a> { + #[cfg(test)] + pub(crate) fn create_for_testing( + deltas: &[&'a DeltaLayerInner], + images: &[&'a ImageLayerInner], + ctx: &'a RequestContext, + ) -> Self { + Self::create_with_options(deltas, images, ctx, 1024 * 8192, 1024) + } + + /// Create a new merge iterator with custom options. + /// + /// Adjust `max_read_size` and `max_batch_size` to trade memory usage for performance. The size should scale + /// with the number of layers to compact. If there are a lot of layers, consider reducing the values, so that + /// the buffer does not take too much memory. + /// + /// The default options for L0 compactions are: + /// - max_read_size: 1024 * 8192 (8MB) + /// - max_batch_size: 1024 + /// + /// The default options for gc-compaction are: + /// - max_read_size: 128 * 8192 (1MB) + /// - max_batch_size: 128 pub fn create_with_options( deltas: &[&'a DeltaLayerInner], images: &[&'a ImageLayerInner], @@ -351,14 +365,6 @@ impl<'a> MergeIterator<'a> { } } - pub fn create( - deltas: &[&'a DeltaLayerInner], - images: &[&'a ImageLayerInner], - ctx: &'a RequestContext, - ) -> Self { - Self::create_with_options(deltas, images, ctx, 1024 * 8192, 1024) - } - pub(crate) async fn next_inner(&mut self) -> anyhow::Result> { while let Some(mut iter) = self.heap.peek_mut() { if !iter.is_loaded() { @@ -477,7 +483,7 @@ mod tests { let resident_layer_2 = produce_delta_layer(&tenant, &tline, test_deltas2.clone(), &ctx) .await .unwrap(); - let mut merge_iter = MergeIterator::create( + let mut merge_iter = MergeIterator::create_for_testing( &[ resident_layer_2.get_as_delta(&ctx).await.unwrap(), resident_layer_1.get_as_delta(&ctx).await.unwrap(), @@ -549,7 +555,7 @@ mod tests { let resident_layer_3 = produce_delta_layer(&tenant, &tline, test_deltas3.clone(), &ctx) .await .unwrap(); - let mut merge_iter = MergeIterator::create( + let mut merge_iter = MergeIterator::create_for_testing( &[ resident_layer_1.get_as_delta(&ctx).await.unwrap(), resident_layer_2.get_as_delta(&ctx).await.unwrap(), @@ -670,7 +676,7 @@ mod tests { // Test with different layer order for MergeIterator::create to ensure the order // is stable. - let mut merge_iter = MergeIterator::create( + let mut merge_iter = MergeIterator::create_for_testing( &[ resident_layer_4.get_as_delta(&ctx).await.unwrap(), resident_layer_1.get_as_delta(&ctx).await.unwrap(), @@ -682,7 +688,7 @@ mod tests { ); assert_merge_iter_equal(&mut merge_iter, &expect).await; - let mut merge_iter = MergeIterator::create( + let mut merge_iter = MergeIterator::create_for_testing( &[ resident_layer_1.get_as_delta(&ctx).await.unwrap(), resident_layer_4.get_as_delta(&ctx).await.unwrap(), diff --git a/pageserver/src/tenant/timeline/compaction.rs b/pageserver/src/tenant/timeline/compaction.rs index 9086d29d50..d0c13d86ce 100644 --- a/pageserver/src/tenant/timeline/compaction.rs +++ b/pageserver/src/tenant/timeline/compaction.rs @@ -1994,7 +1994,13 @@ impl Timeline { let l = l.get_as_delta(ctx).await.map_err(CompactionError::Other)?; deltas.push(l); } - MergeIterator::create(&deltas, &[], ctx) + MergeIterator::create_with_options( + &deltas, + &[], + ctx, + 1024 * 8192, /* 8 MiB buffer per layer iterator */ + 1024, + ) }; // This iterator walks through all keys and is needed to calculate size used by each key @@ -2828,7 +2834,7 @@ impl Timeline { Ok(()) } - /// Check if the memory usage is within the limit. + /// Check to bail out of gc compaction early if it would use too much memory. async fn check_memory_usage( self: &Arc, layer_selection: &[Layer], @@ -2841,7 +2847,8 @@ impl Timeline { let layer_desc = layer.layer_desc(); if layer_desc.is_delta() { // Delta layers at most have 1MB buffer; 3x to make it safe (there're deltas as large as 16KB). - // Multiply the layer size so that tests can pass. + // Scale it by target_layer_size_bytes so that tests can pass (some tests, e.g., `test_pageserver_gc_compaction_preempt + // use 3MB layer size and we need to account for that). estimated_memory_usage_mb += 3.0 * (layer_desc.file_size / target_layer_size_bytes) as f64; num_delta_layers += 1;