mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-06 05:12:54 +00:00
fix: allow compacting L1 files under append mode (#7239)
* fix: allow compacting L1 files under append mode Signed-off-by: evenyag <realevenyag@gmail.com> * feat: limit the number of compaction input files Signed-off-by: evenyag <realevenyag@gmail.com> --------- Signed-off-by: evenyag <realevenyag@gmail.com>
This commit is contained in:
@@ -163,6 +163,10 @@ impl FileGroup {
|
||||
self.files.push(file);
|
||||
}
|
||||
|
||||
pub(crate) fn num_files(&self) -> usize {
|
||||
self.files.len()
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
pub(crate) fn files(&self) -> &[FileHandle] {
|
||||
&self.files[..]
|
||||
@@ -175,10 +179,6 @@ impl FileGroup {
|
||||
pub(crate) fn into_files(self) -> impl Iterator<Item = FileHandle> {
|
||||
self.files.into_iter()
|
||||
}
|
||||
|
||||
pub(crate) fn is_all_level_0(&self) -> bool {
|
||||
self.files.iter().all(|f| f.level() == 0)
|
||||
}
|
||||
}
|
||||
|
||||
impl Ranged for FileGroup {
|
||||
|
||||
@@ -36,6 +36,9 @@ use crate::sst::version::LevelMeta;
|
||||
|
||||
const LEVEL_COMPACTED: Level = 1;
|
||||
|
||||
/// Default value for max compaction input file num.
|
||||
const DEFAULT_MAX_INPUT_FILE_NUM: usize = 32;
|
||||
|
||||
/// `TwcsPicker` picks files of which the max timestamp are in the same time window as compaction
|
||||
/// candidates.
|
||||
#[derive(Debug)]
|
||||
@@ -73,7 +76,7 @@ impl TwcsPicker {
|
||||
{
|
||||
let (kept_files, ignored_files) = files_to_merge
|
||||
.into_iter()
|
||||
.partition(|fg| fg.size() <= max_size as usize && fg.is_all_level_0());
|
||||
.partition(|fg| fg.size() <= max_size as usize);
|
||||
files_to_merge = kept_files;
|
||||
info!(
|
||||
"Skipped {} large files in append mode for region {}, window {}, max_size: {}",
|
||||
@@ -93,7 +96,7 @@ impl TwcsPicker {
|
||||
continue;
|
||||
}
|
||||
|
||||
let inputs = if found_runs > 1 {
|
||||
let mut inputs = if found_runs > 1 {
|
||||
reduce_runs(sorted_runs)
|
||||
} else {
|
||||
let run = sorted_runs.last().unwrap();
|
||||
@@ -104,7 +107,32 @@ impl TwcsPicker {
|
||||
merge_seq_files(run.items(), self.max_output_file_size)
|
||||
};
|
||||
|
||||
if !inputs.is_empty() {
|
||||
// Limits the number of input files.
|
||||
let total_input_files: usize = inputs.iter().map(|fg| fg.num_files()).sum();
|
||||
if total_input_files > DEFAULT_MAX_INPUT_FILE_NUM {
|
||||
// Sorts file groups by size first.
|
||||
inputs.sort_unstable_by_key(|fg| fg.size());
|
||||
let mut num_picked_files = 0;
|
||||
inputs = inputs
|
||||
.into_iter()
|
||||
.take_while(|fg| {
|
||||
let current_group_file_num = fg.num_files();
|
||||
if current_group_file_num + num_picked_files <= DEFAULT_MAX_INPUT_FILE_NUM {
|
||||
num_picked_files += current_group_file_num;
|
||||
true
|
||||
} else {
|
||||
false
|
||||
}
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
info!(
|
||||
"Compaction for region {} enforces max input file num limit: {}, current total: {}, input: {:?}",
|
||||
region_id, DEFAULT_MAX_INPUT_FILE_NUM, total_input_files, inputs
|
||||
);
|
||||
}
|
||||
|
||||
if inputs.len() > 1 {
|
||||
// If we have more than one group to compact.
|
||||
log_pick_result(
|
||||
region_id,
|
||||
*window,
|
||||
@@ -1024,5 +1052,85 @@ mod tests {
|
||||
assert!(!output.is_empty(), "Should have at least one output");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_pick_multiple_runs() {
|
||||
common_telemetry::init_default_ut_logging();
|
||||
|
||||
let num_files = 8;
|
||||
let file_ids = (0..num_files).map(|_| FileId::random()).collect::<Vec<_>>();
|
||||
|
||||
// Create files with different sequences so they form multiple runs
|
||||
let files: Vec<_> = file_ids
|
||||
.iter()
|
||||
.enumerate()
|
||||
.map(|(idx, file_id)| {
|
||||
new_file_handle_with_size_and_sequence(
|
||||
*file_id,
|
||||
0,
|
||||
999,
|
||||
0,
|
||||
(idx + 1) as u64,
|
||||
1024 * 1024,
|
||||
)
|
||||
})
|
||||
.collect();
|
||||
|
||||
let mut windows = assign_to_windows(files.iter(), 3);
|
||||
|
||||
let picker = TwcsPicker {
|
||||
trigger_file_num: 4,
|
||||
time_window_seconds: Some(3),
|
||||
max_output_file_size: None,
|
||||
append_mode: false,
|
||||
max_background_tasks: None,
|
||||
};
|
||||
|
||||
let active_window = find_latest_window_in_seconds(files.iter(), 3);
|
||||
let output = picker.build_output(RegionId::from_u64(123), &mut windows, active_window);
|
||||
|
||||
assert_eq!(1, output.len());
|
||||
assert_eq!(output[0].inputs.len(), 2);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_limit_max_input_files() {
|
||||
common_telemetry::init_default_ut_logging();
|
||||
|
||||
let num_files = 50;
|
||||
let file_ids = (0..num_files).map(|_| FileId::random()).collect::<Vec<_>>();
|
||||
|
||||
// Create files with different sequences so they form 2 runs
|
||||
let files: Vec<_> = file_ids
|
||||
.iter()
|
||||
.enumerate()
|
||||
.map(|(idx, file_id)| {
|
||||
new_file_handle_with_size_and_sequence(
|
||||
*file_id,
|
||||
(idx / 2 * 10) as i64,
|
||||
(idx / 2 * 10 + 5) as i64,
|
||||
0,
|
||||
(idx + 1) as u64,
|
||||
1024 * 1024,
|
||||
)
|
||||
})
|
||||
.collect();
|
||||
|
||||
let mut windows = assign_to_windows(files.iter(), 3);
|
||||
|
||||
let picker = TwcsPicker {
|
||||
trigger_file_num: 4,
|
||||
time_window_seconds: Some(3),
|
||||
max_output_file_size: None,
|
||||
append_mode: false,
|
||||
max_background_tasks: None,
|
||||
};
|
||||
|
||||
let active_window = find_latest_window_in_seconds(files.iter(), 3);
|
||||
let output = picker.build_output(RegionId::from_u64(123), &mut windows, active_window);
|
||||
|
||||
assert_eq!(1, output.len());
|
||||
assert_eq!(output[0].inputs.len(), 32);
|
||||
}
|
||||
|
||||
// TODO(hl): TTL tester that checks if get_expired_ssts function works as expected.
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user