From 35180757dcdfcd238e5a25a69f980c9fe2aba5b4 Mon Sep 17 00:00:00 2001 From: luofucong Date: Wed, 27 May 2026 15:32:11 +0800 Subject: [PATCH] rayon Signed-off-by: luofucong --- src/mito2/src/compaction/twcs.rs | 388 +++++++++++++++++++++---------- 1 file changed, 269 insertions(+), 119 deletions(-) diff --git a/src/mito2/src/compaction/twcs.rs b/src/mito2/src/compaction/twcs.rs index 45c3d6d64b..b76a2f9db3 100644 --- a/src/mito2/src/compaction/twcs.rs +++ b/src/mito2/src/compaction/twcs.rs @@ -23,6 +23,7 @@ use common_telemetry::{debug, info}; use common_time::Timestamp; use common_time::timestamp::TimeUnit; use common_time::timestamp_millis::BucketAligned; +use rayon::prelude::*; use store_api::storage::RegionId; use crate::compaction::buckets::infer_time_bucket; @@ -58,7 +59,162 @@ pub struct TwcsPicker { pub max_background_tasks: Option, } +struct WindowPickJob { + index: usize, + window: i64, + file_num: usize, + files: Vec, +} + +#[derive(Default)] +struct WindowPickStats { + file_groups_after_filter: usize, + large_file_groups_skipped: usize, + window_after_append_filter: bool, + found_runs: usize, + checked_overlap: bool, + overlap_probe_windows: usize, + has_overlap: bool, + skipped_by_trigger: bool, + skipped_by_empty_inputs: bool, + input_file_limit: Option<(usize, Vec)>, + find_sorted_runs_elapsed: Duration, + check_overlap_elapsed: Duration, + reduce_runs_elapsed: Duration, + merge_seq_files_elapsed: Duration, +} + +struct WindowPickResult { + index: usize, + window: i64, + file_num: usize, + filter_deleted: bool, + inputs: Option>, + stats: WindowPickStats, +} + impl TwcsPicker { + fn pick_window( + &self, + job: &WindowPickJob, + time_windows: &BTreeMap, + ) -> WindowPickResult { + let mut stats = WindowPickStats { + file_groups_after_filter: job.files.len(), + ..Default::default() + }; + let mut files_to_merge = job.files.clone(); + + // Filter out large files in append mode - they won't benefit from compaction + if self.append_mode + && let Some(max_size) = self.max_output_file_size + { + let (kept_files, ignored_files): (Vec<_>, Vec<_>) = files_to_merge + .into_iter() + .partition(|fg| fg.size() <= max_size as usize); + stats.large_file_groups_skipped = ignored_files.len(); + stats.file_groups_after_filter = kept_files.len(); + files_to_merge = kept_files; + } + stats.window_after_append_filter = !files_to_merge.is_empty(); + + let find_sorted_runs_start = Instant::now(); + let sorted_runs = find_sorted_runs_by_time_range(&mut files_to_merge); + stats.find_sorted_runs_elapsed = find_sorted_runs_start.elapsed(); + stats.found_runs = sorted_runs.len(); + if stats.found_runs == 0 { + return WindowPickResult { + index: job.index, + window: job.window, + file_num: job.file_num, + filter_deleted: false, + inputs: None, + stats, + }; + } + + // We only remove deletion markers if we found less than 2 runs and not in append mode. + // because after compaction there will be no overlapping files. + let mut filter_deleted = stats.found_runs <= 2 && !self.append_mode; + if filter_deleted { + stats.checked_overlap = true; + let check_overlap_start = Instant::now(); + let window = time_windows + .get(&job.window) + .expect("window pick job should reference an existing time window"); + let (has_overlap, checked_windows) = + window_has_overlap(job.window, window, time_windows); + stats.check_overlap_elapsed = check_overlap_start.elapsed(); + stats.overlap_probe_windows = checked_windows; + stats.has_overlap = has_overlap; + if has_overlap { + filter_deleted = false; + } + } + + let mut inputs = if stats.found_runs > 1 { + let reduce_runs_start = Instant::now(); + let inputs = reduce_runs(sorted_runs); + stats.reduce_runs_elapsed = reduce_runs_start.elapsed(); + inputs + } else { + let run = sorted_runs.last().unwrap(); + if run.items().len() < self.trigger_file_num { + stats.skipped_by_trigger = true; + return WindowPickResult { + index: job.index, + window: job.window, + file_num: job.file_num, + filter_deleted, + inputs: None, + stats, + }; + } + // no overlapping files, try merge small files + let merge_seq_files_start = Instant::now(); + let inputs = merge_seq_files(run.items(), self.max_output_file_size); + stats.merge_seq_files_elapsed = merge_seq_files_start.elapsed(); + inputs + }; + + // Limits the number of input files. + let total_input_files: usize = inputs.iter().map(|fg| fg.num_files()).sum(); + if total_input_files > DEFAULT_MAX_INPUT_FILE_NUM { + // Sorts file groups by size first. + inputs.sort_unstable_by_key(|fg| fg.size()); + let mut num_picked_files = 0; + inputs = inputs + .into_iter() + .take_while(|fg| { + let current_group_file_num = fg.num_files(); + if current_group_file_num + num_picked_files <= DEFAULT_MAX_INPUT_FILE_NUM { + num_picked_files += current_group_file_num; + true + } else { + false + } + }) + .collect::>(); + stats.input_file_limit = Some((total_input_files, inputs.clone())); + } + + let inputs = if inputs.len() > 1 { + Some(inputs) + } else { + stats.skipped_by_empty_inputs = true; + None + }; + + WindowPickResult { + index: job.index, + window: job.window, + file_num: job.file_num, + filter_deleted, + inputs, + stats, + } + } + /// Builds compaction output from files. fn build_output( &self, @@ -85,141 +241,137 @@ impl TwcsPicker { let mut max_window_file_groups = 0; let mut max_found_runs = 0; let mut output = vec![]; - for (window, files) in time_windows.iter() { - if files.files.is_empty() { - continue; - } - windows_with_files += 1; - file_groups_before_filter += files.files.len(); - max_window_file_groups = max_window_file_groups.max(files.files.len()); - let mut files_to_merge: Vec<_> = files.files().cloned().collect(); - - // Filter out large files in append mode - they won't benefit from compaction - if self.append_mode - && let Some(max_size) = self.max_output_file_size - { - let (kept_files, ignored_files): (Vec<_>, Vec<_>) = files_to_merge - .into_iter() - .partition(|fg| fg.size() <= max_size as usize); - large_file_groups_skipped += ignored_files.len(); - files_to_merge = kept_files; - info!( - "Skipped {} large files in append mode for region {}, window {}, max_size: {}", - ignored_files.len(), - region_id, - window, - max_size - ); - } - file_groups_after_filter += files_to_merge.len(); - if !files_to_merge.is_empty() { - windows_after_append_filter += 1; - } - - let find_sorted_runs_start = Instant::now(); - let sorted_runs = find_sorted_runs_by_time_range(&mut files_to_merge); - let elapsed = find_sorted_runs_start.elapsed(); - find_sorted_runs_elapsed += elapsed; - let found_runs = sorted_runs.len(); - found_runs_total += found_runs; - max_found_runs = max_found_runs.max(found_runs); - COMPACTION_STAGE_ELAPSED - .with_label_values(&["pick_twcs_find_sorted_runs"]) - .observe(elapsed.as_secs_f64()); - if found_runs == 0 { - continue; - } - - // We only remove deletion markers if we found less than 2 runs and not in append mode. - // because after compaction there will be no overlapping files. - let mut filter_deleted = found_runs <= 2 && !self.append_mode; - if filter_deleted { - windows_checked_overlap += 1; - let check_overlap_start = Instant::now(); - let (has_overlap, checked_windows) = - window_has_overlap(*window, files, time_windows); - let elapsed = check_overlap_start.elapsed(); - check_overlap_elapsed += elapsed; - overlap_probe_windows += checked_windows; - if has_overlap { - windows_with_overlap += 1; - filter_deleted = false; + let window_entries: Vec<_> = time_windows + .iter() + .enumerate() + .filter_map(|(index, (window, files))| { + if files.files.is_empty() { + return None; } - COMPACTION_STAGE_ELAPSED - .with_label_values(&["pick_twcs_check_overlap"]) - .observe(elapsed.as_secs_f64()); - debug!( - "Region ({}) TWCS pick checked window overlap: window={}, checked_windows={}, has_overlap={}, elapsed={:?}", - region_id, window, checked_windows, has_overlap, elapsed - ); - } + Some((index, *window, files)) + }) + .collect(); + let chunk_size = self + .max_background_tasks + .unwrap_or(window_entries.len()) + .max(1); - let mut inputs = if found_runs > 1 { - let reduce_runs_start = Instant::now(); - let inputs = reduce_runs(sorted_runs); - let elapsed = reduce_runs_start.elapsed(); - reduce_runs_elapsed += elapsed; + 'chunks: for chunk in window_entries.chunks(chunk_size) { + let mut results = chunk + .par_iter() + .map(|(index, window, files)| { + let job = WindowPickJob { + index: *index, + window: *window, + file_num: files.files.len(), + files: files.files().cloned().collect(), + }; + self.pick_window(&job, time_windows) + }) + .collect::>(); + results.sort_unstable_by_key(|result| result.index); + + for result in results { + let window = result.window; + let files = time_windows + .get(&window) + .expect("window pick result should reference an existing time window"); + let stats = &result.stats; + + if self.append_mode + && let Some(max_size) = self.max_output_file_size + { + info!( + "Skipped {} large files in append mode for region {}, window {}, max_size: {}", + stats.large_file_groups_skipped, region_id, window, max_size + ); + } + + windows_with_files += 1; + file_groups_before_filter += result.file_num; + max_window_file_groups = max_window_file_groups.max(result.file_num); + file_groups_after_filter += stats.file_groups_after_filter; + if stats.window_after_append_filter { + windows_after_append_filter += 1; + } + + find_sorted_runs_elapsed += stats.find_sorted_runs_elapsed; + found_runs_total += stats.found_runs; + max_found_runs = max_found_runs.max(stats.found_runs); COMPACTION_STAGE_ELAPSED - .with_label_values(&["pick_twcs_reduce_runs"]) - .observe(elapsed.as_secs_f64()); - inputs - } else { - let run = sorted_runs.last().unwrap(); - if run.items().len() < self.trigger_file_num { + .with_label_values(&["pick_twcs_find_sorted_runs"]) + .observe(stats.find_sorted_runs_elapsed.as_secs_f64()); + if stats.found_runs == 0 { + continue; + } + + if stats.checked_overlap { + windows_checked_overlap += 1; + check_overlap_elapsed += stats.check_overlap_elapsed; + overlap_probe_windows += stats.overlap_probe_windows; + if stats.has_overlap { + windows_with_overlap += 1; + } + COMPACTION_STAGE_ELAPSED + .with_label_values(&["pick_twcs_check_overlap"]) + .observe(stats.check_overlap_elapsed.as_secs_f64()); + debug!( + "Region ({}) TWCS pick checked window overlap: window={}, checked_windows={}, has_overlap={}, elapsed={:?}", + region_id, + window, + stats.overlap_probe_windows, + stats.has_overlap, + stats.check_overlap_elapsed + ); + } + + if stats.reduce_runs_elapsed != Duration::default() { + reduce_runs_elapsed += stats.reduce_runs_elapsed; + COMPACTION_STAGE_ELAPSED + .with_label_values(&["pick_twcs_reduce_runs"]) + .observe(stats.reduce_runs_elapsed.as_secs_f64()); + } + if stats.merge_seq_files_elapsed != Duration::default() { + merge_seq_files_elapsed += stats.merge_seq_files_elapsed; + COMPACTION_STAGE_ELAPSED + .with_label_values(&["pick_twcs_merge_seq_files"]) + .observe(stats.merge_seq_files_elapsed.as_secs_f64()); + } + + large_file_groups_skipped += stats.large_file_groups_skipped; + if stats.skipped_by_trigger { outputs_skipped_by_trigger += 1; continue; } - // no overlapping files, try merge small files - let merge_seq_files_start = Instant::now(); - let inputs = merge_seq_files(run.items(), self.max_output_file_size); - let elapsed = merge_seq_files_start.elapsed(); - merge_seq_files_elapsed += elapsed; - COMPACTION_STAGE_ELAPSED - .with_label_values(&["pick_twcs_merge_seq_files"]) - .observe(elapsed.as_secs_f64()); - inputs - }; + let Some(inputs) = result.inputs else { + if stats.skipped_by_empty_inputs { + outputs_skipped_by_empty_inputs += 1; + } + continue; + }; - // Limits the number of input files. - let total_input_files: usize = inputs.iter().map(|fg| fg.num_files()).sum(); - if total_input_files > DEFAULT_MAX_INPUT_FILE_NUM { - // Sorts file groups by size first. - inputs.sort_unstable_by_key(|fg| fg.size()); - let mut num_picked_files = 0; - inputs = inputs - .into_iter() - .take_while(|fg| { - let current_group_file_num = fg.num_files(); - if current_group_file_num + num_picked_files <= DEFAULT_MAX_INPUT_FILE_NUM { - num_picked_files += current_group_file_num; - true - } else { - false - } - }) - .collect::>(); - info!( - "Compaction for region {} enforces max input file num limit: {}, current total: {}, input: {:?}", - region_id, DEFAULT_MAX_INPUT_FILE_NUM, total_input_files, inputs - ); - } + if let Some((total_input_files, limited_inputs)) = &stats.input_file_limit { + info!( + "Compaction for region {} enforces max input file num limit: {}, current total: {}, input: {:?}", + region_id, DEFAULT_MAX_INPUT_FILE_NUM, total_input_files, limited_inputs + ); + } - if inputs.len() > 1 { // If we have more than one group to compact. log_pick_result( region_id, - *window, + window, active_window, - found_runs, + stats.found_runs, files.files.len(), self.max_output_file_size, - filter_deleted, + result.filter_deleted, &inputs, ); output.push(CompactionOutput { output_level: LEVEL_COMPACTED, // always compact to l1 inputs: inputs.into_iter().flat_map(|fg| fg.into_files()).collect(), - filter_deleted, + filter_deleted: result.filter_deleted, output_time_range: None, // we do not enforce output time range in twcs compactions. }); @@ -230,10 +382,8 @@ impl TwcsPicker { "Region ({:?}) compaction task size larger than max background tasks({}), remaining tasks discarded", region_id, max_background_tasks ); - break; + break 'chunks; } - } else { - outputs_skipped_by_empty_inputs += 1; } } let build_output_elapsed = build_output_start.elapsed();