fix(pageserver): reduce gc-compaction memory usage (#11709)

## Problem

close https://github.com/neondatabase/neon/issues/11694

We had the delta layer iterator and image layer iterator set to buffer
at most 8MB data. Note that 8MB is the compressed size, so it is
possible for those iterators contain more than 8MB data in memory.

For the recent OOM case, gc-compaction was running over 556 layers,
which means that we will have 556 active iterators. So in theory, it
could take up to 556*8=4448MB memory when the compaction is going on. If
images get compressed and the compression ratio is high (for that
tenant, we see 3x compression ratio across image layers), then that's
13344MB memory.

Also we have layer rewrites, which explains the memory taken by
gc-compaction itself (versus the iterators). We rewrite 424 out of 556
layers, and each of such rewrites need a pair of delta layer writer. So
we are buffering a lot of deltas in the memory.

The flamegraph shows that gc-compaction itself takes 6GB memory, delta
iterator 7GB, and image iterator 2GB, which can be explained by the
above theory.

## Summary of changes

- Reduce the buffer sizes.
- Estimate memory consumption and if it is too high.
- Also give up if the number of layers-to-rewrite is too high.

---------

Signed-off-by: Alex Chi Z <chi@neon.tech>
This commit is contained in:
Alex Chi Z.
2025-04-25 04:45:31 -04:00
committed by GitHub
parent 2465e9141f
commit 5d91d4e843
4 changed files with 141 additions and 20 deletions

View File

@@ -1442,6 +1442,19 @@ impl DeltaLayerInner {
}
pub fn iter<'a>(&'a self, ctx: &'a RequestContext) -> DeltaLayerIterator<'a> {
self.iter_with_options(
ctx,
1024 * 8192, // The default value. Unit tests might use a different value. 1024 * 8K = 8MB buffer.
1024, // The default value. Unit tests might use a different value
)
}
pub fn iter_with_options<'a>(
&'a self,
ctx: &'a RequestContext,
max_read_size: u64,
max_batch_size: usize,
) -> DeltaLayerIterator<'a> {
let block_reader = FileBlockReader::new(&self.file, self.file_id);
let tree_reader =
DiskBtreeReader::new(self.index_start_blk, self.index_root_blk, block_reader);
@@ -1451,10 +1464,7 @@ impl DeltaLayerInner {
index_iter: tree_reader.iter(&[0; DELTA_KEY_SIZE], ctx),
key_values_batch: std::collections::VecDeque::new(),
is_end: false,
planner: StreamingVectoredReadPlanner::new(
1024 * 8192, // The default value. Unit tests might use a different value. 1024 * 8K = 8MB buffer.
1024, // The default value. Unit tests might use a different value
),
planner: StreamingVectoredReadPlanner::new(max_read_size, max_batch_size),
}
}

View File

@@ -685,6 +685,19 @@ impl ImageLayerInner {
}
pub(crate) fn iter<'a>(&'a self, ctx: &'a RequestContext) -> ImageLayerIterator<'a> {
self.iter_with_options(
ctx,
1024 * 8192, // The default value. Unit tests might use a different value. 1024 * 8K = 8MB buffer.
1024, // The default value. Unit tests might use a different value
)
}
pub(crate) fn iter_with_options<'a>(
&'a self,
ctx: &'a RequestContext,
max_read_size: u64,
max_batch_size: usize,
) -> ImageLayerIterator<'a> {
let block_reader = FileBlockReader::new(&self.file, self.file_id);
let tree_reader =
DiskBtreeReader::new(self.index_start_blk, self.index_root_blk, block_reader);
@@ -694,10 +707,7 @@ impl ImageLayerInner {
index_iter: tree_reader.iter(&[0; KEY_SIZE], ctx),
key_values_batch: VecDeque::new(),
is_end: false,
planner: StreamingVectoredReadPlanner::new(
1024 * 8192, // The default value. Unit tests might use a different value. 1024 * 8K = 8MB buffer.
1024, // The default value. Unit tests might use a different value
),
planner: StreamingVectoredReadPlanner::new(max_read_size, max_batch_size),
}
}

View File

