diff --git a/src/mito2/src/compaction/twcs.rs b/src/mito2/src/compaction/twcs.rs index 3f77ea6e21..45c3d6d64b 100644 --- a/src/mito2/src/compaction/twcs.rs +++ b/src/mito2/src/compaction/twcs.rs @@ -16,8 +16,8 @@ use std::collections::hash_map::Entry; use std::collections::{BTreeMap, HashMap}; use std::fmt::Debug; use std::num::NonZeroU64; +use std::time::{Duration, Instant}; -use bytes::Bytes; use common_base::readable_size::ReadableSize; use common_telemetry::{debug, info}; use common_time::Timestamp; @@ -33,6 +33,7 @@ use crate::compaction::run::{ merge_seq_files, primary_key_ranges_overlap, reduce_runs, }; use crate::compaction::{CompactionOutput, get_expired_ssts}; +use crate::metrics::COMPACTION_STAGE_ELAPSED; use crate::sst::file::{FileHandle, Level, overlaps}; use crate::sst::version::LevelMeta; @@ -62,39 +63,45 @@ impl TwcsPicker { fn build_output( &self, region_id: RegionId, - time_windows: &mut BTreeMap, + time_windows: &BTreeMap, active_window: Option, ) -> Vec { - let has_overlap = |window: i64, this: &Window| -> bool { - time_windows - .iter() - .filter(|(w, _)| **w != window) - .any(|(_, that)| { - overlaps(&this.range(), &that.range()) - .then( - || match (&this.primary_key_range, &that.primary_key_range) { - (Some(l), Some(r)) => primary_key_ranges_overlap(l, r), - _ => true, - }, - ) - .unwrap_or_default() - }) - }; - + let build_output_start = Instant::now(); + let mut find_sorted_runs_elapsed = Duration::default(); + let mut check_overlap_elapsed = Duration::default(); + let mut reduce_runs_elapsed = Duration::default(); + let mut merge_seq_files_elapsed = Duration::default(); + let mut windows_with_files = 0; + let mut windows_after_append_filter = 0; + let mut windows_checked_overlap = 0; + let mut overlap_probe_windows = 0; + let mut windows_with_overlap = 0; + let mut file_groups_before_filter = 0; + let mut file_groups_after_filter = 0; + let mut large_file_groups_skipped = 0; + let mut found_runs_total = 0; + let mut outputs_skipped_by_trigger = 0; + let mut outputs_skipped_by_empty_inputs = 0; + let mut max_window_file_groups = 0; + let mut max_found_runs = 0; let mut output = vec![]; for (window, files) in time_windows.iter() { if files.files.is_empty() { continue; } + windows_with_files += 1; + file_groups_before_filter += files.files.len(); + max_window_file_groups = max_window_file_groups.max(files.files.len()); let mut files_to_merge: Vec<_> = files.files().cloned().collect(); // Filter out large files in append mode - they won't benefit from compaction if self.append_mode && let Some(max_size) = self.max_output_file_size { - let (kept_files, ignored_files) = files_to_merge + let (kept_files, ignored_files): (Vec<_>, Vec<_>) = files_to_merge .into_iter() .partition(|fg| fg.size() <= max_size as usize); + large_file_groups_skipped += ignored_files.len(); files_to_merge = kept_files; info!( "Skipped {} large files in append mode for region {}, window {}, max_size: {}", @@ -104,27 +111,73 @@ impl TwcsPicker { max_size ); } + file_groups_after_filter += files_to_merge.len(); + if !files_to_merge.is_empty() { + windows_after_append_filter += 1; + } + let find_sorted_runs_start = Instant::now(); let sorted_runs = find_sorted_runs_by_time_range(&mut files_to_merge); + let elapsed = find_sorted_runs_start.elapsed(); + find_sorted_runs_elapsed += elapsed; let found_runs = sorted_runs.len(); + found_runs_total += found_runs; + max_found_runs = max_found_runs.max(found_runs); + COMPACTION_STAGE_ELAPSED + .with_label_values(&["pick_twcs_find_sorted_runs"]) + .observe(elapsed.as_secs_f64()); if found_runs == 0 { continue; } // We only remove deletion markers if we found less than 2 runs and not in append mode. // because after compaction there will be no overlapping files. - let filter_deleted = - found_runs <= 2 && !self.append_mode && !has_overlap(*window, &files); + let mut filter_deleted = found_runs <= 2 && !self.append_mode; + if filter_deleted { + windows_checked_overlap += 1; + let check_overlap_start = Instant::now(); + let (has_overlap, checked_windows) = + window_has_overlap(*window, files, time_windows); + let elapsed = check_overlap_start.elapsed(); + check_overlap_elapsed += elapsed; + overlap_probe_windows += checked_windows; + if has_overlap { + windows_with_overlap += 1; + filter_deleted = false; + } + COMPACTION_STAGE_ELAPSED + .with_label_values(&["pick_twcs_check_overlap"]) + .observe(elapsed.as_secs_f64()); + debug!( + "Region ({}) TWCS pick checked window overlap: window={}, checked_windows={}, has_overlap={}, elapsed={:?}", + region_id, window, checked_windows, has_overlap, elapsed + ); + } let mut inputs = if found_runs > 1 { - reduce_runs(sorted_runs) + let reduce_runs_start = Instant::now(); + let inputs = reduce_runs(sorted_runs); + let elapsed = reduce_runs_start.elapsed(); + reduce_runs_elapsed += elapsed; + COMPACTION_STAGE_ELAPSED + .with_label_values(&["pick_twcs_reduce_runs"]) + .observe(elapsed.as_secs_f64()); + inputs } else { let run = sorted_runs.last().unwrap(); if run.items().len() < self.trigger_file_num { + outputs_skipped_by_trigger += 1; continue; } // no overlapping files, try merge small files - merge_seq_files(run.items(), self.max_output_file_size) + let merge_seq_files_start = Instant::now(); + let inputs = merge_seq_files(run.items(), self.max_output_file_size); + let elapsed = merge_seq_files_start.elapsed(); + merge_seq_files_elapsed += elapsed; + COMPACTION_STAGE_ELAPSED + .with_label_values(&["pick_twcs_merge_seq_files"]) + .observe(elapsed.as_secs_f64()); + inputs }; // Limits the number of input files. @@ -179,12 +232,71 @@ impl TwcsPicker { ); break; } + } else { + outputs_skipped_by_empty_inputs += 1; } } + let build_output_elapsed = build_output_start.elapsed(); + COMPACTION_STAGE_ELAPSED + .with_label_values(&["pick_twcs_build_output"]) + .observe(build_output_elapsed.as_secs_f64()); + info!( + "Region ({}) TWCS pick build output stats: windows={}, windows_with_files={}, windows_after_append_filter={}, \ + file_groups_before_filter={}, file_groups_after_filter={}, large_file_groups_skipped={}, max_window_file_groups={}, \ + found_runs_total={}, max_found_runs={}, windows_checked_overlap={}, overlap_probe_windows={}, windows_with_overlap={}, \ + outputs={}, skipped_by_trigger={}, skipped_by_empty_inputs={}, elapsed={:?}, find_sorted_runs_elapsed={:?}, \ + check_overlap_elapsed={:?}, reduce_runs_elapsed={:?}, merge_seq_files_elapsed={:?}", + region_id, + time_windows.len(), + windows_with_files, + windows_after_append_filter, + file_groups_before_filter, + file_groups_after_filter, + large_file_groups_skipped, + max_window_file_groups, + found_runs_total, + max_found_runs, + windows_checked_overlap, + overlap_probe_windows, + windows_with_overlap, + output.len(), + outputs_skipped_by_trigger, + outputs_skipped_by_empty_inputs, + build_output_elapsed, + find_sorted_runs_elapsed, + check_overlap_elapsed, + reduce_runs_elapsed, + merge_seq_files_elapsed + ); output } } +fn window_has_overlap( + window: i64, + this: &Window, + time_windows: &BTreeMap, +) -> (bool, usize) { + let mut checked_windows = 0; + for (other_window, that) in time_windows { + if *other_window == window { + continue; + } + + checked_windows += 1; + if overlaps(&this.range(), &that.range()) + && match (&this.primary_key_range, &that.primary_key_range) { + (Some(l), Some(r)) => primary_key_ranges_overlap(l, r), + _ => true, + } + { + return (true, checked_windows); + } + } + + (false, checked_windows) +} + #[allow(clippy::too_many_arguments)] fn log_pick_result( region_id: RegionId, @@ -261,10 +373,30 @@ impl Picker for TwcsPicker { // Find active window from files in level 0. let active_window = find_latest_window_in_seconds(levels[0].files(), time_window_size); + let total_ssts: usize = levels.iter().map(|level| level.files().count()).sum(); // Assign files to windows - let mut windows = - assign_to_windows(levels.iter().flat_map(LevelMeta::files), time_window_size); - let outputs = self.build_output(region_id, &mut windows, active_window); + let assign_windows_start = Instant::now(); + let windows = assign_to_windows(levels.iter().flat_map(LevelMeta::files), time_window_size); + let assign_windows_elapsed = assign_windows_start.elapsed(); + COMPACTION_STAGE_ELAPSED + .with_label_values(&["pick_twcs_assign_windows"]) + .observe(assign_windows_elapsed.as_secs_f64()); + + let build_output_start = Instant::now(); + let outputs = self.build_output(region_id, &windows, active_window); + let build_output_elapsed = build_output_start.elapsed(); + info!( + "Region ({}) TWCS pick stats: total_ssts={}, expired_ssts={}, windows={}, outputs={}, time_window_size={}, active_window={:?}, assign_windows_elapsed={:?}, build_output_elapsed={:?}", + region_id, + total_ssts, + expired_ssts.len(), + windows.len(), + outputs.len(), + time_window_size, + active_window, + assign_windows_elapsed, + build_output_elapsed + ); if outputs.is_empty() && expired_ssts.is_empty() { return None; @@ -287,7 +419,6 @@ struct Window { // created from the same compaction task. files: HashMap, FileGroup>, time_window: i64, - overlapping: bool, primary_key_range: Option<(bytes::Bytes, bytes::Bytes)>, } @@ -302,7 +433,6 @@ impl Window { end, files, time_window: 0, - overlapping: false, primary_key_range, } } @@ -365,37 +495,6 @@ fn assign_to_windows<'a>( } } } - // if windows.is_empty() { - // return BTreeMap::new(); - // } - // - // let mut windows = windows.into_values().collect::>(); - // windows.sort_unstable_by(|l, r| l.start.cmp(&r.start).then(l.end.cmp(&r.end).reverse())); - // - // for idx in 0..windows.len() { - // let lhs_range = windows[idx].range(); - // for next_idx in idx + 1..windows.len() { - // let rhs_range = windows[next_idx].range(); - // if rhs_range.0 > lhs_range.1 { - // break; - // } - // - // let windows_overlap = overlaps(&lhs_range, &rhs_range) - // && match ( - // &windows[idx].primary_key_range, - // &windows[next_idx].primary_key_range, - // ) { - // (Some(lhs), Some(rhs)) => primary_key_ranges_overlap(lhs, rhs), - // _ => true, - // }; - // if windows_overlap { - // windows[idx].overlapping = true; - // windows[next_idx].overlapping = true; - // } - // } - // } - // - // windows.into_iter().map(|w| (w.time_window, w)).collect() windows.into_iter().collect() } @@ -626,7 +725,10 @@ mod tests { for (expected_window, overlapping, window_files) in expected_files { let actual_window = windows.get(expected_window).unwrap(); - assert_eq!(*overlapping, actual_window.overlapping); + assert_eq!( + *overlapping, + window_has_overlap(*expected_window, actual_window, &windows).0 + ); let mut file_ranges = actual_window .files .values() @@ -764,7 +866,8 @@ mod tests { let windows = assign_to_windows(files.iter(), 2); - assert!(!windows.get(&2).unwrap().overlapping); + let window = windows.get(&2).unwrap(); + assert!(!window_has_overlap(2, window, &windows).0); } #[test] @@ -793,7 +896,8 @@ mod tests { let windows = assign_to_windows(files.iter(), 2); - assert!(!windows.get(&4).unwrap().overlapping); + let window = windows.get(&4).unwrap(); + assert!(!window_has_overlap(4, window, &windows).0); } struct CompactionPickerTestCase {