diff --git a/src/mito2/Cargo.toml b/src/mito2/Cargo.toml index 964f6eedea..dde1e44ea1 100644 --- a/src/mito2/Cargo.toml +++ b/src/mito2/Cargo.toml @@ -7,6 +7,7 @@ license.workspace = true [features] default = [] test = ["common-test-util", "rstest", "rstest_reuse", "rskafka"] +testing = ["test"] enterprise = [] vector_index = ["dep:roaring", "index/vector_index"] @@ -123,7 +124,7 @@ required-features = ["test"] [[bench]] name = "bench_compaction_picker" harness = false -required-features = ["test"] +required-features = ["testing"] [[bench]] name = "simple_bulk_memtable" diff --git a/src/mito2/benches/bench_compaction_picker.rs b/src/mito2/benches/bench_compaction_picker.rs index d2a5e5aeae..74142127ae 100644 --- a/src/mito2/benches/bench_compaction_picker.rs +++ b/src/mito2/benches/bench_compaction_picker.rs @@ -16,13 +16,15 @@ use std::hint::black_box; use criterion::{Criterion, criterion_group, criterion_main}; use mito2::compaction::run::{ - Item, Ranged, SortedRun, find_overlapping_items, find_sorted_runs, merge_seq_files, reduce_runs, + Item, Ranged, SortedRun, find_overlapping_items, find_sorted_runs, find_sorted_runs_original, + merge_seq_files, reduce_runs, }; #[derive(Clone, Debug, Eq, Hash, PartialEq)] struct MockFile { start: i64, end: i64, + pk: usize, size: usize, } @@ -32,6 +34,13 @@ impl Ranged for MockFile { fn range(&self) -> (Self::BoundType, Self::BoundType) { (self.start, self.end) } + + fn overlap(&self, other: &Self) -> bool { + let (lhs_start, lhs_end) = self.range(); + let (rhs_start, rhs_end) = other.range(); + + lhs_start.max(rhs_start) < lhs_end.min(rhs_end) && self.pk == other.pk + } } impl Item for MockFile { @@ -47,22 +56,53 @@ fn generate_test_files(n: usize) -> Vec { files.push(MockFile { start: 0, end: 10, + pk: 0, size: 10, }); } files } +fn generate_same_timestamp_files(total_files: usize, files_per_timestamp: usize) -> Vec { + const TIMESTAMP_INTERVAL_SECS: i64 = 10 * 60; + + let mut files = Vec::with_capacity(total_files); + for idx in 0..total_files { + let timestamp_idx = idx / files_per_timestamp; + let start = timestamp_idx as i64 * TIMESTAMP_INTERVAL_SECS; + files.push(MockFile { + start, + end: start + TIMESTAMP_INTERVAL_SECS, + pk: idx % files_per_timestamp, + size: 10, + }); + } + + files +} + fn bench_find_sorted_runs(c: &mut Criterion) { let mut group = c.benchmark_group("find_sorted_runs"); - for size in [10, 100, 1000].iter() { - group.bench_function(format!("size_{}", size), |b| { - let mut files = generate_test_files(*size); + for (total_files, files_per_timestamp) in [(5000, 1000), (50000, 1000), (50000, 5000)] { + let case_name = format!( + "{}_files_{}_per_timestamp_10min", + total_files, files_per_timestamp + ); + + group.bench_function(format!("{}_new", case_name), |b| { + let mut files = generate_same_timestamp_files(total_files, files_per_timestamp); b.iter(|| { find_sorted_runs(black_box(&mut files)); }); }); + + group.bench_function(format!("{}_old", case_name), |b| { + let mut files = generate_same_timestamp_files(total_files, files_per_timestamp); + b.iter(|| { + find_sorted_runs_original(black_box(&mut files)); + }); + }); } group.finish(); } @@ -95,12 +135,14 @@ fn bench_find_overlapping_items(c: &mut Criterion) { files1.push(MockFile { start: i as i64, end: (i + 5) as i64, + pk: 0, size: 10, }); files2.push(MockFile { start: (i + 3) as i64, end: (i + 8) as i64, + pk: 0, size: 10, }); } @@ -137,6 +179,7 @@ fn bench_merge_seq_files(c: &mut Criterion) { files.push(MockFile { start: i as i64, end: (i + 1) as i64, + pk: 0, size: file_size, }); } diff --git a/src/mito2/src/compaction/run.rs b/src/mito2/src/compaction/run.rs index 27d4f5779e..ecd19666e2 100644 --- a/src/mito2/src/compaction/run.rs +++ b/src/mito2/src/compaction/run.rs @@ -324,6 +324,72 @@ where // sort files sort_ranged_items(items); + let mut current_run = SortedRun::default(); + let mut runs = vec![]; + let mut active_run_item_indices = Vec::new(); + + let mut selection = BitVec::repeat(false, items.len()); + while !selection.all() { + // until all items are assigned to some sorted run. + let mut last_pruned_start = None; + for (item, mut selected) in items.iter().zip(selection.iter_mut()) { + if *selected { + // item is already assigned. + continue; + } + if current_run.items.is_empty() { + // current run is empty, just add current_item + selected.set(true); + current_run.push_item(item.clone()); + active_run_item_indices.push(current_run.items.len() - 1); + } else { + // the current item does not overlap with any item in current run, + // then it belongs to current run. Because now we introduced primary + // key range, we cannot simply use timestamps to check overlapping. + let (item_start, _) = item.range(); + if last_pruned_start != Some(item_start) { + active_run_item_indices.retain(|idx| { + let (_, run_item_end) = current_run.items[*idx].range(); + run_item_end > item_start + }); + last_pruned_start = Some(item_start); + } + + let mut overlaps_any = false; + for idx in &active_run_item_indices { + let run_item = ¤t_run.items[*idx]; + if run_item.overlap(item) { + overlaps_any = true; + break; + } + } + if !overlaps_any { + // does not overlap, push to current run + selected.set(true); + let item_idx = current_run.items.len(); + current_run.push_item(item.clone()); + active_run_item_indices.push(item_idx); + } + } + } + // finished an iteration, we've found a new run. + runs.push(std::mem::take(&mut current_run)); + active_run_item_indices.clear(); + } + runs +} + +#[cfg(any(test, feature = "test", feature = "testing"))] +pub fn find_sorted_runs_original(items: &mut [T]) -> Vec> +where + T: Item, +{ + if items.is_empty() { + return vec![]; + } + // sort files + sort_ranged_items(items); + let mut current_run = SortedRun::default(); let mut runs = vec![]; @@ -543,6 +609,30 @@ mod tests { runs } + fn sorted_run_ranges(runs: &[SortedRun]) -> Vec> { + runs.iter() + .map(|r| { + r.items + .iter() + .flat_map(|f| { + let (start, end) = f.range(); + [start, end] + }) + .collect() + }) + .collect() + } + + fn check_find_sorted_runs_consistency(ranges: &[(i64, i64)]) { + let mut files = build_items(ranges); + let mut files_for_original = files.clone(); + + let runs = find_sorted_runs(&mut files); + let original_runs = find_sorted_runs_original(&mut files_for_original); + + assert_eq!(sorted_run_ranges(&original_runs), sorted_run_ranges(&runs)); + } + #[test] fn test_find_sorted_runs() { check_sorted_runs(&[], &[]); @@ -594,6 +684,27 @@ mod tests { ); } + #[test] + fn test_find_sorted_runs_matches_original_impl() { + for ranges in [ + &[][..], + &[(1, 1), (2, 2)], + &[(1, 2), (2, 3)], + &[(2, 4), (1, 3)], + &[(1, 3), (2, 4), (4, 5)], + &[(1, 2), (3, 4), (3, 5)], + &[(1, 3), (2, 4), (5, 6)], + &[(1, 2), (3, 5), (4, 6)], + &[(1, 2), (3, 4), (4, 6), (7, 8)], + &[(1, 2), (3, 4), (5, 6), (3, 6), (7, 8), (8, 9)], + &[(10, 19), (20, 21), (20, 29), (30, 39)], + &[(10, 19), (20, 29), (21, 22), (30, 39), (31, 32), (32, 42)], + &[(32, 42), (10, 19), (31, 32), (20, 29), (21, 22), (30, 39)], + ] { + check_find_sorted_runs_consistency(ranges); + } + } + fn check_reduce_runs( files: &[(i64, i64)], expected_runs: &[Vec<(i64, i64)>],