compaction PoC: trivial move compaction (#4604)

reduce write amp. for bulk load, might also be useful for main branch

---------

Signed-off-by: Alex Chi Z <chi@neon.tech>
This commit is contained in:
Alex Chi Z
2023-07-05 15:27:25 -04:00
committed by GitHub
parent 756319b0ce
commit a079d250d9
2 changed files with 142 additions and 25 deletions

View File

@@ -132,7 +132,9 @@ impl SortedRuns {
}
/// Remove layers and the corresponding sorted runs.
pub fn insert_run_at(&mut self, idx: usize, layers: Vec<Arc<PersistentLayerDesc>>) {}
pub fn insert_run_at(&mut self, idx: usize, layers: Vec<Arc<PersistentLayerDesc>>) {
unimplemented!()
}
pub fn num_of_tiers(&self) -> usize {
self.runs.len()
@@ -743,11 +745,6 @@ impl LayerMap {
layer.dump(verbose, ctx)?;
}
println!("historic_layers:");
for layer in self.iter_historic_layers() {
layer.dump(verbose, ctx)?;
}
println!("sorted_runs:");
for (lvl, (tier_id, layer)) in self.sorted_runs.runs.iter().enumerate() {
println!("tier {}", tier_id);

View File

@@ -3431,6 +3431,7 @@ struct CompactLevel0Phase1Result {
#[derive(Default)]
struct CompactTieredPhase1Result {
new_layers: Vec<Arc<dyn PersistentLayer>>,
trivial_move_layers: Vec<Arc<dyn PersistentLayer>>,
new_tier_at: usize,
removed_tiers: Vec<usize>,
}
@@ -3913,7 +3914,7 @@ impl Timeline {
let size_ratio = 1.25;
let space_amplification_ratio = 1.5;
let max_merge_width = 10;
let min_merge_width = 3;
let min_merge_width = 2;
// Trigger 1: by space amplification, do full compaction
let total_tier_size = tier_sizes.iter().map(|(_, size)| *size).sum::<u64>();
@@ -4013,7 +4014,7 @@ impl Timeline {
target_file_size: u64,
ctx: &RequestContext,
) -> Result<Option<CompactTieredPhase1Result>, CompactionError> {
let (deltas_to_compact_layers, tier_to_compact, lsn_range) = {
let (deltas_to_compact_layers, tier_to_compact, lsn_range, trivial_move_layers) = {
let guard = self.layers.read().await;
let (layers, _) = &*guard;
@@ -4045,10 +4046,83 @@ impl Timeline {
}
drop(compacting_tiers);
let mut deltas_to_compact_layers = vec![];
let mut layers_in_tier = vec![];
for (tier_id, layers) in layers.sorted_runs.runs.iter() {
if tier_to_compact.contains(tier_id) {
deltas_to_compact_layers.extend(layers.iter().cloned());
let layers = layers.iter().cloned().collect_vec();
let image_layers = layers
.iter()
.filter(|x| !x.is_delta())
.cloned()
.collect_vec();
let delta_layers = layers
.iter()
.filter(|x| x.is_delta())
.cloned()
.collect_vec();
layers_in_tier.push((image_layers, delta_layers));
}
}
let mut layers_range = vec![];
// compute layer ranges (only including delta)
for (_, layers) in &layers_in_tier {
let key_range_start = layers
.iter()
.map(|l| l.get_key_range().start)
.min()
.unwrap();
let key_range_end = layers.iter().map(|l| l.get_key_range().end).max().unwrap();
layers_range.push(key_range_start..key_range_end);
}
// compute deltas that can be trivially moved
let mut deltas_to_compact_layers = vec![];
let mut trivial_move_layers = vec![];
for (idx, (image_layers, delta_layers)) in layers_in_tier.into_iter().enumerate() {
let range_to_check = {
let start = layers_range
.iter()
.enumerate()
.filter(|(i, _)| *i != idx)
.map(|(_, k)| k.start)
.min()
.unwrap();
let end = layers_range
.iter()
.enumerate()
.filter(|(i, _)| *i != idx)
.map(|(_, k)| k.end)
.max()
.unwrap();
start..end
};
fn overlaps_with<T: Ord>(a: &Range<T>, b: &Range<T>) -> bool {
!(a.end <= b.start || b.end <= a.start)
}
/// a contains b
/// ---- a -----
/// -- b --
fn contains<T: Ord>(a: &Range<T>, b: &Range<T>) -> bool {
b.start >= a.start && b.end <= a.end
}
for layer in delta_layers.into_iter() {
if overlaps_with(&range_to_check, &layer.get_key_range()) {
// compact if overlaps
deltas_to_compact_layers.push(layer);
} else {
// if delta layer does not overlap, trivial move
trivial_move_layers.push(layer);
}
}
for layer in image_layers.into_iter() {
if contains(&range_to_check, &layer.get_key_range()) {
// if image layer is within compaction range, remove it
deltas_to_compact_layers.push(layer);
} else {
// otherwise, trivially move
trivial_move_layers.push(layer);
}
}
}
@@ -4073,14 +4147,31 @@ impl Timeline {
};
info!(
"Starting tier compaction in LSN range {}-{} for tiers {:?}",
lsn_range.start, lsn_range.end, tier_to_compact
"Starting tier compaction in LSN range {}-{} for tiers {:?}, trivial move layers: {}",
lsn_range.start, lsn_range.end, tier_to_compact, trivial_move_layers.len()
);
(deltas_to_compact_layers, tier_to_compact, lsn_range)
let trivial_move_layers = trivial_move_layers
.iter()
.map(|x| self.lcache.get_from_desc(x))
.collect_vec();
(
deltas_to_compact_layers,
tier_to_compact,
lsn_range,
trivial_move_layers,
)
};
// TODO: leverage the properties that some layers do not overlap, kmerge is too costly
if deltas_to_compact_layers.is_empty() {
return Ok(Some(CompactTieredPhase1Result {
new_layers: vec![],
new_tier_at: *tier_to_compact.last().unwrap(),
removed_tiers: tier_to_compact,
trivial_move_layers,
}));
}
// This iterator walks through all key-value pairs from all the layers
// we're compacting, in key, LSN order.
@@ -4127,6 +4218,7 @@ impl Timeline {
let mut new_layers: Vec<Arc<dyn PersistentLayer>> = Vec::new();
let mut prev_key: Option<Key> = None;
let mut prev_image_key: Option<Key> = None;
let mut writer: Option<DeltaLayerWriter> = None;
let mut image_writer: Option<ImageLayerWriter> = None;
let mut key_values_total_size = 0u64;
@@ -4177,12 +4269,7 @@ impl Timeline {
let image_writer_mut = image_writer.as_mut().unwrap();
image_writer_mut.put_image(key, &img)?;
construct_image_for_key = true;
let written_size: u64 = image_writer_mut.size();
if written_size + key_values_total_size > target_file_size {
new_layers.push(Arc::new(image_writer.take().unwrap().finish(key.next())?));
image_writer = None;
}
prev_image_key = Some(key);
}
}
@@ -4225,6 +4312,7 @@ impl Timeline {
dup_start_lsn = dup_end_lsn;
dup_end_lsn = lsn_range.end;
}
if writer.is_some() {
let written_size = writer.as_mut().unwrap().size();
// check if key cause layer overflow or contains hole...
@@ -4236,9 +4324,26 @@ impl Timeline {
new_layers.push(Arc::new(
writer.take().unwrap().finish(prev_key.unwrap().next())?,
));
writer = None;
// only write image layer when we end a delta layer
if image_writer.is_some() {
let image_writer_mut = image_writer.as_mut().unwrap();
let written_size: u64 = image_writer_mut.size();
if written_size + key_values_total_size > target_file_size / 2 {
new_layers.push(Arc::new(
image_writer
.take()
.unwrap()
.finish(prev_image_key.unwrap().next())?,
));
image_writer = None; // this is redundant
}
}
writer = None; // this is redundant
}
}
// Remember size of key value because at next iteration we will access next item
key_values_total_size = next_key_size;
}
@@ -4296,6 +4401,7 @@ impl Timeline {
new_layers,
new_tier_at: *tier_to_compact.last().unwrap(),
removed_tiers: tier_to_compact,
trivial_move_layers,
}))
}
@@ -4310,7 +4416,8 @@ impl Timeline {
let Some(CompactTieredPhase1Result {
new_layers,
new_tier_at,
removed_tiers
removed_tiers,
trivial_move_layers
}) = self
.compact_tiered_phase1(layer_removal_cs.clone(), target_file_size, ctx)
.await? else { return Ok(()); };
@@ -4338,6 +4445,11 @@ impl Timeline {
let mut new_tier_at_index = None;
let mut layers_to_delete = vec![];
let mut layer_names_to_delete = vec![];
let trivial_move_layers_keys = trivial_move_layers
.iter()
.map(|x| x.layer_desc().key())
.collect::<HashSet<_>>();
for (tier_id, tier) in &updates.sorted_runs().runs {
if *tier_id == new_tier_at {
new_tier_at_index = Some(new_sorted_runs.len());
@@ -4346,7 +4458,9 @@ impl Timeline {
new_sorted_runs.push((*tier_id, tier.clone()));
} else {
for layer in tier {
layers_to_delete.push(layer.clone());
if !trivial_move_layers_keys.contains(&layer.key()) {
layers_to_delete.push(layer.clone());
}
}
}
}
@@ -4359,7 +4473,7 @@ impl Timeline {
let new_tier_at_index = new_tier_at_index.unwrap();
let mut new_layer_paths = HashMap::with_capacity(new_layers.len());
let mut new_layer_descs = vec![];
let mut new_layer_descs: Vec<Arc<PersistentLayerDesc>> = vec![];
for l in new_layers {
let new_path = l.local_path().unwrap();
@@ -4393,7 +4507,12 @@ impl Timeline {
self.lcache.create_new_layer(l);
}
for layer in &trivial_move_layers {
new_layer_descs.push(layer.layer_desc().clone().into());
}
let new_tier_id = updates.sorted_runs().next_tier_id();
new_layer_descs.sort_by_key(|x| (x.is_delta(), x.key_range.start));
new_sorted_runs.insert(new_tier_at_index, (new_tier_id, new_layer_descs));
updates.sorted_runs().runs = new_sorted_runs;
@@ -5367,6 +5486,7 @@ fn compaction_simulator_1() {
let mut new_tiers = vec![];
let mut new_tier_size = 0;
let mut insert_at = 0;
let tiers_to_compact_clone = tiers_to_compact.clone();
for &(tier_id, size) in &tiers {
if tiers_to_compact.contains(&tier_id) {
new_tier_size += size;
@@ -5381,7 +5501,7 @@ fn compaction_simulator_1() {
next_tier_id += 1;
println!(
"finish {:?} -> {}, size = {}",
tiers_to_compact, next_tier_id, new_tier_size
tiers_to_compact_clone, next_tier_id, new_tier_size
);
for tier in &tiers_to_compact {
skip_tiers.remove(tier);