From 412e0aa9858f4b406c08488ffa0c14eebfd941ec Mon Sep 17 00:00:00 2001 From: Konstantin Knizhnik Date: Wed, 22 Feb 2023 18:28:01 +0200 Subject: [PATCH] Skip largest N holes during compaction (#3597) ## Describe your changes This is yet another attempt to address problem with storage size ballooning #2948 Previous PR #3348 tries to address this problem by maintaining list of holes for each layer. The problem with this approach is that we have to load all layer on pageserver start. Lazy loading of layers is not possible any more. This PR tries to collect information of N largest holes on compaction time and exclude this holes from produced layers. It can cause generation of larger number of layers (up to 2 times) and producing small layers. But it requires minimal changes in code and doesn't affect storage format. For graphical explanation please see thread: https://github.com/neondatabase/neon/pull/3597#discussion_r1112704451 ## Issue ticket number and link #2948 #3348 ## Checklist before requesting a review - [ ] I have performed a self-review of my code. - [ ] If it is a core feature, I have added thorough tests. - [ ] Do we need to implement analytics? if so did you add the relevant metrics to the dashboard? - [ ] If this PR requires public announcement, mark it with /release-notes label and add several sentences in this section. --- pageserver/src/tenant/timeline.rs | 71 ++++++++++++++++++++++++++++++- 1 file changed, 70 insertions(+), 1 deletion(-) diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 176eb61ff3..d46ac26e7d 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -19,6 +19,7 @@ use tracing::*; use utils::id::TenantTimelineId; use std::cmp::{max, min, Ordering}; +use std::collections::BinaryHeap; use std::collections::HashMap; use std::fs; use std::ops::{Deref, Range}; @@ -82,6 +83,25 @@ enum FlushLoopState { Exited, } +/// Wrapper for key range to provide reverse ordering by range length for BinaryHeap +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct Hole { + key_range: Range, + coverage_size: usize, +} + +impl Ord for Hole { + fn cmp(&self, other: &Self) -> Ordering { + other.coverage_size.cmp(&self.coverage_size) // inverse order + } +} + +impl PartialOrd for Hole { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + pub struct Timeline { conf: &'static PageServerConf, tenant_conf: Arc>, @@ -2941,6 +2961,47 @@ impl Timeline { }, )?; + // Determine N largest holes where N is number of compacted layers. + let max_holes = deltas_to_compact.len(); + let last_record_lsn = self.get_last_record_lsn(); + let layers = self.layers.read().unwrap(); // Is'n it better to hold original layers lock till here? + let min_hole_range = (target_file_size / page_cache::PAGE_SZ as u64) as i128; + let min_hole_coverage_size = 3; // TODO: something more flexible? + + // min-heap (reserve space for one more element added before eviction) + let mut heap: BinaryHeap = BinaryHeap::with_capacity(max_holes + 1); + let mut prev: Option = None; + for (next_key, _next_lsn, _size) in itertools::process_results( + deltas_to_compact.iter().map(|l| l.key_iter(ctx)), + |iter_iter| iter_iter.kmerge_by(|a, b| a.0 <= b.0), + )? { + if let Some(prev_key) = prev { + // just first fast filter + if next_key.to_i128() - prev_key.to_i128() >= min_hole_range { + let key_range = prev_key..next_key; + // Measuring hole by just subtraction of i128 representation of key range boundaries + // has not so much sense, because largest holes will corresponds field1/field2 changes. + // But we are mostly interested to eliminate holes which cause generation of excessive image layers. + // That is why it is better to measure size of hole as number of covering image layers. + let coverage_size = layers.image_coverage(&key_range, last_record_lsn)?.len(); + if coverage_size >= min_hole_coverage_size { + heap.push(Hole { + key_range, + coverage_size, + }); + if heap.len() > max_holes { + heap.pop(); // remove smallest hole + } + } + } + } + prev = Some(next_key.next()); + } + drop(layers); + let mut holes = heap.into_vec(); + holes.sort_unstable_by_key(|hole| hole.key_range.start); + let mut next_hole = 0; // index of next hole in holes vector + // Merge the contents of all the input delta layers into a new set // of delta layers, based on the current partitioning. // @@ -3035,14 +3096,22 @@ impl Timeline { } if writer.is_some() { let written_size = writer.as_mut().unwrap().size(); - // check if key cause layer overflow... + let contains_hole = + next_hole < holes.len() && key >= holes[next_hole].key_range.end; + // check if key cause layer overflow or contains hole... if is_dup_layer || dup_end_lsn.is_valid() || written_size + key_values_total_size > target_file_size + || contains_hole { // ... if so, flush previous layer and prepare to write new one new_layers.push(writer.take().unwrap().finish(prev_key.unwrap().next())?); writer = None; + + if contains_hole { + // skip hole + next_hole += 1; + } } } // Remember size of key value because at next iteration we will access next item