Signed-off-by: luofucong <luofc@foxmail.com>
This commit is contained in:
luofucong
2026-05-25 19:38:31 +08:00
parent 67deecf7de
commit bf7abbcb97

View File

@@ -16,8 +16,8 @@ use std::collections::hash_map::Entry;
use std::collections::{BTreeMap, HashMap};
use std::fmt::Debug;
use std::num::NonZeroU64;
use std::time::{Duration, Instant};
use bytes::Bytes;
use common_base::readable_size::ReadableSize;
use common_telemetry::{debug, info};
use common_time::Timestamp;
@@ -33,6 +33,7 @@ use crate::compaction::run::{
merge_seq_files, primary_key_ranges_overlap, reduce_runs,
};
use crate::compaction::{CompactionOutput, get_expired_ssts};
use crate::metrics::COMPACTION_STAGE_ELAPSED;
use crate::sst::file::{FileHandle, Level, overlaps};
use crate::sst::version::LevelMeta;
@@ -62,39 +63,45 @@ impl TwcsPicker {
fn build_output(
&self,
region_id: RegionId,
time_windows: &mut BTreeMap<i64, Window>,
time_windows: &BTreeMap<i64, Window>,
active_window: Option<i64>,
) -> Vec<CompactionOutput> {
let has_overlap = |window: i64, this: &Window| -> bool {
time_windows
.iter()
.filter(|(w, _)| **w != window)
.any(|(_, that)| {
overlaps(&this.range(), &that.range())
.then(
|| match (&this.primary_key_range, &that.primary_key_range) {
(Some(l), Some(r)) => primary_key_ranges_overlap(l, r),
_ => true,
},
)
.unwrap_or_default()
})
};
let build_output_start = Instant::now();
let mut find_sorted_runs_elapsed = Duration::default();
let mut check_overlap_elapsed = Duration::default();
let mut reduce_runs_elapsed = Duration::default();
let mut merge_seq_files_elapsed = Duration::default();
let mut windows_with_files = 0;
let mut windows_after_append_filter = 0;
let mut windows_checked_overlap = 0;
let mut overlap_probe_windows = 0;
let mut windows_with_overlap = 0;
let mut file_groups_before_filter = 0;
let mut file_groups_after_filter = 0;
let mut large_file_groups_skipped = 0;
let mut found_runs_total = 0;
let mut outputs_skipped_by_trigger = 0;
let mut outputs_skipped_by_empty_inputs = 0;
let mut max_window_file_groups = 0;
let mut max_found_runs = 0;
let mut output = vec![];
for (window, files) in time_windows.iter() {
if files.files.is_empty() {
continue;
}
windows_with_files += 1;
file_groups_before_filter += files.files.len();
max_window_file_groups = max_window_file_groups.max(files.files.len());
let mut files_to_merge: Vec<_> = files.files().cloned().collect();
// Filter out large files in append mode - they won't benefit from compaction
if self.append_mode
&& let Some(max_size) = self.max_output_file_size
{
let (kept_files, ignored_files) = files_to_merge
let (kept_files, ignored_files): (Vec<_>, Vec<_>) = files_to_merge
.into_iter()
.partition(|fg| fg.size() <= max_size as usize);
large_file_groups_skipped += ignored_files.len();
files_to_merge = kept_files;
info!(
"Skipped {} large files in append mode for region {}, window {}, max_size: {}",
@@ -104,27 +111,73 @@ impl TwcsPicker {
max_size
);
}
file_groups_after_filter += files_to_merge.len();
if !files_to_merge.is_empty() {
windows_after_append_filter += 1;
}
let find_sorted_runs_start = Instant::now();
let sorted_runs = find_sorted_runs_by_time_range(&mut files_to_merge);
let elapsed = find_sorted_runs_start.elapsed();
find_sorted_runs_elapsed += elapsed;
let found_runs = sorted_runs.len();
found_runs_total += found_runs;
max_found_runs = max_found_runs.max(found_runs);
COMPACTION_STAGE_ELAPSED
.with_label_values(&["pick_twcs_find_sorted_runs"])
.observe(elapsed.as_secs_f64());
if found_runs == 0 {
continue;
}
// We only remove deletion markers if we found less than 2 runs and not in append mode.
// because after compaction there will be no overlapping files.
let filter_deleted =
found_runs <= 2 && !self.append_mode && !has_overlap(*window, &files);
let mut filter_deleted = found_runs <= 2 && !self.append_mode;
if filter_deleted {
windows_checked_overlap += 1;
let check_overlap_start = Instant::now();
let (has_overlap, checked_windows) =
window_has_overlap(*window, files, time_windows);
let elapsed = check_overlap_start.elapsed();
check_overlap_elapsed += elapsed;
overlap_probe_windows += checked_windows;
if has_overlap {
windows_with_overlap += 1;
filter_deleted = false;
}
COMPACTION_STAGE_ELAPSED
.with_label_values(&["pick_twcs_check_overlap"])
.observe(elapsed.as_secs_f64());
debug!(
"Region ({}) TWCS pick checked window overlap: window={}, checked_windows={}, has_overlap={}, elapsed={:?}",
region_id, window, checked_windows, has_overlap, elapsed
);
}
let mut inputs = if found_runs > 1 {
reduce_runs(sorted_runs)
let reduce_runs_start = Instant::now();
let inputs = reduce_runs(sorted_runs);
let elapsed = reduce_runs_start.elapsed();
reduce_runs_elapsed += elapsed;
COMPACTION_STAGE_ELAPSED
.with_label_values(&["pick_twcs_reduce_runs"])
.observe(elapsed.as_secs_f64());
inputs
} else {
let run = sorted_runs.last().unwrap();
if run.items().len() < self.trigger_file_num {
outputs_skipped_by_trigger += 1;
continue;
}
// no overlapping files, try merge small files
merge_seq_files(run.items(), self.max_output_file_size)
let merge_seq_files_start = Instant::now();
let inputs = merge_seq_files(run.items(), self.max_output_file_size);
let elapsed = merge_seq_files_start.elapsed();
merge_seq_files_elapsed += elapsed;
COMPACTION_STAGE_ELAPSED
.with_label_values(&["pick_twcs_merge_seq_files"])
.observe(elapsed.as_secs_f64());
inputs
};
// Limits the number of input files.
@@ -179,12 +232,71 @@ impl TwcsPicker {
);
break;
}
} else {
outputs_skipped_by_empty_inputs += 1;
}
}
let build_output_elapsed = build_output_start.elapsed();
COMPACTION_STAGE_ELAPSED
.with_label_values(&["pick_twcs_build_output"])
.observe(build_output_elapsed.as_secs_f64());
info!(
"Region ({}) TWCS pick build output stats: windows={}, windows_with_files={}, windows_after_append_filter={}, \
file_groups_before_filter={}, file_groups_after_filter={}, large_file_groups_skipped={}, max_window_file_groups={}, \
found_runs_total={}, max_found_runs={}, windows_checked_overlap={}, overlap_probe_windows={}, windows_with_overlap={}, \
outputs={}, skipped_by_trigger={}, skipped_by_empty_inputs={}, elapsed={:?}, find_sorted_runs_elapsed={:?}, \
check_overlap_elapsed={:?}, reduce_runs_elapsed={:?}, merge_seq_files_elapsed={:?}",
region_id,
time_windows.len(),
windows_with_files,
windows_after_append_filter,
file_groups_before_filter,
file_groups_after_filter,
large_file_groups_skipped,
max_window_file_groups,
found_runs_total,
max_found_runs,
windows_checked_overlap,
overlap_probe_windows,
windows_with_overlap,
output.len(),
outputs_skipped_by_trigger,
outputs_skipped_by_empty_inputs,
build_output_elapsed,
find_sorted_runs_elapsed,
check_overlap_elapsed,
reduce_runs_elapsed,
merge_seq_files_elapsed
);
output
}
}
fn window_has_overlap(
window: i64,
this: &Window,
time_windows: &BTreeMap<i64, Window>,
) -> (bool, usize) {
let mut checked_windows = 0;
for (other_window, that) in time_windows {
if *other_window == window {
continue;
}
checked_windows += 1;
if overlaps(&this.range(), &that.range())
&& match (&this.primary_key_range, &that.primary_key_range) {
(Some(l), Some(r)) => primary_key_ranges_overlap(l, r),
_ => true,
}
{
return (true, checked_windows);
}
}
(false, checked_windows)
}
#[allow(clippy::too_many_arguments)]
fn log_pick_result(
region_id: RegionId,
@@ -261,10 +373,30 @@ impl Picker for TwcsPicker {
// Find active window from files in level 0.
let active_window = find_latest_window_in_seconds(levels[0].files(), time_window_size);
let total_ssts: usize = levels.iter().map(|level| level.files().count()).sum();
// Assign files to windows
let mut windows =
assign_to_windows(levels.iter().flat_map(LevelMeta::files), time_window_size);
let outputs = self.build_output(region_id, &mut windows, active_window);
let assign_windows_start = Instant::now();
let windows = assign_to_windows(levels.iter().flat_map(LevelMeta::files), time_window_size);
let assign_windows_elapsed = assign_windows_start.elapsed();
COMPACTION_STAGE_ELAPSED
.with_label_values(&["pick_twcs_assign_windows"])
.observe(assign_windows_elapsed.as_secs_f64());
let build_output_start = Instant::now();
let outputs = self.build_output(region_id, &windows, active_window);
let build_output_elapsed = build_output_start.elapsed();
info!(
"Region ({}) TWCS pick stats: total_ssts={}, expired_ssts={}, windows={}, outputs={}, time_window_size={}, active_window={:?}, assign_windows_elapsed={:?}, build_output_elapsed={:?}",
region_id,
total_ssts,
expired_ssts.len(),
windows.len(),
outputs.len(),
time_window_size,
active_window,
assign_windows_elapsed,
build_output_elapsed
);
if outputs.is_empty() && expired_ssts.is_empty() {
return None;
@@ -287,7 +419,6 @@ struct Window {
// created from the same compaction task.
files: HashMap<Option<NonZeroU64>, FileGroup>,
time_window: i64,
overlapping: bool,
primary_key_range: Option<(bytes::Bytes, bytes::Bytes)>,
}
@@ -302,7 +433,6 @@ impl Window {
end,
files,
time_window: 0,
overlapping: false,
primary_key_range,
}
}
@@ -365,37 +495,6 @@ fn assign_to_windows<'a>(
}
}
}
// if windows.is_empty() {
// return BTreeMap::new();
// }
//
// let mut windows = windows.into_values().collect::<Vec<_>>();
// windows.sort_unstable_by(|l, r| l.start.cmp(&r.start).then(l.end.cmp(&r.end).reverse()));
//
// for idx in 0..windows.len() {
// let lhs_range = windows[idx].range();
// for next_idx in idx + 1..windows.len() {
// let rhs_range = windows[next_idx].range();
// if rhs_range.0 > lhs_range.1 {
// break;
// }
//
// let windows_overlap = overlaps(&lhs_range, &rhs_range)
// && match (
// &windows[idx].primary_key_range,
// &windows[next_idx].primary_key_range,
// ) {
// (Some(lhs), Some(rhs)) => primary_key_ranges_overlap(lhs, rhs),
// _ => true,
// };
// if windows_overlap {
// windows[idx].overlapping = true;
// windows[next_idx].overlapping = true;
// }
// }
// }
//
// windows.into_iter().map(|w| (w.time_window, w)).collect()
windows.into_iter().collect()
}
@@ -626,7 +725,10 @@ mod tests {
for (expected_window, overlapping, window_files) in expected_files {
let actual_window = windows.get(expected_window).unwrap();
assert_eq!(*overlapping, actual_window.overlapping);
assert_eq!(
*overlapping,
window_has_overlap(*expected_window, actual_window, &windows).0
);
let mut file_ranges = actual_window
.files
.values()
@@ -764,7 +866,8 @@ mod tests {
let windows = assign_to_windows(files.iter(), 2);
assert!(!windows.get(&2).unwrap().overlapping);
let window = windows.get(&2).unwrap();
assert!(!window_has_overlap(2, window, &windows).0);
}
#[test]
@@ -793,7 +896,8 @@ mod tests {
let windows = assign_to_windows(files.iter(), 2);
assert!(!windows.get(&4).unwrap().overlapping);
let window = windows.get(&4).unwrap();
assert!(!window_has_overlap(4, window, &windows).0);
}
struct CompactionPickerTestCase {