fix compaction algorithm

Signed-off-by: Alex Chi Z <chi@neon.tech>
This commit is contained in:
Alex Chi Z
2023-06-29 18:11:51 -04:00
parent c3bcaa0551
commit 8816fc98fc

View File

@@ -3922,9 +3922,20 @@ impl Timeline {
if estimated_space_amp > space_amplification_ratio {
if !skip_tiers.is_empty() {
info!(
"full compaction cannot be triggered as some layers are being compacted: {:?}",
"full compaction triggered while some layers are being compacted: {:?}",
skip_tiers
);
let tiers = tier_sizes.iter().rev().take(max_merge_width);
let mut tiers_to_compact = vec![];
for &(id, _) in tiers {
if skip_tiers.contains(&id) {
break;
}
tiers_to_compact.push(id);
}
if tiers_to_compact.len() >= min_merge_width {
return Some(tiers_to_compact);
}
} else {
info!("full compaction triggered by space amplification");
let tiers = tier_sizes
@@ -3945,7 +3956,6 @@ impl Timeline {
let mut compact_tiers = Vec::new();
for &(tier_id, size) in &tier_sizes {
if total_size_up_to_lvl != 0 && size as f64 / total_size_up_to_lvl as f64 > size_ratio {
info!("compaction triggered by size ratio");
let compact_tiers = compact_tiers
.iter()
.rev()
@@ -3954,29 +3964,39 @@ impl Timeline {
.copied()
.collect_vec();
if compact_tiers.len() >= min_merge_width {
info!(
"compaction triggered by size ratio: {} / {}",
size, total_size_up_to_lvl
);
return Some(compact_tiers);
} else {
break;
return None;
}
}
if skip_tiers.contains(&tier_id) {
break;
total_size_up_to_lvl = 0;
compact_tiers.clear();
continue;
}
total_size_up_to_lvl += size;
compact_tiers.push(tier_id);
}
// Trigger 3: reduce number of sorted runs, pick up to max_merge_width files to compact.
let mut compact_tiers = Vec::new();
let mut compact_tiers: Vec<usize> = Vec::new();
let last_tier = tier_sizes.last().unwrap().0;
for (tier_id, size) in tier_sizes {
if skip_tiers.contains(&tier_id) {
break;
}
if tier_id == last_tier {
// exclude bottom-most level
break;
}
total_size_up_to_lvl += size;
compact_tiers.push(tier_id);
}
info!("compaction triggered by reducing sorted runs");
let compact_tiers = compact_tiers
.iter()
.rev()
@@ -3985,6 +4005,7 @@ impl Timeline {
.copied()
.collect_vec();
if compact_tiers.len() >= min_merge_width {
info!("compaction triggered by reducing sorted runs");
return Some(compact_tiers);
} else {
return None;
@@ -5331,3 +5352,213 @@ pub(crate) fn debug_assert_current_span_has_tenant_and_timeline_id() {
),
}
}
#[test]
fn compaction_simulator_1() {
let mut tiers = vec![];
let mut skip_tiers = HashSet::new();
let mut next_tier_id = 0;
let mut write_total = 0;
let l0_total = 400;
let mut last_round_tiers_to_compact = vec![];
for i in 0..l0_total {
tiers.insert(0, (next_tier_id, 1));
next_tier_id += 1;
if !last_round_tiers_to_compact.is_empty() {
let mut tiers_to_compact = last_round_tiers_to_compact.clone();
last_round_tiers_to_compact = vec![];
let mut new_tiers = vec![];
let mut new_tier_size = 0;
let mut insert_at = 0;
for &(tier_id, size) in &tiers {
if tiers_to_compact.contains(&tier_id) {
new_tier_size += size;
insert_at = new_tiers.len();
tiers_to_compact.retain(|x| *x != tier_id);
} else {
new_tiers.push((tier_id, size));
}
}
assert!(tiers_to_compact.is_empty());
new_tiers.insert(insert_at, (next_tier_id, new_tier_size));
next_tier_id += 1;
println!(
"finish {:?} -> {}, size = {}",
tiers_to_compact, next_tier_id, new_tier_size
);
for tier in &tiers_to_compact {
skip_tiers.remove(tier);
}
write_total += new_tier_size;
tiers = new_tiers;
}
if let Some(tiers_to_compact) = Timeline::get_compact_task(&skip_tiers, tiers.clone()) {
last_round_tiers_to_compact = tiers_to_compact.clone();
for &tier in &tiers_to_compact {
skip_tiers.insert(tier);
}
println!("start {:?}", tiers_to_compact);
}
// println!("--- round {i} ---");
// for &(tier_id, size) in &tiers {
// println!("{}: {}", tier_id, size);
// }
// println!("--- summary ---");
for &(id, size) in &tiers {
if skip_tiers.contains(&id) {
print!("{}(!) ", size);
} else {
print!("{} ", size);
}
}
println!();
}
println!(
"write amp.: {} / {} = {}",
write_total,
l0_total,
write_total as f64 / l0_total as f64
);
}
#[test]
fn compaction_simulator_2() {
let mut tiers = vec![];
let mut skip_tiers = HashSet::new();
let mut next_tier_id = 0;
let mut write_total = 0;
let l0_total = 400;
let mut read_amp = 0;
for i in 0..l0_total {
tiers.insert(0, (next_tier_id, 1));
next_tier_id += 1;
read_amp = read_amp.max(tiers.len());
if let Some(mut tiers_to_compact) = Timeline::get_compact_task(&skip_tiers, tiers.clone()) {
let mut new_tiers = vec![];
let mut new_tier_size = 0;
let mut insert_at = 0;
for &(tier_id, size) in &tiers {
if tiers_to_compact.contains(&tier_id) {
new_tier_size += size;
insert_at = new_tiers.len();
tiers_to_compact.retain(|x| *x != tier_id);
} else {
new_tiers.push((tier_id, size));
}
}
assert!(tiers_to_compact.is_empty());
new_tiers.insert(insert_at, (next_tier_id, new_tier_size));
next_tier_id += 1;
println!(
"finish {:?} -> {}, size = {}",
tiers_to_compact, next_tier_id, new_tier_size
);
for tier in &tiers_to_compact {
skip_tiers.remove(tier);
}
write_total += new_tier_size;
tiers = new_tiers;
}
// println!("--- round {i} ---");
// for &(tier_id, size) in &tiers {
// println!("{}: {}", tier_id, size);
// }
// println!("--- summary ---");
for &(id, size) in &tiers {
if skip_tiers.contains(&id) {
print!("{}(!) ", size);
} else {
print!("{} ", size);
}
}
println!();
}
println!(
"write amp.: {} / {} = {}",
write_total,
l0_total,
write_total as f64 / l0_total as f64
);
println!("read amp. max: {}", read_amp);
}
#[test]
fn compaction_simulator_3() {
let mut tiers = vec![];
let mut skip_tiers = HashSet::new();
let mut next_tier_id = 0;
let mut write_total = 0;
let l0_total = 400;
let mut read_amp = 0;
for i in 0..l0_total {
tiers.insert(0, (next_tier_id, 1));
next_tier_id += 1;
read_amp = read_amp.max(tiers.len());
let mut round = 0;
while let Some(mut tiers_to_compact) =
Timeline::get_compact_task(&skip_tiers, tiers.clone())
{
round += 1;
let mut new_tiers = vec![];
let mut new_tier_size = 0;
let mut insert_at = 0;
for &(tier_id, size) in &tiers {
if tiers_to_compact.contains(&tier_id) {
new_tier_size += size;
insert_at = new_tiers.len();
tiers_to_compact.retain(|x| *x != tier_id);
} else {
new_tiers.push((tier_id, size));
}
}
assert!(tiers_to_compact.is_empty());
new_tiers.insert(insert_at, (next_tier_id, new_tier_size));
next_tier_id += 1;
println!(
"finish {:?} -> {}, size = {}",
tiers_to_compact, next_tier_id, new_tier_size
);
for tier in &tiers_to_compact {
skip_tiers.remove(tier);
}
write_total += new_tier_size;
tiers = new_tiers;
}
if round > 3 {
panic!("compaction not exaustive");
}
// println!("--- round {i} ---");
// for &(tier_id, size) in &tiers {
// println!("{}: {}", tier_id, size);
// }
// println!("--- summary ---");
for &(id, size) in &tiers {
if skip_tiers.contains(&id) {
print!("{}(!) ", size);
} else {
print!("{} ", size);
}
}
println!();
}
println!(
"write amp.: {} / {} = {}",
write_total,
l0_total,
write_total as f64 / l0_total as f64
);
println!("read amp. max: {}", read_amp);
}