mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-08 22:32:55 +00:00
feat(mito): limit compaction output file size (#4754)
* Commit Message Clarify documentation for CompactionOutput struct Updated the documentation for the `CompactionOutput` struct to specify that the output time range is only relevant for windowed compaction. * Add max_output_file_size to TwcsPicker and TwcsOptions - Introduced `max_output_file_size` to `TwcsPicker` struct and its logic to enforce output file size limits during compaction. - Updated `TwcsOptions` to include `max_output_file_size` and adjusted related tests. - Modified `new_picker` function to initialize `TwcsPicker` with the new `max_output_file_size` field. * feat/limit-compaction-output-size: Refactor compaction picker and TWCS to support append mode and improve options handling - Update compaction picker to accept a reference to options and append mode flag - Modify TWCS picker logic to consider append mode when filtering deleted rows - Remove VersionControl usage in compactor and simplify return type - Adjust enforce_max_output_size logic in TWCS picker to handle max output file size - Add append mode flag to TwcsPicker struct - Fix incorrect condition in TWCS picker for enforcing max output size - Update region options tests to reflect new max output file size format (1GB and 7MB) - Simplify InvalidTableOptionSnafu error handling in create_parser - Add `compaction.twcs.max_output_file_size` to mito engine option keys * resolve some comments
This commit is contained in:
@@ -240,7 +240,11 @@ impl CompactionScheduler {
|
||||
request: CompactionRequest,
|
||||
options: compact_request::Options,
|
||||
) -> Result<()> {
|
||||
let picker = new_picker(options.clone(), &request.current_version.options.compaction);
|
||||
let picker = new_picker(
|
||||
&options,
|
||||
&request.current_version.options.compaction,
|
||||
request.current_version.options.append_mode,
|
||||
);
|
||||
let region_id = request.region_id();
|
||||
let CompactionRequest {
|
||||
engine_config,
|
||||
@@ -500,7 +504,7 @@ pub struct CompactionOutput {
|
||||
pub inputs: Vec<FileHandle>,
|
||||
/// Whether to remove deletion markers.
|
||||
pub filter_deleted: bool,
|
||||
/// Compaction output time range.
|
||||
/// Compaction output time range. Only windowed compaction specifies output time range.
|
||||
pub output_time_range: Option<TimestampRange>,
|
||||
}
|
||||
|
||||
|
||||
@@ -38,7 +38,7 @@ use crate::memtable::MemtableBuilderProvider;
|
||||
use crate::read::Source;
|
||||
use crate::region::opener::new_manifest_dir;
|
||||
use crate::region::options::RegionOptions;
|
||||
use crate::region::version::{VersionBuilder, VersionControl, VersionRef};
|
||||
use crate::region::version::{VersionBuilder, VersionRef};
|
||||
use crate::region::ManifestContext;
|
||||
use crate::region::RegionState::Writable;
|
||||
use crate::schedule::scheduler::LocalScheduler;
|
||||
@@ -164,8 +164,7 @@ pub async fn open_compaction_region(
|
||||
.compaction_time_window(manifest.compaction_time_window)
|
||||
.options(req.region_options.clone())
|
||||
.build();
|
||||
let version_control = Arc::new(VersionControl::new(version));
|
||||
version_control.current().version
|
||||
Arc::new(version)
|
||||
};
|
||||
|
||||
Ok(CompactionRegion {
|
||||
@@ -395,8 +394,9 @@ impl Compactor for DefaultCompactor {
|
||||
) -> Result<()> {
|
||||
let picker_output = {
|
||||
let picker_output = new_picker(
|
||||
compact_request_options,
|
||||
&compact_request_options,
|
||||
&compaction_region.region_options.compaction,
|
||||
compaction_region.region_options.append_mode,
|
||||
)
|
||||
.pick(compaction_region);
|
||||
|
||||
|
||||
@@ -119,10 +119,11 @@ impl PickerOutput {
|
||||
|
||||
/// Create a new picker based on the compaction request options and compaction options.
|
||||
pub fn new_picker(
|
||||
compact_request_options: compact_request::Options,
|
||||
compact_request_options: &compact_request::Options,
|
||||
compaction_options: &CompactionOptions,
|
||||
append_mode: bool,
|
||||
) -> Arc<dyn Picker> {
|
||||
if let compact_request::Options::StrictWindow(window) = &compact_request_options {
|
||||
if let compact_request::Options::StrictWindow(window) = compact_request_options {
|
||||
let window = if window.window_seconds == 0 {
|
||||
None
|
||||
} else {
|
||||
@@ -131,13 +132,15 @@ pub fn new_picker(
|
||||
Arc::new(WindowedCompactionPicker::new(window)) as Arc<_>
|
||||
} else {
|
||||
match compaction_options {
|
||||
CompactionOptions::Twcs(twcs_opts) => Arc::new(TwcsPicker::new(
|
||||
twcs_opts.max_active_window_runs,
|
||||
twcs_opts.max_active_window_files,
|
||||
twcs_opts.max_inactive_window_runs,
|
||||
twcs_opts.max_inactive_window_files,
|
||||
twcs_opts.time_window_seconds(),
|
||||
)) as Arc<_>,
|
||||
CompactionOptions::Twcs(twcs_opts) => Arc::new(TwcsPicker {
|
||||
max_active_window_runs: twcs_opts.max_active_window_runs,
|
||||
max_active_window_files: twcs_opts.max_active_window_files,
|
||||
max_inactive_window_runs: twcs_opts.max_inactive_window_runs,
|
||||
max_inactive_window_files: twcs_opts.max_inactive_window_files,
|
||||
time_window_seconds: twcs_opts.time_window_seconds(),
|
||||
max_output_file_size: twcs_opts.max_output_file_size.map(|r| r.as_bytes()),
|
||||
append_mode,
|
||||
}) as Arc<_>,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -35,30 +35,23 @@ const LEVEL_COMPACTED: Level = 1;
|
||||
/// candidates.
|
||||
#[derive(Debug)]
|
||||
pub struct TwcsPicker {
|
||||
max_active_window_runs: usize,
|
||||
max_active_window_files: usize,
|
||||
max_inactive_window_runs: usize,
|
||||
max_inactive_window_files: usize,
|
||||
time_window_seconds: Option<i64>,
|
||||
/// Max allowed sorted runs in active window.
|
||||
pub max_active_window_runs: usize,
|
||||
/// Max allowed files in active window.
|
||||
pub max_active_window_files: usize,
|
||||
/// Max allowed sorted runs in inactive windows.
|
||||
pub max_inactive_window_runs: usize,
|
||||
/// Max allowed files in inactive windows.
|
||||
pub max_inactive_window_files: usize,
|
||||
/// Compaction time window in seconds.
|
||||
pub time_window_seconds: Option<i64>,
|
||||
/// Max allowed compaction output file size.
|
||||
pub max_output_file_size: Option<u64>,
|
||||
/// Whether the target region is in append mode.
|
||||
pub append_mode: bool,
|
||||
}
|
||||
|
||||
impl TwcsPicker {
|
||||
pub fn new(
|
||||
max_active_window_runs: usize,
|
||||
max_active_window_files: usize,
|
||||
max_inactive_window_runs: usize,
|
||||
max_inactive_window_files: usize,
|
||||
time_window_seconds: Option<i64>,
|
||||
) -> Self {
|
||||
Self {
|
||||
max_inactive_window_runs,
|
||||
max_active_window_runs,
|
||||
time_window_seconds,
|
||||
max_active_window_files,
|
||||
max_inactive_window_files,
|
||||
}
|
||||
}
|
||||
|
||||
/// Builds compaction output from files.
|
||||
/// For active writing window, we allow for at most `max_active_window_runs` files to alleviate
|
||||
/// fragmentation. For other windows, we allow at most 1 file at each window.
|
||||
@@ -82,47 +75,114 @@ impl TwcsPicker {
|
||||
)
|
||||
};
|
||||
|
||||
// we only remove deletion markers once no file in current window overlaps with any other window.
|
||||
let found_runs = sorted_runs.len();
|
||||
let filter_deleted = !files.overlapping && (found_runs == 1 || max_runs == 1);
|
||||
// We only remove deletion markers once no file in current window overlaps with any other window
|
||||
// and region is not in append mode.
|
||||
let filter_deleted =
|
||||
!files.overlapping && (found_runs == 1 || max_runs == 1) && !self.append_mode;
|
||||
|
||||
if found_runs > max_runs {
|
||||
let inputs = if found_runs > max_runs {
|
||||
let files_to_compact = reduce_runs(sorted_runs, max_runs);
|
||||
info!("Building compaction output, active window: {:?}, current window: {}, max runs: {}, found runs: {}, output size: {}, remove deletion markers: {}", active_window, *window,max_runs, found_runs, files_to_compact.len(), filter_deleted);
|
||||
for inputs in files_to_compact {
|
||||
output.push(CompactionOutput {
|
||||
output_file_id: FileId::random(),
|
||||
output_level: LEVEL_COMPACTED, // always compact to l1
|
||||
inputs,
|
||||
filter_deleted,
|
||||
output_time_range: None, // we do not enforce output time range in twcs compactions.
|
||||
});
|
||||
}
|
||||
let files_to_compact_len = files_to_compact.len();
|
||||
info!(
|
||||
"Building compaction output, active window: {:?}, \
|
||||
current window: {}, \
|
||||
max runs: {}, \
|
||||
found runs: {}, \
|
||||
output size: {}, \
|
||||
max output size: {:?}, \
|
||||
remove deletion markers: {}",
|
||||
active_window,
|
||||
*window,
|
||||
max_runs,
|
||||
found_runs,
|
||||
files_to_compact_len,
|
||||
self.max_output_file_size,
|
||||
filter_deleted
|
||||
);
|
||||
files_to_compact
|
||||
} else if files.files.len() > max_files {
|
||||
debug!(
|
||||
"Enforcing max file num in window: {}, active: {:?}, max: {}, current: {}",
|
||||
info!(
|
||||
"Enforcing max file num in window: {}, active: {:?}, max: {}, current: {}, max output size: {:?}, filter delete: {}",
|
||||
*window,
|
||||
active_window,
|
||||
max_files,
|
||||
files.files.len()
|
||||
files.files.len(),
|
||||
self.max_output_file_size,
|
||||
filter_deleted,
|
||||
);
|
||||
// Files in window exceeds file num limit
|
||||
let to_merge = enforce_file_num(&files.files, max_files);
|
||||
vec![enforce_file_num(&files.files, max_files)]
|
||||
} else {
|
||||
debug!("Skip building compaction output, active window: {:?}, current window: {}, max runs: {}, found runs: {}, ", active_window, *window, max_runs, found_runs);
|
||||
continue;
|
||||
};
|
||||
|
||||
let split_inputs = if !filter_deleted
|
||||
&& let Some(max_output_file_size) = self.max_output_file_size
|
||||
{
|
||||
let len_before_split = inputs.len();
|
||||
let maybe_split = enforce_max_output_size(inputs, max_output_file_size);
|
||||
if maybe_split.len() != len_before_split {
|
||||
info!("Compaction output file size exceeds threshold {}, split compaction inputs to: {:?}", max_output_file_size, maybe_split);
|
||||
}
|
||||
maybe_split
|
||||
} else {
|
||||
inputs
|
||||
};
|
||||
|
||||
for input in split_inputs {
|
||||
debug_assert!(input.len() > 1);
|
||||
output.push(CompactionOutput {
|
||||
output_file_id: FileId::random(),
|
||||
output_level: LEVEL_COMPACTED, // always compact to l1
|
||||
inputs: to_merge,
|
||||
inputs: input,
|
||||
filter_deleted,
|
||||
output_time_range: None,
|
||||
output_time_range: None, // we do not enforce output time range in twcs compactions.
|
||||
});
|
||||
} else {
|
||||
debug!("Skip building compaction output, active window: {:?}, current window: {}, max runs: {}, found runs: {}, ", active_window, *window, max_runs, found_runs);
|
||||
}
|
||||
}
|
||||
output
|
||||
}
|
||||
}
|
||||
|
||||
/// Limits the size of compaction output in a naive manner.
|
||||
/// todo(hl): we can find the output file size more precisely by checking the time range
|
||||
/// of each row group and adding the sizes of those non-overlapping row groups. But now
|
||||
/// we'd better not to expose the SST details in this level.
|
||||
fn enforce_max_output_size(
|
||||
inputs: Vec<Vec<FileHandle>>,
|
||||
max_output_file_size: u64,
|
||||
) -> Vec<Vec<FileHandle>> {
|
||||
inputs
|
||||
.into_iter()
|
||||
.flat_map(|input| {
|
||||
debug_assert!(input.len() > 1);
|
||||
let estimated_output_size = input.iter().map(|f| f.size()).sum::<u64>();
|
||||
if estimated_output_size < max_output_file_size {
|
||||
// total file size does not exceed the threshold, just return the original input.
|
||||
return vec![input];
|
||||
}
|
||||
let mut splits = vec![];
|
||||
let mut new_input = vec![];
|
||||
let mut new_input_size = 0;
|
||||
for f in input {
|
||||
if new_input_size + f.size() > max_output_file_size {
|
||||
splits.push(std::mem::take(&mut new_input));
|
||||
new_input_size = 0;
|
||||
}
|
||||
new_input_size += f.size();
|
||||
new_input.push(f);
|
||||
}
|
||||
if !new_input.is_empty() {
|
||||
splits.push(new_input);
|
||||
}
|
||||
splits
|
||||
})
|
||||
.filter(|p| p.len() > 1)
|
||||
.collect()
|
||||
}
|
||||
|
||||
/// Merges consecutive files so that file num does not exceed `max_file_num`, and chooses
|
||||
/// the solution with minimum overhead according to files sizes to be merged.
|
||||
/// `enforce_file_num` only merges consecutive files so that it won't create overlapping outputs.
|
||||
@@ -305,10 +365,12 @@ fn find_latest_window_in_seconds<'a>(
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::collections::HashSet;
|
||||
use std::sync::Arc;
|
||||
|
||||
use super::*;
|
||||
use crate::compaction::test_util::{new_file_handle, new_file_handles};
|
||||
use crate::sst::file::Level;
|
||||
use crate::sst::file::{FileMeta, Level};
|
||||
use crate::test_util::NoopFilePurger;
|
||||
|
||||
#[test]
|
||||
fn test_get_latest_window_in_seconds() {
|
||||
@@ -525,8 +587,16 @@ mod tests {
|
||||
let mut windows = assign_to_windows(self.input_files.iter(), self.window_size);
|
||||
let active_window =
|
||||
find_latest_window_in_seconds(self.input_files.iter(), self.window_size);
|
||||
let output = TwcsPicker::new(4, usize::MAX, 1, usize::MAX, None)
|
||||
.build_output(&mut windows, active_window);
|
||||
let output = TwcsPicker {
|
||||
max_active_window_runs: 4,
|
||||
max_active_window_files: usize::MAX,
|
||||
max_inactive_window_runs: 1,
|
||||
max_inactive_window_files: usize::MAX,
|
||||
time_window_seconds: None,
|
||||
max_output_file_size: None,
|
||||
append_mode: false,
|
||||
}
|
||||
.build_output(&mut windows, active_window);
|
||||
|
||||
let output = output
|
||||
.iter()
|
||||
@@ -641,5 +711,43 @@ mod tests {
|
||||
.check();
|
||||
}
|
||||
|
||||
fn make_file_handles(inputs: &[(i64, i64, u64)]) -> Vec<FileHandle> {
|
||||
inputs
|
||||
.iter()
|
||||
.map(|(start, end, size)| {
|
||||
FileHandle::new(
|
||||
FileMeta {
|
||||
region_id: Default::default(),
|
||||
file_id: Default::default(),
|
||||
time_range: (
|
||||
Timestamp::new_millisecond(*start),
|
||||
Timestamp::new_millisecond(*end),
|
||||
),
|
||||
level: 0,
|
||||
file_size: *size,
|
||||
available_indexes: Default::default(),
|
||||
index_file_size: 0,
|
||||
num_rows: 0,
|
||||
num_row_groups: 0,
|
||||
},
|
||||
Arc::new(NoopFilePurger),
|
||||
)
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_limit_output_size() {
|
||||
let mut files = make_file_handles(&[(1, 1, 1)].repeat(6));
|
||||
let runs = find_sorted_runs(&mut files);
|
||||
assert_eq!(6, runs.len());
|
||||
let files_to_merge = reduce_runs(runs, 2);
|
||||
|
||||
let enforced = enforce_max_output_size(files_to_merge, 2);
|
||||
assert_eq!(2, enforced.len());
|
||||
assert_eq!(2, enforced[0].len());
|
||||
assert_eq!(2, enforced[1].len());
|
||||
}
|
||||
|
||||
// TODO(hl): TTL tester that checks if get_expired_ssts function works as expected.
|
||||
}
|
||||
|
||||
@@ -204,6 +204,8 @@ pub struct TwcsOptions {
|
||||
/// Compaction time window defined when creating tables.
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub time_window: Option<Duration>,
|
||||
/// Compaction time window defined when creating tables.
|
||||
pub max_output_file_size: Option<ReadableSize>,
|
||||
/// Whether to use remote compaction.
|
||||
#[serde_as(as = "DisplayFromStr")]
|
||||
pub remote_compaction: bool,
|
||||
@@ -236,6 +238,7 @@ impl Default for TwcsOptions {
|
||||
max_inactive_window_runs: 1,
|
||||
max_inactive_window_files: 1,
|
||||
time_window: None,
|
||||
max_output_file_size: None,
|
||||
remote_compaction: false,
|
||||
fallback_to_local: true,
|
||||
}
|
||||
@@ -597,6 +600,7 @@ mod tests {
|
||||
("compaction.twcs.max_active_window_files", "11"),
|
||||
("compaction.twcs.max_inactive_window_runs", "2"),
|
||||
("compaction.twcs.max_inactive_window_files", "3"),
|
||||
("compaction.twcs.max_output_file_size", "1GB"),
|
||||
("compaction.twcs.time_window", "2h"),
|
||||
("compaction.type", "twcs"),
|
||||
("compaction.twcs.remote_compaction", "false"),
|
||||
@@ -624,6 +628,7 @@ mod tests {
|
||||
max_inactive_window_runs: 2,
|
||||
max_inactive_window_files: 3,
|
||||
time_window: Some(Duration::from_secs(3600 * 2)),
|
||||
max_output_file_size: Some(ReadableSize::gb(1)),
|
||||
remote_compaction: false,
|
||||
fallback_to_local: true,
|
||||
}),
|
||||
@@ -656,6 +661,7 @@ mod tests {
|
||||
max_inactive_window_runs: 2,
|
||||
max_inactive_window_files: usize::MAX,
|
||||
time_window: Some(Duration::from_secs(3600 * 2)),
|
||||
max_output_file_size: None,
|
||||
remote_compaction: false,
|
||||
fallback_to_local: true,
|
||||
}),
|
||||
@@ -693,6 +699,7 @@ mod tests {
|
||||
"compaction.twcs.max_active_window_files": "11",
|
||||
"compaction.twcs.max_inactive_window_runs": "2",
|
||||
"compaction.twcs.max_inactive_window_files": "7",
|
||||
"compaction.twcs.max_output_file_size": "7MB",
|
||||
"compaction.twcs.time_window": "2h"
|
||||
},
|
||||
"storage": "S3",
|
||||
@@ -722,6 +729,7 @@ mod tests {
|
||||
max_inactive_window_runs: 2,
|
||||
max_inactive_window_files: 7,
|
||||
time_window: Some(Duration::from_secs(3600 * 2)),
|
||||
max_output_file_size: Some(ReadableSize::mb(7)),
|
||||
remote_compaction: false,
|
||||
fallback_to_local: true,
|
||||
}),
|
||||
|
||||
@@ -370,12 +370,7 @@ impl<'a> ParserContext<'a> {
|
||||
.map(parse_option_string)
|
||||
.collect::<Result<HashMap<String, String>>>()?;
|
||||
for key in options.keys() {
|
||||
ensure!(
|
||||
validate_table_option(key),
|
||||
InvalidTableOptionSnafu {
|
||||
key: key.to_string()
|
||||
}
|
||||
);
|
||||
ensure!(validate_table_option(key), InvalidTableOptionSnafu { key });
|
||||
}
|
||||
Ok(options.into())
|
||||
}
|
||||
|
||||
@@ -33,6 +33,7 @@ pub fn is_mito_engine_option_key(key: &str) -> bool {
|
||||
"compaction.twcs.max_active_window_files",
|
||||
"compaction.twcs.max_inactive_window_runs",
|
||||
"compaction.twcs.max_inactive_window_files",
|
||||
"compaction.twcs.max_output_file_size",
|
||||
"compaction.twcs.time_window",
|
||||
"compaction.twcs.remote_compaction",
|
||||
"compaction.twcs.fallback_to_local",
|
||||
|
||||
Reference in New Issue
Block a user