diff --git a/pageserver/compaction/src/helpers.rs b/pageserver/compaction/src/helpers.rs index 9dbb6ecedf..6b739d85a7 100644 --- a/pageserver/compaction/src/helpers.rs +++ b/pageserver/compaction/src/helpers.rs @@ -35,6 +35,15 @@ pub fn overlaps_with(a: &Range, b: &Range) -> bool { !(a.end <= b.start || b.end <= a.start) } +/// Whether a fully contains b, example as below +/// ```plain +/// | a | +/// | b | +/// ``` +pub fn fully_contains(a: &Range, b: &Range) -> bool { + a.start <= b.start && a.end >= b.end +} + pub fn union_to_keyspace(a: &mut CompactionKeySpace, b: CompactionKeySpace) { let x = std::mem::take(a); let mut all_ranges_iter = [x.into_iter(), b.into_iter()] diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index 774672aed6..e7c258d829 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -9223,6 +9223,23 @@ mod tests { Ok(()) } + fn sort_layer_key(k1: &PersistentLayerKey, k2: &PersistentLayerKey) -> std::cmp::Ordering { + ( + k1.is_delta, + k1.key_range.start, + k1.key_range.end, + k1.lsn_range.start, + k1.lsn_range.end, + ) + .cmp(&( + k2.is_delta, + k2.key_range.start, + k2.key_range.end, + k2.lsn_range.start, + k2.lsn_range.end, + )) + } + async fn inspect_and_sort( tline: &Arc, filter: Option>, @@ -9231,25 +9248,30 @@ mod tests { if let Some(filter) = filter { all_layers.retain(|layer| overlaps_with(&layer.key_range, &filter)); } - all_layers.sort_by(|k1, k2| { - ( - k1.is_delta, - k1.key_range.start, - k1.key_range.end, - k1.lsn_range.start, - k1.lsn_range.end, - ) - .cmp(&( - k2.is_delta, - k2.key_range.start, - k2.key_range.end, - k2.lsn_range.start, - k2.lsn_range.end, - )) - }); + all_layers.sort_by(sort_layer_key); all_layers } + #[cfg(feature = "testing")] + fn check_layer_map_key_eq( + mut left: Vec, + mut right: Vec, + ) { + left.sort_by(sort_layer_key); + right.sort_by(sort_layer_key); + if left != right { + eprintln!("---LEFT---"); + for left in left.iter() { + eprintln!("{}", left); + } + eprintln!("---RIGHT---"); + for right in right.iter() { + eprintln!("{}", right); + } + assert_eq!(left, right); + } + } + #[cfg(feature = "testing")] #[tokio::test] async fn test_simple_partial_bottom_most_compaction() -> anyhow::Result<()> { @@ -9342,127 +9364,206 @@ mod tests { let cancel = CancellationToken::new(); - // Do a partial compaction on key range 0..4, we should generate a image layer; no other layers - // can be removed because they might be used for other key ranges. + // Do a partial compaction on key range 0..2 tline - .partial_compact_with_gc(Some(get_key(0)..get_key(4)), &cancel, EnumSet::new(), &ctx) + .partial_compact_with_gc(get_key(0)..get_key(2), &cancel, EnumSet::new(), &ctx) .await .unwrap(); let all_layers = inspect_and_sort(&tline, Some(get_key(0)..get_key(10))).await; - assert_eq!( + check_layer_map_key_eq( all_layers, vec![ + // newly-generated image layer for the partial compaction range 0-2 PersistentLayerKey { - key_range: get_key(0)..get_key(4), + key_range: get_key(0)..get_key(2), lsn_range: Lsn(0x20)..Lsn(0x21), - is_delta: false + is_delta: false, }, PersistentLayerKey { key_range: get_key(0)..get_key(10), lsn_range: Lsn(0x10)..Lsn(0x11), - is_delta: false + is_delta: false, }, + // delta1 is split and the second part is rewritten PersistentLayerKey { - key_range: get_key(1)..get_key(4), + key_range: get_key(2)..get_key(4), lsn_range: Lsn(0x20)..Lsn(0x48), - is_delta: true + is_delta: true, }, PersistentLayerKey { key_range: get_key(5)..get_key(7), lsn_range: Lsn(0x20)..Lsn(0x48), - is_delta: true + is_delta: true, }, PersistentLayerKey { key_range: get_key(8)..get_key(10), lsn_range: Lsn(0x48)..Lsn(0x50), - is_delta: true - } - ] + is_delta: true, + }, + ], ); - // Do a partial compaction on key range 4..10 + // Do a partial compaction on key range 2..4 tline - .partial_compact_with_gc(Some(get_key(4)..get_key(10)), &cancel, EnumSet::new(), &ctx) + .partial_compact_with_gc(get_key(2)..get_key(4), &cancel, EnumSet::new(), &ctx) .await .unwrap(); let all_layers = inspect_and_sort(&tline, Some(get_key(0)..get_key(10))).await; - assert_eq!( + check_layer_map_key_eq( all_layers, vec![ PersistentLayerKey { - key_range: get_key(0)..get_key(4), + key_range: get_key(0)..get_key(2), lsn_range: Lsn(0x20)..Lsn(0x21), - is_delta: false + is_delta: false, }, PersistentLayerKey { - // if (in the future) GC kicks in, this layer will be removed key_range: get_key(0)..get_key(10), lsn_range: Lsn(0x10)..Lsn(0x11), - is_delta: false + is_delta: false, }, + // image layer generated for the compaction range 2-4 PersistentLayerKey { - key_range: get_key(4)..get_key(10), + key_range: get_key(2)..get_key(4), lsn_range: Lsn(0x20)..Lsn(0x21), - is_delta: false + is_delta: false, }, + // we have key2/key3 above the retain_lsn, so we still need this delta layer PersistentLayerKey { - key_range: get_key(1)..get_key(4), + key_range: get_key(2)..get_key(4), lsn_range: Lsn(0x20)..Lsn(0x48), - is_delta: true + is_delta: true, }, PersistentLayerKey { key_range: get_key(5)..get_key(7), lsn_range: Lsn(0x20)..Lsn(0x48), - is_delta: true + is_delta: true, }, PersistentLayerKey { key_range: get_key(8)..get_key(10), lsn_range: Lsn(0x48)..Lsn(0x50), - is_delta: true - } - ] + is_delta: true, + }, + ], + ); + + // Do a partial compaction on key range 4..9 + tline + .partial_compact_with_gc(get_key(4)..get_key(9), &cancel, EnumSet::new(), &ctx) + .await + .unwrap(); + let all_layers = inspect_and_sort(&tline, Some(get_key(0)..get_key(10))).await; + check_layer_map_key_eq( + all_layers, + vec![ + PersistentLayerKey { + key_range: get_key(0)..get_key(2), + lsn_range: Lsn(0x20)..Lsn(0x21), + is_delta: false, + }, + PersistentLayerKey { + key_range: get_key(0)..get_key(10), + lsn_range: Lsn(0x10)..Lsn(0x11), + is_delta: false, + }, + PersistentLayerKey { + key_range: get_key(2)..get_key(4), + lsn_range: Lsn(0x20)..Lsn(0x21), + is_delta: false, + }, + PersistentLayerKey { + key_range: get_key(2)..get_key(4), + lsn_range: Lsn(0x20)..Lsn(0x48), + is_delta: true, + }, + // image layer generated for this compaction range + PersistentLayerKey { + key_range: get_key(4)..get_key(9), + lsn_range: Lsn(0x20)..Lsn(0x21), + is_delta: false, + }, + PersistentLayerKey { + key_range: get_key(8)..get_key(10), + lsn_range: Lsn(0x48)..Lsn(0x50), + is_delta: true, + }, + ], + ); + + // Do a partial compaction on key range 9..10 + tline + .partial_compact_with_gc(get_key(9)..get_key(10), &cancel, EnumSet::new(), &ctx) + .await + .unwrap(); + let all_layers = inspect_and_sort(&tline, Some(get_key(0)..get_key(10))).await; + check_layer_map_key_eq( + all_layers, + vec![ + PersistentLayerKey { + key_range: get_key(0)..get_key(2), + lsn_range: Lsn(0x20)..Lsn(0x21), + is_delta: false, + }, + PersistentLayerKey { + key_range: get_key(0)..get_key(10), + lsn_range: Lsn(0x10)..Lsn(0x11), + is_delta: false, + }, + PersistentLayerKey { + key_range: get_key(2)..get_key(4), + lsn_range: Lsn(0x20)..Lsn(0x21), + is_delta: false, + }, + PersistentLayerKey { + key_range: get_key(2)..get_key(4), + lsn_range: Lsn(0x20)..Lsn(0x48), + is_delta: true, + }, + PersistentLayerKey { + key_range: get_key(4)..get_key(9), + lsn_range: Lsn(0x20)..Lsn(0x21), + is_delta: false, + }, + // image layer generated for the compaction range + PersistentLayerKey { + key_range: get_key(9)..get_key(10), + lsn_range: Lsn(0x20)..Lsn(0x21), + is_delta: false, + }, + PersistentLayerKey { + key_range: get_key(8)..get_key(10), + lsn_range: Lsn(0x48)..Lsn(0x50), + is_delta: true, + }, + ], ); // Do a partial compaction on key range 0..10, all image layers below LSN 20 can be replaced with new ones. tline - .partial_compact_with_gc(Some(get_key(0)..get_key(10)), &cancel, EnumSet::new(), &ctx) + .partial_compact_with_gc(get_key(0)..get_key(10), &cancel, EnumSet::new(), &ctx) .await .unwrap(); let all_layers = inspect_and_sort(&tline, Some(get_key(0)..get_key(10))).await; - assert_eq!( + check_layer_map_key_eq( all_layers, vec![ - PersistentLayerKey { - key_range: get_key(0)..get_key(4), - lsn_range: Lsn(0x20)..Lsn(0x21), - is_delta: false - }, + // aha, we removed all unnecessary image/delta layers and got a very clean layer map! PersistentLayerKey { key_range: get_key(0)..get_key(10), lsn_range: Lsn(0x20)..Lsn(0x21), - is_delta: false + is_delta: false, }, PersistentLayerKey { - key_range: get_key(4)..get_key(10), - lsn_range: Lsn(0x20)..Lsn(0x21), - is_delta: false - }, - PersistentLayerKey { - key_range: get_key(1)..get_key(4), + key_range: get_key(2)..get_key(4), lsn_range: Lsn(0x20)..Lsn(0x48), - is_delta: true - }, - PersistentLayerKey { - key_range: get_key(5)..get_key(7), - lsn_range: Lsn(0x20)..Lsn(0x48), - is_delta: true + is_delta: true, }, PersistentLayerKey { key_range: get_key(8)..get_key(10), lsn_range: Lsn(0x48)..Lsn(0x50), - is_delta: true - } - ] + is_delta: true, + }, + ], ); Ok(()) diff --git a/pageserver/src/tenant/storage_layer/delta_layer.rs b/pageserver/src/tenant/storage_layer/delta_layer.rs index 664c00a6b1..fec8a0a16c 100644 --- a/pageserver/src/tenant/storage_layer/delta_layer.rs +++ b/pageserver/src/tenant/storage_layer/delta_layer.rs @@ -653,6 +653,10 @@ impl DeltaLayerWriter { }) } + pub fn is_empty(&self) -> bool { + self.inner.as_ref().unwrap().num_keys == 0 + } + /// /// Append a key-value pair to the file. /// diff --git a/pageserver/src/tenant/storage_layer/filter_iterator.rs b/pageserver/src/tenant/storage_layer/filter_iterator.rs index ccfcf68e8f..8660be1fcc 100644 --- a/pageserver/src/tenant/storage_layer/filter_iterator.rs +++ b/pageserver/src/tenant/storage_layer/filter_iterator.rs @@ -1,4 +1,4 @@ -use std::ops::Range; +use std::{ops::Range, sync::Arc}; use anyhow::bail; use pageserver_api::{ @@ -9,7 +9,10 @@ use utils::lsn::Lsn; use pageserver_api::value::Value; -use super::merge_iterator::MergeIterator; +use super::{ + merge_iterator::{MergeIterator, MergeIteratorItem}, + PersistentLayerKey, +}; /// A filter iterator over merge iterators (and can be easily extended to other types of iterators). /// @@ -48,10 +51,10 @@ impl<'a> FilterIterator<'a> { }) } - pub async fn next(&mut self) -> anyhow::Result> { - while let Some(item) = self.inner.next().await? { + async fn next_inner(&mut self) -> anyhow::Result> { + while let Some(item) = self.inner.next_inner::().await? { while self.current_filter_idx < self.retain_key_filters.len() - && item.0 >= self.retain_key_filters[self.current_filter_idx].end + && item.key_lsn_value().0 >= self.retain_key_filters[self.current_filter_idx].end { // [filter region] [filter region] [filter region] // ^ item @@ -68,7 +71,7 @@ impl<'a> FilterIterator<'a> { // ^ current filter (nothing) return Ok(None); } - if self.retain_key_filters[self.current_filter_idx].contains(&item.0) { + if self.retain_key_filters[self.current_filter_idx].contains(&item.key_lsn_value().0) { // [filter region] [filter region] [filter region] // ^ item // ^ current filter @@ -81,6 +84,16 @@ impl<'a> FilterIterator<'a> { } Ok(None) } + + pub async fn next(&mut self) -> anyhow::Result> { + self.next_inner().await + } + + pub async fn next_with_trace( + &mut self, + ) -> anyhow::Result)>> { + self.next_inner().await + } } #[cfg(test)] diff --git a/pageserver/src/tenant/storage_layer/merge_iterator.rs b/pageserver/src/tenant/storage_layer/merge_iterator.rs index 980202f12c..2667d130f5 100644 --- a/pageserver/src/tenant/storage_layer/merge_iterator.rs +++ b/pageserver/src/tenant/storage_layer/merge_iterator.rs @@ -1,6 +1,7 @@ use std::{ cmp::Ordering, collections::{binary_heap, BinaryHeap}, + sync::Arc, }; use anyhow::bail; @@ -13,10 +14,11 @@ use pageserver_api::value::Value; use super::{ delta_layer::{DeltaLayerInner, DeltaLayerIterator}, image_layer::{ImageLayerInner, ImageLayerIterator}, + PersistentLayerDesc, PersistentLayerKey, }; #[derive(Clone, Copy)] -enum LayerRef<'a> { +pub(crate) enum LayerRef<'a> { Image(&'a ImageLayerInner), Delta(&'a DeltaLayerInner), } @@ -62,18 +64,20 @@ impl LayerIterRef<'_> { /// 1. Unified iterator for image and delta layers. /// 2. `Ord` for use in [`MergeIterator::heap`] (for the k-merge). /// 3. Lazy creation of the real delta/image iterator. -enum IteratorWrapper<'a> { +pub(crate) enum IteratorWrapper<'a> { NotLoaded { ctx: &'a RequestContext, first_key_lower_bound: (Key, Lsn), layer: LayerRef<'a>, + source_desc: Arc, }, Loaded { iter: PeekableLayerIterRef<'a>, + source_desc: Arc, }, } -struct PeekableLayerIterRef<'a> { +pub(crate) struct PeekableLayerIterRef<'a> { iter: LayerIterRef<'a>, peeked: Option<(Key, Lsn, Value)>, // None == end } @@ -151,6 +155,12 @@ impl<'a> IteratorWrapper<'a> { layer: LayerRef::Image(image_layer), first_key_lower_bound: (image_layer.key_range().start, image_layer.lsn()), ctx, + source_desc: PersistentLayerKey { + key_range: image_layer.key_range().clone(), + lsn_range: PersistentLayerDesc::image_layer_lsn_range(image_layer.lsn()), + is_delta: false, + } + .into(), } } @@ -162,12 +172,18 @@ impl<'a> IteratorWrapper<'a> { layer: LayerRef::Delta(delta_layer), first_key_lower_bound: (delta_layer.key_range().start, delta_layer.lsn_range().start), ctx, + source_desc: PersistentLayerKey { + key_range: delta_layer.key_range().clone(), + lsn_range: delta_layer.lsn_range().clone(), + is_delta: true, + } + .into(), } } fn peek_next_key_lsn_value(&self) -> Option<(&Key, Lsn, Option<&Value>)> { match self { - Self::Loaded { iter } => iter + Self::Loaded { iter, .. } => iter .peek() .as_ref() .map(|(key, lsn, val)| (key, *lsn, Some(val))), @@ -191,6 +207,7 @@ impl<'a> IteratorWrapper<'a> { ctx, first_key_lower_bound, layer, + source_desc, } = self else { unreachable!() @@ -206,7 +223,10 @@ impl<'a> IteratorWrapper<'a> { ); } } - *self = Self::Loaded { iter }; + *self = Self::Loaded { + iter, + source_desc: source_desc.clone(), + }; Ok(()) } @@ -220,11 +240,19 @@ impl<'a> IteratorWrapper<'a> { /// The public interfaces to use are [`crate::tenant::storage_layer::delta_layer::DeltaLayerIterator`] and /// [`crate::tenant::storage_layer::image_layer::ImageLayerIterator`]. async fn next(&mut self) -> anyhow::Result> { - let Self::Loaded { iter } = self else { + let Self::Loaded { iter, .. } = self else { panic!("must load the iterator before using") }; iter.next().await } + + /// Get the persistent layer key corresponding to this iterator + fn trace_source(&self) -> Arc { + match self { + Self::Loaded { source_desc, .. } => source_desc.clone(), + Self::NotLoaded { source_desc, .. } => source_desc.clone(), + } + } } /// A merge iterator over delta/image layer iterators. @@ -242,6 +270,32 @@ pub struct MergeIterator<'a> { heap: BinaryHeap>, } +pub(crate) trait MergeIteratorItem { + fn new(item: (Key, Lsn, Value), iterator: &IteratorWrapper<'_>) -> Self; + + fn key_lsn_value(&self) -> &(Key, Lsn, Value); +} + +impl MergeIteratorItem for (Key, Lsn, Value) { + fn new(item: (Key, Lsn, Value), _: &IteratorWrapper<'_>) -> Self { + item + } + + fn key_lsn_value(&self) -> &(Key, Lsn, Value) { + self + } +} + +impl MergeIteratorItem for ((Key, Lsn, Value), Arc) { + fn new(item: (Key, Lsn, Value), iter: &IteratorWrapper<'_>) -> Self { + (item, iter.trace_source().clone()) + } + + fn key_lsn_value(&self) -> &(Key, Lsn, Value) { + &self.0 + } +} + impl<'a> MergeIterator<'a> { pub fn create( deltas: &[&'a DeltaLayerInner], @@ -260,7 +314,7 @@ impl<'a> MergeIterator<'a> { } } - pub async fn next(&mut self) -> anyhow::Result> { + pub(crate) async fn next_inner(&mut self) -> anyhow::Result> { while let Some(mut iter) = self.heap.peek_mut() { if !iter.is_loaded() { // Once we load the iterator, we can know the real first key-value pair in the iterator. @@ -275,10 +329,22 @@ impl<'a> MergeIterator<'a> { binary_heap::PeekMut::pop(iter); continue; }; - return Ok(Some(item)); + return Ok(Some(R::new(item, &iter))); } Ok(None) } + + /// Get the next key-value pair from the iterator. + pub async fn next(&mut self) -> anyhow::Result> { + self.next_inner().await + } + + /// Get the next key-value pair from the iterator, and trace where the key comes from. + pub async fn next_with_trace( + &mut self, + ) -> anyhow::Result)>> { + self.next_inner().await + } } #[cfg(test)] diff --git a/pageserver/src/tenant/timeline/compaction.rs b/pageserver/src/tenant/timeline/compaction.rs index 01c2803881..e6ef1aae2b 100644 --- a/pageserver/src/tenant/timeline/compaction.rs +++ b/pageserver/src/tenant/timeline/compaction.rs @@ -4,7 +4,7 @@ //! //! The old legacy algorithm is implemented directly in `timeline.rs`. -use std::collections::{BinaryHeap, HashSet}; +use std::collections::{BinaryHeap, HashMap, HashSet}; use std::ops::{Deref, Range}; use std::sync::Arc; @@ -56,7 +56,7 @@ use pageserver_api::value::Value; use utils::lsn::Lsn; -use pageserver_compaction::helpers::overlaps_with; +use pageserver_compaction::helpers::{fully_contains, overlaps_with}; use pageserver_compaction::interface::*; use super::CompactionError; @@ -64,6 +64,23 @@ use super::CompactionError; /// Maximum number of deltas before generating an image layer in bottom-most compaction. const COMPACTION_DELTA_THRESHOLD: usize = 5; +pub struct GcCompactionJobDescription { + /// All layers to read in the compaction job + selected_layers: Vec, + /// GC cutoff of the job + gc_cutoff: Lsn, + /// LSNs to retain for the job + retain_lsns_below_horizon: Vec, + /// Maximum layer LSN processed in this compaction + max_layer_lsn: Lsn, + /// Only compact layers overlapping with this range + compaction_key_range: Range, + /// When partial compaction is enabled, these layers need to be rewritten to ensure no overlap. + /// This field is here solely for debugging. The field will not be read once the compaction + /// description is generated. + rewrite_layers: Vec>, +} + /// The result of bottom-most compaction for a single key at each LSN. #[derive(Debug)] #[cfg_attr(test, derive(PartialEq))] @@ -1722,7 +1739,8 @@ impl Timeline { flags: EnumSet, ctx: &RequestContext, ) -> anyhow::Result<()> { - self.partial_compact_with_gc(None, cancel, flags, ctx).await + self.partial_compact_with_gc(Key::MIN..Key::MAX, cancel, flags, ctx) + .await } /// An experimental compaction building block that combines compaction with garbage collection. @@ -1732,12 +1750,15 @@ impl Timeline { /// layers and image layers, which generates image layers on the gc horizon, drop deltas below gc horizon, /// and create delta layers with all deltas >= gc horizon. /// - /// If `key_range`, it will only compact the keys within the range, aka partial compaction. This functionality - /// is not complete yet, and if it is set, only image layers will be generated. - /// + /// If `key_range` is provided, it will only compact the keys within the range, aka partial compaction. + /// Partial compaction will read and process all layers overlapping with the key range, even if it might + /// contain extra keys. After the gc-compaction phase completes, delta layers that are not fully contained + /// within the key range will be rewritten to ensure they do not overlap with the delta layers. Providing + /// Key::MIN..Key..MAX to the function indicates a full compaction, though technically, `Key::MAX` is not + /// part of the range. pub(crate) async fn partial_compact_with_gc( self: &Arc, - compaction_key_range: Option>, + compaction_key_range: Range, cancel: &CancellationToken, flags: EnumSet, ctx: &RequestContext, @@ -1762,9 +1783,8 @@ impl Timeline { .await?; let dry_run = flags.contains(CompactFlags::DryRun); - let partial_compaction = compaction_key_range.is_some(); - if let Some(ref compaction_key_range) = compaction_key_range { + if compaction_key_range == (Key::MIN..Key::MAX) { info!("running enhanced gc bottom-most compaction, dry_run={dry_run}, compaction_key_range={}..{}", compaction_key_range.start, compaction_key_range.end); } else { info!("running enhanced gc bottom-most compaction, dry_run={dry_run}"); @@ -1780,7 +1800,7 @@ impl Timeline { // The layer selection has the following properties: // 1. If a layer is in the selection, all layers below it are in the selection. // 2. Inferred from (1), for each key in the layer selection, the value can be reconstructed only with the layers in the layer selection. - let (layer_selection, gc_cutoff, retain_lsns_below_horizon) = if !partial_compaction { + let job_desc = { let guard = self.layers.read().await; let layers = guard.layer_map()?; let gc_info = self.gc_info.read().unwrap(); @@ -1810,9 +1830,21 @@ impl Timeline { }; // Then, pick all the layers that are below the max_layer_lsn. This is to ensure we can pick all single-key // layers to compact. + let mut rewrite_layers = Vec::new(); for desc in layers.iter_historic_layers() { - if desc.get_lsn_range().end <= max_layer_lsn { + if desc.get_lsn_range().end <= max_layer_lsn + && overlaps_with(&desc.get_key_range(), &compaction_key_range) + { + // If the layer overlaps with the compaction key range, we need to read it to obtain all keys within the range, + // even if it might contain extra keys selected_layers.push(guard.get_from_desc(&desc)); + // If the layer is not fully contained within the key range, we need to rewrite it if it's a delta layer (it's fine + // to overlap image layers) + if desc.is_delta() + && !fully_contains(&compaction_key_range, &desc.get_key_range()) + { + rewrite_layers.push(desc); + } } } if selected_layers.is_empty() { @@ -1820,82 +1852,59 @@ impl Timeline { return Ok(()); } retain_lsns_below_horizon.sort(); - (selected_layers, gc_cutoff, retain_lsns_below_horizon) - } else { - // In case of partial compaction, we currently only support generating image layers, and therefore, - // we pick all layers that are below the lowest retain_lsn and does not intersect with any of the layers. - let guard = self.layers.read().await; - let layers = guard.layer_map()?; - let gc_info = self.gc_info.read().unwrap(); - let mut min_lsn = gc_info.cutoffs.select_min(); - for (lsn, _, _) in &gc_info.retain_lsns { - if lsn < &min_lsn { - min_lsn = *lsn; - } + GcCompactionJobDescription { + selected_layers, + gc_cutoff, + retain_lsns_below_horizon, + max_layer_lsn, + compaction_key_range, + rewrite_layers, } - for lsn in gc_info.leases.keys() { - if lsn < &min_lsn { - min_lsn = *lsn; - } - } - let mut selected_layers = Vec::new(); - drop(gc_info); - // |-------| |-------| |-------| - // | Delta | | Delta | | Delta | -- min_lsn could be intersecting with the layers - // |-------| |-------| |-------| <- we want to pick all the layers below min_lsn, so that - // | Delta | | Delta | | Delta | ...we can remove them after compaction - // |-------| |-------| |-------| - // Pick all the layers intersect or below the min_lsn, get the largest LSN in the selected layers. - let Some(compaction_key_range) = compaction_key_range.as_ref() else { - unreachable!() - }; - for desc in layers.iter_historic_layers() { - if desc.get_lsn_range().end <= min_lsn - && overlaps_with(&desc.key_range, compaction_key_range) - { - selected_layers.push(guard.get_from_desc(&desc)); - } - } - if selected_layers.is_empty() { - info!("no layers to compact with gc"); - return Ok(()); - } - (selected_layers, min_lsn, Vec::new()) }; let lowest_retain_lsn = if self.ancestor_timeline.is_some() { - if partial_compaction { - warn!("partial compaction cannot run on child branches (for now)"); - return Ok(()); - } Lsn(self.ancestor_lsn.0 + 1) } else { - let res = retain_lsns_below_horizon + let res = job_desc + .retain_lsns_below_horizon .first() .copied() - .unwrap_or(gc_cutoff); + .unwrap_or(job_desc.gc_cutoff); if cfg!(debug_assertions) { assert_eq!( res, - retain_lsns_below_horizon + job_desc + .retain_lsns_below_horizon .iter() .min() .copied() - .unwrap_or(gc_cutoff) + .unwrap_or(job_desc.gc_cutoff) ); } res }; info!( - "picked {} layers for compaction with gc_cutoff={} lowest_retain_lsn={}", - layer_selection.len(), - gc_cutoff, - lowest_retain_lsn + "picked {} layers for compaction ({} layers need rewriting) with max_layer_lsn={} gc_cutoff={} lowest_retain_lsn={}, key_range={}..{}", + job_desc.selected_layers.len(), + job_desc.rewrite_layers.len(), + job_desc.max_layer_lsn, + job_desc.gc_cutoff, + lowest_retain_lsn, + job_desc.compaction_key_range.start, + job_desc.compaction_key_range.end ); - self.check_compaction_space(&layer_selection).await?; + for layer in &job_desc.selected_layers { + debug!("read layer: {}", layer.layer_desc().key()); + } + for layer in &job_desc.rewrite_layers { + debug!("rewrite layer: {}", layer.key()); + } + + self.check_compaction_space(&job_desc.selected_layers) + .await?; // Generate statistics for the compaction - for layer in &layer_selection { + for layer in &job_desc.selected_layers { let desc = layer.layer_desc(); if desc.is_delta() { stat.visit_delta_layer(desc.file_size()); @@ -1906,25 +1915,25 @@ impl Timeline { // Step 1: construct a k-merge iterator over all layers. // Also, verify if the layer map can be split by drawing a horizontal line at every LSN start/end split point. - let layer_names: Vec = layer_selection + let layer_names = job_desc + .selected_layers .iter() .map(|layer| layer.layer_desc().layer_name()) .collect_vec(); if let Some(err) = check_valid_layermap(&layer_names) { - bail!("cannot run gc-compaction because {}", err); + warn!("gc-compaction layer map check failed because {}, this is normal if partial compaction is not finished yet", err); } // The maximum LSN we are processing in this compaction loop - let end_lsn = layer_selection + let end_lsn = job_desc + .selected_layers .iter() .map(|l| l.layer_desc().lsn_range.end) .max() .unwrap(); - // We don't want any of the produced layers to cover the full key range (i.e., MIN..MAX) b/c it will then be recognized - // as an L0 layer. let mut delta_layers = Vec::new(); let mut image_layers = Vec::new(); let mut downloaded_layers = Vec::new(); - for layer in &layer_selection { + for layer in &job_desc.selected_layers { let resident_layer = layer.download_and_keep_resident().await?; downloaded_layers.push(resident_layer); } @@ -1943,8 +1952,8 @@ impl Timeline { dense_ks, sparse_ks, )?; - // Step 2: Produce images+deltas. TODO: ensure newly-produced delta does not overlap with other deltas. - // Data of the same key. + + // Step 2: Produce images+deltas. let mut accumulated_values = Vec::new(); let mut last_key: Option = None; @@ -1956,10 +1965,7 @@ impl Timeline { self.conf, self.timeline_id, self.tenant_shard_id, - compaction_key_range - .as_ref() - .map(|x| x.start) - .unwrap_or(Key::MIN), + job_desc.compaction_key_range.start, lowest_retain_lsn, self.get_compaction_target_size(), ctx, @@ -1979,6 +1985,13 @@ impl Timeline { ) .await?; + #[derive(Default)] + struct RewritingLayers { + before: Option, + after: Option, + } + let mut delta_layer_rewriters = HashMap::, RewritingLayers>::new(); + /// Returns None if there is no ancestor branch. Throw an error when the key is not found. /// /// Currently, we always get the ancestor image for each key in the child branch no matter whether the image @@ -2004,10 +2017,51 @@ impl Timeline { // the key and LSN range are determined. However, to keep things simple here, we still // create this writer, and discard the writer in the end. - while let Some((key, lsn, val)) = merge_iter.next().await? { + while let Some(((key, lsn, val), desc)) = merge_iter.next_with_trace().await? { if cancel.is_cancelled() { return Err(anyhow!("cancelled")); // TODO: refactor to CompactionError and pass cancel error } + if !job_desc.compaction_key_range.contains(&key) { + if !desc.is_delta { + continue; + } + let rewriter = delta_layer_rewriters.entry(desc.clone()).or_default(); + let rewriter = if key < job_desc.compaction_key_range.start { + if rewriter.before.is_none() { + rewriter.before = Some( + DeltaLayerWriter::new( + self.conf, + self.timeline_id, + self.tenant_shard_id, + desc.key_range.start, + desc.lsn_range.clone(), + ctx, + ) + .await?, + ); + } + rewriter.before.as_mut().unwrap() + } else if key >= job_desc.compaction_key_range.end { + if rewriter.after.is_none() { + rewriter.after = Some( + DeltaLayerWriter::new( + self.conf, + self.timeline_id, + self.tenant_shard_id, + job_desc.compaction_key_range.end, + desc.lsn_range.clone(), + ctx, + ) + .await?, + ); + } + rewriter.after.as_mut().unwrap() + } else { + unreachable!() + }; + rewriter.put_value(key, lsn, val, ctx).await?; + continue; + } match val { Value::Image(_) => stat.visit_image_key(&val), Value::WalRecord(_) => stat.visit_wal_key(&val), @@ -2018,35 +2072,27 @@ impl Timeline { } accumulated_values.push((key, lsn, val)); } else { - let last_key = last_key.as_mut().unwrap(); - stat.on_unique_key_visited(); - let skip_adding_key = if let Some(ref compaction_key_range) = compaction_key_range { - !compaction_key_range.contains(last_key) - } else { - false - }; - if !skip_adding_key { - let retention = self - .generate_key_retention( - *last_key, - &accumulated_values, - gc_cutoff, - &retain_lsns_below_horizon, - COMPACTION_DELTA_THRESHOLD, - get_ancestor_image(self, *last_key, ctx).await?, - ) - .await?; - // Put the image into the image layer. Currently we have a single big layer for the compaction. - retention - .pipe_to( - *last_key, - &mut delta_layer_writer, - image_layer_writer.as_mut(), - &mut stat, - ctx, - ) - .await?; - } + let last_key: &mut Key = last_key.as_mut().unwrap(); + stat.on_unique_key_visited(); // TODO: adjust statistics for partial compaction + let retention = self + .generate_key_retention( + *last_key, + &accumulated_values, + job_desc.gc_cutoff, + &job_desc.retain_lsns_below_horizon, + COMPACTION_DELTA_THRESHOLD, + get_ancestor_image(self, *last_key, ctx).await?, + ) + .await?; + retention + .pipe_to( + *last_key, + &mut delta_layer_writer, + image_layer_writer.as_mut(), + &mut stat, + ctx, + ) + .await?; accumulated_values.clear(); *last_key = key; accumulated_values.push((key, lsn, val)); @@ -2057,35 +2103,43 @@ impl Timeline { let last_key = last_key.expect("no keys produced during compaction"); stat.on_unique_key_visited(); - let skip_adding_key = if let Some(ref compaction_key_range) = compaction_key_range { - !compaction_key_range.contains(&last_key) - } else { - false - }; - if !skip_adding_key { - let retention = self - .generate_key_retention( - last_key, - &accumulated_values, - gc_cutoff, - &retain_lsns_below_horizon, - COMPACTION_DELTA_THRESHOLD, - get_ancestor_image(self, last_key, ctx).await?, - ) - .await?; - // Put the image into the image layer. Currently we have a single big layer for the compaction. - retention - .pipe_to( - last_key, - &mut delta_layer_writer, - image_layer_writer.as_mut(), - &mut stat, - ctx, - ) - .await?; - } + let retention = self + .generate_key_retention( + last_key, + &accumulated_values, + job_desc.gc_cutoff, + &job_desc.retain_lsns_below_horizon, + COMPACTION_DELTA_THRESHOLD, + get_ancestor_image(self, last_key, ctx).await?, + ) + .await?; + retention + .pipe_to( + last_key, + &mut delta_layer_writer, + image_layer_writer.as_mut(), + &mut stat, + ctx, + ) + .await?; // end: move the above part to the loop body + let mut rewrote_delta_layers = Vec::new(); + for (key, writers) in delta_layer_rewriters { + if let Some(delta_writer_before) = writers.before { + let (desc, path) = delta_writer_before + .finish(job_desc.compaction_key_range.start, ctx) + .await?; + let layer = Layer::finish_creating(self.conf, self, desc, &path)?; + rewrote_delta_layers.push(layer); + } + if let Some(delta_writer_after) = writers.after { + let (desc, path) = delta_writer_after.finish(key.key_range.end, ctx).await?; + let layer = Layer::finish_creating(self.conf, self, desc, &path)?; + rewrote_delta_layers.push(layer); + } + } + let discard = |key: &PersistentLayerKey| { let key = key.clone(); async move { KeyHistoryRetention::discard_key(&key, self, dry_run).await } @@ -2093,10 +2147,7 @@ impl Timeline { let produced_image_layers = if let Some(writer) = image_layer_writer { if !dry_run { - let end_key = compaction_key_range - .as_ref() - .map(|x| x.end) - .unwrap_or(Key::MAX); + let end_key = job_desc.compaction_key_range.end; writer .finish_with_discard_fn(self, ctx, end_key, discard) .await? @@ -2117,10 +2168,8 @@ impl Timeline { Vec::new() }; - if partial_compaction && !produced_delta_layers.is_empty() { - bail!("implementation error: partial compaction should not be producing delta layers (for now)"); - } - + // TODO: make image/delta/rewrote_delta layers generation atomic. At this point, we already generated resident layers, and if + // compaction is cancelled at this point, we might have some layers that are not cleaned up. let mut compact_to = Vec::new(); let mut keep_layers = HashSet::new(); let produced_delta_layers_len = produced_delta_layers.len(); @@ -2128,52 +2177,84 @@ impl Timeline { for action in produced_delta_layers { match action { BatchWriterResult::Produced(layer) => { + if cfg!(debug_assertions) { + info!("produced delta layer: {}", layer.layer_desc().key()); + } stat.produce_delta_layer(layer.layer_desc().file_size()); compact_to.push(layer); } BatchWriterResult::Discarded(l) => { + if cfg!(debug_assertions) { + info!("discarded delta layer: {}", l); + } keep_layers.insert(l); stat.discard_delta_layer(); } } } + for layer in &rewrote_delta_layers { + debug!( + "produced rewritten delta layer: {}", + layer.layer_desc().key() + ); + } + compact_to.extend(rewrote_delta_layers); for action in produced_image_layers { match action { BatchWriterResult::Produced(layer) => { + debug!("produced image layer: {}", layer.layer_desc().key()); stat.produce_image_layer(layer.layer_desc().file_size()); compact_to.push(layer); } BatchWriterResult::Discarded(l) => { + debug!("discarded image layer: {}", l); keep_layers.insert(l); stat.discard_image_layer(); } } } - let mut layer_selection = layer_selection; - layer_selection.retain(|x| !keep_layers.contains(&x.layer_desc().key())); - if let Some(ref compaction_key_range) = compaction_key_range { - // Partial compaction might select more data than it processes, e.g., if - // the compaction_key_range only partially overlaps: - // - // [---compaction_key_range---] - // [---A----][----B----][----C----][----D----] - // - // A,B,C,D are all in the `layer_selection`. The created image layers contain - // whatever is needed from B, C, and from `----]` of A, and from `[--` of D. - // - // In contrast, `[--A-` and `--D----]` have not been processed, so, we must - // keep that data. - // - // The solution for now is to keep A and D completely. - // (layer_selection is what we'll remove from the layer map, so, - // retain what is _not_ fully covered by compaction_key_range). - layer_selection.retain(|x| { - let key_range = &x.layer_desc().key_range; - key_range.start >= compaction_key_range.start - && key_range.end <= compaction_key_range.end - }); + + let mut layer_selection = job_desc.selected_layers; + + // Partial compaction might select more data than it processes, e.g., if + // the compaction_key_range only partially overlaps: + // + // [---compaction_key_range---] + // [---A----][----B----][----C----][----D----] + // + // For delta layers, we will rewrite the layers so that it is cut exactly at + // the compaction key range, so we can always discard them. However, for image + // layers, as we do not rewrite them for now, we need to handle them differently. + // Assume image layers A, B, C, D are all in the `layer_selection`. + // + // The created image layers contain whatever is needed from B, C, and from + // `----]` of A, and from `[---` of D. + // + // In contrast, `[---A` and `D----]` have not been processed, so, we must + // keep that data. + // + // The solution for now is to keep A and D completely if they are image layers. + // (layer_selection is what we'll remove from the layer map, so, retain what + // is _not_ fully covered by compaction_key_range). + for layer in &layer_selection { + if !layer.layer_desc().is_delta() { + if !overlaps_with( + &layer.layer_desc().key_range, + &job_desc.compaction_key_range, + ) { + bail!("violated constraint: image layer outside of compaction key range"); + } + if !fully_contains( + &job_desc.compaction_key_range, + &layer.layer_desc().key_range, + ) { + keep_layers.insert(layer.layer_desc().key()); + } + } } + layer_selection.retain(|x| !keep_layers.contains(&x.layer_desc().key())); + info!( "gc-compaction statistics: {}", serde_json::to_string(&stat)? @@ -2192,6 +2273,7 @@ impl Timeline { // Step 3: Place back to the layer map. { + // TODO: sanity check if the layer map is valid (i.e., should not have overlaps) let mut guard = self.layers.write().await; guard .open_mut()?