mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-05-20 15:00:40 +00:00
fix: optimize sorted run picking (#8128)
* fix/optimize-pick: Optimize sorted run picking - `find_sorted_runs`: prune inactive run candidates and keep `find_sorted_runs_original` for testing in `src/mito2/src/compaction/run.rs` - `bench_compaction_picker`: compare old and new sorted-run picking with PK-aware workloads in `src/mito2/benches/bench_compaction_picker.rs` - `features`: expose `testing` for benchmark-only helpers in `src/mito2/Cargo.toml` Signed-off-by: Lei, HUANG <mrsatangel@gmail.com> * fix/optimize-pick: Remove redundant overlap guard - `find_sorted_runs`: simplify active candidate overlap checks in `src/mito2/src/compaction/run.rs` Signed-off-by: Lei, HUANG <mrsatangel@gmail.com> * fix: apply review comment Signed-off-by: Lei, HUANG <mrsatangel@gmail.com> --------- Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>
This commit is contained in:
@@ -7,6 +7,7 @@ license.workspace = true
|
||||
[features]
|
||||
default = []
|
||||
test = ["common-test-util", "rstest", "rstest_reuse", "rskafka"]
|
||||
testing = ["test"]
|
||||
enterprise = []
|
||||
vector_index = ["dep:roaring", "index/vector_index"]
|
||||
|
||||
@@ -123,7 +124,7 @@ required-features = ["test"]
|
||||
[[bench]]
|
||||
name = "bench_compaction_picker"
|
||||
harness = false
|
||||
required-features = ["test"]
|
||||
required-features = ["testing"]
|
||||
|
||||
[[bench]]
|
||||
name = "simple_bulk_memtable"
|
||||
|
||||
@@ -16,13 +16,15 @@ use std::hint::black_box;
|
||||
|
||||
use criterion::{Criterion, criterion_group, criterion_main};
|
||||
use mito2::compaction::run::{
|
||||
Item, Ranged, SortedRun, find_overlapping_items, find_sorted_runs, merge_seq_files, reduce_runs,
|
||||
Item, Ranged, SortedRun, find_overlapping_items, find_sorted_runs, find_sorted_runs_original,
|
||||
merge_seq_files, reduce_runs,
|
||||
};
|
||||
|
||||
#[derive(Clone, Debug, Eq, Hash, PartialEq)]
|
||||
struct MockFile {
|
||||
start: i64,
|
||||
end: i64,
|
||||
pk: usize,
|
||||
size: usize,
|
||||
}
|
||||
|
||||
@@ -32,6 +34,13 @@ impl Ranged for MockFile {
|
||||
fn range(&self) -> (Self::BoundType, Self::BoundType) {
|
||||
(self.start, self.end)
|
||||
}
|
||||
|
||||
fn overlap(&self, other: &Self) -> bool {
|
||||
let (lhs_start, lhs_end) = self.range();
|
||||
let (rhs_start, rhs_end) = other.range();
|
||||
|
||||
lhs_start.max(rhs_start) < lhs_end.min(rhs_end) && self.pk == other.pk
|
||||
}
|
||||
}
|
||||
|
||||
impl Item for MockFile {
|
||||
@@ -47,22 +56,53 @@ fn generate_test_files(n: usize) -> Vec<MockFile> {
|
||||
files.push(MockFile {
|
||||
start: 0,
|
||||
end: 10,
|
||||
pk: 0,
|
||||
size: 10,
|
||||
});
|
||||
}
|
||||
files
|
||||
}
|
||||
|
||||
fn generate_same_timestamp_files(total_files: usize, files_per_timestamp: usize) -> Vec<MockFile> {
|
||||
const TIMESTAMP_INTERVAL_SECS: i64 = 10 * 60;
|
||||
|
||||
let mut files = Vec::with_capacity(total_files);
|
||||
for idx in 0..total_files {
|
||||
let timestamp_idx = idx / files_per_timestamp;
|
||||
let start = timestamp_idx as i64 * TIMESTAMP_INTERVAL_SECS;
|
||||
files.push(MockFile {
|
||||
start,
|
||||
end: start + TIMESTAMP_INTERVAL_SECS,
|
||||
pk: idx % files_per_timestamp,
|
||||
size: 10,
|
||||
});
|
||||
}
|
||||
|
||||
files
|
||||
}
|
||||
|
||||
fn bench_find_sorted_runs(c: &mut Criterion) {
|
||||
let mut group = c.benchmark_group("find_sorted_runs");
|
||||
|
||||
for size in [10, 100, 1000].iter() {
|
||||
group.bench_function(format!("size_{}", size), |b| {
|
||||
let mut files = generate_test_files(*size);
|
||||
for (total_files, files_per_timestamp) in [(5000, 1000), (50000, 1000), (50000, 5000)] {
|
||||
let case_name = format!(
|
||||
"{}_files_{}_per_timestamp_10min",
|
||||
total_files, files_per_timestamp
|
||||
);
|
||||
|
||||
group.bench_function(format!("{}_new", case_name), |b| {
|
||||
let mut files = generate_same_timestamp_files(total_files, files_per_timestamp);
|
||||
b.iter(|| {
|
||||
find_sorted_runs(black_box(&mut files));
|
||||
});
|
||||
});
|
||||
|
||||
group.bench_function(format!("{}_old", case_name), |b| {
|
||||
let mut files = generate_same_timestamp_files(total_files, files_per_timestamp);
|
||||
b.iter(|| {
|
||||
find_sorted_runs_original(black_box(&mut files));
|
||||
});
|
||||
});
|
||||
}
|
||||
group.finish();
|
||||
}
|
||||
@@ -95,12 +135,14 @@ fn bench_find_overlapping_items(c: &mut Criterion) {
|
||||
files1.push(MockFile {
|
||||
start: i as i64,
|
||||
end: (i + 5) as i64,
|
||||
pk: 0,
|
||||
size: 10,
|
||||
});
|
||||
|
||||
files2.push(MockFile {
|
||||
start: (i + 3) as i64,
|
||||
end: (i + 8) as i64,
|
||||
pk: 0,
|
||||
size: 10,
|
||||
});
|
||||
}
|
||||
@@ -137,6 +179,7 @@ fn bench_merge_seq_files(c: &mut Criterion) {
|
||||
files.push(MockFile {
|
||||
start: i as i64,
|
||||
end: (i + 1) as i64,
|
||||
pk: 0,
|
||||
size: file_size,
|
||||
});
|
||||
}
|
||||
|
||||
@@ -324,6 +324,72 @@ where
|
||||
// sort files
|
||||
sort_ranged_items(items);
|
||||
|
||||
let mut current_run = SortedRun::default();
|
||||
let mut runs = vec![];
|
||||
let mut active_run_item_indices = Vec::new();
|
||||
|
||||
let mut selection = BitVec::repeat(false, items.len());
|
||||
while !selection.all() {
|
||||
// until all items are assigned to some sorted run.
|
||||
let mut last_pruned_start = None;
|
||||
for (item, mut selected) in items.iter().zip(selection.iter_mut()) {
|
||||
if *selected {
|
||||
// item is already assigned.
|
||||
continue;
|
||||
}
|
||||
if current_run.items.is_empty() {
|
||||
// current run is empty, just add current_item
|
||||
selected.set(true);
|
||||
current_run.push_item(item.clone());
|
||||
active_run_item_indices.push(current_run.items.len() - 1);
|
||||
} else {
|
||||
// the current item does not overlap with any item in current run,
|
||||
// then it belongs to current run. Because now we introduced primary
|
||||
// key range, we cannot simply use timestamps to check overlapping.
|
||||
let (item_start, _) = item.range();
|
||||
if last_pruned_start != Some(item_start) {
|
||||
active_run_item_indices.retain(|idx| {
|
||||
let (_, run_item_end) = current_run.items[*idx].range();
|
||||
run_item_end > item_start
|
||||
});
|
||||
last_pruned_start = Some(item_start);
|
||||
}
|
||||
|
||||
let mut overlaps_any = false;
|
||||
for idx in &active_run_item_indices {
|
||||
let run_item = ¤t_run.items[*idx];
|
||||
if run_item.overlap(item) {
|
||||
overlaps_any = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if !overlaps_any {
|
||||
// does not overlap, push to current run
|
||||
selected.set(true);
|
||||
let item_idx = current_run.items.len();
|
||||
current_run.push_item(item.clone());
|
||||
active_run_item_indices.push(item_idx);
|
||||
}
|
||||
}
|
||||
}
|
||||
// finished an iteration, we've found a new run.
|
||||
runs.push(std::mem::take(&mut current_run));
|
||||
active_run_item_indices.clear();
|
||||
}
|
||||
runs
|
||||
}
|
||||
|
||||
#[cfg(any(test, feature = "test", feature = "testing"))]
|
||||
pub fn find_sorted_runs_original<T>(items: &mut [T]) -> Vec<SortedRun<T>>
|
||||
where
|
||||
T: Item,
|
||||
{
|
||||
if items.is_empty() {
|
||||
return vec![];
|
||||
}
|
||||
// sort files
|
||||
sort_ranged_items(items);
|
||||
|
||||
let mut current_run = SortedRun::default();
|
||||
let mut runs = vec![];
|
||||
|
||||
@@ -543,6 +609,30 @@ mod tests {
|
||||
runs
|
||||
}
|
||||
|
||||
fn sorted_run_ranges<T: Item>(runs: &[SortedRun<T>]) -> Vec<Vec<T::BoundType>> {
|
||||
runs.iter()
|
||||
.map(|r| {
|
||||
r.items
|
||||
.iter()
|
||||
.flat_map(|f| {
|
||||
let (start, end) = f.range();
|
||||
[start, end]
|
||||
})
|
||||
.collect()
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
|
||||
fn check_find_sorted_runs_consistency(ranges: &[(i64, i64)]) {
|
||||
let mut files = build_items(ranges);
|
||||
let mut files_for_original = files.clone();
|
||||
|
||||
let runs = find_sorted_runs(&mut files);
|
||||
let original_runs = find_sorted_runs_original(&mut files_for_original);
|
||||
|
||||
assert_eq!(sorted_run_ranges(&original_runs), sorted_run_ranges(&runs));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_find_sorted_runs() {
|
||||
check_sorted_runs(&[], &[]);
|
||||
@@ -594,6 +684,27 @@ mod tests {
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_find_sorted_runs_matches_original_impl() {
|
||||
for ranges in [
|
||||
&[][..],
|
||||
&[(1, 1), (2, 2)],
|
||||
&[(1, 2), (2, 3)],
|
||||
&[(2, 4), (1, 3)],
|
||||
&[(1, 3), (2, 4), (4, 5)],
|
||||
&[(1, 2), (3, 4), (3, 5)],
|
||||
&[(1, 3), (2, 4), (5, 6)],
|
||||
&[(1, 2), (3, 5), (4, 6)],
|
||||
&[(1, 2), (3, 4), (4, 6), (7, 8)],
|
||||
&[(1, 2), (3, 4), (5, 6), (3, 6), (7, 8), (8, 9)],
|
||||
&[(10, 19), (20, 21), (20, 29), (30, 39)],
|
||||
&[(10, 19), (20, 29), (21, 22), (30, 39), (31, 32), (32, 42)],
|
||||
&[(32, 42), (10, 19), (31, 32), (20, 29), (21, 22), (30, 39)],
|
||||
] {
|
||||
check_find_sorted_runs_consistency(ranges);
|
||||
}
|
||||
}
|
||||
|
||||
fn check_reduce_runs(
|
||||
files: &[(i64, i64)],
|
||||
expected_runs: &[Vec<(i64, i64)>],
|
||||
|
||||
Reference in New Issue
Block a user