optimize compaction speed

Signed-off-by: luofucong <luofc@foxmail.com>
This commit is contained in:
luofucong
2026-05-25 16:42:40 +08:00
parent 9ec163badb
commit 76b6d35862
2 changed files with 146 additions and 74 deletions

View File

@@ -662,13 +662,15 @@ pub fn merge_seq_files<T: Item>(input_files: &[T], max_file_size: Option<u64>) -
#[cfg(test)]
mod tests {
use std::collections::HashSet;
use std::collections::{BTreeMap, HashSet};
use std::fs;
use std::num::NonZeroU64;
use std::path::Path;
use std::time::Instant;
use std::time::{Duration, Instant};
use bytes::Bytes;
use common_time::timestamp::TimeUnit;
use common_time::timestamp_millis::BucketAligned;
use store_api::storage::FileId;
use super::*;
@@ -676,7 +678,9 @@ mod tests {
use crate::sst::file::{FileHandle, FileMeta};
use crate::test_util::new_noop_file_purger;
const SORTED_RUNS_MANIFEST_PATH: &str = "./ssts_manifest_202605201245.csv";
const SORTED_RUNS_MANIFEST_PATH: &str =
"/Users/luofucong/Downloads/ssts_manifest_20260525104700.csv";
const SORTED_RUNS_MANIFEST_TIME_WINDOW_SECONDS: i64 = 24 * 60 * 60;
#[derive(Clone, Debug, PartialEq)]
struct MockFile {
@@ -801,9 +805,25 @@ mod tests {
Ok(FileGroup::new_with_file(file))
}
fn load_manifest_file_groups(
fn manifest_record_time_window(
record: &[String],
time_window_size: i64,
) -> Result<i64, Box<dyn std::error::Error>> {
let max_ts_us = record[3].parse()?;
let time_window = Timestamp::new_microsecond(max_ts_us)
.convert_to(TimeUnit::Second)
.ok_or("failed to convert manifest timestamp to seconds")?
.value()
.align_to_ceil_by_bucket(time_window_size)
.unwrap_or(i64::MIN);
Ok(time_window)
}
fn load_manifest_file_groups_by_time_window(
path: &Path,
) -> Result<Vec<FileGroup>, Box<dyn std::error::Error>> {
time_window_size: i64,
) -> Result<BTreeMap<i64, Vec<FileGroup>>, Box<dyn std::error::Error>> {
let data = fs::read(path)?;
let data = String::from_utf8_lossy(&data);
let mut lines = data.lines();
@@ -820,7 +840,7 @@ mod tests {
parse_csv_record(header).as_slice()
);
let mut files = Vec::new();
let mut windows = BTreeMap::new();
for (line_idx, line) in lines.enumerate() {
if line.is_empty() {
continue;
@@ -833,13 +853,13 @@ mod tests {
"invalid CSV record at line {}",
line_idx + 2
);
files.push(new_file_group_from_manifest_record(
&record,
(line_idx + 1) as u64,
)?);
let time_window = manifest_record_time_window(&record, time_window_size)?;
windows.entry(time_window).or_insert_with(Vec::new).push(
new_file_group_from_manifest_record(&record, (line_idx + 1) as u64)?,
);
}
Ok(files)
Ok(windows)
}
fn check_sorted_runs(
@@ -998,45 +1018,77 @@ mod tests {
fn test_find_sorted_runs_manifest_performance() {
common_telemetry::init_default_ut_logging();
let files = load_manifest_file_groups(Path::new(SORTED_RUNS_MANIFEST_PATH)).unwrap();
let mut files_for_current = files.clone();
let mut files_for_original = files.clone();
let mut files_for_2 = files.clone();
let windows = load_manifest_file_groups_by_time_window(
Path::new(SORTED_RUNS_MANIFEST_PATH),
SORTED_RUNS_MANIFEST_TIME_WINDOW_SECONDS,
)
.unwrap();
let total_files: usize = windows.values().map(Vec::len).sum();
let max_window_files = windows.values().map(Vec::len).max().unwrap_or(0);
assert!(total_files > 0);
let mut current_runs = Vec::new();
let mut current_elapsed = Duration::default();
let mut current_max_run_len = 0;
let mut original_runs = Vec::new();
let mut original_elapsed = Duration::default();
let mut original_max_run_len = 0;
let mut runs_2 = Vec::new();
let mut runs_2_elapsed = Duration::default();
let mut runs_2_max_run_len = 0;
for files in windows.values() {
let mut files_for_current = files.clone();
let mut files_for_original = files.clone();
let mut files_for_2 = files.clone();
let start = Instant::now();
let mut window_current_runs = find_sorted_runs(&mut files_for_current);
current_elapsed += start.elapsed();
current_max_run_len = current_max_run_len.max(max_run_len(&window_current_runs));
current_runs.append(&mut window_current_runs);
let start = Instant::now();
let mut window_original_runs = find_sorted_runs_original(&mut files_for_original);
original_elapsed += start.elapsed();
original_max_run_len = original_max_run_len.max(max_run_len(&window_original_runs));
original_runs.append(&mut window_original_runs);
let start = Instant::now();
let mut window_runs_2 = find_sorted_runs_by_time_range(&mut files_for_2);
runs_2_elapsed += start.elapsed();
runs_2_max_run_len = runs_2_max_run_len.max(max_run_len(&window_runs_2));
runs_2.append(&mut window_runs_2);
}
let start = Instant::now();
let current_runs = find_sorted_runs(&mut files_for_current);
let elapsed = start.elapsed();
assert!(!files_for_current.is_empty());
common_telemetry::info!(
"find_sorted_runs manifest performance: files={}, runs={}, max_run_len={}, elapsed={:?}",
files_for_current.len(),
"find_sorted_runs manifest performance: windows={}, files={}, max_window_files={}, runs={}, max_run_len={}, elapsed={:?}",
windows.len(),
total_files,
max_window_files,
current_runs.len(),
max_run_len(&current_runs),
elapsed
current_max_run_len,
current_elapsed
);
let start = Instant::now();
let original_runs = find_sorted_runs_original(&mut files_for_original);
let elapsed = start.elapsed();
assert!(!files_for_original.is_empty());
common_telemetry::info!(
"find_sorted_runs_original manifest performance: files={}, runs={}, max_run_len={}, elapsed={:?}",
files_for_original.len(),
"find_sorted_runs_original manifest performance: windows={}, files={}, max_window_files={}, runs={}, max_run_len={}, elapsed={:?}",
windows.len(),
total_files,
max_window_files,
original_runs.len(),
max_run_len(&original_runs),
elapsed
original_max_run_len,
original_elapsed
);
let start = Instant::now();
let runs_2 = find_sorted_runs_by_time_range(&mut files_for_2);
let elapsed = start.elapsed();
assert!(!files_for_2.is_empty());
common_telemetry::info!(
"find_sorted_runs_2 manifest performance: files={}, runs={}, max_run_len={}, elapsed={:?}",
files_for_2.len(),
"find_sorted_runs_2 manifest performance: windows={}, files={}, max_window_files={}, runs={}, max_run_len={}, elapsed={:?}",
windows.len(),
total_files,
max_window_files,
runs_2.len(),
max_run_len(&runs_2),
elapsed
runs_2_max_run_len,
runs_2_elapsed
);
log_sorted_runs_comparison(

View File

@@ -17,6 +17,7 @@ use std::collections::{BTreeMap, HashMap};
use std::fmt::Debug;
use std::num::NonZeroU64;
use bytes::Bytes;
use common_base::readable_size::ReadableSize;
use common_telemetry::{debug, info};
use common_time::Timestamp;
@@ -64,8 +65,24 @@ impl TwcsPicker {
time_windows: &mut BTreeMap<i64, Window>,
active_window: Option<i64>,
) -> Vec<CompactionOutput> {
let has_overlap = |window: i64, this: &Window| -> bool {
time_windows
.iter()
.filter(|(w, _)| **w != window)
.any(|(_, that)| {
overlaps(&this.range(), &that.range())
.then(
|| match (&this.primary_key_range, &that.primary_key_range) {
(Some(l), Some(r)) => primary_key_ranges_overlap(l, r),
_ => true,
},
)
.unwrap_or_default()
})
};
let mut output = vec![];
for (window, files) in time_windows {
for (window, files) in time_windows.iter() {
if files.files.is_empty() {
continue;
}
@@ -90,13 +107,15 @@ impl TwcsPicker {
let sorted_runs = find_sorted_runs_by_time_range(&mut files_to_merge);
let found_runs = sorted_runs.len();
// We only remove deletion markers if we found less than 2 runs and not in append mode.
// because after compaction there will be no overlapping files.
let filter_deleted = !files.overlapping && found_runs <= 2 && !self.append_mode;
if found_runs == 0 {
continue;
}
// We only remove deletion markers if we found less than 2 runs and not in append mode.
// because after compaction there will be no overlapping files.
let filter_deleted =
found_runs <= 2 && !self.append_mode && !has_overlap(*window, &files);
let mut inputs = if found_runs > 1 {
reduce_runs(sorted_runs)
} else {
@@ -346,37 +365,38 @@ fn assign_to_windows<'a>(
}
}
}
if windows.is_empty() {
return BTreeMap::new();
}
let mut windows = windows.into_values().collect::<Vec<_>>();
windows.sort_unstable_by(|l, r| l.start.cmp(&r.start).then(l.end.cmp(&r.end).reverse()));
for idx in 0..windows.len() {
let lhs_range = windows[idx].range();
for next_idx in idx + 1..windows.len() {
let rhs_range = windows[next_idx].range();
if rhs_range.0 > lhs_range.1 {
break;
}
let windows_overlap = overlaps(&lhs_range, &rhs_range)
&& match (
&windows[idx].primary_key_range,
&windows[next_idx].primary_key_range,
) {
(Some(lhs), Some(rhs)) => primary_key_ranges_overlap(lhs, rhs),
_ => true,
};
if windows_overlap {
windows[idx].overlapping = true;
windows[next_idx].overlapping = true;
}
}
}
windows.into_iter().map(|w| (w.time_window, w)).collect()
// if windows.is_empty() {
// return BTreeMap::new();
// }
//
// let mut windows = windows.into_values().collect::<Vec<_>>();
// windows.sort_unstable_by(|l, r| l.start.cmp(&r.start).then(l.end.cmp(&r.end).reverse()));
//
// for idx in 0..windows.len() {
// let lhs_range = windows[idx].range();
// for next_idx in idx + 1..windows.len() {
// let rhs_range = windows[next_idx].range();
// if rhs_range.0 > lhs_range.1 {
// break;
// }
//
// let windows_overlap = overlaps(&lhs_range, &rhs_range)
// && match (
// &windows[idx].primary_key_range,
// &windows[next_idx].primary_key_range,
// ) {
// (Some(lhs), Some(rhs)) => primary_key_ranges_overlap(lhs, rhs),
// _ => true,
// };
// if windows_overlap {
// windows[idx].overlapping = true;
// windows[next_idx].overlapping = true;
// }
// }
// }
//
// windows.into_iter().map(|w| (w.time_window, w)).collect()
windows.into_iter().collect()
}
/// Finds the latest active writing window among all files.