@@ -19,6 +19,7 @@ pub(crate) enum LayerRef<'a> {
}
impl<'a> LayerRef<'a> {
#[allow(dead_code)]
fn iter(self, ctx: &'a RequestContext) -> LayerIterRef<'a> {
match self {
Self::Image(x) => LayerIterRef::Image(x.iter(ctx)),
@@ -26,6 +27,22 @@ impl<'a> LayerRef<'a> {
}
}
fn iter_with_options(
self,
ctx: &'a RequestContext,
max_read_size: u64,
max_batch_size: usize,
) -> LayerIterRef<'a> {
match self {
Self::Image(x) => {
LayerIterRef::Image(x.iter_with_options(ctx, max_read_size, max_batch_size))
}
Self::Delta(x) => {
LayerIterRef::Delta(x.iter_with_options(ctx, max_read_size, max_batch_size))
}
}
}
fn layer_dbg_info(&self) -> String {
match self {
Self::Image(x) => x.layer_dbg_info(),
@@ -66,6 +83,8 @@ pub(crate) enum IteratorWrapper<'a> {
first_key_lower_bound: (Key, Lsn),
layer: LayerRef<'a>,
source_desc: Arc<PersistentLayerKey>,
max_read_size: u64,
max_batch_size: usize,
},
Loaded {
iter: PeekableLayerIterRef<'a>,
@@ -146,6 +165,8 @@ impl<'a> IteratorWrapper<'a> {
pub fn create_from_image_layer(
image_layer: &'a ImageLayerInner,
ctx: &'a RequestContext,
max_read_size: u64,
max_batch_size: usize,
) -> Self {
Self::NotLoaded {
layer: LayerRef::Image(image_layer),
@@ -157,12 +178,16 @@ impl<'a> IteratorWrapper<'a> {
is_delta: false,
}
.into(),
max_read_size,
max_batch_size,
}
}
pub fn create_from_delta_layer(
delta_layer: &'a DeltaLayerInner,
ctx: &'a RequestContext,
max_read_size: u64,
max_batch_size: usize,
) -> Self {
Self::NotLoaded {
layer: LayerRef::Delta(delta_layer),
@@ -174,6 +199,8 @@ impl<'a> IteratorWrapper<'a> {
is_delta: true,
}
.into(),
max_read_size,
max_batch_size,
}
}
@@ -204,11 +231,13 @@ impl<'a> IteratorWrapper<'a> {
first_key_lower_bound,
layer,
source_desc,
max_read_size,
max_batch_size,
} = self
else {
unreachable!()
};
let iter = layer.iter(ctx);
let iter = layer.iter_with_options(ctx, *max_read_size, *max_batch_size);
let iter = PeekableLayerIterRef::create(iter).await?;
if let Some((k1, l1, _)) = iter.peek() {
let (k2, l2) = first_key_lower_bound;
@@ -293,21 +322,41 @@ impl MergeIteratorItem for ((Key, Lsn, Value), Arc<PersistentLayerKey>) {
}
impl<'a> MergeIterator<'a> {
pub fn create_with_options(
deltas: &[&'a DeltaLayerInner],
images: &[&'a ImageLayerInner],
ctx: &'a RequestContext,
max_read_size: u64,
max_batch_size: usize,
) -> Self {
let mut heap = Vec::with_capacity(images.len() + deltas.len());
for image in images {
heap.push(IteratorWrapper::create_from_image_layer(
image,
ctx,
max_read_size,
max_batch_size,
));
}
for delta in deltas {
heap.push(IteratorWrapper::create_from_delta_layer(
delta,
ctx,
max_read_size,
max_batch_size,
));
}
Self {
heap: BinaryHeap::from(heap),
}
}
pub fn create(
deltas: &[&'a DeltaLayerInner],
images: &[&'a ImageLayerInner],
ctx: &'a RequestContext,
) -> Self {
let mut heap = Vec::with_capacity(images.len() + deltas.len());
for image in images {
heap.push(IteratorWrapper::create_from_image_layer(image, ctx));
}
for delta in deltas {
heap.push(IteratorWrapper::create_from_delta_layer(delta, ctx));
}
Self {
heap: BinaryHeap::from(heap),
}
Self::create_with_options(deltas, images, ctx, 1024 * 8192, 1024)
}
pub(crate) async fn next_inner<R: MergeIteratorItem>(&mut self) -> anyhow::Result<Option<R>> {

View File

@@ -2828,6 +2828,41 @@ impl Timeline {
Ok(())
}
/// Check if the memory usage is within the limit.
async fn check_memory_usage(
self: &Arc<Self>,
layer_selection: &[Layer],
) -> Result<(), CompactionError> {
let mut estimated_memory_usage_mb = 0.0;
let mut num_image_layers = 0;
let mut num_delta_layers = 0;
let target_layer_size_bytes = 256 * 1024 * 1024;
for layer in layer_selection {
let layer_desc = layer.layer_desc();
if layer_desc.is_delta() {
// Delta layers at most have 1MB buffer; 3x to make it safe (there're deltas as large as 16KB).
// Multiply the layer size so that tests can pass.
estimated_memory_usage_mb +=
3.0 * (layer_desc.file_size / target_layer_size_bytes) as f64;
num_delta_layers += 1;
} else {
// Image layers at most have 1MB buffer but it might be compressed; assume 5x compression ratio.
estimated_memory_usage_mb +=
5.0 * (layer_desc.file_size / target_layer_size_bytes) as f64;
num_image_layers += 1;
}
}
if estimated_memory_usage_mb > 1024.0 {
return Err(CompactionError::Other(anyhow!(
"estimated memory usage is too high: {}MB, giving up compaction; num_image_layers={}, num_delta_layers={}",
estimated_memory_usage_mb,
num_image_layers,
num_delta_layers
)));
}
Ok(())
}
/// Get a watermark for gc-compaction, that is the lowest LSN that we can use as the `gc_horizon` for
/// the compaction algorithm. It is min(space_cutoff, time_cutoff, latest_gc_cutoff, standby_horizon).
/// Leases and retain_lsns are considered in the gc-compaction job itself so we don't need to account for them
@@ -3264,6 +3299,17 @@ impl Timeline {
self.check_compaction_space(&job_desc.selected_layers)
.await?;
self.check_memory_usage(&job_desc.selected_layers).await?;
if job_desc.selected_layers.len() > 100
&& job_desc.rewrite_layers.len() as f64 >= job_desc.selected_layers.len() as f64 * 0.7
{
return Err(CompactionError::Other(anyhow!(
"too many layers to rewrite: {} / {}, giving up compaction",
job_desc.rewrite_layers.len(),
job_desc.selected_layers.len()
)));
}
// Generate statistics for the compaction
for layer in &job_desc.selected_layers {
let desc = layer.layer_desc();
@@ -3359,7 +3405,13 @@ impl Timeline {
.context("failed to collect gc compaction keyspace")
.map_err(CompactionError::Other)?;
let mut merge_iter = FilterIterator::create(
MergeIterator::create(&delta_layers, &image_layers, ctx),
MergeIterator::create_with_options(
&delta_layers,
&image_layers,
ctx,
128 * 8192, /* 1MB buffer for each of the inner iterators */
128,
),
dense_ks,
sparse_ks,
)