mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-07 22:02:56 +00:00
feat: skip compaction on large file on append only mode (#6838)
* feat: skip compaction on large file on append only mode Signed-off-by: Ruihang Xia <waynestxia@gmail.com> * log ignored files Signed-off-by: Ruihang Xia <waynestxia@gmail.com> * format Signed-off-by: Ruihang Xia <waynestxia@gmail.com> * only ignore level 1 files Signed-off-by: Ruihang Xia <waynestxia@gmail.com> * early exit Signed-off-by: Ruihang Xia <waynestxia@gmail.com> * fix typo Signed-off-by: Ruihang Xia <waynestxia@gmail.com> --------- Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
@@ -175,6 +175,10 @@ impl FileGroup {
|
||||
pub(crate) fn into_files(self) -> impl Iterator<Item = FileHandle> {
|
||||
self.files.into_iter()
|
||||
}
|
||||
|
||||
pub(crate) fn is_all_level_0(&self) -> bool {
|
||||
self.files.iter().all(|f| f.level() == 0)
|
||||
}
|
||||
}
|
||||
|
||||
impl Ranged for FileGroup {
|
||||
|
||||
@@ -42,6 +42,25 @@ pub fn new_file_handle_with_sequence(
|
||||
end_ts_millis: i64,
|
||||
level: Level,
|
||||
sequence: u64,
|
||||
) -> FileHandle {
|
||||
new_file_handle_with_size_and_sequence(
|
||||
file_id,
|
||||
start_ts_millis,
|
||||
end_ts_millis,
|
||||
level,
|
||||
sequence,
|
||||
0,
|
||||
)
|
||||
}
|
||||
|
||||
/// Test util to create file handles with custom size.
|
||||
pub fn new_file_handle_with_size_and_sequence(
|
||||
file_id: FileId,
|
||||
start_ts_millis: i64,
|
||||
end_ts_millis: i64,
|
||||
level: Level,
|
||||
sequence: u64,
|
||||
file_size: u64,
|
||||
) -> FileHandle {
|
||||
let file_purger = new_noop_file_purger();
|
||||
FileHandle::new(
|
||||
@@ -53,7 +72,7 @@ pub fn new_file_handle_with_sequence(
|
||||
Timestamp::new_millisecond(end_ts_millis),
|
||||
),
|
||||
level,
|
||||
file_size: 0,
|
||||
file_size,
|
||||
available_indexes: Default::default(),
|
||||
index_file_size: 0,
|
||||
num_rows: 0,
|
||||
|
||||
@@ -64,11 +64,32 @@ impl TwcsPicker {
|
||||
continue;
|
||||
}
|
||||
let mut files_to_merge: Vec<_> = files.files().cloned().collect();
|
||||
|
||||
// Filter out large files in append mode - they won't benefit from compaction
|
||||
if self.append_mode {
|
||||
if let Some(max_size) = self.max_output_file_size {
|
||||
let (kept_files, ignored_files) = files_to_merge
|
||||
.into_iter()
|
||||
.partition(|fg| fg.size() <= max_size as usize && fg.is_all_level_0());
|
||||
files_to_merge = kept_files;
|
||||
info!(
|
||||
"Skipped {} large files in append mode for region {}, window {}, max_size: {}",
|
||||
ignored_files.len(),
|
||||
region_id,
|
||||
window,
|
||||
max_size
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
let sorted_runs = find_sorted_runs(&mut files_to_merge);
|
||||
let found_runs = sorted_runs.len();
|
||||
// We only remove deletion markers if we found less than 2 runs and not in append mode.
|
||||
// because after compaction there will be no overlapping files.
|
||||
let filter_deleted = !files.overlapping && found_runs <= 2 && !self.append_mode;
|
||||
if found_runs == 0 {
|
||||
return output;
|
||||
}
|
||||
|
||||
let inputs = if found_runs > 1 {
|
||||
reduce_runs(sorted_runs)
|
||||
@@ -330,7 +351,9 @@ mod tests {
|
||||
use std::collections::HashSet;
|
||||
|
||||
use super::*;
|
||||
use crate::compaction::test_util::{new_file_handle, new_file_handle_with_sequence};
|
||||
use crate::compaction::test_util::{
|
||||
new_file_handle, new_file_handle_with_sequence, new_file_handle_with_size_and_sequence,
|
||||
};
|
||||
use crate::sst::file::{FileId, Level};
|
||||
|
||||
#[test]
|
||||
@@ -766,5 +789,45 @@ mod tests {
|
||||
.check();
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_append_mode_filter_large_files() {
|
||||
let file_ids = (0..4).map(|_| FileId::random()).collect::<Vec<_>>();
|
||||
let max_output_file_size = 1000u64;
|
||||
|
||||
// Create files with different sizes
|
||||
let small_file_1 = new_file_handle_with_size_and_sequence(file_ids[0], 0, 999, 0, 1, 500);
|
||||
let large_file_1 = new_file_handle_with_size_and_sequence(file_ids[1], 0, 999, 0, 2, 1500);
|
||||
let small_file_2 = new_file_handle_with_size_and_sequence(file_ids[2], 0, 999, 0, 3, 800);
|
||||
let large_file_2 = new_file_handle_with_size_and_sequence(file_ids[3], 0, 999, 0, 4, 2000);
|
||||
|
||||
// Create file groups (each file is in its own group due to different sequences)
|
||||
let mut files_to_merge = vec![
|
||||
FileGroup::new_with_file(small_file_1),
|
||||
FileGroup::new_with_file(large_file_1),
|
||||
FileGroup::new_with_file(small_file_2),
|
||||
FileGroup::new_with_file(large_file_2),
|
||||
];
|
||||
|
||||
// Test filtering logic directly
|
||||
let original_count = files_to_merge.len();
|
||||
|
||||
// Apply append mode filtering
|
||||
files_to_merge.retain(|fg| fg.size() <= max_output_file_size as usize);
|
||||
|
||||
// Should have filtered out 2 large files, leaving 2 small files
|
||||
assert_eq!(files_to_merge.len(), 2);
|
||||
assert_eq!(original_count, 4);
|
||||
|
||||
// Verify the remaining files are the small ones
|
||||
for fg in &files_to_merge {
|
||||
assert!(
|
||||
fg.size() <= max_output_file_size as usize,
|
||||
"File size {} should be <= {}",
|
||||
fg.size(),
|
||||
max_output_file_size
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
// TODO(hl): TTL tester that checks if get_expired_ssts function works as expected.
|
||||
}
|
||||
|
||||
@@ -319,6 +319,10 @@ impl FileHandle {
|
||||
pub fn num_rows(&self) -> usize {
|
||||
self.inner.meta.num_rows as usize
|
||||
}
|
||||
|
||||
pub fn level(&self) -> Level {
|
||||
self.inner.meta.level
|
||||
}
|
||||
}
|
||||
|
||||
/// Inner data of [FileHandle].
|
||||
|
||||
Reference in New Issue
Block a user