mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-03 11:32:56 +00:00
fix(pageserver): only keep iter_with_options API, improve docs in gc-compact (#11804)
## Problem Address comments in https://github.com/neondatabase/neon/pull/11709 ## Summary of changes - remove `iter` API, users always need to specify buffer size depending on the expected memory usage. - several doc improvements --------- Signed-off-by: Alex Chi Z <chi@neon.tech> Co-authored-by: Christian Schwarz <christian@neon.tech>
This commit is contained in:
@@ -1441,14 +1441,6 @@ impl DeltaLayerInner {
|
||||
offset
|
||||
}
|
||||
|
||||
pub fn iter<'a>(&'a self, ctx: &'a RequestContext) -> DeltaLayerIterator<'a> {
|
||||
self.iter_with_options(
|
||||
ctx,
|
||||
1024 * 8192, // The default value. Unit tests might use a different value. 1024 * 8K = 8MB buffer.
|
||||
1024, // The default value. Unit tests might use a different value
|
||||
)
|
||||
}
|
||||
|
||||
pub fn iter_with_options<'a>(
|
||||
&'a self,
|
||||
ctx: &'a RequestContext,
|
||||
@@ -1634,7 +1626,6 @@ pub(crate) mod test {
|
||||
use crate::tenant::disk_btree::tests::TestDisk;
|
||||
use crate::tenant::harness::{TIMELINE_ID, TenantHarness};
|
||||
use crate::tenant::storage_layer::{Layer, ResidentLayer};
|
||||
use crate::tenant::vectored_blob_io::StreamingVectoredReadPlanner;
|
||||
use crate::tenant::{TenantShard, Timeline};
|
||||
|
||||
/// Construct an index for a fictional delta layer and and then
|
||||
@@ -2311,8 +2302,7 @@ pub(crate) mod test {
|
||||
for batch_size in [1, 2, 4, 8, 3, 7, 13] {
|
||||
println!("running with batch_size={batch_size} max_read_size={max_read_size}");
|
||||
// Test if the batch size is correctly determined
|
||||
let mut iter = delta_layer.iter(&ctx);
|
||||
iter.planner = StreamingVectoredReadPlanner::new(max_read_size, batch_size);
|
||||
let mut iter = delta_layer.iter_with_options(&ctx, max_read_size, batch_size);
|
||||
let mut num_items = 0;
|
||||
for _ in 0..3 {
|
||||
iter.next_batch().await.unwrap();
|
||||
@@ -2329,8 +2319,7 @@ pub(crate) mod test {
|
||||
iter.key_values_batch.clear();
|
||||
}
|
||||
// Test if the result is correct
|
||||
let mut iter = delta_layer.iter(&ctx);
|
||||
iter.planner = StreamingVectoredReadPlanner::new(max_read_size, batch_size);
|
||||
let mut iter = delta_layer.iter_with_options(&ctx, max_read_size, batch_size);
|
||||
assert_delta_iter_equal(&mut iter, &test_deltas).await;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -157,7 +157,7 @@ mod tests {
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let merge_iter = MergeIterator::create(
|
||||
let merge_iter = MergeIterator::create_for_testing(
|
||||
&[resident_layer_1.get_as_delta(&ctx).await.unwrap()],
|
||||
&[],
|
||||
&ctx,
|
||||
@@ -182,7 +182,7 @@ mod tests {
|
||||
result.extend(test_deltas1[90..100].iter().cloned());
|
||||
assert_filter_iter_equal(&mut filter_iter, &result).await;
|
||||
|
||||
let merge_iter = MergeIterator::create(
|
||||
let merge_iter = MergeIterator::create_for_testing(
|
||||
&[resident_layer_1.get_as_delta(&ctx).await.unwrap()],
|
||||
&[],
|
||||
&ctx,
|
||||
|
||||
@@ -684,14 +684,6 @@ impl ImageLayerInner {
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn iter<'a>(&'a self, ctx: &'a RequestContext) -> ImageLayerIterator<'a> {
|
||||
self.iter_with_options(
|
||||
ctx,
|
||||
1024 * 8192, // The default value. Unit tests might use a different value. 1024 * 8K = 8MB buffer.
|
||||
1024, // The default value. Unit tests might use a different value
|
||||
)
|
||||
}
|
||||
|
||||
pub(crate) fn iter_with_options<'a>(
|
||||
&'a self,
|
||||
ctx: &'a RequestContext,
|
||||
@@ -1240,7 +1232,6 @@ mod test {
|
||||
use crate::context::RequestContext;
|
||||
use crate::tenant::harness::{TIMELINE_ID, TenantHarness};
|
||||
use crate::tenant::storage_layer::{Layer, ResidentLayer};
|
||||
use crate::tenant::vectored_blob_io::StreamingVectoredReadPlanner;
|
||||
use crate::tenant::{TenantShard, Timeline};
|
||||
|
||||
#[tokio::test]
|
||||
@@ -1507,8 +1498,7 @@ mod test {
|
||||
for batch_size in [1, 2, 4, 8, 3, 7, 13] {
|
||||
println!("running with batch_size={batch_size} max_read_size={max_read_size}");
|
||||
// Test if the batch size is correctly determined
|
||||
let mut iter = img_layer.iter(&ctx);
|
||||
iter.planner = StreamingVectoredReadPlanner::new(max_read_size, batch_size);
|
||||
let mut iter = img_layer.iter_with_options(&ctx, max_read_size, batch_size);
|
||||
let mut num_items = 0;
|
||||
for _ in 0..3 {
|
||||
iter.next_batch().await.unwrap();
|
||||
@@ -1525,8 +1515,7 @@ mod test {
|
||||
iter.key_values_batch.clear();
|
||||
}
|
||||
// Test if the result is correct
|
||||
let mut iter = img_layer.iter(&ctx);
|
||||
iter.planner = StreamingVectoredReadPlanner::new(max_read_size, batch_size);
|
||||
let mut iter = img_layer.iter_with_options(&ctx, max_read_size, batch_size);
|
||||
assert_img_iter_equal(&mut iter, &test_imgs, Lsn(0x10)).await;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -19,14 +19,6 @@ pub(crate) enum LayerRef<'a> {
|
||||
}
|
||||
|
||||
impl<'a> LayerRef<'a> {
|
||||
#[allow(dead_code)]
|
||||
fn iter(self, ctx: &'a RequestContext) -> LayerIterRef<'a> {
|
||||
match self {
|
||||
Self::Image(x) => LayerIterRef::Image(x.iter(ctx)),
|
||||
Self::Delta(x) => LayerIterRef::Delta(x.iter(ctx)),
|
||||
}
|
||||
}
|
||||
|
||||
fn iter_with_options(
|
||||
self,
|
||||
ctx: &'a RequestContext,
|
||||
@@ -322,6 +314,28 @@ impl MergeIteratorItem for ((Key, Lsn, Value), Arc<PersistentLayerKey>) {
|
||||
}
|
||||
|
||||
impl<'a> MergeIterator<'a> {
|
||||
#[cfg(test)]
|
||||
pub(crate) fn create_for_testing(
|
||||
deltas: &[&'a DeltaLayerInner],
|
||||
images: &[&'a ImageLayerInner],
|
||||
ctx: &'a RequestContext,
|
||||
) -> Self {
|
||||
Self::create_with_options(deltas, images, ctx, 1024 * 8192, 1024)
|
||||
}
|
||||
|
||||
/// Create a new merge iterator with custom options.
|
||||
///
|
||||
/// Adjust `max_read_size` and `max_batch_size` to trade memory usage for performance. The size should scale
|
||||
/// with the number of layers to compact. If there are a lot of layers, consider reducing the values, so that
|
||||
/// the buffer does not take too much memory.
|
||||
///
|
||||
/// The default options for L0 compactions are:
|
||||
/// - max_read_size: 1024 * 8192 (8MB)
|
||||
/// - max_batch_size: 1024
|
||||
///
|
||||
/// The default options for gc-compaction are:
|
||||
/// - max_read_size: 128 * 8192 (1MB)
|
||||
/// - max_batch_size: 128
|
||||
pub fn create_with_options(
|
||||
deltas: &[&'a DeltaLayerInner],
|
||||
images: &[&'a ImageLayerInner],
|
||||
@@ -351,14 +365,6 @@ impl<'a> MergeIterator<'a> {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn create(
|
||||
deltas: &[&'a DeltaLayerInner],
|
||||
images: &[&'a ImageLayerInner],
|
||||
ctx: &'a RequestContext,
|
||||
) -> Self {
|
||||
Self::create_with_options(deltas, images, ctx, 1024 * 8192, 1024)
|
||||
}
|
||||
|
||||
pub(crate) async fn next_inner<R: MergeIteratorItem>(&mut self) -> anyhow::Result<Option<R>> {
|
||||
while let Some(mut iter) = self.heap.peek_mut() {
|
||||
if !iter.is_loaded() {
|
||||
@@ -477,7 +483,7 @@ mod tests {
|
||||
let resident_layer_2 = produce_delta_layer(&tenant, &tline, test_deltas2.clone(), &ctx)
|
||||
.await
|
||||
.unwrap();
|
||||
let mut merge_iter = MergeIterator::create(
|
||||
let mut merge_iter = MergeIterator::create_for_testing(
|
||||
&[
|
||||
resident_layer_2.get_as_delta(&ctx).await.unwrap(),
|
||||
resident_layer_1.get_as_delta(&ctx).await.unwrap(),
|
||||
@@ -549,7 +555,7 @@ mod tests {
|
||||
let resident_layer_3 = produce_delta_layer(&tenant, &tline, test_deltas3.clone(), &ctx)
|
||||
.await
|
||||
.unwrap();
|
||||
let mut merge_iter = MergeIterator::create(
|
||||
let mut merge_iter = MergeIterator::create_for_testing(
|
||||
&[
|
||||
resident_layer_1.get_as_delta(&ctx).await.unwrap(),
|
||||
resident_layer_2.get_as_delta(&ctx).await.unwrap(),
|
||||
@@ -670,7 +676,7 @@ mod tests {
|
||||
// Test with different layer order for MergeIterator::create to ensure the order
|
||||
// is stable.
|
||||
|
||||
let mut merge_iter = MergeIterator::create(
|
||||
let mut merge_iter = MergeIterator::create_for_testing(
|
||||
&[
|
||||
resident_layer_4.get_as_delta(&ctx).await.unwrap(),
|
||||
resident_layer_1.get_as_delta(&ctx).await.unwrap(),
|
||||
@@ -682,7 +688,7 @@ mod tests {
|
||||
);
|
||||
assert_merge_iter_equal(&mut merge_iter, &expect).await;
|
||||
|
||||
let mut merge_iter = MergeIterator::create(
|
||||
let mut merge_iter = MergeIterator::create_for_testing(
|
||||
&[
|
||||
resident_layer_1.get_as_delta(&ctx).await.unwrap(),
|
||||
resident_layer_4.get_as_delta(&ctx).await.unwrap(),
|
||||
|
||||
@@ -1994,7 +1994,13 @@ impl Timeline {
|
||||
let l = l.get_as_delta(ctx).await.map_err(CompactionError::Other)?;
|
||||
deltas.push(l);
|
||||
}
|
||||
MergeIterator::create(&deltas, &[], ctx)
|
||||
MergeIterator::create_with_options(
|
||||
&deltas,
|
||||
&[],
|
||||
ctx,
|
||||
1024 * 8192, /* 8 MiB buffer per layer iterator */
|
||||
1024,
|
||||
)
|
||||
};
|
||||
|
||||
// This iterator walks through all keys and is needed to calculate size used by each key
|
||||
@@ -2828,7 +2834,7 @@ impl Timeline {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Check if the memory usage is within the limit.
|
||||
/// Check to bail out of gc compaction early if it would use too much memory.
|
||||
async fn check_memory_usage(
|
||||
self: &Arc<Self>,
|
||||
layer_selection: &[Layer],
|
||||
@@ -2841,7 +2847,8 @@ impl Timeline {
|
||||
let layer_desc = layer.layer_desc();
|
||||
if layer_desc.is_delta() {
|
||||
// Delta layers at most have 1MB buffer; 3x to make it safe (there're deltas as large as 16KB).
|
||||
// Multiply the layer size so that tests can pass.
|
||||
// Scale it by target_layer_size_bytes so that tests can pass (some tests, e.g., `test_pageserver_gc_compaction_preempt
|
||||
// use 3MB layer size and we need to account for that).
|
||||
estimated_memory_usage_mb +=
|
||||
3.0 * (layer_desc.file_size / target_layer_size_bytes) as f64;
|
||||
num_delta_layers += 1;
|
||||
|
||||
Reference in New Issue
Block a user