From a079d250d9a7bad7272fce7d3b25c07172fc5952 Mon Sep 17 00:00:00 2001 From: Alex Chi Z Date: Wed, 5 Jul 2023 15:27:25 -0400 Subject: [PATCH] compaction PoC: trivial move compaction (#4604) reduce write amp. for bulk load, might also be useful for main branch --------- Signed-off-by: Alex Chi Z --- pageserver/src/tenant/layer_map.rs | 9 +- pageserver/src/tenant/timeline.rs | 158 +++++++++++++++++++++++++---- 2 files changed, 142 insertions(+), 25 deletions(-) diff --git a/pageserver/src/tenant/layer_map.rs b/pageserver/src/tenant/layer_map.rs index 9352a7bead..572a6a4533 100644 --- a/pageserver/src/tenant/layer_map.rs +++ b/pageserver/src/tenant/layer_map.rs @@ -132,7 +132,9 @@ impl SortedRuns { } /// Remove layers and the corresponding sorted runs. - pub fn insert_run_at(&mut self, idx: usize, layers: Vec>) {} + pub fn insert_run_at(&mut self, idx: usize, layers: Vec>) { + unimplemented!() + } pub fn num_of_tiers(&self) -> usize { self.runs.len() @@ -743,11 +745,6 @@ impl LayerMap { layer.dump(verbose, ctx)?; } - println!("historic_layers:"); - for layer in self.iter_historic_layers() { - layer.dump(verbose, ctx)?; - } - println!("sorted_runs:"); for (lvl, (tier_id, layer)) in self.sorted_runs.runs.iter().enumerate() { println!("tier {}", tier_id); diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 210630bc2f..bdc9dd718b 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -3431,6 +3431,7 @@ struct CompactLevel0Phase1Result { #[derive(Default)] struct CompactTieredPhase1Result { new_layers: Vec>, + trivial_move_layers: Vec>, new_tier_at: usize, removed_tiers: Vec, } @@ -3913,7 +3914,7 @@ impl Timeline { let size_ratio = 1.25; let space_amplification_ratio = 1.5; let max_merge_width = 10; - let min_merge_width = 3; + let min_merge_width = 2; // Trigger 1: by space amplification, do full compaction let total_tier_size = tier_sizes.iter().map(|(_, size)| *size).sum::(); @@ -4013,7 +4014,7 @@ impl Timeline { target_file_size: u64, ctx: &RequestContext, ) -> Result, CompactionError> { - let (deltas_to_compact_layers, tier_to_compact, lsn_range) = { + let (deltas_to_compact_layers, tier_to_compact, lsn_range, trivial_move_layers) = { let guard = self.layers.read().await; let (layers, _) = &*guard; @@ -4045,10 +4046,83 @@ impl Timeline { } drop(compacting_tiers); - let mut deltas_to_compact_layers = vec![]; + let mut layers_in_tier = vec![]; for (tier_id, layers) in layers.sorted_runs.runs.iter() { if tier_to_compact.contains(tier_id) { - deltas_to_compact_layers.extend(layers.iter().cloned()); + let layers = layers.iter().cloned().collect_vec(); + let image_layers = layers + .iter() + .filter(|x| !x.is_delta()) + .cloned() + .collect_vec(); + let delta_layers = layers + .iter() + .filter(|x| x.is_delta()) + .cloned() + .collect_vec(); + layers_in_tier.push((image_layers, delta_layers)); + } + } + + let mut layers_range = vec![]; + // compute layer ranges (only including delta) + for (_, layers) in &layers_in_tier { + let key_range_start = layers + .iter() + .map(|l| l.get_key_range().start) + .min() + .unwrap(); + let key_range_end = layers.iter().map(|l| l.get_key_range().end).max().unwrap(); + layers_range.push(key_range_start..key_range_end); + } + + // compute deltas that can be trivially moved + let mut deltas_to_compact_layers = vec![]; + let mut trivial_move_layers = vec![]; + for (idx, (image_layers, delta_layers)) in layers_in_tier.into_iter().enumerate() { + let range_to_check = { + let start = layers_range + .iter() + .enumerate() + .filter(|(i, _)| *i != idx) + .map(|(_, k)| k.start) + .min() + .unwrap(); + let end = layers_range + .iter() + .enumerate() + .filter(|(i, _)| *i != idx) + .map(|(_, k)| k.end) + .max() + .unwrap(); + start..end + }; + fn overlaps_with(a: &Range, b: &Range) -> bool { + !(a.end <= b.start || b.end <= a.start) + } + /// a contains b + /// ---- a ----- + /// -- b -- + fn contains(a: &Range, b: &Range) -> bool { + b.start >= a.start && b.end <= a.end + } + for layer in delta_layers.into_iter() { + if overlaps_with(&range_to_check, &layer.get_key_range()) { + // compact if overlaps + deltas_to_compact_layers.push(layer); + } else { + // if delta layer does not overlap, trivial move + trivial_move_layers.push(layer); + } + } + for layer in image_layers.into_iter() { + if contains(&range_to_check, &layer.get_key_range()) { + // if image layer is within compaction range, remove it + deltas_to_compact_layers.push(layer); + } else { + // otherwise, trivially move + trivial_move_layers.push(layer); + } } } @@ -4073,14 +4147,31 @@ impl Timeline { }; info!( - "Starting tier compaction in LSN range {}-{} for tiers {:?}", - lsn_range.start, lsn_range.end, tier_to_compact + "Starting tier compaction in LSN range {}-{} for tiers {:?}, trivial move layers: {}", + lsn_range.start, lsn_range.end, tier_to_compact, trivial_move_layers.len() ); - (deltas_to_compact_layers, tier_to_compact, lsn_range) + let trivial_move_layers = trivial_move_layers + .iter() + .map(|x| self.lcache.get_from_desc(x)) + .collect_vec(); + + ( + deltas_to_compact_layers, + tier_to_compact, + lsn_range, + trivial_move_layers, + ) }; - // TODO: leverage the properties that some layers do not overlap, kmerge is too costly + if deltas_to_compact_layers.is_empty() { + return Ok(Some(CompactTieredPhase1Result { + new_layers: vec![], + new_tier_at: *tier_to_compact.last().unwrap(), + removed_tiers: tier_to_compact, + trivial_move_layers, + })); + } // This iterator walks through all key-value pairs from all the layers // we're compacting, in key, LSN order. @@ -4127,6 +4218,7 @@ impl Timeline { let mut new_layers: Vec> = Vec::new(); let mut prev_key: Option = None; + let mut prev_image_key: Option = None; let mut writer: Option = None; let mut image_writer: Option = None; let mut key_values_total_size = 0u64; @@ -4177,12 +4269,7 @@ impl Timeline { let image_writer_mut = image_writer.as_mut().unwrap(); image_writer_mut.put_image(key, &img)?; construct_image_for_key = true; - - let written_size: u64 = image_writer_mut.size(); - if written_size + key_values_total_size > target_file_size { - new_layers.push(Arc::new(image_writer.take().unwrap().finish(key.next())?)); - image_writer = None; - } + prev_image_key = Some(key); } } @@ -4225,6 +4312,7 @@ impl Timeline { dup_start_lsn = dup_end_lsn; dup_end_lsn = lsn_range.end; } + if writer.is_some() { let written_size = writer.as_mut().unwrap().size(); // check if key cause layer overflow or contains hole... @@ -4236,9 +4324,26 @@ impl Timeline { new_layers.push(Arc::new( writer.take().unwrap().finish(prev_key.unwrap().next())?, )); - writer = None; + + // only write image layer when we end a delta layer + if image_writer.is_some() { + let image_writer_mut = image_writer.as_mut().unwrap(); + let written_size: u64 = image_writer_mut.size(); + if written_size + key_values_total_size > target_file_size / 2 { + new_layers.push(Arc::new( + image_writer + .take() + .unwrap() + .finish(prev_image_key.unwrap().next())?, + )); + image_writer = None; // this is redundant + } + } + + writer = None; // this is redundant } } + // Remember size of key value because at next iteration we will access next item key_values_total_size = next_key_size; } @@ -4296,6 +4401,7 @@ impl Timeline { new_layers, new_tier_at: *tier_to_compact.last().unwrap(), removed_tiers: tier_to_compact, + trivial_move_layers, })) } @@ -4310,7 +4416,8 @@ impl Timeline { let Some(CompactTieredPhase1Result { new_layers, new_tier_at, - removed_tiers + removed_tiers, + trivial_move_layers }) = self .compact_tiered_phase1(layer_removal_cs.clone(), target_file_size, ctx) .await? else { return Ok(()); }; @@ -4338,6 +4445,11 @@ impl Timeline { let mut new_tier_at_index = None; let mut layers_to_delete = vec![]; let mut layer_names_to_delete = vec![]; + + let trivial_move_layers_keys = trivial_move_layers + .iter() + .map(|x| x.layer_desc().key()) + .collect::>(); for (tier_id, tier) in &updates.sorted_runs().runs { if *tier_id == new_tier_at { new_tier_at_index = Some(new_sorted_runs.len()); @@ -4346,7 +4458,9 @@ impl Timeline { new_sorted_runs.push((*tier_id, tier.clone())); } else { for layer in tier { - layers_to_delete.push(layer.clone()); + if !trivial_move_layers_keys.contains(&layer.key()) { + layers_to_delete.push(layer.clone()); + } } } } @@ -4359,7 +4473,7 @@ impl Timeline { let new_tier_at_index = new_tier_at_index.unwrap(); let mut new_layer_paths = HashMap::with_capacity(new_layers.len()); - let mut new_layer_descs = vec![]; + let mut new_layer_descs: Vec> = vec![]; for l in new_layers { let new_path = l.local_path().unwrap(); @@ -4393,7 +4507,12 @@ impl Timeline { self.lcache.create_new_layer(l); } + for layer in &trivial_move_layers { + new_layer_descs.push(layer.layer_desc().clone().into()); + } + let new_tier_id = updates.sorted_runs().next_tier_id(); + new_layer_descs.sort_by_key(|x| (x.is_delta(), x.key_range.start)); new_sorted_runs.insert(new_tier_at_index, (new_tier_id, new_layer_descs)); updates.sorted_runs().runs = new_sorted_runs; @@ -5367,6 +5486,7 @@ fn compaction_simulator_1() { let mut new_tiers = vec![]; let mut new_tier_size = 0; let mut insert_at = 0; + let tiers_to_compact_clone = tiers_to_compact.clone(); for &(tier_id, size) in &tiers { if tiers_to_compact.contains(&tier_id) { new_tier_size += size; @@ -5381,7 +5501,7 @@ fn compaction_simulator_1() { next_tier_id += 1; println!( "finish {:?} -> {}, size = {}", - tiers_to_compact, next_tier_id, new_tier_size + tiers_to_compact_clone, next_tier_id, new_tier_size ); for tier in &tiers_to_compact { skip_tiers.remove(tier);