From 77483ad7d459024b0c133e392b027875851eee83 Mon Sep 17 00:00:00 2001 From: Yingwen Date: Mon, 17 Nov 2025 20:46:30 +0800 Subject: [PATCH] fix: allow compacting L1 files under append mode (#7239) * fix: allow compacting L1 files under append mode Signed-off-by: evenyag * feat: limit the number of compaction input files Signed-off-by: evenyag --------- Signed-off-by: evenyag --- src/mito2/src/compaction/run.rs | 8 +-- src/mito2/src/compaction/twcs.rs | 114 ++++++++++++++++++++++++++++++- 2 files changed, 115 insertions(+), 7 deletions(-) diff --git a/src/mito2/src/compaction/run.rs b/src/mito2/src/compaction/run.rs index e691709948..a7e5ca490c 100644 --- a/src/mito2/src/compaction/run.rs +++ b/src/mito2/src/compaction/run.rs @@ -163,6 +163,10 @@ impl FileGroup { self.files.push(file); } + pub(crate) fn num_files(&self) -> usize { + self.files.len() + } + #[cfg(test)] pub(crate) fn files(&self) -> &[FileHandle] { &self.files[..] @@ -175,10 +179,6 @@ impl FileGroup { pub(crate) fn into_files(self) -> impl Iterator { self.files.into_iter() } - - pub(crate) fn is_all_level_0(&self) -> bool { - self.files.iter().all(|f| f.level() == 0) - } } impl Ranged for FileGroup { diff --git a/src/mito2/src/compaction/twcs.rs b/src/mito2/src/compaction/twcs.rs index 371fb8f989..9012457f75 100644 --- a/src/mito2/src/compaction/twcs.rs +++ b/src/mito2/src/compaction/twcs.rs @@ -36,6 +36,9 @@ use crate::sst::version::LevelMeta; const LEVEL_COMPACTED: Level = 1; +/// Default value for max compaction input file num. +const DEFAULT_MAX_INPUT_FILE_NUM: usize = 32; + /// `TwcsPicker` picks files of which the max timestamp are in the same time window as compaction /// candidates. #[derive(Debug)] @@ -73,7 +76,7 @@ impl TwcsPicker { { let (kept_files, ignored_files) = files_to_merge .into_iter() - .partition(|fg| fg.size() <= max_size as usize && fg.is_all_level_0()); + .partition(|fg| fg.size() <= max_size as usize); files_to_merge = kept_files; info!( "Skipped {} large files in append mode for region {}, window {}, max_size: {}", @@ -93,7 +96,7 @@ impl TwcsPicker { continue; } - let inputs = if found_runs > 1 { + let mut inputs = if found_runs > 1 { reduce_runs(sorted_runs) } else { let run = sorted_runs.last().unwrap(); @@ -104,7 +107,32 @@ impl TwcsPicker { merge_seq_files(run.items(), self.max_output_file_size) }; - if !inputs.is_empty() { + // Limits the number of input files. + let total_input_files: usize = inputs.iter().map(|fg| fg.num_files()).sum(); + if total_input_files > DEFAULT_MAX_INPUT_FILE_NUM { + // Sorts file groups by size first. + inputs.sort_unstable_by_key(|fg| fg.size()); + let mut num_picked_files = 0; + inputs = inputs + .into_iter() + .take_while(|fg| { + let current_group_file_num = fg.num_files(); + if current_group_file_num + num_picked_files <= DEFAULT_MAX_INPUT_FILE_NUM { + num_picked_files += current_group_file_num; + true + } else { + false + } + }) + .collect::>(); + info!( + "Compaction for region {} enforces max input file num limit: {}, current total: {}, input: {:?}", + region_id, DEFAULT_MAX_INPUT_FILE_NUM, total_input_files, inputs + ); + } + + if inputs.len() > 1 { + // If we have more than one group to compact. log_pick_result( region_id, *window, @@ -1024,5 +1052,85 @@ mod tests { assert!(!output.is_empty(), "Should have at least one output"); } + #[test] + fn test_pick_multiple_runs() { + common_telemetry::init_default_ut_logging(); + + let num_files = 8; + let file_ids = (0..num_files).map(|_| FileId::random()).collect::>(); + + // Create files with different sequences so they form multiple runs + let files: Vec<_> = file_ids + .iter() + .enumerate() + .map(|(idx, file_id)| { + new_file_handle_with_size_and_sequence( + *file_id, + 0, + 999, + 0, + (idx + 1) as u64, + 1024 * 1024, + ) + }) + .collect(); + + let mut windows = assign_to_windows(files.iter(), 3); + + let picker = TwcsPicker { + trigger_file_num: 4, + time_window_seconds: Some(3), + max_output_file_size: None, + append_mode: false, + max_background_tasks: None, + }; + + let active_window = find_latest_window_in_seconds(files.iter(), 3); + let output = picker.build_output(RegionId::from_u64(123), &mut windows, active_window); + + assert_eq!(1, output.len()); + assert_eq!(output[0].inputs.len(), 2); + } + + #[test] + fn test_limit_max_input_files() { + common_telemetry::init_default_ut_logging(); + + let num_files = 50; + let file_ids = (0..num_files).map(|_| FileId::random()).collect::>(); + + // Create files with different sequences so they form 2 runs + let files: Vec<_> = file_ids + .iter() + .enumerate() + .map(|(idx, file_id)| { + new_file_handle_with_size_and_sequence( + *file_id, + (idx / 2 * 10) as i64, + (idx / 2 * 10 + 5) as i64, + 0, + (idx + 1) as u64, + 1024 * 1024, + ) + }) + .collect(); + + let mut windows = assign_to_windows(files.iter(), 3); + + let picker = TwcsPicker { + trigger_file_num: 4, + time_window_seconds: Some(3), + max_output_file_size: None, + append_mode: false, + max_background_tasks: None, + }; + + let active_window = find_latest_window_in_seconds(files.iter(), 3); + let output = picker.build_output(RegionId::from_u64(123), &mut windows, active_window); + + assert_eq!(1, output.len()); + assert_eq!(output[0].inputs.len(), 32); + } + // TODO(hl): TTL tester that checks if get_expired_ssts function works as expected. }