refactor: find sorted runs by time range only

Signed-off-by: luofucong <luofc@foxmail.com>
This commit is contained in:
luofucong
2026-05-22 16:38:35 +08:00
parent f1ad472075
commit 9ec163badb
5 changed files with 354 additions and 3 deletions

1
Cargo.lock generated
View File

@@ -8295,6 +8295,7 @@ dependencies = [
"datafusion-common",
"datafusion-expr",
"datatypes",
"derive_more",
"dotenv",
"either",
"futures",

View File

@@ -50,6 +50,7 @@ datafusion-common.workspace = true
datafusion-expr.workspace = true
datatypes.workspace = true
dashmap.workspace = true
derive_more.workspace = true
dotenv.workspace = true
either.workspace = true
futures.workspace = true

View File

@@ -15,6 +15,9 @@
//! This file contains code to find sorted runs in a set if ranged items and
//! along with the best way to merge these items to satisfy the desired run count.
use std::cmp::Ordering;
use std::collections::BinaryHeap;
use bytes::{Buf, Bytes};
use common_base::BitVec;
use common_base::readable_size::ReadableSize;
@@ -423,6 +426,126 @@ where
runs
}
pub(crate) fn find_sorted_runs_by_time_range<T>(items: &mut [T]) -> Vec<SortedRun<T>>
where
T: Item,
{
if items.is_empty() {
return vec![];
}
// sort files
sort_ranged_items(items);
use derive_more::{Eq, PartialEq};
/// `SortedRun` with a creation sequence `i`.
#[derive(PartialEq, Eq)]
struct Run<T: Item> {
i: usize,
#[partial_eq(skip)]
run: SortedRun<T>,
}
impl<T: Item> Run<T> {
fn new(i: usize, item: &T) -> Run<T> {
let mut run = SortedRun::default();
run.push_item(item.clone());
Run { i, run }
}
fn push_item(&mut self, item: &T) {
self.run.push_item(item.clone());
}
}
impl<T: Item> PartialOrd for Run<T> {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
/// Sort by run's `end` desc then `start` asc.
impl<T: Item> Ord for Run<T> {
fn cmp(&self, other: &Self) -> Ordering {
let l_run = &self.run;
let r_run = &other.run;
let l_end = l_run.end.unwrap();
let r_end = r_run.end.unwrap();
r_end.cmp(&l_end).then_with(|| {
let l_start = l_run.start.unwrap();
let r_start = r_run.start.unwrap();
l_start.cmp(&r_start)
})
}
}
/// Wrapper around the `Run` above, to support sorting them by their creation sequence `i`.
#[derive(PartialEq, Eq)]
struct Wrapper<T: Item>(Run<T>);
impl<T: Item> PartialOrd for Wrapper<T> {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl<T: Item> Ord for Wrapper<T> {
fn cmp(&self, other: &Self) -> Ordering {
other.0.i.cmp(&self.0.i)
}
}
// Two heaps for finding a run that is both:
// 1. not overlapping with item's range,
// 2. and is created earliest,
// when iterating the items.
//
// Heap 1 (`runs_sorted_by_end`) is for storing the runs of which top has the minimal "end"
// just about to overlap with the current selected item.
//
// Heap 2 (`runs_sort_by_index`) is for storing the runs that all have "end"s non-overlap with
// the current selected item, and of which top is the earliest created run.
//
// The finding of a suitable run basically works like this:
// 1. moves the runs in heap 1 to heap 2, until the top is overlapping with the current item;
// 2. now heap 2 has all the runs that can accept the current item, pop its top;
// 3. the top is the earliest created run, push the current item;
// 4. because the run has changed, push it back to heap 1;
// 5. check the next item. Important: we don't need to push the runs in heap 2 to 1, because
// the items are sorted by "start". When checking the next item, heap 2's runs must all have
// "end"s smaller than next item's "start".
//
// Actually the heap 2 is only for aligning with the runs selection outcomes in the original
// `find_sorted_runs` implementation. If we just need the invariant that each run has the
// non-overlapping items, we can get rid of heap 2 and make the codes simpler.
let mut runs_sort_by_end = BinaryHeap::<Run<T>>::new();
let mut runs_sort_by_index = BinaryHeap::<Wrapper<T>>::new();
let mut i = 0;
for item in items {
let (start, _) = item.range();
while let Some(run) = runs_sort_by_end.pop_if(|x| x.run.end.unwrap() <= start) {
runs_sort_by_index.push(Wrapper(run));
}
let Some(mut run) = runs_sort_by_index.pop() else {
i += 1;
runs_sort_by_end.push(Run::new(i, item));
continue;
};
run.0.push_item(item);
runs_sort_by_end.push(run.0);
}
let mut runs = runs_sort_by_end.into_vec();
runs.extend(runs_sort_by_index.into_vec().into_iter().map(|x| x.0));
runs.sort_unstable_by_key(|run| run.i);
runs.into_iter().map(|x| x.run).collect()
}
/// Finds a set of files with minimum penalty to merge that can reduce the total num of runs.
/// The penalty of merging is defined as the size of all overlapping files between two runs.
pub fn reduce_runs<T: Item>(mut runs: Vec<SortedRun<T>>) -> Vec<T> {
@@ -540,12 +663,20 @@ pub fn merge_seq_files<T: Item>(input_files: &[T], max_file_size: Option<u64>) -
#[cfg(test)]
mod tests {
use std::collections::HashSet;
use std::fs;
use std::num::NonZeroU64;
use std::path::Path;
use std::time::Instant;
use bytes::Bytes;
use store_api::storage::FileId;
use super::*;
use crate::compaction::test_util::new_file_handle_with_size_sequence_and_primary_key_range;
use crate::sst::file::{FileHandle, FileMeta};
use crate::test_util::new_noop_file_purger;
const SORTED_RUNS_MANIFEST_PATH: &str = "./ssts_manifest_202605201245.csv";
#[derive(Clone, Debug, PartialEq)]
struct MockFile {
@@ -594,6 +725,123 @@ mod tests {
Some((Bytes::from_static(min), Bytes::from_static(max)))
}
fn parse_csv_record(line: &str) -> Vec<String> {
let mut fields = Vec::new();
let mut field = String::new();
let mut chars = line.trim_end_matches('\r').chars().peekable();
let mut in_quotes = false;
while let Some(ch) = chars.next() {
match ch {
'"' if in_quotes && chars.peek() == Some(&'"') => {
field.push('"');
chars.next();
}
'"' => {
in_quotes = !in_quotes;
}
',' if !in_quotes => {
fields.push(std::mem::take(&mut field));
}
_ => field.push(ch),
}
}
fields.push(field);
fields
}
fn primary_key_range_from_manifest(
primary_key_min: &str,
primary_key_max: &str,
) -> Option<(Bytes, Bytes)> {
if primary_key_min.is_empty() || primary_key_max.is_empty() {
return None;
}
Some((
Bytes::copy_from_slice(primary_key_min.as_bytes()),
Bytes::copy_from_slice(primary_key_max.as_bytes()),
))
}
fn new_file_group_from_manifest_record(
record: &[String],
sequence: u64,
) -> Result<FileGroup, Box<dyn std::error::Error>> {
assert_eq!(6, record.len());
let file_purger = new_noop_file_purger();
let file = FileHandle::new_with_primary_key_range(
FileMeta {
region_id: 0.into(),
file_id: FileId::parse_str(&record[1])?,
time_range: (
Timestamp::new_microsecond(record[2].parse()?),
Timestamp::new_microsecond(record[3].parse()?),
),
level: 0,
file_size: 1,
max_row_group_uncompressed_size: 1,
available_indexes: Default::default(),
indexes: Default::default(),
index_file_size: 0,
index_version: 0,
num_rows: 0,
num_row_groups: 0,
num_series: 0,
sequence: NonZeroU64::new(sequence),
partition_expr: None,
..Default::default()
},
file_purger,
primary_key_range_from_manifest(&record[4], &record[5]),
);
Ok(FileGroup::new_with_file(file))
}
fn load_manifest_file_groups(
path: &Path,
) -> Result<Vec<FileGroup>, Box<dyn std::error::Error>> {
let data = fs::read(path)?;
let data = String::from_utf8_lossy(&data);
let mut lines = data.lines();
let header = lines.next().ok_or("missing manifest header")?;
assert_eq!(
[
"table_id",
"file_id",
"min_ts_us",
"max_ts_us",
"primary_key_min",
"primary_key_max"
],
parse_csv_record(header).as_slice()
);
let mut files = Vec::new();
for (line_idx, line) in lines.enumerate() {
if line.is_empty() {
continue;
}
let record = parse_csv_record(line);
assert_eq!(
6,
record.len(),
"invalid CSV record at line {}",
line_idx + 2
);
files.push(new_file_group_from_manifest_record(
&record,
(line_idx + 1) as u64,
)?);
}
Ok(files)
}
fn check_sorted_runs(
ranges: &[(i64, i64)],
expected_runs: &[Vec<(i64, i64)>],
@@ -623,6 +871,46 @@ mod tests {
.collect()
}
fn run_lengths<T: Item>(runs: &[SortedRun<T>]) -> Vec<usize> {
runs.iter().map(|run| run.items().len()).collect()
}
fn max_run_len<T: Item>(runs: &[SortedRun<T>]) -> usize {
runs.iter().map(|run| run.items().len()).max().unwrap_or(0)
}
fn log_sorted_runs_comparison<T: Item>(
baseline_name: &str,
baseline: &[SortedRun<T>],
target_name: &str,
target: &[SortedRun<T>],
) {
let baseline_lengths = run_lengths(baseline);
let target_lengths = run_lengths(target);
let baseline_ranges = sorted_run_ranges(baseline);
let target_ranges = sorted_run_ranges(target);
let first_mismatch_idx = baseline_ranges
.iter()
.zip(target_ranges.iter())
.position(|(lhs, rhs)| lhs != rhs)
.or_else(|| {
(baseline_ranges.len() != target_ranges.len())
.then_some(baseline_ranges.len().min(target_ranges.len()))
});
common_telemetry::info!(
"sorted runs comparison: baseline={}, target={}, same_run_count={}, same_run_lengths={}, same_run_ranges={}, baseline_runs={}, target_runs={}, first_mismatch_idx={:?}",
baseline_name,
target_name,
baseline.len() == target.len(),
baseline_lengths == target_lengths,
baseline_ranges == target_ranges,
baseline.len(),
target.len(),
first_mismatch_idx
);
}
fn check_find_sorted_runs_consistency(ranges: &[(i64, i64)]) {
let mut files = build_items(ranges);
let mut files_for_original = files.clone();
@@ -705,6 +993,66 @@ mod tests {
}
}
#[test]
#[ignore]
fn test_find_sorted_runs_manifest_performance() {
common_telemetry::init_default_ut_logging();
let files = load_manifest_file_groups(Path::new(SORTED_RUNS_MANIFEST_PATH)).unwrap();
let mut files_for_current = files.clone();
let mut files_for_original = files.clone();
let mut files_for_2 = files.clone();
let start = Instant::now();
let current_runs = find_sorted_runs(&mut files_for_current);
let elapsed = start.elapsed();
assert!(!files_for_current.is_empty());
common_telemetry::info!(
"find_sorted_runs manifest performance: files={}, runs={}, max_run_len={}, elapsed={:?}",
files_for_current.len(),
current_runs.len(),
max_run_len(&current_runs),
elapsed
);
let start = Instant::now();
let original_runs = find_sorted_runs_original(&mut files_for_original);
let elapsed = start.elapsed();
assert!(!files_for_original.is_empty());
common_telemetry::info!(
"find_sorted_runs_original manifest performance: files={}, runs={}, max_run_len={}, elapsed={:?}",
files_for_original.len(),
original_runs.len(),
max_run_len(&original_runs),
elapsed
);
let start = Instant::now();
let runs_2 = find_sorted_runs_by_time_range(&mut files_for_2);
let elapsed = start.elapsed();
assert!(!files_for_2.is_empty());
common_telemetry::info!(
"find_sorted_runs_2 manifest performance: files={}, runs={}, max_run_len={}, elapsed={:?}",
files_for_2.len(),
runs_2.len(),
max_run_len(&runs_2),
elapsed
);
log_sorted_runs_comparison(
"find_sorted_runs_original",
&original_runs,
"find_sorted_runs",
&current_runs,
);
log_sorted_runs_comparison(
"find_sorted_runs_original",
&original_runs,
"find_sorted_runs_2",
&runs_2,
);
}
fn check_reduce_runs(
files: &[(i64, i64)],
expected_runs: &[Vec<(i64, i64)>],

View File

@@ -28,8 +28,8 @@ use crate::compaction::buckets::infer_time_bucket;
use crate::compaction::compactor::CompactionRegion;
use crate::compaction::picker::{Picker, PickerOutput};
use crate::compaction::run::{
FileGroup, Item, Ranged, find_sorted_runs, merge_primary_key_ranges, merge_seq_files,
primary_key_ranges_overlap, reduce_runs,
FileGroup, Item, Ranged, find_sorted_runs_by_time_range, merge_primary_key_ranges,
merge_seq_files, primary_key_ranges_overlap, reduce_runs,
};
use crate::compaction::{CompactionOutput, get_expired_ssts};
use crate::sst::file::{FileHandle, Level, overlaps};
@@ -88,7 +88,7 @@ impl TwcsPicker {
);
}
let sorted_runs = find_sorted_runs(&mut files_to_merge);
let sorted_runs = find_sorted_runs_by_time_range(&mut files_to_merge);
let found_runs = sorted_runs.len();
// We only remove deletion markers if we found less than 2 runs and not in append mode.
// because after compaction there will be no overlapping files.

View File

@@ -18,6 +18,7 @@
#![feature(debug_closure_helpers)]
#![feature(duration_constructors)]
#![feature(binary_heap_pop_if)]
#[cfg(any(test, feature = "test"))]
#[cfg_attr(feature = "test", allow(unused))]