diff --git a/src/mito2/src/compaction/run.rs b/src/mito2/src/compaction/run.rs index 74509e1a3f..bfa43c65eb 100644 --- a/src/mito2/src/compaction/run.rs +++ b/src/mito2/src/compaction/run.rs @@ -175,6 +175,10 @@ impl FileGroup { pub(crate) fn into_files(self) -> impl Iterator { self.files.into_iter() } + + pub(crate) fn is_all_level_0(&self) -> bool { + self.files.iter().all(|f| f.level() == 0) + } } impl Ranged for FileGroup { diff --git a/src/mito2/src/compaction/test_util.rs b/src/mito2/src/compaction/test_util.rs index f1b067e12f..b05b8b5399 100644 --- a/src/mito2/src/compaction/test_util.rs +++ b/src/mito2/src/compaction/test_util.rs @@ -42,6 +42,25 @@ pub fn new_file_handle_with_sequence( end_ts_millis: i64, level: Level, sequence: u64, +) -> FileHandle { + new_file_handle_with_size_and_sequence( + file_id, + start_ts_millis, + end_ts_millis, + level, + sequence, + 0, + ) +} + +/// Test util to create file handles with custom size. +pub fn new_file_handle_with_size_and_sequence( + file_id: FileId, + start_ts_millis: i64, + end_ts_millis: i64, + level: Level, + sequence: u64, + file_size: u64, ) -> FileHandle { let file_purger = new_noop_file_purger(); FileHandle::new( @@ -53,7 +72,7 @@ pub fn new_file_handle_with_sequence( Timestamp::new_millisecond(end_ts_millis), ), level, - file_size: 0, + file_size, available_indexes: Default::default(), index_file_size: 0, num_rows: 0, diff --git a/src/mito2/src/compaction/twcs.rs b/src/mito2/src/compaction/twcs.rs index 02e0529255..29d8e8a49c 100644 --- a/src/mito2/src/compaction/twcs.rs +++ b/src/mito2/src/compaction/twcs.rs @@ -64,11 +64,32 @@ impl TwcsPicker { continue; } let mut files_to_merge: Vec<_> = files.files().cloned().collect(); + + // Filter out large files in append mode - they won't benefit from compaction + if self.append_mode { + if let Some(max_size) = self.max_output_file_size { + let (kept_files, ignored_files) = files_to_merge + .into_iter() + .partition(|fg| fg.size() <= max_size as usize && fg.is_all_level_0()); + files_to_merge = kept_files; + info!( + "Skipped {} large files in append mode for region {}, window {}, max_size: {}", + ignored_files.len(), + region_id, + window, + max_size + ); + } + } + let sorted_runs = find_sorted_runs(&mut files_to_merge); let found_runs = sorted_runs.len(); // We only remove deletion markers if we found less than 2 runs and not in append mode. // because after compaction there will be no overlapping files. let filter_deleted = !files.overlapping && found_runs <= 2 && !self.append_mode; + if found_runs == 0 { + return output; + } let inputs = if found_runs > 1 { reduce_runs(sorted_runs) @@ -330,7 +351,9 @@ mod tests { use std::collections::HashSet; use super::*; - use crate::compaction::test_util::{new_file_handle, new_file_handle_with_sequence}; + use crate::compaction::test_util::{ + new_file_handle, new_file_handle_with_sequence, new_file_handle_with_size_and_sequence, + }; use crate::sst::file::{FileId, Level}; #[test] @@ -766,5 +789,45 @@ mod tests { .check(); } + #[test] + fn test_append_mode_filter_large_files() { + let file_ids = (0..4).map(|_| FileId::random()).collect::>(); + let max_output_file_size = 1000u64; + + // Create files with different sizes + let small_file_1 = new_file_handle_with_size_and_sequence(file_ids[0], 0, 999, 0, 1, 500); + let large_file_1 = new_file_handle_with_size_and_sequence(file_ids[1], 0, 999, 0, 2, 1500); + let small_file_2 = new_file_handle_with_size_and_sequence(file_ids[2], 0, 999, 0, 3, 800); + let large_file_2 = new_file_handle_with_size_and_sequence(file_ids[3], 0, 999, 0, 4, 2000); + + // Create file groups (each file is in its own group due to different sequences) + let mut files_to_merge = vec![ + FileGroup::new_with_file(small_file_1), + FileGroup::new_with_file(large_file_1), + FileGroup::new_with_file(small_file_2), + FileGroup::new_with_file(large_file_2), + ]; + + // Test filtering logic directly + let original_count = files_to_merge.len(); + + // Apply append mode filtering + files_to_merge.retain(|fg| fg.size() <= max_output_file_size as usize); + + // Should have filtered out 2 large files, leaving 2 small files + assert_eq!(files_to_merge.len(), 2); + assert_eq!(original_count, 4); + + // Verify the remaining files are the small ones + for fg in &files_to_merge { + assert!( + fg.size() <= max_output_file_size as usize, + "File size {} should be <= {}", + fg.size(), + max_output_file_size + ); + } + } + // TODO(hl): TTL tester that checks if get_expired_ssts function works as expected. } diff --git a/src/mito2/src/sst/file.rs b/src/mito2/src/sst/file.rs index 83881b6fa4..7e76a7809e 100644 --- a/src/mito2/src/sst/file.rs +++ b/src/mito2/src/sst/file.rs @@ -319,6 +319,10 @@ impl FileHandle { pub fn num_rows(&self) -> usize { self.inner.meta.num_rows as usize } + + pub fn level(&self) -> Level { + self.inner.meta.level + } } /// Inner data of [FileHandle].