Signed-off-by: luofucong <luofc@foxmail.com>
This commit is contained in:
luofucong
2026-05-27 15:32:11 +08:00
parent bf7abbcb97
commit 35180757dc

View File

@@ -23,6 +23,7 @@ use common_telemetry::{debug, info};
use common_time::Timestamp;
use common_time::timestamp::TimeUnit;
use common_time::timestamp_millis::BucketAligned;
use rayon::prelude::*;
use store_api::storage::RegionId;
use crate::compaction::buckets::infer_time_bucket;
@@ -58,7 +59,162 @@ pub struct TwcsPicker {
pub max_background_tasks: Option<usize>,
}
struct WindowPickJob {
index: usize,
window: i64,
file_num: usize,
files: Vec<FileGroup>,
}
#[derive(Default)]
struct WindowPickStats {
file_groups_after_filter: usize,
large_file_groups_skipped: usize,
window_after_append_filter: bool,
found_runs: usize,
checked_overlap: bool,
overlap_probe_windows: usize,
has_overlap: bool,
skipped_by_trigger: bool,
skipped_by_empty_inputs: bool,
input_file_limit: Option<(usize, Vec<FileGroup>)>,
find_sorted_runs_elapsed: Duration,
check_overlap_elapsed: Duration,
reduce_runs_elapsed: Duration,
merge_seq_files_elapsed: Duration,
}
struct WindowPickResult {
index: usize,
window: i64,
file_num: usize,
filter_deleted: bool,
inputs: Option<Vec<FileGroup>>,
stats: WindowPickStats,
}
impl TwcsPicker {
fn pick_window(
&self,
job: &WindowPickJob,
time_windows: &BTreeMap<i64, Window>,
) -> WindowPickResult {
let mut stats = WindowPickStats {
file_groups_after_filter: job.files.len(),
..Default::default()
};
let mut files_to_merge = job.files.clone();
// Filter out large files in append mode - they won't benefit from compaction
if self.append_mode
&& let Some(max_size) = self.max_output_file_size
{
let (kept_files, ignored_files): (Vec<_>, Vec<_>) = files_to_merge
.into_iter()
.partition(|fg| fg.size() <= max_size as usize);
stats.large_file_groups_skipped = ignored_files.len();
stats.file_groups_after_filter = kept_files.len();
files_to_merge = kept_files;
}
stats.window_after_append_filter = !files_to_merge.is_empty();
let find_sorted_runs_start = Instant::now();
let sorted_runs = find_sorted_runs_by_time_range(&mut files_to_merge);
stats.find_sorted_runs_elapsed = find_sorted_runs_start.elapsed();
stats.found_runs = sorted_runs.len();
if stats.found_runs == 0 {
return WindowPickResult {
index: job.index,
window: job.window,
file_num: job.file_num,
filter_deleted: false,
inputs: None,
stats,
};
}
// We only remove deletion markers if we found less than 2 runs and not in append mode.
// because after compaction there will be no overlapping files.
let mut filter_deleted = stats.found_runs <= 2 && !self.append_mode;
if filter_deleted {
stats.checked_overlap = true;
let check_overlap_start = Instant::now();
let window = time_windows
.get(&job.window)
.expect("window pick job should reference an existing time window");
let (has_overlap, checked_windows) =
window_has_overlap(job.window, window, time_windows);
stats.check_overlap_elapsed = check_overlap_start.elapsed();
stats.overlap_probe_windows = checked_windows;
stats.has_overlap = has_overlap;
if has_overlap {
filter_deleted = false;
}
}
let mut inputs = if stats.found_runs > 1 {
let reduce_runs_start = Instant::now();
let inputs = reduce_runs(sorted_runs);
stats.reduce_runs_elapsed = reduce_runs_start.elapsed();
inputs
} else {
let run = sorted_runs.last().unwrap();
if run.items().len() < self.trigger_file_num {
stats.skipped_by_trigger = true;
return WindowPickResult {
index: job.index,
window: job.window,
file_num: job.file_num,
filter_deleted,
inputs: None,
stats,
};
}
// no overlapping files, try merge small files
let merge_seq_files_start = Instant::now();
let inputs = merge_seq_files(run.items(), self.max_output_file_size);
stats.merge_seq_files_elapsed = merge_seq_files_start.elapsed();
inputs
};
// Limits the number of input files.
let total_input_files: usize = inputs.iter().map(|fg| fg.num_files()).sum();
if total_input_files > DEFAULT_MAX_INPUT_FILE_NUM {
// Sorts file groups by size first.
inputs.sort_unstable_by_key(|fg| fg.size());
let mut num_picked_files = 0;
inputs = inputs
.into_iter()
.take_while(|fg| {
let current_group_file_num = fg.num_files();
if current_group_file_num + num_picked_files <= DEFAULT_MAX_INPUT_FILE_NUM {
num_picked_files += current_group_file_num;
true
} else {
false
}
})
.collect::<Vec<_>>();
stats.input_file_limit = Some((total_input_files, inputs.clone()));
}
let inputs = if inputs.len() > 1 {
Some(inputs)
} else {
stats.skipped_by_empty_inputs = true;
None
};
WindowPickResult {
index: job.index,
window: job.window,
file_num: job.file_num,
filter_deleted,
inputs,
stats,
}
}
/// Builds compaction output from files.
fn build_output(
&self,
@@ -85,141 +241,137 @@ impl TwcsPicker {
let mut max_window_file_groups = 0;
let mut max_found_runs = 0;
let mut output = vec![];
for (window, files) in time_windows.iter() {
if files.files.is_empty() {
continue;
}
windows_with_files += 1;
file_groups_before_filter += files.files.len();
max_window_file_groups = max_window_file_groups.max(files.files.len());
let mut files_to_merge: Vec<_> = files.files().cloned().collect();
// Filter out large files in append mode - they won't benefit from compaction
if self.append_mode
&& let Some(max_size) = self.max_output_file_size
{
let (kept_files, ignored_files): (Vec<_>, Vec<_>) = files_to_merge
.into_iter()
.partition(|fg| fg.size() <= max_size as usize);
large_file_groups_skipped += ignored_files.len();
files_to_merge = kept_files;
info!(
"Skipped {} large files in append mode for region {}, window {}, max_size: {}",
ignored_files.len(),
region_id,
window,
max_size
);
}
file_groups_after_filter += files_to_merge.len();
if !files_to_merge.is_empty() {
windows_after_append_filter += 1;
}
let find_sorted_runs_start = Instant::now();
let sorted_runs = find_sorted_runs_by_time_range(&mut files_to_merge);
let elapsed = find_sorted_runs_start.elapsed();
find_sorted_runs_elapsed += elapsed;
let found_runs = sorted_runs.len();
found_runs_total += found_runs;
max_found_runs = max_found_runs.max(found_runs);
COMPACTION_STAGE_ELAPSED
.with_label_values(&["pick_twcs_find_sorted_runs"])
.observe(elapsed.as_secs_f64());
if found_runs == 0 {
continue;
}
// We only remove deletion markers if we found less than 2 runs and not in append mode.
// because after compaction there will be no overlapping files.
let mut filter_deleted = found_runs <= 2 && !self.append_mode;
if filter_deleted {
windows_checked_overlap += 1;
let check_overlap_start = Instant::now();
let (has_overlap, checked_windows) =
window_has_overlap(*window, files, time_windows);
let elapsed = check_overlap_start.elapsed();
check_overlap_elapsed += elapsed;
overlap_probe_windows += checked_windows;
if has_overlap {
windows_with_overlap += 1;
filter_deleted = false;
let window_entries: Vec<_> = time_windows
.iter()
.enumerate()
.filter_map(|(index, (window, files))| {
if files.files.is_empty() {
return None;
}
COMPACTION_STAGE_ELAPSED
.with_label_values(&["pick_twcs_check_overlap"])
.observe(elapsed.as_secs_f64());
debug!(
"Region ({}) TWCS pick checked window overlap: window={}, checked_windows={}, has_overlap={}, elapsed={:?}",
region_id, window, checked_windows, has_overlap, elapsed
);
}
Some((index, *window, files))
})
.collect();
let chunk_size = self
.max_background_tasks
.unwrap_or(window_entries.len())
.max(1);
let mut inputs = if found_runs > 1 {
let reduce_runs_start = Instant::now();
let inputs = reduce_runs(sorted_runs);
let elapsed = reduce_runs_start.elapsed();
reduce_runs_elapsed += elapsed;
'chunks: for chunk in window_entries.chunks(chunk_size) {
let mut results = chunk
.par_iter()
.map(|(index, window, files)| {
let job = WindowPickJob {
index: *index,
window: *window,
file_num: files.files.len(),
files: files.files().cloned().collect(),
};
self.pick_window(&job, time_windows)
})
.collect::<Vec<_>>();
results.sort_unstable_by_key(|result| result.index);
for result in results {
let window = result.window;
let files = time_windows
.get(&window)
.expect("window pick result should reference an existing time window");
let stats = &result.stats;
if self.append_mode
&& let Some(max_size) = self.max_output_file_size
{
info!(
"Skipped {} large files in append mode for region {}, window {}, max_size: {}",
stats.large_file_groups_skipped, region_id, window, max_size
);
}
windows_with_files += 1;
file_groups_before_filter += result.file_num;
max_window_file_groups = max_window_file_groups.max(result.file_num);
file_groups_after_filter += stats.file_groups_after_filter;
if stats.window_after_append_filter {
windows_after_append_filter += 1;
}
find_sorted_runs_elapsed += stats.find_sorted_runs_elapsed;
found_runs_total += stats.found_runs;
max_found_runs = max_found_runs.max(stats.found_runs);
COMPACTION_STAGE_ELAPSED
.with_label_values(&["pick_twcs_reduce_runs"])
.observe(elapsed.as_secs_f64());
inputs
} else {
let run = sorted_runs.last().unwrap();
if run.items().len() < self.trigger_file_num {
.with_label_values(&["pick_twcs_find_sorted_runs"])
.observe(stats.find_sorted_runs_elapsed.as_secs_f64());
if stats.found_runs == 0 {
continue;
}
if stats.checked_overlap {
windows_checked_overlap += 1;
check_overlap_elapsed += stats.check_overlap_elapsed;
overlap_probe_windows += stats.overlap_probe_windows;
if stats.has_overlap {
windows_with_overlap += 1;
}
COMPACTION_STAGE_ELAPSED
.with_label_values(&["pick_twcs_check_overlap"])
.observe(stats.check_overlap_elapsed.as_secs_f64());
debug!(
"Region ({}) TWCS pick checked window overlap: window={}, checked_windows={}, has_overlap={}, elapsed={:?}",
region_id,
window,
stats.overlap_probe_windows,
stats.has_overlap,
stats.check_overlap_elapsed
);
}
if stats.reduce_runs_elapsed != Duration::default() {
reduce_runs_elapsed += stats.reduce_runs_elapsed;
COMPACTION_STAGE_ELAPSED
.with_label_values(&["pick_twcs_reduce_runs"])
.observe(stats.reduce_runs_elapsed.as_secs_f64());
}
if stats.merge_seq_files_elapsed != Duration::default() {
merge_seq_files_elapsed += stats.merge_seq_files_elapsed;
COMPACTION_STAGE_ELAPSED
.with_label_values(&["pick_twcs_merge_seq_files"])
.observe(stats.merge_seq_files_elapsed.as_secs_f64());
}
large_file_groups_skipped += stats.large_file_groups_skipped;
if stats.skipped_by_trigger {
outputs_skipped_by_trigger += 1;
continue;
}
// no overlapping files, try merge small files
let merge_seq_files_start = Instant::now();
let inputs = merge_seq_files(run.items(), self.max_output_file_size);
let elapsed = merge_seq_files_start.elapsed();
merge_seq_files_elapsed += elapsed;
COMPACTION_STAGE_ELAPSED
.with_label_values(&["pick_twcs_merge_seq_files"])
.observe(elapsed.as_secs_f64());
inputs
};
let Some(inputs) = result.inputs else {
if stats.skipped_by_empty_inputs {
outputs_skipped_by_empty_inputs += 1;
}
continue;
};
// Limits the number of input files.
let total_input_files: usize = inputs.iter().map(|fg| fg.num_files()).sum();
if total_input_files > DEFAULT_MAX_INPUT_FILE_NUM {
// Sorts file groups by size first.
inputs.sort_unstable_by_key(|fg| fg.size());
let mut num_picked_files = 0;
inputs = inputs
.into_iter()
.take_while(|fg| {
let current_group_file_num = fg.num_files();
if current_group_file_num + num_picked_files <= DEFAULT_MAX_INPUT_FILE_NUM {
num_picked_files += current_group_file_num;
true
} else {
false
}
})
.collect::<Vec<_>>();
info!(
"Compaction for region {} enforces max input file num limit: {}, current total: {}, input: {:?}",
region_id, DEFAULT_MAX_INPUT_FILE_NUM, total_input_files, inputs
);
}
if let Some((total_input_files, limited_inputs)) = &stats.input_file_limit {
info!(
"Compaction for region {} enforces max input file num limit: {}, current total: {}, input: {:?}",
region_id, DEFAULT_MAX_INPUT_FILE_NUM, total_input_files, limited_inputs
);
}
if inputs.len() > 1 {
// If we have more than one group to compact.
log_pick_result(
region_id,
*window,
window,
active_window,
found_runs,
stats.found_runs,
files.files.len(),
self.max_output_file_size,
filter_deleted,
result.filter_deleted,
&inputs,
);
output.push(CompactionOutput {
output_level: LEVEL_COMPACTED, // always compact to l1
inputs: inputs.into_iter().flat_map(|fg| fg.into_files()).collect(),
filter_deleted,
filter_deleted: result.filter_deleted,
output_time_range: None, // we do not enforce output time range in twcs compactions.
});
@@ -230,10 +382,8 @@ impl TwcsPicker {
"Region ({:?}) compaction task size larger than max background tasks({}), remaining tasks discarded",
region_id, max_background_tasks
);
break;
break 'chunks;
}
} else {
outputs_skipped_by_empty_inputs += 1;
}
}
let build_output_elapsed = build_output_start.elapsed();