diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 176eb61ff3..d46ac26e7d 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -19,6 +19,7 @@ use tracing::*; use utils::id::TenantTimelineId; use std::cmp::{max, min, Ordering}; +use std::collections::BinaryHeap; use std::collections::HashMap; use std::fs; use std::ops::{Deref, Range}; @@ -82,6 +83,25 @@ enum FlushLoopState { Exited, } +/// Wrapper for key range to provide reverse ordering by range length for BinaryHeap +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct Hole { + key_range: Range, + coverage_size: usize, +} + +impl Ord for Hole { + fn cmp(&self, other: &Self) -> Ordering { + other.coverage_size.cmp(&self.coverage_size) // inverse order + } +} + +impl PartialOrd for Hole { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + pub struct Timeline { conf: &'static PageServerConf, tenant_conf: Arc>, @@ -2941,6 +2961,47 @@ impl Timeline { }, )?; + // Determine N largest holes where N is number of compacted layers. + let max_holes = deltas_to_compact.len(); + let last_record_lsn = self.get_last_record_lsn(); + let layers = self.layers.read().unwrap(); // Is'n it better to hold original layers lock till here? + let min_hole_range = (target_file_size / page_cache::PAGE_SZ as u64) as i128; + let min_hole_coverage_size = 3; // TODO: something more flexible? + + // min-heap (reserve space for one more element added before eviction) + let mut heap: BinaryHeap = BinaryHeap::with_capacity(max_holes + 1); + let mut prev: Option = None; + for (next_key, _next_lsn, _size) in itertools::process_results( + deltas_to_compact.iter().map(|l| l.key_iter(ctx)), + |iter_iter| iter_iter.kmerge_by(|a, b| a.0 <= b.0), + )? { + if let Some(prev_key) = prev { + // just first fast filter + if next_key.to_i128() - prev_key.to_i128() >= min_hole_range { + let key_range = prev_key..next_key; + // Measuring hole by just subtraction of i128 representation of key range boundaries + // has not so much sense, because largest holes will corresponds field1/field2 changes. + // But we are mostly interested to eliminate holes which cause generation of excessive image layers. + // That is why it is better to measure size of hole as number of covering image layers. + let coverage_size = layers.image_coverage(&key_range, last_record_lsn)?.len(); + if coverage_size >= min_hole_coverage_size { + heap.push(Hole { + key_range, + coverage_size, + }); + if heap.len() > max_holes { + heap.pop(); // remove smallest hole + } + } + } + } + prev = Some(next_key.next()); + } + drop(layers); + let mut holes = heap.into_vec(); + holes.sort_unstable_by_key(|hole| hole.key_range.start); + let mut next_hole = 0; // index of next hole in holes vector + // Merge the contents of all the input delta layers into a new set // of delta layers, based on the current partitioning. // @@ -3035,14 +3096,22 @@ impl Timeline { } if writer.is_some() { let written_size = writer.as_mut().unwrap().size(); - // check if key cause layer overflow... + let contains_hole = + next_hole < holes.len() && key >= holes[next_hole].key_range.end; + // check if key cause layer overflow or contains hole... if is_dup_layer || dup_end_lsn.is_valid() || written_size + key_values_total_size > target_file_size + || contains_hole { // ... if so, flush previous layer and prepare to write new one new_layers.push(writer.take().unwrap().finish(prev_key.unwrap().next())?); writer = None; + + if contains_hole { + // skip hole + next_hole += 1; + } } } // Remember size of key value because at next iteration we will access next item