mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-03 20:02:54 +00:00
feat(compaction): add file number limits to TWCS compaction (#4481)
* Add file number limits to TWCS compaction - Introduce `max_active_window_files` and `max_inactive_window_files` to `TwcsOptions`. * feat/limit-files-in-windows: Add max active/inactive window files options to mito engine config * feat/limit-files-in-windows: Add Debug derive to TwcsPicker and implement max file enforcement logging in TWCS compaction * fix: clippy
This commit is contained in:
@@ -133,7 +133,9 @@ pub fn new_picker(
|
||||
match compaction_options {
|
||||
CompactionOptions::Twcs(twcs_opts) => Arc::new(TwcsPicker::new(
|
||||
twcs_opts.max_active_window_runs,
|
||||
twcs_opts.max_active_window_files,
|
||||
twcs_opts.max_inactive_window_runs,
|
||||
twcs_opts.max_inactive_window_files,
|
||||
twcs_opts.time_window_seconds(),
|
||||
)) as Arc<_>,
|
||||
}
|
||||
|
||||
@@ -43,3 +43,29 @@ pub fn new_file_handle(
|
||||
file_purger,
|
||||
)
|
||||
}
|
||||
|
||||
pub(crate) fn new_file_handles(file_specs: &[(i64, i64, u64)]) -> Vec<FileHandle> {
|
||||
let file_purger = new_noop_file_purger();
|
||||
file_specs
|
||||
.iter()
|
||||
.map(|(start, end, size)| {
|
||||
FileHandle::new(
|
||||
FileMeta {
|
||||
region_id: 0.into(),
|
||||
file_id: FileId::random(),
|
||||
time_range: (
|
||||
Timestamp::new_millisecond(*start),
|
||||
Timestamp::new_millisecond(*end),
|
||||
),
|
||||
level: 0,
|
||||
file_size: *size,
|
||||
available_indexes: Default::default(),
|
||||
index_file_size: 0,
|
||||
num_rows: 0,
|
||||
num_row_groups: 0,
|
||||
},
|
||||
file_purger.clone(),
|
||||
)
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
|
||||
@@ -14,7 +14,7 @@
|
||||
|
||||
use std::collections::hash_map::Entry;
|
||||
use std::collections::{BTreeMap, HashMap};
|
||||
use std::fmt::{Debug, Formatter};
|
||||
use std::fmt::Debug;
|
||||
|
||||
use common_telemetry::{debug, info};
|
||||
use common_time::timestamp::TimeUnit;
|
||||
@@ -24,7 +24,7 @@ use common_time::Timestamp;
|
||||
use crate::compaction::buckets::infer_time_bucket;
|
||||
use crate::compaction::compactor::CompactionRegion;
|
||||
use crate::compaction::picker::{Picker, PickerOutput};
|
||||
use crate::compaction::run::{find_sorted_runs, reduce_runs};
|
||||
use crate::compaction::run::{find_sorted_runs, reduce_runs, Item};
|
||||
use crate::compaction::{get_expired_ssts, CompactionOutput};
|
||||
use crate::sst::file::{overlaps, FileHandle, FileId, Level};
|
||||
use crate::sst::version::LevelMeta;
|
||||
@@ -33,31 +33,29 @@ const LEVEL_COMPACTED: Level = 1;
|
||||
|
||||
/// `TwcsPicker` picks files of which the max timestamp are in the same time window as compaction
|
||||
/// candidates.
|
||||
#[derive(Debug)]
|
||||
pub struct TwcsPicker {
|
||||
max_active_window_runs: usize,
|
||||
max_active_window_files: usize,
|
||||
max_inactive_window_runs: usize,
|
||||
max_inactive_window_files: usize,
|
||||
time_window_seconds: Option<i64>,
|
||||
}
|
||||
|
||||
impl Debug for TwcsPicker {
|
||||
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
|
||||
f.debug_struct("TwcsPicker")
|
||||
.field("max_active_window_runs", &self.max_active_window_runs)
|
||||
.field("max_inactive_window_runs", &self.max_inactive_window_runs)
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
|
||||
impl TwcsPicker {
|
||||
pub fn new(
|
||||
max_active_window_runs: usize,
|
||||
max_active_window_files: usize,
|
||||
max_inactive_window_runs: usize,
|
||||
max_inactive_window_files: usize,
|
||||
time_window_seconds: Option<i64>,
|
||||
) -> Self {
|
||||
Self {
|
||||
max_inactive_window_runs,
|
||||
max_active_window_runs,
|
||||
time_window_seconds,
|
||||
max_active_window_files,
|
||||
max_inactive_window_files,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -73,12 +71,15 @@ impl TwcsPicker {
|
||||
for (window, files) in time_windows {
|
||||
let sorted_runs = find_sorted_runs(&mut files.files);
|
||||
|
||||
let max_runs = if let Some(active_window) = active_window
|
||||
let (max_runs, max_files) = if let Some(active_window) = active_window
|
||||
&& *window == active_window
|
||||
{
|
||||
self.max_active_window_runs
|
||||
(self.max_active_window_runs, self.max_active_window_files)
|
||||
} else {
|
||||
self.max_inactive_window_runs
|
||||
(
|
||||
self.max_inactive_window_runs,
|
||||
self.max_inactive_window_files,
|
||||
)
|
||||
};
|
||||
|
||||
// we only remove deletion markers once no file in current window overlaps with any other window.
|
||||
@@ -87,16 +88,33 @@ impl TwcsPicker {
|
||||
|
||||
if found_runs > max_runs {
|
||||
let files_to_compact = reduce_runs(sorted_runs, max_runs);
|
||||
info!("Building compaction output, active window: {:?}, current window: {}, max runs: {}, found runs: {}, output size: {}", active_window, *window,max_runs, found_runs, files_to_compact.len());
|
||||
info!("Building compaction output, active window: {:?}, current window: {}, max runs: {}, found runs: {}, output size: {}, remove deletion markers: {}", active_window, *window,max_runs, found_runs, files_to_compact.len(), filter_deleted);
|
||||
for inputs in files_to_compact {
|
||||
output.push(CompactionOutput {
|
||||
output_file_id: FileId::random(),
|
||||
output_level: LEVEL_COMPACTED, // always compact to l1
|
||||
inputs,
|
||||
filter_deleted,
|
||||
output_time_range: None, // we do not enforce output time range in twcs compactions.});
|
||||
output_time_range: None, // we do not enforce output time range in twcs compactions.
|
||||
});
|
||||
}
|
||||
} else if files.files.len() > max_files {
|
||||
debug!(
|
||||
"Enforcing max file num in window: {}, active: {:?}, max: {}, current: {}",
|
||||
*window,
|
||||
active_window,
|
||||
max_files,
|
||||
files.files.len()
|
||||
);
|
||||
// Files in window exceeds file num limit
|
||||
let to_merge = enforce_file_num(&files.files, max_files);
|
||||
output.push(CompactionOutput {
|
||||
output_file_id: FileId::random(),
|
||||
output_level: LEVEL_COMPACTED, // always compact to l1
|
||||
inputs: to_merge,
|
||||
filter_deleted,
|
||||
output_time_range: None,
|
||||
});
|
||||
} else {
|
||||
debug!("Skip building compaction output, active window: {:?}, current window: {}, max runs: {}, found runs: {}, ", active_window, *window, max_runs, found_runs);
|
||||
}
|
||||
@@ -105,6 +123,31 @@ impl TwcsPicker {
|
||||
}
|
||||
}
|
||||
|
||||
/// Merges consecutive files so that file num does not exceed `max_file_num`, and chooses
|
||||
/// the solution with minimum overhead according to files sizes to be merged.
|
||||
/// `enforce_file_num` only merges consecutive files so that it won't create overlapping outputs.
|
||||
/// `runs` must be sorted according to time ranges.
|
||||
fn enforce_file_num<T: Item>(files: &[T], max_file_num: usize) -> Vec<T> {
|
||||
debug_assert!(files.len() > max_file_num);
|
||||
let to_merge = files.len() - max_file_num + 1;
|
||||
let mut min_penalty = usize::MAX;
|
||||
let mut min_idx = 0;
|
||||
|
||||
for idx in 0..=(files.len() - to_merge) {
|
||||
let current_penalty: usize = files
|
||||
.iter()
|
||||
.skip(idx)
|
||||
.take(to_merge)
|
||||
.map(|f| f.size())
|
||||
.sum();
|
||||
if current_penalty < min_penalty {
|
||||
min_penalty = current_penalty;
|
||||
min_idx = idx;
|
||||
}
|
||||
}
|
||||
files.iter().skip(min_idx).take(to_merge).cloned().collect()
|
||||
}
|
||||
|
||||
impl Picker for TwcsPicker {
|
||||
fn pick(&self, compaction_region: &CompactionRegion) -> Option<PickerOutput> {
|
||||
let region_id = compaction_region.region_id;
|
||||
@@ -264,7 +307,7 @@ mod tests {
|
||||
use std::collections::HashSet;
|
||||
|
||||
use super::*;
|
||||
use crate::compaction::test_util::new_file_handle;
|
||||
use crate::compaction::test_util::{new_file_handle, new_file_handles};
|
||||
use crate::sst::file::Level;
|
||||
|
||||
#[test]
|
||||
@@ -482,7 +525,8 @@ mod tests {
|
||||
let mut windows = assign_to_windows(self.input_files.iter(), self.window_size);
|
||||
let active_window =
|
||||
find_latest_window_in_seconds(self.input_files.iter(), self.window_size);
|
||||
let output = TwcsPicker::new(4, 1, None).build_output(&mut windows, active_window);
|
||||
let output = TwcsPicker::new(4, usize::MAX, 1, usize::MAX, None)
|
||||
.build_output(&mut windows, active_window);
|
||||
|
||||
let output = output
|
||||
.iter()
|
||||
@@ -514,6 +558,43 @@ mod tests {
|
||||
output_level: Level,
|
||||
}
|
||||
|
||||
fn check_enforce_file_num(
|
||||
input_files: &[(i64, i64, u64)],
|
||||
max_file_num: usize,
|
||||
files_to_merge: &[(i64, i64)],
|
||||
) {
|
||||
let mut files = new_file_handles(input_files);
|
||||
// ensure sorted
|
||||
find_sorted_runs(&mut files);
|
||||
let mut to_merge = enforce_file_num(&files, max_file_num);
|
||||
to_merge.sort_unstable_by_key(|f| f.time_range().0);
|
||||
assert_eq!(
|
||||
files_to_merge.to_vec(),
|
||||
to_merge
|
||||
.iter()
|
||||
.map(|f| {
|
||||
let (start, end) = f.time_range();
|
||||
(start.value(), end.value())
|
||||
})
|
||||
.collect::<Vec<_>>()
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_enforce_file_num() {
|
||||
check_enforce_file_num(
|
||||
&[(0, 300, 2), (100, 200, 1), (200, 400, 1)],
|
||||
2,
|
||||
&[(100, 200), (200, 400)],
|
||||
);
|
||||
|
||||
check_enforce_file_num(
|
||||
&[(0, 300, 200), (100, 200, 100), (200, 400, 100)],
|
||||
1,
|
||||
&[(0, 300), (100, 200), (200, 400)],
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_build_twcs_output() {
|
||||
let file_ids = (0..4).map(|_| FileId::random()).collect::<Vec<_>>();
|
||||
|
||||
@@ -220,7 +220,9 @@ async fn test_compaction_region_with_overlapping_delete_all() {
|
||||
let request = CreateRequestBuilder::new()
|
||||
.insert_option("compaction.type", "twcs")
|
||||
.insert_option("compaction.twcs.max_active_window_runs", "2")
|
||||
.insert_option("compaction.twcs.max_active_window_files", "2")
|
||||
.insert_option("compaction.twcs.max_inactive_window_runs", "2")
|
||||
.insert_option("compaction.twcs.max_inactive_window_files", "2")
|
||||
.insert_option("compaction.twcs.time_window", "1h")
|
||||
.build();
|
||||
|
||||
|
||||
@@ -186,9 +186,15 @@ pub struct TwcsOptions {
|
||||
/// Max num of sorted runs that can be kept in active writing time window.
|
||||
#[serde_as(as = "DisplayFromStr")]
|
||||
pub max_active_window_runs: usize,
|
||||
/// Max num of files that can be kept in inactive time window.
|
||||
/// Max num of files in the active window.
|
||||
#[serde_as(as = "DisplayFromStr")]
|
||||
pub max_active_window_files: usize,
|
||||
/// Max num of sorted runs that can be kept in inactive time windows.
|
||||
#[serde_as(as = "DisplayFromStr")]
|
||||
pub max_inactive_window_runs: usize,
|
||||
/// Max num of files in inactive time windows.
|
||||
#[serde_as(as = "DisplayFromStr")]
|
||||
pub max_inactive_window_files: usize,
|
||||
/// Compaction time window defined when creating tables.
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub time_window: Option<Duration>,
|
||||
@@ -217,7 +223,9 @@ impl Default for TwcsOptions {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
max_active_window_runs: 4,
|
||||
max_active_window_files: 4,
|
||||
max_inactive_window_runs: 1,
|
||||
max_inactive_window_files: 1,
|
||||
time_window: None,
|
||||
remote_compaction: false,
|
||||
}
|
||||
@@ -576,7 +584,9 @@ mod tests {
|
||||
let map = make_map(&[
|
||||
("ttl", "7d"),
|
||||
("compaction.twcs.max_active_window_runs", "8"),
|
||||
("compaction.twcs.max_active_window_files", "11"),
|
||||
("compaction.twcs.max_inactive_window_runs", "2"),
|
||||
("compaction.twcs.max_inactive_window_files", "3"),
|
||||
("compaction.twcs.time_window", "2h"),
|
||||
("compaction.type", "twcs"),
|
||||
("compaction.twcs.remote_compaction", "false"),
|
||||
@@ -599,7 +609,9 @@ mod tests {
|
||||
ttl: Some(Duration::from_secs(3600 * 24 * 7)),
|
||||
compaction: CompactionOptions::Twcs(TwcsOptions {
|
||||
max_active_window_runs: 8,
|
||||
max_active_window_files: 11,
|
||||
max_inactive_window_runs: 2,
|
||||
max_inactive_window_files: 3,
|
||||
time_window: Some(Duration::from_secs(3600 * 2)),
|
||||
remote_compaction: false,
|
||||
}),
|
||||
@@ -628,7 +640,9 @@ mod tests {
|
||||
ttl: Some(Duration::from_secs(3600 * 24 * 7)),
|
||||
compaction: CompactionOptions::Twcs(TwcsOptions {
|
||||
max_active_window_runs: 8,
|
||||
max_active_window_files: usize::MAX,
|
||||
max_inactive_window_runs: 2,
|
||||
max_inactive_window_files: usize::MAX,
|
||||
time_window: Some(Duration::from_secs(3600 * 2)),
|
||||
remote_compaction: false,
|
||||
}),
|
||||
@@ -663,7 +677,9 @@ mod tests {
|
||||
"compaction": {
|
||||
"compaction.type": "twcs",
|
||||
"compaction.twcs.max_active_window_runs": "8",
|
||||
"compaction.twcs.max_active_window_files": "11",
|
||||
"compaction.twcs.max_inactive_window_runs": "2",
|
||||
"compaction.twcs.max_inactive_window_files": "7",
|
||||
"compaction.twcs.time_window": "2h"
|
||||
},
|
||||
"storage": "S3",
|
||||
@@ -689,7 +705,9 @@ mod tests {
|
||||
ttl: Some(Duration::from_secs(3600 * 24 * 7)),
|
||||
compaction: CompactionOptions::Twcs(TwcsOptions {
|
||||
max_active_window_runs: 8,
|
||||
max_active_window_files: 11,
|
||||
max_inactive_window_runs: 2,
|
||||
max_inactive_window_files: 7,
|
||||
time_window: Some(Duration::from_secs(3600 * 2)),
|
||||
remote_compaction: false,
|
||||
}),
|
||||
|
||||
@@ -28,7 +28,9 @@ pub fn is_mito_engine_option_key(key: &str) -> bool {
|
||||
"ttl",
|
||||
"compaction.type",
|
||||
"compaction.twcs.max_active_window_runs",
|
||||
"compaction.twcs.max_active_window_files",
|
||||
"compaction.twcs.max_inactive_window_runs",
|
||||
"compaction.twcs.max_inactive_window_files",
|
||||
"compaction.twcs.time_window",
|
||||
"compaction.twcs.remote_compaction",
|
||||
"storage",
|
||||
|
||||
Reference in New Issue
Block a user