diff --git a/src/mito2/src/compaction/run.rs b/src/mito2/src/compaction/run.rs index 54100ee1f0..1286b3e760 100644 --- a/src/mito2/src/compaction/run.rs +++ b/src/mito2/src/compaction/run.rs @@ -662,13 +662,15 @@ pub fn merge_seq_files(input_files: &[T], max_file_size: Option) - #[cfg(test)] mod tests { - use std::collections::HashSet; + use std::collections::{BTreeMap, HashSet}; use std::fs; use std::num::NonZeroU64; use std::path::Path; - use std::time::Instant; + use std::time::{Duration, Instant}; use bytes::Bytes; + use common_time::timestamp::TimeUnit; + use common_time::timestamp_millis::BucketAligned; use store_api::storage::FileId; use super::*; @@ -676,7 +678,9 @@ mod tests { use crate::sst::file::{FileHandle, FileMeta}; use crate::test_util::new_noop_file_purger; - const SORTED_RUNS_MANIFEST_PATH: &str = "./ssts_manifest_202605201245.csv"; + const SORTED_RUNS_MANIFEST_PATH: &str = + "/Users/luofucong/Downloads/ssts_manifest_20260525104700.csv"; + const SORTED_RUNS_MANIFEST_TIME_WINDOW_SECONDS: i64 = 24 * 60 * 60; #[derive(Clone, Debug, PartialEq)] struct MockFile { @@ -801,9 +805,25 @@ mod tests { Ok(FileGroup::new_with_file(file)) } - fn load_manifest_file_groups( + fn manifest_record_time_window( + record: &[String], + time_window_size: i64, + ) -> Result> { + let max_ts_us = record[3].parse()?; + let time_window = Timestamp::new_microsecond(max_ts_us) + .convert_to(TimeUnit::Second) + .ok_or("failed to convert manifest timestamp to seconds")? + .value() + .align_to_ceil_by_bucket(time_window_size) + .unwrap_or(i64::MIN); + + Ok(time_window) + } + + fn load_manifest_file_groups_by_time_window( path: &Path, - ) -> Result, Box> { + time_window_size: i64, + ) -> Result>, Box> { let data = fs::read(path)?; let data = String::from_utf8_lossy(&data); let mut lines = data.lines(); @@ -820,7 +840,7 @@ mod tests { parse_csv_record(header).as_slice() ); - let mut files = Vec::new(); + let mut windows = BTreeMap::new(); for (line_idx, line) in lines.enumerate() { if line.is_empty() { continue; @@ -833,13 +853,13 @@ mod tests { "invalid CSV record at line {}", line_idx + 2 ); - files.push(new_file_group_from_manifest_record( - &record, - (line_idx + 1) as u64, - )?); + let time_window = manifest_record_time_window(&record, time_window_size)?; + windows.entry(time_window).or_insert_with(Vec::new).push( + new_file_group_from_manifest_record(&record, (line_idx + 1) as u64)?, + ); } - Ok(files) + Ok(windows) } fn check_sorted_runs( @@ -998,45 +1018,77 @@ mod tests { fn test_find_sorted_runs_manifest_performance() { common_telemetry::init_default_ut_logging(); - let files = load_manifest_file_groups(Path::new(SORTED_RUNS_MANIFEST_PATH)).unwrap(); - let mut files_for_current = files.clone(); - let mut files_for_original = files.clone(); - let mut files_for_2 = files.clone(); + let windows = load_manifest_file_groups_by_time_window( + Path::new(SORTED_RUNS_MANIFEST_PATH), + SORTED_RUNS_MANIFEST_TIME_WINDOW_SECONDS, + ) + .unwrap(); + let total_files: usize = windows.values().map(Vec::len).sum(); + let max_window_files = windows.values().map(Vec::len).max().unwrap_or(0); + assert!(total_files > 0); + + let mut current_runs = Vec::new(); + let mut current_elapsed = Duration::default(); + let mut current_max_run_len = 0; + let mut original_runs = Vec::new(); + let mut original_elapsed = Duration::default(); + let mut original_max_run_len = 0; + let mut runs_2 = Vec::new(); + let mut runs_2_elapsed = Duration::default(); + let mut runs_2_max_run_len = 0; + + for files in windows.values() { + let mut files_for_current = files.clone(); + let mut files_for_original = files.clone(); + let mut files_for_2 = files.clone(); + + let start = Instant::now(); + let mut window_current_runs = find_sorted_runs(&mut files_for_current); + current_elapsed += start.elapsed(); + current_max_run_len = current_max_run_len.max(max_run_len(&window_current_runs)); + current_runs.append(&mut window_current_runs); + + let start = Instant::now(); + let mut window_original_runs = find_sorted_runs_original(&mut files_for_original); + original_elapsed += start.elapsed(); + original_max_run_len = original_max_run_len.max(max_run_len(&window_original_runs)); + original_runs.append(&mut window_original_runs); + + let start = Instant::now(); + let mut window_runs_2 = find_sorted_runs_by_time_range(&mut files_for_2); + runs_2_elapsed += start.elapsed(); + runs_2_max_run_len = runs_2_max_run_len.max(max_run_len(&window_runs_2)); + runs_2.append(&mut window_runs_2); + } - let start = Instant::now(); - let current_runs = find_sorted_runs(&mut files_for_current); - let elapsed = start.elapsed(); - assert!(!files_for_current.is_empty()); common_telemetry::info!( - "find_sorted_runs manifest performance: files={}, runs={}, max_run_len={}, elapsed={:?}", - files_for_current.len(), + "find_sorted_runs manifest performance: windows={}, files={}, max_window_files={}, runs={}, max_run_len={}, elapsed={:?}", + windows.len(), + total_files, + max_window_files, current_runs.len(), - max_run_len(¤t_runs), - elapsed + current_max_run_len, + current_elapsed ); - let start = Instant::now(); - let original_runs = find_sorted_runs_original(&mut files_for_original); - let elapsed = start.elapsed(); - assert!(!files_for_original.is_empty()); common_telemetry::info!( - "find_sorted_runs_original manifest performance: files={}, runs={}, max_run_len={}, elapsed={:?}", - files_for_original.len(), + "find_sorted_runs_original manifest performance: windows={}, files={}, max_window_files={}, runs={}, max_run_len={}, elapsed={:?}", + windows.len(), + total_files, + max_window_files, original_runs.len(), - max_run_len(&original_runs), - elapsed + original_max_run_len, + original_elapsed ); - let start = Instant::now(); - let runs_2 = find_sorted_runs_by_time_range(&mut files_for_2); - let elapsed = start.elapsed(); - assert!(!files_for_2.is_empty()); common_telemetry::info!( - "find_sorted_runs_2 manifest performance: files={}, runs={}, max_run_len={}, elapsed={:?}", - files_for_2.len(), + "find_sorted_runs_2 manifest performance: windows={}, files={}, max_window_files={}, runs={}, max_run_len={}, elapsed={:?}", + windows.len(), + total_files, + max_window_files, runs_2.len(), - max_run_len(&runs_2), - elapsed + runs_2_max_run_len, + runs_2_elapsed ); log_sorted_runs_comparison( diff --git a/src/mito2/src/compaction/twcs.rs b/src/mito2/src/compaction/twcs.rs index 1752bfb9b5..3f77ea6e21 100644 --- a/src/mito2/src/compaction/twcs.rs +++ b/src/mito2/src/compaction/twcs.rs @@ -17,6 +17,7 @@ use std::collections::{BTreeMap, HashMap}; use std::fmt::Debug; use std::num::NonZeroU64; +use bytes::Bytes; use common_base::readable_size::ReadableSize; use common_telemetry::{debug, info}; use common_time::Timestamp; @@ -64,8 +65,24 @@ impl TwcsPicker { time_windows: &mut BTreeMap, active_window: Option, ) -> Vec { + let has_overlap = |window: i64, this: &Window| -> bool { + time_windows + .iter() + .filter(|(w, _)| **w != window) + .any(|(_, that)| { + overlaps(&this.range(), &that.range()) + .then( + || match (&this.primary_key_range, &that.primary_key_range) { + (Some(l), Some(r)) => primary_key_ranges_overlap(l, r), + _ => true, + }, + ) + .unwrap_or_default() + }) + }; + let mut output = vec![]; - for (window, files) in time_windows { + for (window, files) in time_windows.iter() { if files.files.is_empty() { continue; } @@ -90,13 +107,15 @@ impl TwcsPicker { let sorted_runs = find_sorted_runs_by_time_range(&mut files_to_merge); let found_runs = sorted_runs.len(); - // We only remove deletion markers if we found less than 2 runs and not in append mode. - // because after compaction there will be no overlapping files. - let filter_deleted = !files.overlapping && found_runs <= 2 && !self.append_mode; if found_runs == 0 { continue; } + // We only remove deletion markers if we found less than 2 runs and not in append mode. + // because after compaction there will be no overlapping files. + let filter_deleted = + found_runs <= 2 && !self.append_mode && !has_overlap(*window, &files); + let mut inputs = if found_runs > 1 { reduce_runs(sorted_runs) } else { @@ -346,37 +365,38 @@ fn assign_to_windows<'a>( } } } - if windows.is_empty() { - return BTreeMap::new(); - } - - let mut windows = windows.into_values().collect::>(); - windows.sort_unstable_by(|l, r| l.start.cmp(&r.start).then(l.end.cmp(&r.end).reverse())); - - for idx in 0..windows.len() { - let lhs_range = windows[idx].range(); - for next_idx in idx + 1..windows.len() { - let rhs_range = windows[next_idx].range(); - if rhs_range.0 > lhs_range.1 { - break; - } - - let windows_overlap = overlaps(&lhs_range, &rhs_range) - && match ( - &windows[idx].primary_key_range, - &windows[next_idx].primary_key_range, - ) { - (Some(lhs), Some(rhs)) => primary_key_ranges_overlap(lhs, rhs), - _ => true, - }; - if windows_overlap { - windows[idx].overlapping = true; - windows[next_idx].overlapping = true; - } - } - } - - windows.into_iter().map(|w| (w.time_window, w)).collect() + // if windows.is_empty() { + // return BTreeMap::new(); + // } + // + // let mut windows = windows.into_values().collect::>(); + // windows.sort_unstable_by(|l, r| l.start.cmp(&r.start).then(l.end.cmp(&r.end).reverse())); + // + // for idx in 0..windows.len() { + // let lhs_range = windows[idx].range(); + // for next_idx in idx + 1..windows.len() { + // let rhs_range = windows[next_idx].range(); + // if rhs_range.0 > lhs_range.1 { + // break; + // } + // + // let windows_overlap = overlaps(&lhs_range, &rhs_range) + // && match ( + // &windows[idx].primary_key_range, + // &windows[next_idx].primary_key_range, + // ) { + // (Some(lhs), Some(rhs)) => primary_key_ranges_overlap(lhs, rhs), + // _ => true, + // }; + // if windows_overlap { + // windows[idx].overlapping = true; + // windows[next_idx].overlapping = true; + // } + // } + // } + // + // windows.into_iter().map(|w| (w.time_window, w)).collect() + windows.into_iter().collect() } /// Finds the latest active writing window among all files.