From 9ec163badb3c5ddb758abafce201786c3fb13a80 Mon Sep 17 00:00:00 2001 From: luofucong Date: Fri, 22 May 2026 16:38:35 +0800 Subject: [PATCH] refactor: find sorted runs by time range only Signed-off-by: luofucong --- Cargo.lock | 1 + src/mito2/Cargo.toml | 1 + src/mito2/src/compaction/run.rs | 348 +++++++++++++++++++++++++++++++ src/mito2/src/compaction/twcs.rs | 6 +- src/mito2/src/lib.rs | 1 + 5 files changed, 354 insertions(+), 3 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index aafa225b4b..923f6eba61 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8295,6 +8295,7 @@ dependencies = [ "datafusion-common", "datafusion-expr", "datatypes", + "derive_more", "dotenv", "either", "futures", diff --git a/src/mito2/Cargo.toml b/src/mito2/Cargo.toml index dde1e44ea1..4b629f38fc 100644 --- a/src/mito2/Cargo.toml +++ b/src/mito2/Cargo.toml @@ -50,6 +50,7 @@ datafusion-common.workspace = true datafusion-expr.workspace = true datatypes.workspace = true dashmap.workspace = true +derive_more.workspace = true dotenv.workspace = true either.workspace = true futures.workspace = true diff --git a/src/mito2/src/compaction/run.rs b/src/mito2/src/compaction/run.rs index ecd19666e2..54100ee1f0 100644 --- a/src/mito2/src/compaction/run.rs +++ b/src/mito2/src/compaction/run.rs @@ -15,6 +15,9 @@ //! This file contains code to find sorted runs in a set if ranged items and //! along with the best way to merge these items to satisfy the desired run count. +use std::cmp::Ordering; +use std::collections::BinaryHeap; + use bytes::{Buf, Bytes}; use common_base::BitVec; use common_base::readable_size::ReadableSize; @@ -423,6 +426,126 @@ where runs } +pub(crate) fn find_sorted_runs_by_time_range(items: &mut [T]) -> Vec> +where + T: Item, +{ + if items.is_empty() { + return vec![]; + } + // sort files + sort_ranged_items(items); + + use derive_more::{Eq, PartialEq}; + + /// `SortedRun` with a creation sequence `i`. + #[derive(PartialEq, Eq)] + struct Run { + i: usize, + #[partial_eq(skip)] + run: SortedRun, + } + + impl Run { + fn new(i: usize, item: &T) -> Run { + let mut run = SortedRun::default(); + run.push_item(item.clone()); + Run { i, run } + } + + fn push_item(&mut self, item: &T) { + self.run.push_item(item.clone()); + } + } + + impl PartialOrd for Run { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } + } + + /// Sort by run's `end` desc then `start` asc. + impl Ord for Run { + fn cmp(&self, other: &Self) -> Ordering { + let l_run = &self.run; + let r_run = &other.run; + let l_end = l_run.end.unwrap(); + let r_end = r_run.end.unwrap(); + r_end.cmp(&l_end).then_with(|| { + let l_start = l_run.start.unwrap(); + let r_start = r_run.start.unwrap(); + l_start.cmp(&r_start) + }) + } + } + + /// Wrapper around the `Run` above, to support sorting them by their creation sequence `i`. + #[derive(PartialEq, Eq)] + struct Wrapper(Run); + + impl PartialOrd for Wrapper { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } + } + + impl Ord for Wrapper { + fn cmp(&self, other: &Self) -> Ordering { + other.0.i.cmp(&self.0.i) + } + } + + // Two heaps for finding a run that is both: + // 1. not overlapping with item's range, + // 2. and is created earliest, + // when iterating the items. + // + // Heap 1 (`runs_sorted_by_end`) is for storing the runs of which top has the minimal "end" + // just about to overlap with the current selected item. + // + // Heap 2 (`runs_sort_by_index`) is for storing the runs that all have "end"s non-overlap with + // the current selected item, and of which top is the earliest created run. + // + // The finding of a suitable run basically works like this: + // 1. moves the runs in heap 1 to heap 2, until the top is overlapping with the current item; + // 2. now heap 2 has all the runs that can accept the current item, pop its top; + // 3. the top is the earliest created run, push the current item; + // 4. because the run has changed, push it back to heap 1; + // 5. check the next item. Important: we don't need to push the runs in heap 2 to 1, because + // the items are sorted by "start". When checking the next item, heap 2's runs must all have + // "end"s smaller than next item's "start". + // + // Actually the heap 2 is only for aligning with the runs selection outcomes in the original + // `find_sorted_runs` implementation. If we just need the invariant that each run has the + // non-overlapping items, we can get rid of heap 2 and make the codes simpler. + + let mut runs_sort_by_end = BinaryHeap::>::new(); + let mut runs_sort_by_index = BinaryHeap::>::new(); + let mut i = 0; + + for item in items { + let (start, _) = item.range(); + + while let Some(run) = runs_sort_by_end.pop_if(|x| x.run.end.unwrap() <= start) { + runs_sort_by_index.push(Wrapper(run)); + } + + let Some(mut run) = runs_sort_by_index.pop() else { + i += 1; + runs_sort_by_end.push(Run::new(i, item)); + continue; + }; + + run.0.push_item(item); + runs_sort_by_end.push(run.0); + } + + let mut runs = runs_sort_by_end.into_vec(); + runs.extend(runs_sort_by_index.into_vec().into_iter().map(|x| x.0)); + runs.sort_unstable_by_key(|run| run.i); + runs.into_iter().map(|x| x.run).collect() +} + /// Finds a set of files with minimum penalty to merge that can reduce the total num of runs. /// The penalty of merging is defined as the size of all overlapping files between two runs. pub fn reduce_runs(mut runs: Vec>) -> Vec { @@ -540,12 +663,20 @@ pub fn merge_seq_files(input_files: &[T], max_file_size: Option) - #[cfg(test)] mod tests { use std::collections::HashSet; + use std::fs; + use std::num::NonZeroU64; + use std::path::Path; + use std::time::Instant; use bytes::Bytes; use store_api::storage::FileId; use super::*; use crate::compaction::test_util::new_file_handle_with_size_sequence_and_primary_key_range; + use crate::sst::file::{FileHandle, FileMeta}; + use crate::test_util::new_noop_file_purger; + + const SORTED_RUNS_MANIFEST_PATH: &str = "./ssts_manifest_202605201245.csv"; #[derive(Clone, Debug, PartialEq)] struct MockFile { @@ -594,6 +725,123 @@ mod tests { Some((Bytes::from_static(min), Bytes::from_static(max))) } + fn parse_csv_record(line: &str) -> Vec { + let mut fields = Vec::new(); + let mut field = String::new(); + let mut chars = line.trim_end_matches('\r').chars().peekable(); + let mut in_quotes = false; + + while let Some(ch) = chars.next() { + match ch { + '"' if in_quotes && chars.peek() == Some(&'"') => { + field.push('"'); + chars.next(); + } + '"' => { + in_quotes = !in_quotes; + } + ',' if !in_quotes => { + fields.push(std::mem::take(&mut field)); + } + _ => field.push(ch), + } + } + fields.push(field); + + fields + } + + fn primary_key_range_from_manifest( + primary_key_min: &str, + primary_key_max: &str, + ) -> Option<(Bytes, Bytes)> { + if primary_key_min.is_empty() || primary_key_max.is_empty() { + return None; + } + + Some(( + Bytes::copy_from_slice(primary_key_min.as_bytes()), + Bytes::copy_from_slice(primary_key_max.as_bytes()), + )) + } + + fn new_file_group_from_manifest_record( + record: &[String], + sequence: u64, + ) -> Result> { + assert_eq!(6, record.len()); + + let file_purger = new_noop_file_purger(); + let file = FileHandle::new_with_primary_key_range( + FileMeta { + region_id: 0.into(), + file_id: FileId::parse_str(&record[1])?, + time_range: ( + Timestamp::new_microsecond(record[2].parse()?), + Timestamp::new_microsecond(record[3].parse()?), + ), + level: 0, + file_size: 1, + max_row_group_uncompressed_size: 1, + available_indexes: Default::default(), + indexes: Default::default(), + index_file_size: 0, + index_version: 0, + num_rows: 0, + num_row_groups: 0, + num_series: 0, + sequence: NonZeroU64::new(sequence), + partition_expr: None, + ..Default::default() + }, + file_purger, + primary_key_range_from_manifest(&record[4], &record[5]), + ); + + Ok(FileGroup::new_with_file(file)) + } + + fn load_manifest_file_groups( + path: &Path, + ) -> Result, Box> { + let data = fs::read(path)?; + let data = String::from_utf8_lossy(&data); + let mut lines = data.lines(); + let header = lines.next().ok_or("missing manifest header")?; + assert_eq!( + [ + "table_id", + "file_id", + "min_ts_us", + "max_ts_us", + "primary_key_min", + "primary_key_max" + ], + parse_csv_record(header).as_slice() + ); + + let mut files = Vec::new(); + for (line_idx, line) in lines.enumerate() { + if line.is_empty() { + continue; + } + + let record = parse_csv_record(line); + assert_eq!( + 6, + record.len(), + "invalid CSV record at line {}", + line_idx + 2 + ); + files.push(new_file_group_from_manifest_record( + &record, + (line_idx + 1) as u64, + )?); + } + + Ok(files) + } + fn check_sorted_runs( ranges: &[(i64, i64)], expected_runs: &[Vec<(i64, i64)>], @@ -623,6 +871,46 @@ mod tests { .collect() } + fn run_lengths(runs: &[SortedRun]) -> Vec { + runs.iter().map(|run| run.items().len()).collect() + } + + fn max_run_len(runs: &[SortedRun]) -> usize { + runs.iter().map(|run| run.items().len()).max().unwrap_or(0) + } + + fn log_sorted_runs_comparison( + baseline_name: &str, + baseline: &[SortedRun], + target_name: &str, + target: &[SortedRun], + ) { + let baseline_lengths = run_lengths(baseline); + let target_lengths = run_lengths(target); + let baseline_ranges = sorted_run_ranges(baseline); + let target_ranges = sorted_run_ranges(target); + let first_mismatch_idx = baseline_ranges + .iter() + .zip(target_ranges.iter()) + .position(|(lhs, rhs)| lhs != rhs) + .or_else(|| { + (baseline_ranges.len() != target_ranges.len()) + .then_some(baseline_ranges.len().min(target_ranges.len())) + }); + + common_telemetry::info!( + "sorted runs comparison: baseline={}, target={}, same_run_count={}, same_run_lengths={}, same_run_ranges={}, baseline_runs={}, target_runs={}, first_mismatch_idx={:?}", + baseline_name, + target_name, + baseline.len() == target.len(), + baseline_lengths == target_lengths, + baseline_ranges == target_ranges, + baseline.len(), + target.len(), + first_mismatch_idx + ); + } + fn check_find_sorted_runs_consistency(ranges: &[(i64, i64)]) { let mut files = build_items(ranges); let mut files_for_original = files.clone(); @@ -705,6 +993,66 @@ mod tests { } } + #[test] + #[ignore] + fn test_find_sorted_runs_manifest_performance() { + common_telemetry::init_default_ut_logging(); + + let files = load_manifest_file_groups(Path::new(SORTED_RUNS_MANIFEST_PATH)).unwrap(); + let mut files_for_current = files.clone(); + let mut files_for_original = files.clone(); + let mut files_for_2 = files.clone(); + + let start = Instant::now(); + let current_runs = find_sorted_runs(&mut files_for_current); + let elapsed = start.elapsed(); + assert!(!files_for_current.is_empty()); + common_telemetry::info!( + "find_sorted_runs manifest performance: files={}, runs={}, max_run_len={}, elapsed={:?}", + files_for_current.len(), + current_runs.len(), + max_run_len(¤t_runs), + elapsed + ); + + let start = Instant::now(); + let original_runs = find_sorted_runs_original(&mut files_for_original); + let elapsed = start.elapsed(); + assert!(!files_for_original.is_empty()); + common_telemetry::info!( + "find_sorted_runs_original manifest performance: files={}, runs={}, max_run_len={}, elapsed={:?}", + files_for_original.len(), + original_runs.len(), + max_run_len(&original_runs), + elapsed + ); + + let start = Instant::now(); + let runs_2 = find_sorted_runs_by_time_range(&mut files_for_2); + let elapsed = start.elapsed(); + assert!(!files_for_2.is_empty()); + common_telemetry::info!( + "find_sorted_runs_2 manifest performance: files={}, runs={}, max_run_len={}, elapsed={:?}", + files_for_2.len(), + runs_2.len(), + max_run_len(&runs_2), + elapsed + ); + + log_sorted_runs_comparison( + "find_sorted_runs_original", + &original_runs, + "find_sorted_runs", + ¤t_runs, + ); + log_sorted_runs_comparison( + "find_sorted_runs_original", + &original_runs, + "find_sorted_runs_2", + &runs_2, + ); + } + fn check_reduce_runs( files: &[(i64, i64)], expected_runs: &[Vec<(i64, i64)>], diff --git a/src/mito2/src/compaction/twcs.rs b/src/mito2/src/compaction/twcs.rs index 87dcac7279..1752bfb9b5 100644 --- a/src/mito2/src/compaction/twcs.rs +++ b/src/mito2/src/compaction/twcs.rs @@ -28,8 +28,8 @@ use crate::compaction::buckets::infer_time_bucket; use crate::compaction::compactor::CompactionRegion; use crate::compaction::picker::{Picker, PickerOutput}; use crate::compaction::run::{ - FileGroup, Item, Ranged, find_sorted_runs, merge_primary_key_ranges, merge_seq_files, - primary_key_ranges_overlap, reduce_runs, + FileGroup, Item, Ranged, find_sorted_runs_by_time_range, merge_primary_key_ranges, + merge_seq_files, primary_key_ranges_overlap, reduce_runs, }; use crate::compaction::{CompactionOutput, get_expired_ssts}; use crate::sst::file::{FileHandle, Level, overlaps}; @@ -88,7 +88,7 @@ impl TwcsPicker { ); } - let sorted_runs = find_sorted_runs(&mut files_to_merge); + let sorted_runs = find_sorted_runs_by_time_range(&mut files_to_merge); let found_runs = sorted_runs.len(); // We only remove deletion markers if we found less than 2 runs and not in append mode. // because after compaction there will be no overlapping files. diff --git a/src/mito2/src/lib.rs b/src/mito2/src/lib.rs index 7d43685ded..a452e106c7 100644 --- a/src/mito2/src/lib.rs +++ b/src/mito2/src/lib.rs @@ -18,6 +18,7 @@ #![feature(debug_closure_helpers)] #![feature(duration_constructors)] +#![feature(binary_heap_pop_if)] #[cfg(any(test, feature = "test"))] #[cfg_attr(feature = "test", allow(unused))]