From 17781776c86f4fc6d1469855374a7e050a81581c Mon Sep 17 00:00:00 2001 From: Alex Chi Date: Tue, 20 Jun 2023 15:42:08 -0400 Subject: [PATCH] add two compaction triggers Signed-off-by: Alex Chi --- pageserver/src/tenant/timeline.rs | 100 ++++++++++++++++++++++++++++-- 1 file changed, 95 insertions(+), 5 deletions(-) diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 69c3515649..3fcc261286 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -3749,7 +3749,9 @@ impl Timeline { if new_layers.is_empty() && deltas_to_compact.is_empty() { // If L0 does not need to be compacted, look into other layers - return self.compact_tiered(layer_removal_cs, target_file_size, ctx).await; + return self + .compact_tiered(layer_removal_cs, target_file_size, ctx) + .await; } // Before deleting any layers, we need to wait for their upload ops to finish. @@ -3818,6 +3820,40 @@ impl Timeline { Ok(()) } + fn get_compact_task(tier_sizes: Vec<(usize, u64)>) -> Option> { + let size_ratio = 1.0; + let space_amplification_ratio = 2.0; + + // Trigger 1: by space amplification, do full compaction + let total_tier_size = tier_sizes.iter().map(|(_, size)| *size).sum::(); + let (_, last_tier_size) = *tier_sizes.last().unwrap(); + let estimated_space_amp = last_tier_size as f64 / (total_tier_size - last_tier_size) as f64; + if estimated_space_amp > space_amplification_ratio { + info!("full compaction triggered by space amplification"); + return Some( + tier_sizes + .iter() + .map(|(tier_id, _)| *tier_id) + .collect::>(), + ); + } + + // Trigger 2: by size ratio + let mut total_size_up_to_lvl = 0; + let mut compact_tiers = Vec::new(); + for (tier_id, size) in tier_sizes { + if total_size_up_to_lvl != 0 { + if total_size_up_to_lvl as f64 / size as f64 > size_ratio { + info!("full compaction triggered by size ratio"); + return Some(compact_tiers); + } + } + total_size_up_to_lvl += size; + compact_tiers.push(tier_id); + } + None + } + async fn compact_tiered_phase1( &self, _layer_removal_cs: DeleteGuard, @@ -3826,9 +3862,10 @@ impl Timeline { ) -> Result, CompactionError> { let guard = self.layers.read().await; let (layers, _) = &*guard; - - // Only compact if enough layers have accumulated. + + // Precondition: only compact if enough layers have accumulated. let threshold = 8; + assert!(threshold >= 2); if layers.sorted_runs.len() < threshold { debug!( level0_deltas = layers.sorted_runs.len(), @@ -3838,8 +3875,57 @@ impl Timeline { } // Gather the files to compact in this iteration. + + let tier_sizes: Vec<(usize, u64)> = layers + .sorted_runs + .iter() + .map(|(tier_id, layers)| (*tier_id, layers.iter().map(|layer| layer.file_size()).sum())) + .collect::>(); + + let Some(tier_to_compect) = Self::get_compact_task(tier_sizes) else { + return Ok(None); + }; + + if tier_to_compect.len() < 2 { + return Ok(None); + } + + let mut deltas_to_compact_layers = vec![]; + for (tier_id, layers) in layers.sorted_runs.iter() { + if tier_to_compect.contains(tier_id) { + deltas_to_compact_layers.extend(layers.iter().cloned()); + } + } + drop_rlock(guard); + + let deltas_to_compact_layers = deltas_to_compact_layers + .into_iter() + .map(|l| self.lcache.get_from_desc(&l)) + .collect_vec(); + + let lsn_range = { + let lsn_range_start = deltas_to_compact_layers + .iter() + .map(|l| l.get_lsn_range().start) + .min() + .unwrap(); + let lsn_range_end = deltas_to_compact_layers + .iter() + .map(|l| l.get_lsn_range().end) + .max() + .unwrap(); + lsn_range_start..lsn_range_end + }; + + info!( + "Starting tier compaction in LSN range {}-{} for tiers {:?}", + lsn_range.start, lsn_range.end, tier_to_compect + ); + + layers.dump(false, ctx); + // TODO: leverage the properties that some layers do not overlap, kmerge is too costly - + // This iterator walks through all key-value pairs from all the layers // we're compacting, in key, LSN order. let all_values_iter = itertools::process_results( @@ -3991,7 +4077,11 @@ impl Timeline { drop(all_keys_iter); // So that deltas_to_compact is no longer borrowed - Ok(None) + Ok(Some(CompactTieredPhase1Result { + new_layers, + new_tier_at: *tier_to_compect.last().unwrap(), + removed_tiers: tier_to_compect, + })) } ///