add two compaction triggers

Signed-off-by: Alex Chi <iskyzh@gmail.com>
This commit is contained in:
Alex Chi
2023-06-20 15:42:08 -04:00
parent 5274f487e4
commit 17781776c8

View File

@@ -3749,7 +3749,9 @@ impl Timeline {
if new_layers.is_empty() && deltas_to_compact.is_empty() {
// If L0 does not need to be compacted, look into other layers
return self.compact_tiered(layer_removal_cs, target_file_size, ctx).await;
return self
.compact_tiered(layer_removal_cs, target_file_size, ctx)
.await;
}
// Before deleting any layers, we need to wait for their upload ops to finish.
@@ -3818,6 +3820,40 @@ impl Timeline {
Ok(())
}
fn get_compact_task(tier_sizes: Vec<(usize, u64)>) -> Option<Vec<usize>> {
let size_ratio = 1.0;
let space_amplification_ratio = 2.0;
// Trigger 1: by space amplification, do full compaction
let total_tier_size = tier_sizes.iter().map(|(_, size)| *size).sum::<u64>();
let (_, last_tier_size) = *tier_sizes.last().unwrap();
let estimated_space_amp = last_tier_size as f64 / (total_tier_size - last_tier_size) as f64;
if estimated_space_amp > space_amplification_ratio {
info!("full compaction triggered by space amplification");
return Some(
tier_sizes
.iter()
.map(|(tier_id, _)| *tier_id)
.collect::<Vec<_>>(),
);
}
// Trigger 2: by size ratio
let mut total_size_up_to_lvl = 0;
let mut compact_tiers = Vec::new();
for (tier_id, size) in tier_sizes {
if total_size_up_to_lvl != 0 {
if total_size_up_to_lvl as f64 / size as f64 > size_ratio {
info!("full compaction triggered by size ratio");
return Some(compact_tiers);
}
}
total_size_up_to_lvl += size;
compact_tiers.push(tier_id);
}
None
}
async fn compact_tiered_phase1(
&self,
_layer_removal_cs: DeleteGuard,
@@ -3826,9 +3862,10 @@ impl Timeline {
) -> Result<Option<CompactTieredPhase1Result>, CompactionError> {
let guard = self.layers.read().await;
let (layers, _) = &*guard;
// Only compact if enough layers have accumulated.
// Precondition: only compact if enough layers have accumulated.
let threshold = 8;
assert!(threshold >= 2);
if layers.sorted_runs.len() < threshold {
debug!(
level0_deltas = layers.sorted_runs.len(),
@@ -3838,8 +3875,57 @@ impl Timeline {
}
// Gather the files to compact in this iteration.
let tier_sizes: Vec<(usize, u64)> = layers
.sorted_runs
.iter()
.map(|(tier_id, layers)| (*tier_id, layers.iter().map(|layer| layer.file_size()).sum()))
.collect::<Vec<_>>();
let Some(tier_to_compect) = Self::get_compact_task(tier_sizes) else {
return Ok(None);
};
if tier_to_compect.len() < 2 {
return Ok(None);
}
let mut deltas_to_compact_layers = vec![];
for (tier_id, layers) in layers.sorted_runs.iter() {
if tier_to_compect.contains(tier_id) {
deltas_to_compact_layers.extend(layers.iter().cloned());
}
}
drop_rlock(guard);
let deltas_to_compact_layers = deltas_to_compact_layers
.into_iter()
.map(|l| self.lcache.get_from_desc(&l))
.collect_vec();
let lsn_range = {
let lsn_range_start = deltas_to_compact_layers
.iter()
.map(|l| l.get_lsn_range().start)
.min()
.unwrap();
let lsn_range_end = deltas_to_compact_layers
.iter()
.map(|l| l.get_lsn_range().end)
.max()
.unwrap();
lsn_range_start..lsn_range_end
};
info!(
"Starting tier compaction in LSN range {}-{} for tiers {:?}",
lsn_range.start, lsn_range.end, tier_to_compect
);
layers.dump(false, ctx);
// TODO: leverage the properties that some layers do not overlap, kmerge is too costly
// This iterator walks through all key-value pairs from all the layers
// we're compacting, in key, LSN order.
let all_values_iter = itertools::process_results(
@@ -3991,7 +4077,11 @@ impl Timeline {
drop(all_keys_iter); // So that deltas_to_compact is no longer borrowed
Ok(None)
Ok(Some(CompactTieredPhase1Result {
new_layers,
new_tier_at: *tier_to_compect.last().unwrap(),
removed_tiers: tier_to_compect,
}))
}
///