add reduce num sorted run trigger

Signed-off-by: Alex Chi Z <chi@neon.tech>
This commit is contained in:
Alex Chi Z
2023-06-28 15:38:14 -04:00
parent 7d16a9f96f
commit b1f0bbd12a

View File

@@ -3859,8 +3859,9 @@ impl Timeline {
tier_sizes: Vec<(usize, u64)>,
) -> Option<Vec<usize>> {
let size_ratio = 1.25;
let space_amplification_ratio = 2.0;
let max_merge_width = 20;
let space_amplification_ratio = 1.5;
let max_merge_width = 10;
let min_merge_width = 3;
// Trigger 1: by space amplification, do full compaction
let total_tier_size = tier_sizes.iter().map(|(_, size)| *size).sum::<u64>();
@@ -3868,7 +3869,10 @@ impl Timeline {
let estimated_space_amp = (total_tier_size - last_tier_size) as f64 / last_tier_size as f64;
if estimated_space_amp > space_amplification_ratio {
if !skip_tiers.is_empty() {
info!("full compaction cannot be triggered as some layers are being compacted: {:?}", skip_tiers);
info!(
"full compaction cannot be triggered as some layers are being compacted: {:?}",
skip_tiers
);
} else {
info!("full compaction triggered by space amplification");
let tiers = tier_sizes
@@ -3878,14 +3882,16 @@ impl Timeline {
.rev()
.map(|(tier_id, _)| *tier_id)
.collect::<Vec<_>>();
return Some(tiers);
if tiers.len() >= min_merge_width {
return Some(tiers);
}
}
}
// Trigger 2: by size ratio
let mut total_size_up_to_lvl = 0;
let mut compact_tiers = Vec::new();
for (tier_id, size) in tier_sizes {
for &(tier_id, size) in &tier_sizes {
if total_size_up_to_lvl != 0 && size as f64 / total_size_up_to_lvl as f64 > size_ratio {
info!("compaction triggered by size ratio");
let compact_tiers = compact_tiers
@@ -3895,7 +3901,11 @@ impl Timeline {
.rev()
.copied()
.collect_vec();
return Some(compact_tiers);
if compact_tiers.len() >= min_merge_width {
return Some(compact_tiers);
} else {
break;
}
}
if skip_tiers.contains(&tier_id) {
break;
@@ -3903,7 +3913,30 @@ impl Timeline {
total_size_up_to_lvl += size;
compact_tiers.push(tier_id);
}
None
// Trigger 3: reduce number of sorted runs, pick up to max_merge_width files to compact.
let mut compact_tiers = Vec::new();
for (tier_id, size) in tier_sizes {
if skip_tiers.contains(&tier_id) {
break;
}
total_size_up_to_lvl += size;
compact_tiers.push(tier_id);
}
info!("compaction triggered by reducing sorted runs");
let compact_tiers = compact_tiers
.iter()
.rev()
.take(max_merge_width)
.rev()
.copied()
.collect_vec();
if compact_tiers.len() >= min_merge_width {
return Some(compact_tiers);
} else {
return None;
}
}
async fn compact_tiered_phase1(
@@ -3940,12 +3973,6 @@ impl Timeline {
return Ok(None);
};
let min_merge_width = 3;
if tier_to_compact.len() < min_merge_width {
return Ok(None);
}
println!("tier_to_compact: {tier_to_compact:?}");
for &tier in &tier_to_compact {
compacting_tiers.insert(tier);
@@ -4318,7 +4345,9 @@ impl Timeline {
compacting_tiers.remove(&tier);
}
info!("compaction complete, removed_tiers = {removed_tiers:?}, new_tier_at = {new_tier_at}");
info!(
"compaction complete, removed_tiers = {removed_tiers:?}, new_tier_at = {new_tier_at}"
);
drop_wlock(guard);