From 4b71e493f78365898dfcdbabfd2b637020d69916 Mon Sep 17 00:00:00 2001 From: "Lei, HUANG" <6406592+v0y4g3r@users.noreply.github.com> Date: Fri, 23 May 2025 11:29:08 +0800 Subject: [PATCH] feat!: revise compaction picker (#6121) * - **Refactor `RegionFilePathFactory` to `RegionFilePathProvider`:** Updated references and implementations in `access_layer.rs`, `write_cache.rs`, and related test files to use the new struct name. - **Add `max_file_size` support in compaction:** Introduced `max_file_size` option in `PickerOutput`, `SerializedPickerOutput`, and `WriteOptions` in `compactor.rs`, `picker.rs`, `twcs.rs`, and `window.rs`. - **Enhance Parquet writing logic:** Modified `parquet.rs` and `parquet/writer.rs` to support optional `max_file_size` and added a test case `test_write_multiple_files` to verify writing multiple files based on size constraints. **Refactor Parquet Writer Initialization and File Handling** - Updated `ParquetWriter` in `writer.rs` to handle `current_indexer` as an `Option`, allowing for more flexible initialization and management. - Introduced `finish_current_file` method to encapsulate logic for completing and transitioning between SST files, improving code clarity and maintainability. - Enhanced error handling and logging with `debug` statements for better traceability during file operations. - **Removed Output Size Enforcement in `twcs.rs`:** - Deleted the `enforce_max_output_size` function and related logic to simplify compaction input handling. - **Added Max File Size Option in `parquet.rs`:** - Introduced `max_file_size` in `WriteOptions` to control the maximum size of output files. - **Refactored Indexer Management in `parquet/writer.rs`:** - Changed `current_indexer` from an `Option` to a direct `Indexer` type. - Implemented `roll_to_next_file` to handle file transitions when exceeding `max_file_size`. - Simplified indexer initialization and management logic. - **Refactored SST File Handling**: - Introduced `FilePathProvider` trait and its implementations (`WriteCachePathProvider`, `RegionFilePathFactory`) to manage SST and index file paths. - Updated `AccessLayer`, `WriteCache`, and `ParquetWriter` to use `FilePathProvider` for path management. - Modified `SstWriteRequest` and `SstUploadRequest` to use path providers instead of direct paths. - Files affected: `access_layer.rs`, `write_cache.rs`, `parquet.rs`, `writer.rs`. - **Enhanced Indexer Management**: - Replaced `IndexerBuilder` with `IndexerBuilderImpl` and made it async to support dynamic indexer creation. - Updated `ParquetWriter` to handle multiple indexers and file IDs. - Files affected: `index.rs`, `parquet.rs`, `writer.rs`. - **Removed Redundant File ID Handling**: - Removed `file_id` from `SstWriteRequest` and `CompactionOutput`. - Updated related logic to dynamically generate file IDs where necessary. - Files affected: `compaction.rs`, `flush.rs`, `picker.rs`, `twcs.rs`, `window.rs`. - **Test Adjustments**: - Updated tests to align with new path and indexer management. - Introduced `FixedPathProvider` and `NoopIndexBuilder` for testing purposes. - Files affected: `sst_util.rs`, `version_util.rs`, `parquet.rs`. * chore: rebase main * feat/multiple-compaction-output: ### Add Benchmarking and Refactor Compaction Logic - **Benchmarking**: Added a new benchmark `run_bench` in `Cargo.toml` and implemented benchmarks in `benches/run_bench.rs` using Criterion for `find_sorted_runs` and `reduce_runs` functions. - **Compaction Module Enhancements**: - Made `run.rs` public and refactored the `Ranged` and `Item` traits to be public. - Simplified the logic in `find_sorted_runs` and `reduce_runs` by removing `MergeItems` and related functions. - Introduced `find_overlapping_items` for identifying overlapping items. - **Code Cleanup**: Removed redundant code and tests related to `MergeItems` in `run.rs`. * feat/multiple-compaction-output: ### Enhance Compaction Logic and Add Benchmarks - **Compaction Logic Improvements**: - Updated `reduce_runs` function in `src/mito2/src/compaction/run.rs` to remove the target parameter and improve the logic for selecting files to merge based on minimum penalty. - Enhanced `find_overlapping_items` to handle unsorted inputs and improve overlap detection efficiency. - **Benchmark Enhancements**: - Added `bench_find_overlapping_items` in `src/mito2/benches/run_bench.rs` to benchmark the new `find_overlapping_items` function. - Extended existing benchmarks to include larger data sizes. - **Testing Enhancements**: - Updated tests in `src/mito2/src/compaction/run.rs` to reflect changes in `reduce_runs` and added new tests for `find_overlapping_items`. - **Logging and Debugging**: - Improved logging in `src/mito2/src/compaction/twcs.rs` to provide more detailed information about compaction decisions. * feat/multiple-compaction-output: ### Refactor and Enhance Compaction Logic - **Refactor `find_overlapping_items` Function**: Changed the function signature to accept slices instead of mutable vectors in `run.rs`. - **Rename and Update Struct Fields**: Renamed `penalty` to `size` in `SortedRun` struct and updated related logic in `run.rs`. - **Enhance `reduce_runs` Function**: Improved logic to sort runs by size and limit probe runs to 100 in `run.rs`. - **Add `merge_seq_files` Function**: Introduced a new function `merge_seq_files` in `run.rs` for merging sequential files. - **Modify `TwcsPicker` Logic**: Updated the compaction logic to use `merge_seq_files` when only one run is found in `twcs.rs`. - **Remove `enforce_file_num` Function**: Deleted the `enforce_file_num` function and its related test cases in `twcs.rs`. * feat/multiple-compaction-output: ### Enhance Compaction Logic and Testing - **Add `merge_seq_files` Functionality**: Implemented the `merge_seq_files` function in `run.rs` to optimize file merging based on scoring systems. Updated benchmarks in `run_bench.rs` to include `bench_merge_seq_files`. - **Improve Compaction Strategy in `twcs.rs`**: Modified the compaction logic to handle file merging more effectively, considering file size and overlap. - **Update Tests**: Enhanced test coverage in `compaction_test.rs` and `append_mode_test.rs` to validate new compaction logic and file merging strategies. - **Remove Unused Function**: Deleted `new_file_handles` from `test_util.rs` as it was no longer needed. * feat/multiple-compaction-output: ### Refactor TWCS Compaction Options - **Refactor Compaction Logic**: Simplified the TWCS compaction logic by replacing multiple parameters (`max_active_window_runs`, `max_active_window_files`, `max_inactive_window_runs`, `max_inactive_window_files`) with a single `trigger_file_num` parameter in `picker.rs`, `twcs.rs`, and `options.rs`. - **Update Tests**: Adjusted test cases to reflect the new compaction logic in `append_mode_test.rs`, `compaction_test.rs`, `filter_deleted_test.rs`, `merge_mode_test.rs`, and various test files under `tests/cases`. - **Modify Engine Options**: Updated engine option keys to use `trigger_file_num` in `mito_engine_options.rs` and `region_request.rs`. - **Fuzz Testing**: Updated fuzz test generators and translators to accommodate the new compaction parameter in `alter_expr.rs` and related files. This refactor aims to streamline the compaction configuration by reducing the number of parameters and simplifying the codebase. * chore: add trailing space * fix license header * feat/revise-compaction-picker: **Limit File Processing and Optimize Merge Logic in `run.rs`** - Introduced a limit to process a maximum of 100 files in `merge_seq_files` to control time complexity. - Adjusted logic to calculate `target_size` and iterate over files using the limited set of files. - Updated scoring calculations to use the limited file set, ensuring efficient file merging. * feat/revise-compaction-picker: ### Add Compaction Metrics and Remove Debug Logging - **Compaction Metrics**: Introduced new histograms `COMPACTION_INPUT_BYTES` and `COMPACTION_OUTPUT_BYTES` to track compaction input and output file sizes in `metrics.rs`. Updated `compactor.rs` to observe these metrics during the compaction process. - **Logging Cleanup**: Removed debug logging of file ranges during the merge process in `twcs.rs`. * feat/revise-compaction-picker: ## Enhance Compaction Logic and Metrics - **Compaction Logic Improvements**: - Added methods `input_file_size` and `output_file_size` to `MergeOutput` in `compactor.rs` to streamline file size calculations. - Updated `Compactor` implementation to use these methods for metrics tracking. - Modified `Ranged` trait logic in `run.rs` to improve range comparison. - Enhanced test cases in `run.rs` to reflect changes in compaction logic. - **Metrics Enhancements**: - Changed `COMPACTION_INPUT_BYTES` and `COMPACTION_OUTPUT_BYTES` from histograms to counters in `metrics.rs` for better performance tracking. - **Debugging and Logging**: - Added detailed logging for compaction pick results in `twcs.rs`. - Implemented custom `Debug` trait for `FileMeta` in `file.rs` to improve debugging output. - **Testing Enhancements**: - Added new test `test_compaction_overlapping_files` in `compaction_test.rs` to verify compaction behavior with overlapping files. - Updated `merge_mode_test.rs` to reflect changes in file handling during scans. * feat/revise-compaction-picker: ### Update `FileHandle` Debug Implementation - **Refactor Debug Output**: Simplified the `fmt::Debug` implementation for `FileHandle` in `src/mito2/src/sst/file.rs` by consolidating multiple fields into a single `meta` field using `meta_ref()`. - **Atomic Operations**: Updated the `deleted` field to use atomic loading with `Ordering::Relaxed`. * Trigger CI * feat/revise-compaction-picker: **Update compaction logic and default options** - **`twcs.rs`**: Enhanced logging for compaction pick results by improving the formatting for better readability. - **`options.rs`**: Modified the default `max_output_file_size` in `TwcsOptions` from 2GB to 512MB to optimize file handling and performance. * feat/revise-compaction-picker: Refactor `find_overlapping_items` to use an external result vector - Updated `find_overlapping_items` in `src/mito2/src/compaction/run.rs` to accept a mutable result vector instead of returning a new vector, improving memory efficiency. - Modified benchmarks in `src/mito2/benches/bench_compaction_picker.rs` to accommodate the new function signature. - Adjusted tests in `src/mito2/src/compaction/run.rs` to use the updated function signature, ensuring correct functionality with the new approach. * feat/revise-compaction-picker: Improve file merging logic in `run.rs` - Refactor the loop logic in `merge_seq_files` to simplify the iteration over file groups. - Adjust the range for `end_idx` to include the endpoint, allowing for more flexible group selection. - Remove the condition that skips groups with only one file, enabling more comprehensive processing of file sequences. * feat/revise-compaction-picker: Enhance `find_overlapping_items` with `SortedRun` and Update Tests - Refactor `find_overlapping_items` in `src/mito2/src/compaction/run.rs` to utilize the `SortedRun` struct for improved efficiency and clarity. - Introduce a `sorted` flag in `SortedRun` to optimize sorting operations. - Update test cases in `src/mito2/benches/bench_compaction_picker.rs` to accommodate changes in `find_overlapping_items` by using `SortedRun`. - Add `From>` implementation for `SortedRun` to facilitate easy conversion from vectors. * feat/revise-compaction-picker: **Enhancements in `compaction/run.rs`:** - Added `ReadableSize` import to handle size calculations. - Modified the logic in `merge_seq_files` to clamp the calculated target size to a maximum of 2GB when `max_file_size` is not provided. * feat/revise-compaction-picker: Add Default Max Output Size Constant for Compaction Introduce DEFAULT_MAX_OUTPUT_SIZE constant to define the default maximum compaction output file size as 2GB. Refactor the merge_seq_files function to utilize this constant, ensuring consistent and maintainable code for handling file size limits during compaction. --- src/mito2/Cargo.toml | 5 + src/mito2/benches/bench_compaction_picker.rs | 157 ++++ src/mito2/src/access_layer.rs | 2 +- src/mito2/src/compaction.rs | 2 +- src/mito2/src/compaction/compactor.rs | 13 + src/mito2/src/compaction/picker.rs | 11 +- src/mito2/src/compaction/run.rs | 833 +++++++++--------- src/mito2/src/compaction/task.rs | 4 +- src/mito2/src/compaction/test_util.rs | 27 - src/mito2/src/compaction/twcs.rs | 339 +++---- src/mito2/src/compaction/window.rs | 1 + src/mito2/src/engine/append_mode_test.rs | 4 +- src/mito2/src/engine/compaction_test.rs | 149 +++- src/mito2/src/engine/filter_deleted_test.rs | 1 - src/mito2/src/engine/merge_mode_test.rs | 4 +- src/mito2/src/lib.rs | 1 + src/mito2/src/metrics.rs | 17 +- src/mito2/src/region/options.rs | 51 +- src/mito2/src/sst/file.rs | 50 +- src/mito2/src/sst/parquet.rs | 65 +- src/mito2/src/sst/parquet/writer.rs | 114 ++- src/mito2/src/worker/handle_alter.rs | 26 +- src/store-api/src/mito_engine_options.rs | 20 +- src/store-api/src/region_request.rs | 39 +- tests-fuzz/src/generator/alter_expr.rs | 22 +- tests-fuzz/src/ir/alter_expr.rs | 56 +- tests-fuzz/src/translator/mysql/alter_expr.rs | 10 +- .../src/translator/postgres/alter_expr.rs | 10 +- .../cases/distributed/explain/order_by.result | 2 +- tests/cases/distributed/explain/order_by.sql | 2 +- .../common/alter/alter_table_options.result | 170 ++-- .../common/alter/alter_table_options.sql | 10 +- .../common/create/create_with_options.result | 5 +- .../common/create/create_with_options.sql | 5 +- .../common/order/windowed_sort.result | 4 +- .../standalone/common/order/windowed_sort.sql | 4 +- .../common/select/flush_append_only.result | 3 +- .../common/select/flush_append_only.sql | 3 +- .../standalone/optimizer/order_by.result | 2 +- tests/cases/standalone/optimizer/order_by.sql | 2 +- 40 files changed, 1172 insertions(+), 1073 deletions(-) create mode 100644 src/mito2/benches/bench_compaction_picker.rs diff --git a/src/mito2/Cargo.toml b/src/mito2/Cargo.toml index 9ffbc17e6a..6a72568bb7 100644 --- a/src/mito2/Cargo.toml +++ b/src/mito2/Cargo.toml @@ -97,3 +97,8 @@ required-features = ["test"] name = "bench_filter_time_partition" harness = false required-features = ["test"] + +[[bench]] +name = "bench_compaction_picker" +harness = false +required-features = ["test"] diff --git a/src/mito2/benches/bench_compaction_picker.rs b/src/mito2/benches/bench_compaction_picker.rs new file mode 100644 index 0000000000..75acd1ca48 --- /dev/null +++ b/src/mito2/benches/bench_compaction_picker.rs @@ -0,0 +1,157 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use criterion::{black_box, criterion_group, criterion_main, Criterion}; +use mito2::compaction::run::{ + find_overlapping_items, find_sorted_runs, merge_seq_files, reduce_runs, Item, Ranged, SortedRun, +}; + +#[derive(Clone, Debug, Eq, Hash, PartialEq)] +struct MockFile { + start: i64, + end: i64, + size: usize, +} + +impl Ranged for MockFile { + type BoundType = i64; + + fn range(&self) -> (Self::BoundType, Self::BoundType) { + (self.start, self.end) + } +} + +impl Item for MockFile { + fn size(&self) -> usize { + self.size + } +} + +fn generate_test_files(n: usize) -> Vec { + let mut files = Vec::with_capacity(n); + for _ in 0..n { + // Create slightly overlapping ranges to force multiple sorted runs + files.push(MockFile { + start: 0, + end: 10, + size: 10, + }); + } + files +} + +fn bench_find_sorted_runs(c: &mut Criterion) { + let mut group = c.benchmark_group("find_sorted_runs"); + + for size in [10, 100, 1000].iter() { + group.bench_function(format!("size_{}", size), |b| { + let mut files = generate_test_files(*size); + b.iter(|| { + find_sorted_runs(black_box(&mut files)); + }); + }); + } + group.finish(); +} + +fn bench_reduce_runs(c: &mut Criterion) { + let mut group = c.benchmark_group("reduce_runs"); + + for size in [10, 100, 1000].iter() { + group.bench_function(format!("size_{}", size), |b| { + let mut files = generate_test_files(*size); + let runs = find_sorted_runs(&mut files); + b.iter(|| { + reduce_runs(black_box(runs.clone())); + }); + }); + } + group.finish(); +} + +fn bench_find_overlapping_items(c: &mut Criterion) { + let mut group = c.benchmark_group("find_overlapping_items"); + + for size in [10, 100, 1000].iter() { + group.bench_function(format!("size_{}", size), |b| { + // Create two sets of files with some overlapping ranges + let mut files1 = Vec::with_capacity(*size); + let mut files2 = Vec::with_capacity(*size); + + for i in 0..*size { + files1.push(MockFile { + start: i as i64, + end: (i + 5) as i64, + size: 10, + }); + + files2.push(MockFile { + start: (i + 3) as i64, + end: (i + 8) as i64, + size: 10, + }); + } + + let mut r1 = SortedRun::from(files1); + let mut r2 = SortedRun::from(files2); + b.iter(|| { + let mut result = vec![]; + find_overlapping_items(black_box(&mut r1), black_box(&mut r2), &mut result); + }); + }); + } + group.finish(); +} + +fn bench_merge_seq_files(c: &mut Criterion) { + let mut group = c.benchmark_group("merge_seq_files"); + + for size in [10, 100, 1000].iter() { + group.bench_function(format!("size_{}", size), |b| { + // Create a set of files with varying sizes + let mut files = Vec::with_capacity(*size); + + for i in 0..*size { + // Create files with different sizes to test the scoring algorithm + let file_size = if i % 3 == 0 { + 5 + } else if i % 3 == 1 { + 10 + } else { + 15 + }; + + files.push(MockFile { + start: i as i64, + end: (i + 1) as i64, + size: file_size, + }); + } + + b.iter(|| { + merge_seq_files(black_box(&files), black_box(Some(50))); + }); + }); + } + group.finish(); +} + +criterion_group!( + benches, + bench_find_sorted_runs, + bench_reduce_runs, + bench_find_overlapping_items, + bench_merge_seq_files +); +criterion_main!(benches); diff --git a/src/mito2/src/access_layer.rs b/src/mito2/src/access_layer.rs index a0c313b210..ae0091c7c2 100644 --- a/src/mito2/src/access_layer.rs +++ b/src/mito2/src/access_layer.rs @@ -362,7 +362,7 @@ impl FilePathProvider for WriteCachePathProvider { /// Path provider that builds paths in region storage path. #[derive(Clone, Debug)] pub(crate) struct RegionFilePathFactory { - region_dir: String, + pub(crate) region_dir: String, } impl RegionFilePathFactory { diff --git a/src/mito2/src/compaction.rs b/src/mito2/src/compaction.rs index 8b1c319161..ca50f6c6eb 100644 --- a/src/mito2/src/compaction.rs +++ b/src/mito2/src/compaction.rs @@ -15,7 +15,7 @@ mod buckets; pub mod compactor; pub mod picker; -mod run; +pub mod run; mod task; #[cfg(test)] mod test_util; diff --git a/src/mito2/src/compaction/compactor.rs b/src/mito2/src/compaction/compactor.rs index dfddb43726..27aecd42f8 100644 --- a/src/mito2/src/compaction/compactor.rs +++ b/src/mito2/src/compaction/compactor.rs @@ -36,6 +36,7 @@ use crate::error::{EmptyRegionDirSnafu, JoinSnafu, ObjectStoreNotFoundSnafu, Res use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList}; use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions}; use crate::manifest::storage::manifest_compress_type; +use crate::metrics; use crate::read::Source; use crate::region::opener::new_manifest_dir; use crate::region::options::RegionOptions; @@ -240,6 +241,14 @@ impl MergeOutput { pub fn is_empty(&self) -> bool { self.files_to_add.is_empty() && self.files_to_remove.is_empty() } + + pub fn input_file_size(&self) -> u64 { + self.files_to_remove.iter().map(|f| f.file_size).sum() + } + + pub fn output_file_size(&self) -> u64 { + self.files_to_add.iter().map(|f| f.file_size).sum() + } } /// Compactor is the trait that defines the compaction logic. @@ -286,6 +295,7 @@ impl Compactor for DefaultCompactor { compacted_inputs.extend(output.inputs.iter().map(|f| f.meta_ref().clone())); let write_opts = WriteOptions { write_buffer_size: compaction_region.engine_config.sst_write_buffer_size, + max_file_size: picker_output.max_file_size, ..Default::default() }; @@ -460,6 +470,9 @@ impl Compactor for DefaultCompactor { ); return Ok(()); } + + metrics::COMPACTION_INPUT_BYTES.inc_by(merge_output.input_file_size() as f64); + metrics::COMPACTION_OUTPUT_BYTES.inc_by(merge_output.output_file_size() as f64); self.update_manifest(compaction_region, merge_output) .await?; diff --git a/src/mito2/src/compaction/picker.rs b/src/mito2/src/compaction/picker.rs index 431973c3b6..5256cc93f8 100644 --- a/src/mito2/src/compaction/picker.rs +++ b/src/mito2/src/compaction/picker.rs @@ -45,6 +45,8 @@ pub struct PickerOutput { pub outputs: Vec, pub expired_ssts: Vec, pub time_window_size: i64, + /// Max single output file size in bytes. + pub max_file_size: Option, } /// SerializedPickerOutput is a serialized version of PickerOutput by replacing [CompactionOutput] and [FileHandle] with [SerializedCompactionOutput] and [FileMeta]. @@ -53,6 +55,7 @@ pub struct SerializedPickerOutput { pub outputs: Vec, pub expired_ssts: Vec, pub time_window_size: i64, + pub max_file_size: Option, } impl From<&PickerOutput> for SerializedPickerOutput { @@ -76,6 +79,7 @@ impl From<&PickerOutput> for SerializedPickerOutput { outputs, expired_ssts, time_window_size: input.time_window_size, + max_file_size: input.max_file_size, } } } @@ -111,6 +115,7 @@ impl PickerOutput { outputs, expired_ssts, time_window_size: input.time_window_size, + max_file_size: input.max_file_size, } } } @@ -131,10 +136,7 @@ pub fn new_picker( } else { match compaction_options { CompactionOptions::Twcs(twcs_opts) => Arc::new(TwcsPicker { - max_active_window_runs: twcs_opts.max_active_window_runs, - max_active_window_files: twcs_opts.max_active_window_files, - max_inactive_window_runs: twcs_opts.max_inactive_window_runs, - max_inactive_window_files: twcs_opts.max_inactive_window_files, + trigger_file_num: twcs_opts.trigger_file_num, time_window_seconds: twcs_opts.time_window_seconds(), max_output_file_size: twcs_opts.max_output_file_size.map(|r| r.as_bytes()), append_mode, @@ -179,6 +181,7 @@ mod tests { ], expired_ssts: expired_ssts_file_handle.clone(), time_window_size: 1000, + max_file_size: None, }; let picker_output_str = diff --git a/src/mito2/src/compaction/run.rs b/src/mito2/src/compaction/run.rs index 8bd5150082..ba425741cf 100644 --- a/src/mito2/src/compaction/run.rs +++ b/src/mito2/src/compaction/run.rs @@ -15,17 +15,17 @@ //! This file contains code to find sorted runs in a set if ranged items and //! along with the best way to merge these items to satisfy the desired run count. -use std::cmp::Ordering; - +use common_base::readable_size::ReadableSize; use common_base::BitVec; use common_time::Timestamp; -use itertools::Itertools; -use smallvec::{smallvec, SmallVec}; use crate::sst::file::FileHandle; -/// Trait for any items with specific range. -pub(crate) trait Ranged { +/// Default max compaction output file size when not specified. +const DEFAULT_MAX_OUTPUT_SIZE: u64 = ReadableSize::gb(2).as_bytes(); + +/// Trait for any items with specific range (both boundaries are inclusive). +pub trait Ranged { type BoundType: Ord + Copy; /// Returns the inclusive range of item. @@ -37,10 +37,75 @@ pub(crate) trait Ranged { { let (lhs_start, lhs_end) = self.range(); let (rhs_start, rhs_end) = other.range(); - match lhs_start.cmp(&rhs_start) { - Ordering::Less => lhs_end >= rhs_start, - Ordering::Equal => true, - Ordering::Greater => lhs_start <= rhs_end, + + lhs_start.max(rhs_start) < lhs_end.min(rhs_end) + } +} + +pub fn find_overlapping_items( + l: &mut SortedRun, + r: &mut SortedRun, + result: &mut Vec, +) { + if l.items.is_empty() || r.items.is_empty() { + return; + } + + result.clear(); + result.reserve(l.items.len() + r.items.len()); + + // Sort both arrays by start boundary for more efficient overlap detection + if !l.sorted { + sort_ranged_items(&mut l.items); + l.sorted = true; + } + if !r.sorted { + sort_ranged_items(&mut r.items); + r.sorted = true; + } + + let mut r_idx = 0; + + let mut selected = BitVec::repeat(false, r.items().len() + l.items.len()); + + for (lhs_idx, lhs) in l.items.iter().enumerate() { + let (lhs_start, lhs_end) = lhs.range(); + + // Skip right elements that end before current left element starts + while r_idx < r.items.len() { + let (_, rhs_end) = r.items[r_idx].range(); + if rhs_end < lhs_start { + r_idx += 1; + } else { + break; + } + } + + // Check for overlaps with remaining right elements + let mut j = r_idx; + while j < r.items.len() { + let (rhs_start, rhs_end) = r.items[j].range(); + + // If right element starts after left element ends, no more overlaps possible + if rhs_start > lhs_end { + break; + } + + // We have an overlap + if lhs_start.max(rhs_start) <= lhs_end.min(rhs_end) { + if !selected[lhs_idx] { + result.push(lhs.clone()); + selected.set(lhs_idx, true); + } + + let rhs_selected_idx = l.items.len() + j; + if !selected[rhs_selected_idx] { + result.push(r.items[j].clone()); + selected.set(rhs_selected_idx, true); + } + } + + j += 1; } } } @@ -55,7 +120,7 @@ fn sort_ranged_items(values: &mut [T]) { } /// Trait for items to merge. -pub(crate) trait Item: Ranged + Clone { +pub trait Item: Ranged + Clone { /// Size is used to calculate the cost of merging items. fn size(&self) -> usize; } @@ -74,74 +139,36 @@ impl Item for FileHandle { } } -#[derive(Debug, Clone)] -struct MergeItems { - items: SmallVec<[T; 4]>, - start: T::BoundType, - end: T::BoundType, - size: usize, -} - -impl Ranged for MergeItems { - type BoundType = T::BoundType; - - fn range(&self) -> (Self::BoundType, Self::BoundType) { - (self.start, self.end) - } -} - -impl MergeItems { - /// Creates unmerged item from given value. - pub fn new_unmerged(val: T) -> Self { - let (start, end) = val.range(); - let size = val.size(); - Self { - items: smallvec![val], - start, - end, - size, - } - } - - /// The range of current merge item - pub(crate) fn range(&self) -> (T::BoundType, T::BoundType) { - (self.start, self.end) - } - - /// Merges current item with other item. - pub(crate) fn merge(self, other: Self) -> Self { - let start = self.start.min(other.start); - let end = self.end.max(other.end); - let size = self.size + other.size; - - let mut items = SmallVec::with_capacity(self.items.len() + other.items.len()); - items.extend(self.items); - items.extend(other.items); - Self { - start, - end, - size, - items, - } - } - - /// Returns true if current item is merged from two items. - pub fn merged(&self) -> bool { - self.items.len() > 1 - } -} - /// A set of files with non-overlapping time ranges. #[derive(Debug, Clone)] -pub(crate) struct SortedRun { +pub struct SortedRun { /// Items to merge - items: Vec>, - /// penalty is defined as the total size of merged items. - penalty: usize, + items: Vec, + /// The total size of all items. + size: usize, /// The lower bound of all items. start: Option, - // The upper bound of all items. + /// The upper bound of all items. end: Option, + /// Whether items are sorted. + sorted: bool, +} + +impl From> for SortedRun { + fn from(items: Vec) -> Self { + let mut r = Self { + items: Vec::with_capacity(items.len()), + size: 0, + start: None, + end: None, + sorted: false, + }; + for item in items { + r.push_item(item); + } + + r + } } impl Default for SortedRun @@ -151,9 +178,10 @@ where fn default() -> Self { Self { items: vec![], - penalty: 0, + size: 0, start: None, end: None, + sorted: false, } } } @@ -162,11 +190,13 @@ impl SortedRun where T: Item, { - fn push_item(&mut self, t: MergeItems) { + pub fn items(&self) -> &[T] { + &self.items + } + + fn push_item(&mut self, t: T) { let (file_start, file_end) = t.range(); - if t.merged() { - self.penalty += t.size; - } + self.size += t.size(); self.items.push(t); self.start = Some(self.start.map_or(file_start, |v| v.min(file_start))); self.end = Some(self.end.map_or(file_end, |v| v.max(file_end))); @@ -174,7 +204,7 @@ where } /// Finds sorted runs in given items. -pub(crate) fn find_sorted_runs(items: &mut [T]) -> Vec> +pub fn find_sorted_runs(items: &mut [T]) -> Vec> where T: Item, { @@ -195,20 +225,19 @@ where // item is already assigned. continue; } - let current_item = MergeItems::new_unmerged(item.clone()); match current_run.items.last() { None => { // current run is empty, just add current_item selected.set(true); - current_run.push_item(current_item); + current_run.push_item(item.clone()); } Some(last) => { // the current item does not overlap with the last item in current run, // then it belongs to current run. - if !last.overlap(¤t_item) { + if !last.overlap(item) { // does not overlap, push to current run selected.set(true); - current_run.push_item(current_item); + current_run.push_item(item.clone()); } } } @@ -219,52 +248,118 @@ where runs } -fn merge_all_runs(runs: Vec>) -> SortedRun { - assert!(!runs.is_empty()); - let mut all_items = runs - .into_iter() - .flat_map(|r| r.items.into_iter()) - .collect::>(); - - sort_ranged_items(&mut all_items); - - let mut res = SortedRun::default(); - let mut iter = all_items.into_iter(); - // safety: all_items is not empty - let mut current_item = iter.next().unwrap(); - - for item in iter { - if current_item.overlap(&item) { - current_item = current_item.merge(item); - } else { - res.push_item(current_item); - current_item = item; +/// Finds a set of files with minimum penalty to merge that can reduce the total num of runs. +/// The penalty of merging is defined as the size of all overlapping files between two runs. +pub fn reduce_runs(mut runs: Vec>) -> Vec { + assert!(runs.len() > 1); + // sort runs by size + runs.sort_unstable_by(|a, b| a.size.cmp(&b.size)); + // limit max probe runs to 100 + let probe_end = runs.len().min(100); + let mut min_penalty = usize::MAX; + let mut files = vec![]; + let mut temp_files = vec![]; + for i in 0..probe_end { + for j in i + 1..probe_end { + let (a, b) = runs.split_at_mut(j); + find_overlapping_items(&mut a[i], &mut b[0], &mut temp_files); + let penalty = temp_files.iter().map(|e| e.size()).sum(); + if penalty < min_penalty { + min_penalty = penalty; + files.clear(); + files.extend_from_slice(&temp_files); + } } } - res.push_item(current_item); - res + files } -/// Reduces the num of runs to given target and returns items to merge. -/// The time complexity of this function is `C_{k}_{runs.len()}` where k=`runs.len()`-target+1. -pub(crate) fn reduce_runs(runs: Vec>, target: usize) -> Vec> { - assert_ne!(target, 0); - if target >= runs.len() { - // already satisfied. +/// Finds the optimal set of adjacent files to merge based on a scoring system. +/// +/// This function evaluates all possible contiguous subsets of files to find the best +/// candidates for merging, considering: +/// +/// 1. File reduction - prioritizes merging more files to reduce the total count +/// 2. Write amplification - minimizes the ratio of largest file to total size +/// 3. Size efficiency - prefers merges that utilize available space effectively +/// +/// When multiple merge candidates have the same score, older files (those with lower indices) +/// are preferred. +/// +/// # Arguments +/// * `input_files` - Slice of files to consider for merging +/// * `max_file_size` - Optional maximum size constraint for the merged file. +/// If None, uses 1.5 times the average file size. +/// +/// # Returns +/// A vector containing the best set of adjacent files to merge. +/// Returns an empty vector if input is empty or contains only one file. +pub fn merge_seq_files(input_files: &[T], max_file_size: Option) -> Vec { + if input_files.is_empty() || input_files.len() == 1 { return vec![]; } - let k = runs.len() + 1 - target; - runs.into_iter() - .combinations(k) // find all possible solutions - .map(|runs_to_merge| merge_all_runs(runs_to_merge)) // calculate merge penalty - .min_by(|p, r| p.penalty.cmp(&r.penalty)) // find solution with the min penalty - .unwrap() // safety: their must be at least one solution. - .items - .into_iter() - .filter(|m| m.merged()) // find all files to merge in that solution - .map(|m| m.items.to_vec()) - .collect() + // Limit the number of files to process to 100 to control time complexity + let files_to_process = if input_files.len() > 100 { + &input_files[0..100] + } else { + input_files + }; + + // Calculate target size based on max_file_size or average file size + let target_size = match max_file_size { + Some(size) => size as usize, + None => { + // Calculate 1.5*average_file_size if max_file_size is not provided and clamp to 2GB + let total_size: usize = files_to_process.iter().map(|f| f.size()).sum(); + ((((total_size as f64) / (files_to_process.len() as f64)) * 1.5) as usize) + .min(DEFAULT_MAX_OUTPUT_SIZE as usize) + } + }; + + // Find the best group of adjacent files to merge + let mut best_group = Vec::new(); + let mut best_score = f64::NEG_INFINITY; + + // Try different starting positions - iterate from end to start to prefer older files + for start_idx in (0..files_to_process.len()).rev() { + // Try different ending positions - also iterate from end to start + for end_idx in (start_idx + 1..files_to_process.len()).rev() { + let group = &files_to_process[start_idx..=end_idx]; + let total_size: usize = group.iter().map(|f| f.size()).sum(); + + // Skip if total size exceeds target size + if total_size > target_size { + continue; // Use continue instead of break to check smaller ranges + } + + // Calculate amplification factor (largest file size / total size) + let largest_file_size = group.iter().map(|f| f.size()).max().unwrap_or(0); + let amplification_factor = largest_file_size as f64 / total_size as f64; + + // Calculate file reduction (number of files that will be reduced) + let file_reduction = group.len() - 1; + + // Calculate score based on multiple factors: + // 1. File reduction (higher is better) + // 2. Amplification factor (lower is better) + // 3. Size efficiency (how close to target size) + let file_reduction_score = file_reduction as f64 / files_to_process.len() as f64; + let amp_factor_score = (1.0 - amplification_factor) * 1.5; // Lower amplification is better + let size_efficiency = (total_size as f64 / target_size as f64).min(1.0); // Reward using available space + + let score = file_reduction_score + amp_factor_score + size_efficiency; + + // Check if this group is better than our current best + // Use >= instead of > to prefer older files (which we encounter first due to reverse iteration) + if score >= best_score { + best_score = score; + best_group = group.to_vec(); + } + } + } + + best_group } #[cfg(test)] @@ -273,7 +368,7 @@ mod tests { use super::*; - #[derive(Clone, Debug)] + #[derive(Clone, Debug, PartialEq)] struct MockFile { start: i64, end: i64, @@ -305,6 +400,17 @@ mod tests { .collect() } + fn build_items_with_size(items: &[(i64, i64, usize)]) -> Vec { + items + .iter() + .map(|(start, end, size)| MockFile { + start: *start, + end: *end, + size: *size, + }) + .collect() + } + fn check_sorted_runs( ranges: &[(i64, i64)], expected_runs: &[Vec<(i64, i64)>], @@ -325,7 +431,7 @@ mod tests { check_sorted_runs(&[], &[]); check_sorted_runs(&[(1, 1), (2, 2)], &[vec![(1, 1), (2, 2)]]); check_sorted_runs(&[(1, 2)], &[vec![(1, 2)]]); - check_sorted_runs(&[(1, 2), (2, 3)], &[vec![(1, 2)], vec![(2, 3)]]); + check_sorted_runs(&[(1, 2), (2, 3)], &[vec![(1, 2), (2, 3)]]); check_sorted_runs(&[(1, 2), (3, 4)], &[vec![(1, 2), (3, 4)]]); check_sorted_runs(&[(2, 4), (1, 3)], &[vec![(1, 3)], vec![(2, 4)]]); check_sorted_runs( @@ -350,11 +456,11 @@ mod tests { check_sorted_runs( &[(1, 2), (3, 4), (4, 6), (7, 8)], - &[vec![(1, 2), (3, 4), (7, 8)], vec![(4, 6)]], + &[vec![(1, 2), (3, 4), (4, 6), (7, 8)]], ); check_sorted_runs( &[(1, 2), (3, 4), (5, 6), (3, 6), (7, 8), (8, 9)], - &[vec![(1, 2), (3, 6), (7, 8)], vec![(3, 4), (5, 6), (8, 9)]], + &[vec![(1, 2), (3, 6), (7, 8), (8, 9)], vec![(3, 4), (5, 6)]], ); check_sorted_runs( @@ -366,205 +472,29 @@ mod tests { &[(10, 19), (20, 29), (21, 22), (30, 39), (31, 32), (32, 42)], &[ vec![(10, 19), (20, 29), (30, 39)], - vec![(21, 22), (31, 32)], - vec![(32, 42)], + vec![(21, 22), (31, 32), (32, 42)], ], ); } - fn check_merge_sorted_runs( - items: &[(i64, i64)], - expected_penalty: usize, - expected: &[Vec<(i64, i64)>], - ) { - let mut items = build_items(items); - let runs = find_sorted_runs(&mut items); - assert_eq!(2, runs.len()); - let res = merge_all_runs(runs); - let penalty = res.penalty; - let ranges = res - .items - .into_iter() - .map(|i| { - i.items - .into_iter() - .map(|f| (f.start, f.end)) - .sorted_by(|l, r| l.0.cmp(&r.0).then(l.1.cmp(&r.1))) - .collect::>() - }) - .collect::>(); - assert_eq!(expected, &ranges); - assert_eq!(expected_penalty, penalty); - } - - #[test] - fn test_merge_sorted_runs() { - // [1..2] - // [1...3] - check_merge_sorted_runs(&[(1, 2), (1, 3)], 3, &[vec![(1, 2), (1, 3)]]); - - // [1..2][3..4] - // [2..3] - check_merge_sorted_runs( - &[(1, 2), (2, 3), (3, 4)], - 3, - &[vec![(1, 2), (2, 3), (3, 4)]], - ); - - // [1..10][11..20][21...30] - // [18] - check_merge_sorted_runs( - &[(1, 10), (11, 20), (21, 30), (18, 18)], - 9, - &[vec![(1, 10)], vec![(11, 20), (18, 18)], vec![(21, 30)]], - ); - - // [1..3][4..5] - // [2...4] - check_merge_sorted_runs( - &[(1, 3), (2, 4), (4, 5)], - 5, - &[vec![(1, 3), (2, 4), (4, 5)]], - ); - - // [1..2][3..4] [7..8] - // [4..6] - check_merge_sorted_runs( - &[(1, 2), (3, 4), (4, 6), (7, 8)], - 3, - &[vec![(1, 2)], vec![(3, 4), (4, 6)], vec![(7, 8)]], - ); - - // [1..2][3..4][5..6][7..8] - // [3........6] [8..9] - // - check_merge_sorted_runs( - &[(1, 2), (3, 4), (5, 6), (3, 6), (7, 8), (8, 9)], - 7, - &[ - vec![(1, 2)], - vec![(3, 4), (3, 6), (5, 6)], - vec![(7, 8), (8, 9)], - ], - ); - - // [10.....19][20........29][30........39] - // [21..22] [31..32] - check_merge_sorted_runs( - &[(10, 19), (20, 29), (21, 22), (30, 39), (31, 32)], - 20, - &[ - vec![(10, 19)], - vec![(20, 29), (21, 22)], - vec![(30, 39), (31, 32)], - ], - ); - - // [1..10][11..20][21..30] - // [1..10] [21..30] - check_merge_sorted_runs( - &[(1, 10), (1, 10), (11, 20), (21, 30), (21, 30)], - 36, - &[ - vec![(1, 10), (1, 10)], - vec![(11, 20)], - vec![(21, 30), (21, 30)], - ], - ); - - // [1..10][11..20][21...30] - // [22..30] - check_merge_sorted_runs( - &[(1, 10), (11, 20), (21, 30), (22, 30)], - 17, - &[vec![(1, 10)], vec![(11, 20)], vec![(21, 30), (22, 30)]], - ); - } - - /// files: file arrangement with two sorted runs. - fn check_merge_all_sorted_runs( - files: &[(i64, i64)], - expected_penalty: usize, - expected: &[Vec<(i64, i64)>], - ) { - let mut files = build_items(files); - let runs = find_sorted_runs(&mut files); - let result = merge_all_runs(runs); - assert_eq!(expected_penalty, result.penalty); - assert_eq!(expected.len(), result.items.len()); - let res = result - .items - .iter() - .map(|i| { - let mut res = i.items.iter().map(|f| (f.start, f.end)).collect::>(); - res.sort_unstable_by(|l, r| l.0.cmp(&r.0)); - res - }) - .collect::>(); - assert_eq!(expected, &res); - } - - #[test] - fn test_merge_all_sorted_runs() { - // [1..2][3..4] - // [4..10] - check_merge_all_sorted_runs( - &[(1, 2), (3, 4), (4, 10)], - 7, // 1+6 - &[vec![(1, 2)], vec![(3, 4), (4, 10)]], - ); - - // [1..2] [3..4] [5..6] - // [4..........10] - check_merge_all_sorted_runs( - &[(1, 2), (3, 4), (5, 6), (4, 10)], - 8, // 1+1+6 - &[vec![(1, 2)], vec![(3, 4), (4, 10), (5, 6)]], - ); - - // [10..20] [30..40] [50....60] - // [35........55] - // [51..61] - check_merge_all_sorted_runs( - &[(10, 20), (30, 40), (50, 60), (35, 55), (51, 61)], - 50, - &[vec![(10, 20)], vec![(30, 40), (35, 55), (50, 60), (51, 61)]], - ); - } - - #[test] - fn test_sorted_runs_time_range() { - let mut files = build_items(&[(1, 2), (3, 4), (4, 10)]); - let runs = find_sorted_runs(&mut files); - assert_eq!(2, runs.len()); - let SortedRun { start, end, .. } = &runs[0]; - assert_eq!(Some(1), *start); - assert_eq!(Some(4), *end); - - let SortedRun { start, end, .. } = &runs[1]; - assert_eq!(Some(4), *start); - assert_eq!(Some(10), *end); - } - fn check_reduce_runs( files: &[(i64, i64)], expected_runs: &[Vec<(i64, i64)>], - target: usize, - expected: &[Vec<(i64, i64)>], + expected: &[(i64, i64)], ) { let runs = check_sorted_runs(files, expected_runs); - let files_to_merge = reduce_runs(runs, target); - let file_timestamps = files_to_merge + if runs.len() <= 1 { + assert!(expected.is_empty()); + return; + } + let files_to_merge = reduce_runs(runs); + let file_to_merge_timestamps = files_to_merge .into_iter() - .map(|f| { - let mut overlapping = f.into_iter().map(|f| (f.start, f.end)).collect::>(); - overlapping.sort_unstable_by(|l, r| l.0.cmp(&r.0).then(l.1.cmp(&r.1))); - overlapping - }) + .map(|f| (f.start, f.end)) .collect::>(); let expected = expected.iter().cloned().collect::>(); - assert_eq!(&expected, &file_timestamps); + assert_eq!(&expected, &file_to_merge_timestamps); } #[test] @@ -574,8 +504,7 @@ mod tests { check_reduce_runs( &[(1, 3), (2, 4), (5, 6)], &[vec![(1, 3), (5, 6)], vec![(2, 4)]], - 1, - &[vec![(1, 3), (2, 4)]], + &[(1, 3), (2, 4)], ); // [1..2][3..5] @@ -583,56 +512,36 @@ mod tests { check_reduce_runs( &[(1, 2), (3, 5), (4, 6)], &[vec![(1, 2), (3, 5)], vec![(4, 6)]], - 1, - &[vec![(3, 5), (4, 6)]], - ); - - // [1..4] - // [2..5] - // [3..6] - check_reduce_runs( - &[(1, 4), (2, 5), (3, 6)], - &[vec![(1, 4)], vec![(2, 5)], vec![(3, 6)]], - 1, - &[vec![(1, 4), (2, 5), (3, 6)]], - ); - check_reduce_runs( - &[(1, 4), (2, 5), (3, 6)], - &[vec![(1, 4)], vec![(2, 5)], vec![(3, 6)]], - 2, - &[vec![(1, 4), (2, 5)]], + &[(3, 5), (4, 6)], ); // [1..2][3..4] [7..8] // [4..6] check_reduce_runs( &[(1, 2), (3, 4), (4, 6), (7, 8)], - &[vec![(1, 2), (3, 4), (7, 8)], vec![(4, 6)]], - 1, - &[vec![(3, 4), (4, 6)]], + &[vec![(1, 2), (3, 4), (4, 6), (7, 8)]], + &[], ); - // [1..2][3........6][7..8] - // [3..4][5..6] [8..9] + // [1..2][3........6][7..8][8..9] + // [3..4][5..6] check_reduce_runs( &[(1, 2), (3, 4), (5, 6), (3, 6), (7, 8), (8, 9)], - &[vec![(1, 2), (3, 6), (7, 8)], vec![(3, 4), (5, 6), (8, 9)]], - 2, - &[], // already satisfied + &[vec![(1, 2), (3, 6), (7, 8), (8, 9)], vec![(3, 4), (5, 6)]], + &[(5, 6), (3, 4), (3, 6)], // already satisfied ); - // [1..2][3........6][7..8] - // [3..4][5..6] [8..9] + // [1..2][3........6][7..8][8..9] + // [3..4][5..6] check_reduce_runs( &[(1, 2), (3, 4), (5, 6), (3, 6), (7, 8), (8, 9)], - &[vec![(1, 2), (3, 6), (7, 8)], vec![(3, 4), (5, 6), (8, 9)]], - 1, - &[vec![(3, 4), (3, 6), (5, 6)], vec![(7, 8), (8, 9)]], + &[vec![(1, 2), (3, 6), (7, 8), (8, 9)], vec![(3, 4), (5, 6)]], + &[(3, 4), (3, 6), (5, 6)], ); - // [10..20] [30..40] [50........80] [100..110] - // [50..60] [80...100] - // [80..90] + // [10..20] [30..40] [50........80][80...100][100..110] + // [50..60] [80..90] + // check_reduce_runs( &[ (10, 20), @@ -644,34 +553,10 @@ mod tests { (100, 110), ], &[ - vec![(10, 20), (30, 40), (50, 80), (100, 110)], - vec![(50, 60), (80, 100)], - vec![(80, 90)], + vec![(10, 20), (30, 40), (50, 80), (80, 100), (100, 110)], + vec![(50, 60), (80, 90)], ], - 2, - &[vec![(80, 90), (80, 100)]], - ); - - // [10..20] [30..40] [50........80] [100..110] - // [50..60] [80.......100] - // [80..90] - check_reduce_runs( - &[ - (10, 20), - (30, 40), - (50, 60), - (50, 80), - (80, 90), - (80, 100), - (100, 110), - ], - &[ - vec![(10, 20), (30, 40), (50, 80), (100, 110)], - vec![(50, 60), (80, 100)], - vec![(80, 90)], - ], - 1, - &[vec![(50, 60), (50, 80), (80, 90), (80, 100), (100, 110)]], + &[(50, 80), (80, 100), (50, 60), (80, 90)], ); // [0..10] @@ -681,29 +566,181 @@ mod tests { check_reduce_runs( &[(0, 10), (0, 11), (0, 12), (0, 13)], &[vec![(0, 13)], vec![(0, 12)], vec![(0, 11)], vec![(0, 10)]], - 4, - &[], - ); - // enforce 3 runs - check_reduce_runs( - &[(0, 10), (0, 11), (0, 12), (0, 13)], - &[vec![(0, 13)], vec![(0, 12)], vec![(0, 11)], vec![(0, 10)]], - 3, - &[vec![(0, 10), (0, 11)]], - ); - // enforce 2 runs - check_reduce_runs( - &[(0, 10), (0, 11), (0, 12), (0, 13)], - &[vec![(0, 13)], vec![(0, 12)], vec![(0, 11)], vec![(0, 10)]], - 2, - &[vec![(0, 10), (0, 11), (0, 12)]], - ); - // enforce 1 run - check_reduce_runs( - &[(0, 10), (0, 11), (0, 12), (0, 13)], - &[vec![(0, 13)], vec![(0, 12)], vec![(0, 11)], vec![(0, 10)]], - 1, - &[vec![(0, 10), (0, 11), (0, 12), (0, 13)]], + &[(0, 10), (0, 11)], ); } + + #[test] + fn test_find_overlapping_items() { + let mut result = Vec::new(); + + // Test empty inputs + find_overlapping_items( + &mut SortedRun::from(Vec::::new()), + &mut SortedRun::from(Vec::::new()), + &mut result, + ); + assert_eq!(result, Vec::::new()); + + let files1 = build_items(&[(1, 3)]); + find_overlapping_items( + &mut SortedRun::from(files1.clone()), + &mut SortedRun::from(Vec::::new()), + &mut result, + ); + assert_eq!(result, Vec::::new()); + + find_overlapping_items( + &mut SortedRun::from(Vec::::new()), + &mut SortedRun::from(files1.clone()), + &mut result, + ); + assert_eq!(result, Vec::::new()); + + // Test non-overlapping ranges + let files1 = build_items(&[(1, 3), (5, 7)]); + let files2 = build_items(&[(10, 12), (15, 20)]); + find_overlapping_items( + &mut SortedRun::from(files1), + &mut SortedRun::from(files2), + &mut result, + ); + assert_eq!(result, Vec::::new()); + + // Test simple overlap + let files1 = build_items(&[(1, 5)]); + let files2 = build_items(&[(3, 7)]); + find_overlapping_items( + &mut SortedRun::from(files1), + &mut SortedRun::from(files2), + &mut result, + ); + assert_eq!(result.len(), 2); + assert_eq!(result[0].range(), (1, 5)); + assert_eq!(result[1].range(), (3, 7)); + + // Test multiple overlaps + let files1 = build_items(&[(1, 5), (8, 12), (15, 20)]); + let files2 = build_items(&[(3, 6), (7, 10), (18, 25)]); + find_overlapping_items( + &mut SortedRun::from(files1), + &mut SortedRun::from(files2), + &mut result, + ); + assert_eq!(result.len(), 6); + + // Test boundary cases (touching but not overlapping) + let files1 = build_items(&[(1, 5)]); + let files2 = build_items(&[(5, 10)]); // Touching at 5 + find_overlapping_items( + &mut SortedRun::from(files1), + &mut SortedRun::from(files2), + &mut result, + ); + assert_eq!(result.len(), 2); // Should overlap since ranges are inclusive + + // Test completely contained ranges + let files1 = build_items(&[(1, 10)]); + let files2 = build_items(&[(3, 7)]); + find_overlapping_items( + &mut SortedRun::from(files1), + &mut SortedRun::from(files2), + &mut result, + ); + assert_eq!(result.len(), 2); + + // Test identical ranges + let files1 = build_items(&[(1, 5)]); + let files2 = build_items(&[(1, 5)]); + find_overlapping_items( + &mut SortedRun::from(files1), + &mut SortedRun::from(files2), + &mut result, + ); + assert_eq!(result.len(), 2); + + // Test unsorted input handling + let files1 = build_items(&[(5, 10), (1, 3)]); // Unsorted + let files2 = build_items(&[(2, 7), (8, 12)]); // Unsorted + find_overlapping_items( + &mut SortedRun::from(files1), + &mut SortedRun::from(files2), + &mut result, + ); + assert_eq!(result.len(), 4); // Should find both overlaps + } + + #[test] + fn test_merge_seq_files() { + // Test empty input + let files = Vec::::new(); + assert_eq!(merge_seq_files(&files, None), Vec::::new()); + + // Test single file input (should return empty vec as no merge needed) + let files = build_items(&[(1, 5)]); + assert_eq!(merge_seq_files(&files, None), Vec::::new()); + + // Test the example case: [10, 1, 1, 1] - should merge the last three files + let files = build_items_with_size(&[(1, 2, 10), (3, 4, 1), (5, 6, 1), (7, 8, 1)]); + let result = merge_seq_files(&files, None); + assert_eq!(result.len(), 3); + assert_eq!(result[0].size, 1); + assert_eq!(result[1].size, 1); + assert_eq!(result[2].size, 1); + + // Test with files of equal size - should merge as many as possible + let files = build_items_with_size(&[(1, 2, 5), (3, 4, 5), (5, 6, 5), (7, 8, 5)]); + let result = merge_seq_files(&files, Some(20)); + assert_eq!(result.len(), 4); // Should merge all 4 files as total size is 20 + + // Test with max_file_size constraint + let files = build_items_with_size(&[(1, 2, 5), (3, 4, 5), (5, 6, 5), (7, 8, 5)]); + let result = merge_seq_files(&files, Some(10)); + assert_eq!(result.len(), 2); // Should merge only 2 files as max size is 10 + + // Test with uneven file sizes - should prioritize reducing file count + let files = build_items_with_size(&[(1, 2, 2), (3, 4, 3), (5, 6, 4), (7, 8, 10)]); + let result = merge_seq_files(&files, Some(10)); + assert_eq!(result.len(), 3); // Should merge the first 3 files (total size 9) + + // Test amplification factor prioritization + // Two possible merges: [5, 5] (amp factor 0.5) vs [10, 1, 1] (amp factor 0.83) + let files = + build_items_with_size(&[(1, 2, 5), (3, 4, 5), (5, 6, 10), (7, 8, 1), (9, 10, 1)]); + let result = merge_seq_files(&files, Some(12)); + assert_eq!(result.len(), 2); + assert_eq!(result[0].size, 5); + assert_eq!(result[1].size, 5); + + // Test with large file preventing merges + let files = build_items_with_size(&[(1, 2, 100), (3, 4, 1), (5, 6, 1), (7, 8, 1)]); + let result = merge_seq_files(&files, Some(10)); + assert_eq!(result.len(), 3); // Should merge the last 3 small files + assert_eq!(result[0].size, 1); + assert_eq!(result[1].size, 1); + assert_eq!(result[2].size, 1); + + let files = build_items_with_size(&[(1, 2, 100), (3, 4, 20), (5, 6, 20), (7, 8, 20)]); + let result = merge_seq_files(&files, Some(200)); + assert_eq!(result.len(), 4); + + let files = build_items_with_size(&[(1, 2, 160), (3, 4, 20), (5, 6, 20), (7, 8, 20)]); + let result = merge_seq_files(&files, None); + assert_eq!(result.len(), 3); + assert_eq!(result[0].size, 20); + assert_eq!(result[1].size, 20); + assert_eq!(result[2].size, 20); + + let files = build_items_with_size(&[(1, 2, 100), (3, 4, 1)]); + let result = merge_seq_files(&files, Some(200)); + assert_eq!(result.len(), 2); + assert_eq!(result[0].size, 100); + assert_eq!(result[1].size, 1); + + let files = build_items_with_size(&[(1, 2, 20), (3, 4, 20), (5, 6, 20), (7, 8, 20)]); + let result = merge_seq_files(&files, Some(40)); + assert_eq!(result.len(), 2); + assert_eq!(result[0].start, 1); + assert_eq!(result[1].start, 3); + } } diff --git a/src/mito2/src/compaction/task.rs b/src/mito2/src/compaction/task.rs index f083e09587..2508c3f6ae 100644 --- a/src/mito2/src/compaction/task.rs +++ b/src/mito2/src/compaction/task.rs @@ -22,7 +22,6 @@ use tokio::sync::mpsc; use crate::compaction::compactor::{CompactionRegion, Compactor}; use crate::compaction::picker::{CompactionTask, PickerOutput}; -use crate::error; use crate::error::CompactRegionSnafu; use crate::manifest::action::RegionEdit; use crate::metrics::{COMPACTION_FAILURE_COUNT, COMPACTION_STAGE_ELAPSED}; @@ -30,6 +29,7 @@ use crate::request::{ BackgroundNotify, CompactionFailed, CompactionFinished, OutputTx, WorkerRequest, }; use crate::worker::WorkerListener; +use crate::{error, metrics}; /// Maximum number of compaction tasks in parallel. pub const MAX_PARALLEL_COMPACTION: usize = 1; @@ -98,6 +98,8 @@ impl CompactionTaskImpl { }; let merge_time = merge_timer.stop_and_record(); + metrics::COMPACTION_INPUT_BYTES.inc_by(compaction_result.input_file_size() as f64); + metrics::COMPACTION_OUTPUT_BYTES.inc_by(compaction_result.output_file_size() as f64); info!( "Compacted SST files, region_id: {}, input: {:?}, output: {:?}, window: {:?}, waiter_num: {}, merge_time: {}s", self.compaction_region.region_id, diff --git a/src/mito2/src/compaction/test_util.rs b/src/mito2/src/compaction/test_util.rs index 4baa6a9db5..26d4d1c3a8 100644 --- a/src/mito2/src/compaction/test_util.rs +++ b/src/mito2/src/compaction/test_util.rs @@ -44,30 +44,3 @@ pub fn new_file_handle( file_purger, ) } - -pub(crate) fn new_file_handles(file_specs: &[(i64, i64, u64)]) -> Vec { - let file_purger = new_noop_file_purger(); - file_specs - .iter() - .map(|(start, end, size)| { - FileHandle::new( - FileMeta { - region_id: 0.into(), - file_id: FileId::random(), - time_range: ( - Timestamp::new_millisecond(*start), - Timestamp::new_millisecond(*end), - ), - level: 0, - file_size: *size, - available_indexes: Default::default(), - index_file_size: 0, - num_rows: 0, - num_row_groups: 0, - sequence: None, - }, - file_purger.clone(), - ) - }) - .collect() -} diff --git a/src/mito2/src/compaction/twcs.rs b/src/mito2/src/compaction/twcs.rs index a4e8913eef..fc8f8e2bad 100644 --- a/src/mito2/src/compaction/twcs.rs +++ b/src/mito2/src/compaction/twcs.rs @@ -16,15 +16,17 @@ use std::collections::hash_map::Entry; use std::collections::{BTreeMap, HashMap}; use std::fmt::Debug; -use common_telemetry::{info, trace}; +use common_base::readable_size::ReadableSize; +use common_telemetry::info; use common_time::timestamp::TimeUnit; use common_time::timestamp_millis::BucketAligned; use common_time::Timestamp; +use store_api::storage::RegionId; use crate::compaction::buckets::infer_time_bucket; use crate::compaction::compactor::CompactionRegion; use crate::compaction::picker::{Picker, PickerOutput}; -use crate::compaction::run::{find_sorted_runs, reduce_runs, Item}; +use crate::compaction::run::{find_sorted_runs, merge_seq_files, reduce_runs}; use crate::compaction::{get_expired_ssts, CompactionOutput}; use crate::sst::file::{overlaps, FileHandle, Level}; use crate::sst::version::LevelMeta; @@ -35,14 +37,8 @@ const LEVEL_COMPACTED: Level = 1; /// candidates. #[derive(Debug)] pub struct TwcsPicker { - /// Max allowed sorted runs in active window. - pub max_active_window_runs: usize, - /// Max allowed files in active window. - pub max_active_window_files: usize, - /// Max allowed sorted runs in inactive windows. - pub max_inactive_window_runs: usize, - /// Max allowed files in inactive windows. - pub max_inactive_window_files: usize, + /// Minimum file num to trigger a compaction. + pub trigger_file_num: usize, /// Compaction time window in seconds. pub time_window_seconds: Option, /// Max allowed compaction output file size. @@ -53,89 +49,48 @@ pub struct TwcsPicker { impl TwcsPicker { /// Builds compaction output from files. - /// For active writing window, we allow for at most `max_active_window_runs` files to alleviate - /// fragmentation. For other windows, we allow at most 1 file at each window. fn build_output( &self, + region_id: RegionId, time_windows: &mut BTreeMap, active_window: Option, ) -> Vec { let mut output = vec![]; for (window, files) in time_windows { + if files.files.is_empty() { + continue; + } let sorted_runs = find_sorted_runs(&mut files.files); + let found_runs = sorted_runs.len(); + // We only remove deletion markers if we found less than 2 runs and not in append mode. + // because after compaction there will be no overlapping files. + let filter_deleted = !files.overlapping && found_runs <= 2 && !self.append_mode; - let (max_runs, max_files) = if let Some(active_window) = active_window - && *window == active_window - { - (self.max_active_window_runs, self.max_active_window_files) + let inputs = if found_runs > 1 { + reduce_runs(sorted_runs) } else { - ( - self.max_inactive_window_runs, - self.max_inactive_window_files, - ) + let run = sorted_runs.last().unwrap(); + if run.items().len() < self.trigger_file_num { + continue; + } + // no overlapping files, try merge small files + merge_seq_files(run.items(), self.max_output_file_size) }; - let found_runs = sorted_runs.len(); - // We only remove deletion markers once no file in current window overlaps with any other window - // and region is not in append mode. - let filter_deleted = - !files.overlapping && (found_runs == 1 || max_runs == 1) && !self.append_mode; - - let inputs = if found_runs > max_runs { - let files_to_compact = reduce_runs(sorted_runs, max_runs); - let files_to_compact_len = files_to_compact.len(); - info!( - "Building compaction output, active window: {:?}, \ - current window: {}, \ - max runs: {}, \ - found runs: {}, \ - output size: {}, \ - max output size: {:?}, \ - remove deletion markers: {}", - active_window, + if !inputs.is_empty() { + log_pick_result( + region_id, *window, - max_runs, + active_window, found_runs, - files_to_compact_len, - self.max_output_file_size, - filter_deleted - ); - files_to_compact - } else if files.files.len() > max_files { - info!( - "Enforcing max file num in window: {}, active: {:?}, max: {}, current: {}, max output size: {:?}, filter delete: {}", - *window, - active_window, - max_files, files.files.len(), self.max_output_file_size, filter_deleted, + &inputs, ); - // Files in window exceeds file num limit - vec![enforce_file_num(&files.files, max_files)] - } else { - trace!("Skip building compaction output, active window: {:?}, current window: {}, max runs: {}, found runs: {}, ", active_window, *window, max_runs, found_runs); - continue; - }; - - let split_inputs = if !filter_deleted - && let Some(max_output_file_size) = self.max_output_file_size - { - let len_before_split = inputs.len(); - let maybe_split = enforce_max_output_size(inputs, max_output_file_size); - if maybe_split.len() != len_before_split { - info!("Compaction output file size exceeds threshold {}, split compaction inputs to: {:?}", max_output_file_size, maybe_split); - } - maybe_split - } else { - inputs - }; - - for input in split_inputs { - debug_assert!(input.len() > 1); output.push(CompactionOutput { output_level: LEVEL_COMPACTED, // always compact to l1 - inputs: input, + inputs, filter_deleted, output_time_range: None, // we do not enforce output time range in twcs compactions. }); @@ -145,66 +100,50 @@ impl TwcsPicker { } } -/// Limits the size of compaction output in a naive manner. -/// todo(hl): we can find the output file size more precisely by checking the time range -/// of each row group and adding the sizes of those non-overlapping row groups. But now -/// we'd better not to expose the SST details in this level. -fn enforce_max_output_size( - inputs: Vec>, - max_output_file_size: u64, -) -> Vec> { - inputs - .into_iter() - .flat_map(|input| { - debug_assert!(input.len() > 1); - let estimated_output_size = input.iter().map(|f| f.size()).sum::(); - if estimated_output_size < max_output_file_size { - // total file size does not exceed the threshold, just return the original input. - return vec![input]; - } - let mut splits = vec![]; - let mut new_input = vec![]; - let mut new_input_size = 0; - for f in input { - if new_input_size + f.size() > max_output_file_size { - splits.push(std::mem::take(&mut new_input)); - new_input_size = 0; - } - new_input_size += f.size(); - new_input.push(f); - } - if !new_input.is_empty() { - splits.push(new_input); - } - splits +#[allow(clippy::too_many_arguments)] +fn log_pick_result( + region_id: RegionId, + window: i64, + active_window: Option, + found_runs: usize, + file_num: usize, + max_output_file_size: Option, + filter_deleted: bool, + inputs: &[FileHandle], +) { + let input_file_str: Vec = inputs + .iter() + .map(|f| { + let range = f.time_range(); + let start = range.0.to_iso8601_string(); + let end = range.1.to_iso8601_string(); + let num_rows = f.num_rows(); + format!( + "SST{{id: {}, range: ({}, {}), size: {}, num rows: {} }}", + f.file_id(), + start, + end, + ReadableSize(f.size()), + num_rows + ) }) - .filter(|p| p.len() > 1) - .collect() -} - -/// Merges consecutive files so that file num does not exceed `max_file_num`, and chooses -/// the solution with minimum overhead according to files sizes to be merged. -/// `enforce_file_num` only merges consecutive files so that it won't create overlapping outputs. -/// `runs` must be sorted according to time ranges. -fn enforce_file_num(files: &[T], max_file_num: usize) -> Vec { - debug_assert!(files.len() > max_file_num); - let to_merge = files.len() - max_file_num + 1; - let mut min_penalty = usize::MAX; - let mut min_idx = 0; - - for idx in 0..=(files.len() - to_merge) { - let current_penalty: usize = files - .iter() - .skip(idx) - .take(to_merge) - .map(|f| f.size()) - .sum(); - if current_penalty < min_penalty { - min_penalty = current_penalty; - min_idx = idx; - } - } - files.iter().skip(min_idx).take(to_merge).cloned().collect() + .collect(); + let window_str = Timestamp::new_second(window).to_iso8601_string(); + let active_window_str = active_window.map(|s| Timestamp::new_second(s).to_iso8601_string()); + let max_output_file_size = max_output_file_size.map(|size| ReadableSize(size).to_string()); + info!( + "Region ({:?}) compaction pick result: current window: {}, active window: {:?}, \ + found runs: {}, file num: {}, max output file size: {:?}, filter deleted: {}, \ + input files: {:?}", + region_id, + window_str, + active_window_str, + found_runs, + file_num, + max_output_file_size, + filter_deleted, + input_file_str + ); } impl Picker for TwcsPicker { @@ -240,16 +179,18 @@ impl Picker for TwcsPicker { // Assign files to windows let mut windows = assign_to_windows(levels.iter().flat_map(LevelMeta::files), time_window_size); - let outputs = self.build_output(&mut windows, active_window); + let outputs = self.build_output(region_id, &mut windows, active_window); if outputs.is_empty() && expired_ssts.is_empty() { return None; } + let max_file_size = self.max_output_file_size.map(|v| v as usize); Some(PickerOutput { outputs, expired_ssts, time_window_size, + max_file_size, }) } } @@ -368,12 +309,10 @@ fn find_latest_window_in_seconds<'a>( #[cfg(test)] mod tests { use std::collections::HashSet; - use std::sync::Arc; use super::*; - use crate::compaction::test_util::{new_file_handle, new_file_handles}; - use crate::sst::file::{FileId, FileMeta, Level}; - use crate::test_util::NoopFilePurger; + use crate::compaction::test_util::new_file_handle; + use crate::sst::file::{FileId, Level}; #[test] fn test_get_latest_window_in_seconds() { @@ -614,25 +553,31 @@ mod tests { impl CompactionPickerTestCase { fn check(&self) { + let file_id_to_idx = self + .input_files + .iter() + .enumerate() + .map(|(idx, file)| (file.file_id(), idx)) + .collect::>(); let mut windows = assign_to_windows(self.input_files.iter(), self.window_size); let active_window = find_latest_window_in_seconds(self.input_files.iter(), self.window_size); let output = TwcsPicker { - max_active_window_runs: 4, - max_active_window_files: usize::MAX, - max_inactive_window_runs: 1, - max_inactive_window_files: usize::MAX, + trigger_file_num: 4, time_window_seconds: None, max_output_file_size: None, append_mode: false, } - .build_output(&mut windows, active_window); + .build_output(RegionId::from_u64(0), &mut windows, active_window); let output = output .iter() .map(|o| { - let input_file_ids = - o.inputs.iter().map(|f| f.file_id()).collect::>(); + let input_file_ids = o + .inputs + .iter() + .map(|f| file_id_to_idx.get(&f.file_id()).copied().unwrap()) + .collect::>(); (input_file_ids, o.output_level) }) .collect::>(); @@ -641,11 +586,7 @@ mod tests { .expected_outputs .iter() .map(|o| { - let input_file_ids = o - .input_files - .iter() - .map(|idx| self.input_files[*idx].file_id()) - .collect::>(); + let input_file_ids = o.input_files.iter().copied().collect::>(); (input_file_ids, o.output_level) }) .collect::>(); @@ -658,47 +599,11 @@ mod tests { output_level: Level, } - fn check_enforce_file_num( - input_files: &[(i64, i64, u64)], - max_file_num: usize, - files_to_merge: &[(i64, i64)], - ) { - let mut files = new_file_handles(input_files); - // ensure sorted - find_sorted_runs(&mut files); - let mut to_merge = enforce_file_num(&files, max_file_num); - to_merge.sort_unstable_by_key(|f| f.time_range().0); - assert_eq!( - files_to_merge.to_vec(), - to_merge - .iter() - .map(|f| { - let (start, end) = f.time_range(); - (start.value(), end.value()) - }) - .collect::>() - ); - } - - #[test] - fn test_enforce_file_num() { - check_enforce_file_num( - &[(0, 300, 2), (100, 200, 1), (200, 400, 1)], - 2, - &[(100, 200), (200, 400)], - ); - - check_enforce_file_num( - &[(0, 300, 200), (100, 200, 100), (200, 400, 100)], - 1, - &[(0, 300), (100, 200), (200, 400)], - ); - } - #[test] fn test_build_twcs_output() { let file_ids = (0..4).map(|_| FileId::random()).collect::>(); + // Case 1: 2 runs found in each time window. CompactionPickerTestCase { window_size: 3, input_files: [ @@ -708,13 +613,25 @@ mod tests { new_file_handle(file_ids[3], 50, 2998, 0), //active windows ] .to_vec(), - expected_outputs: vec![ExpectedOutput { - input_files: vec![0, 1], - output_level: 1, - }], + expected_outputs: vec![ + ExpectedOutput { + input_files: vec![0, 1], + output_level: 1, + }, + ExpectedOutput { + input_files: vec![2, 3], + output_level: 1, + }, + ], } .check(); + // Case 2: + // -2000........-3 + // -3000.....-100 + // 0..............2999 + // 50..........2998 + // 11.........2990 let file_ids = (0..6).map(|_| FileId::random()).collect::>(); CompactionPickerTestCase { window_size: 3, @@ -724,7 +641,6 @@ mod tests { new_file_handle(file_ids[2], 0, 2999, 0), new_file_handle(file_ids[3], 50, 2998, 0), new_file_handle(file_ids[4], 11, 2990, 0), - new_file_handle(file_ids[5], 50, 4998, 0), ] .to_vec(), expected_outputs: vec![ @@ -733,7 +649,7 @@ mod tests { output_level: 1, }, ExpectedOutput { - input_files: vec![2, 3, 4], + input_files: vec![2, 4], output_level: 1, }, ], @@ -741,44 +657,5 @@ mod tests { .check(); } - fn make_file_handles(inputs: &[(i64, i64, u64)]) -> Vec { - inputs - .iter() - .map(|(start, end, size)| { - FileHandle::new( - FileMeta { - region_id: Default::default(), - file_id: Default::default(), - time_range: ( - Timestamp::new_millisecond(*start), - Timestamp::new_millisecond(*end), - ), - level: 0, - file_size: *size, - available_indexes: Default::default(), - index_file_size: 0, - num_rows: 0, - num_row_groups: 0, - sequence: None, - }, - Arc::new(NoopFilePurger), - ) - }) - .collect() - } - - #[test] - fn test_limit_output_size() { - let mut files = make_file_handles(&[(1, 1, 1)].repeat(6)); - let runs = find_sorted_runs(&mut files); - assert_eq!(6, runs.len()); - let files_to_merge = reduce_runs(runs, 2); - - let enforced = enforce_max_output_size(files_to_merge, 2); - assert_eq!(2, enforced.len()); - assert_eq!(2, enforced[0].len()); - assert_eq!(2, enforced[1].len()); - } - // TODO(hl): TTL tester that checks if get_expired_ssts function works as expected. } diff --git a/src/mito2/src/compaction/window.rs b/src/mito2/src/compaction/window.rs index f7ad4af893..06212cb6d5 100644 --- a/src/mito2/src/compaction/window.rs +++ b/src/mito2/src/compaction/window.rs @@ -115,6 +115,7 @@ impl Picker for WindowedCompactionPicker { outputs, expired_ssts, time_window_size: time_window, + max_file_size: None, // todo (hl): we may need to support `max_file_size` parameter in manual compaction. }) } } diff --git a/src/mito2/src/engine/append_mode_test.rs b/src/mito2/src/engine/append_mode_test.rs index 3f56b10b6d..7d5c355c53 100644 --- a/src/mito2/src/engine/append_mode_test.rs +++ b/src/mito2/src/engine/append_mode_test.rs @@ -110,8 +110,6 @@ async fn test_append_mode_compaction() { let request = CreateRequestBuilder::new() .insert_option("compaction.type", "twcs") - .insert_option("compaction.twcs.max_active_window_runs", "2") - .insert_option("compaction.twcs.max_inactive_window_runs", "2") .insert_option("append_mode", "true") .build(); let region_dir = request.region_dir.clone(); @@ -177,7 +175,7 @@ async fn test_append_mode_compaction() { +-------+---------+---------------------+"; // Scans in parallel. let mut scanner = engine.scanner(region_id, ScanRequest::default()).unwrap(); - assert_eq!(2, scanner.num_files()); + assert_eq!(1, scanner.num_files()); assert_eq!(1, scanner.num_memtables()); scanner.set_target_partitions(2); let stream = scanner.scan().await.unwrap(); diff --git a/src/mito2/src/engine/compaction_test.rs b/src/mito2/src/engine/compaction_test.rs index 1e740b746b..75d932c0f1 100644 --- a/src/mito2/src/engine/compaction_test.rs +++ b/src/mito2/src/engine/compaction_test.rs @@ -129,8 +129,6 @@ async fn test_compaction_region() { let request = CreateRequestBuilder::new() .insert_option("compaction.type", "twcs") - .insert_option("compaction.twcs.max_active_window_runs", "1") - .insert_option("compaction.twcs.max_inactive_window_runs", "1") .build(); let column_schemas = request @@ -163,12 +161,12 @@ async fn test_compaction_region() { // [0..9] // [10...19] // [20....29] - // -[15.........29]- + // -[15.........29]- (delete) // [15.....24] // Output: // [0..9] - // [10..14] - // [15..24] + // [10............29] (contains delete) + // [15....24] assert_eq!( 3, scanner.num_files(), @@ -181,6 +179,71 @@ async fn test_compaction_region() { assert_eq!((0..25).map(|v| v * 1000).collect::>(), vec); } +#[tokio::test] +async fn test_compaction_overlapping_files() { + common_telemetry::init_default_ut_logging(); + let mut env = TestEnv::new(); + let engine = env.create_engine(MitoConfig::default()).await; + + let region_id = RegionId::new(1, 1); + env.get_schema_metadata_manager() + .register_region_table_info( + region_id.table_id(), + "test_table", + "test_catalog", + "test_schema", + None, + env.get_kv_backend(), + ) + .await; + + let request = CreateRequestBuilder::new() + .insert_option("compaction.type", "twcs") + .build(); + + let column_schemas = request + .column_metadatas + .iter() + .map(column_metadata_to_column_schema) + .collect::>(); + engine + .handle_request(region_id, RegionRequest::Create(request)) + .await + .unwrap(); + // Flush 5 SSTs for compaction. + put_and_flush(&engine, region_id, &column_schemas, 0..10).await; + delete_and_flush(&engine, region_id, &column_schemas, 10..20).await; + put_and_flush(&engine, region_id, &column_schemas, 20..30).await; + delete_and_flush(&engine, region_id, &column_schemas, 30..40).await; + + let result = engine + .handle_request( + region_id, + RegionRequest::Compact(RegionCompactRequest::default()), + ) + .await + .unwrap(); + assert_eq!(result.affected_rows, 0); + + let scanner = engine.scanner(region_id, ScanRequest::default()).unwrap(); + assert_eq!( + 1, + scanner.num_files(), + "unexpected files: {:?}", + scanner.file_ids() + ); + let stream = scanner.scan().await.unwrap(); + + let vec = collect_stream_ts(stream).await; + assert_eq!( + vec, + (0..=9) + .map(|v| v * 1000) + .chain((20..=29).map(|v| v * 1000)) + .collect::>() + ); +} + #[tokio::test] async fn test_compaction_region_with_overlapping() { common_telemetry::init_default_ut_logging(); @@ -201,8 +264,6 @@ async fn test_compaction_region_with_overlapping() { let request = CreateRequestBuilder::new() .insert_option("compaction.type", "twcs") - .insert_option("compaction.twcs.max_active_window_runs", "2") - .insert_option("compaction.twcs.max_inactive_window_runs", "2") .insert_option("compaction.twcs.time_window", "1h") .build(); @@ -257,10 +318,6 @@ async fn test_compaction_region_with_overlapping_delete_all() { let request = CreateRequestBuilder::new() .insert_option("compaction.type", "twcs") - .insert_option("compaction.twcs.max_active_window_runs", "2") - .insert_option("compaction.twcs.max_active_window_files", "2") - .insert_option("compaction.twcs.max_inactive_window_runs", "2") - .insert_option("compaction.twcs.max_inactive_window_files", "2") .insert_option("compaction.twcs.time_window", "1h") .build(); @@ -290,7 +347,7 @@ async fn test_compaction_region_with_overlapping_delete_all() { let scanner = engine.scanner(region_id, ScanRequest::default()).unwrap(); assert_eq!( - 4, + 2, scanner.num_files(), "unexpected files: {:?}", scanner.file_ids() @@ -332,7 +389,6 @@ async fn test_readonly_during_compaction() { let request = CreateRequestBuilder::new() .insert_option("compaction.type", "twcs") - .insert_option("compaction.twcs.max_active_window_runs", "1") .build(); let column_schemas = request @@ -404,10 +460,6 @@ async fn test_compaction_update_time_window() { let request = CreateRequestBuilder::new() .insert_option("compaction.type", "twcs") - .insert_option("compaction.twcs.max_active_window_runs", "2") - .insert_option("compaction.twcs.max_active_window_files", "2") - .insert_option("compaction.twcs.max_inactive_window_runs", "2") - .insert_option("compaction.twcs.max_inactive_window_files", "2") .build(); let column_schemas = request @@ -420,9 +472,10 @@ async fn test_compaction_update_time_window() { .await .unwrap(); // Flush 3 SSTs for compaction. - put_and_flush(&engine, region_id, &column_schemas, 0..1200).await; // window 3600 - put_and_flush(&engine, region_id, &column_schemas, 1200..2400).await; // window 3600 - put_and_flush(&engine, region_id, &column_schemas, 2400..3600).await; // window 3600 + put_and_flush(&engine, region_id, &column_schemas, 0..900).await; // window 3600 + put_and_flush(&engine, region_id, &column_schemas, 900..1800).await; // window 3600 + put_and_flush(&engine, region_id, &column_schemas, 1800..2700).await; // window 3600 + put_and_flush(&engine, region_id, &column_schemas, 2700..3600).await; // window 3600 let result = engine .handle_request( @@ -433,11 +486,21 @@ async fn test_compaction_update_time_window() { .unwrap(); assert_eq!(result.affected_rows, 0); + assert_eq!( + engine + .get_region(region_id) + .unwrap() + .version_control + .current() + .version + .compaction_time_window, + Some(Duration::from_secs(3600)) + ); let scanner = engine.scanner(region_id, ScanRequest::default()).unwrap(); assert_eq!(0, scanner.num_memtables()); - // We keep at most two files. + // We keep all 3 files because no enough file to merge assert_eq!( - 2, + 1, scanner.num_files(), "unexpected files: {:?}", scanner.file_ids() @@ -492,10 +555,6 @@ async fn test_change_region_compaction_window() { let request = CreateRequestBuilder::new() .insert_option("compaction.type", "twcs") - .insert_option("compaction.twcs.max_active_window_runs", "1") - .insert_option("compaction.twcs.max_active_window_files", "1") - .insert_option("compaction.twcs.max_inactive_window_runs", "1") - .insert_option("compaction.twcs.max_inactive_window_files", "1") .build(); let region_dir = request.region_dir.clone(); let column_schemas = request @@ -508,8 +567,10 @@ async fn test_change_region_compaction_window() { .await .unwrap(); // Flush 2 SSTs for compaction. - put_and_flush(&engine, region_id, &column_schemas, 0..1200).await; // window 3600 - put_and_flush(&engine, region_id, &column_schemas, 1200..2400).await; // window 3600 + put_and_flush(&engine, region_id, &column_schemas, 0..600).await; // window 3600 + put_and_flush(&engine, region_id, &column_schemas, 600..1200).await; // window 3600 + put_and_flush(&engine, region_id, &column_schemas, 1200..1800).await; // window 3600 + put_and_flush(&engine, region_id, &column_schemas, 1800..2400).await; // window 3600 engine .handle_request( @@ -520,7 +581,7 @@ async fn test_change_region_compaction_window() { .unwrap(); // Put window 7200 - put_and_flush(&engine, region_id, &column_schemas, 4000..5000).await; // window 3600 + put_and_flush(&engine, region_id, &column_schemas, 4000..5000).await; // Check compaction window. let region = engine.get_region(region_id).unwrap(); @@ -543,6 +604,22 @@ async fn test_change_region_compaction_window() { }, }); engine.handle_request(region_id, request).await.unwrap(); + assert_eq!( + engine + .get_region(region_id) + .unwrap() + .version_control + .current() + .version + .options + .compaction + .time_window(), + Some(Duration::from_secs(7200)) + ); + + put_and_flush(&engine, region_id, &column_schemas, 5000..5100).await; + put_and_flush(&engine, region_id, &column_schemas, 5100..5200).await; + put_and_flush(&engine, region_id, &column_schemas, 5200..5300).await; // Compaction again. It should compacts window 3600 and 7200 // into 7200. @@ -585,12 +662,12 @@ async fn test_change_region_compaction_window() { { let region = engine.get_region(region_id).unwrap(); let version = region.version(); + // We open the region without options, so the time window should be None. + assert!(version.options.compaction.time_window().is_none()); assert_eq!( Some(Duration::from_secs(7200)), version.compaction_time_window, ); - // We open the region without options, so the time window should be None. - assert!(version.options.compaction.time_window().is_none()); } } @@ -615,10 +692,6 @@ async fn test_open_overwrite_compaction_window() { let request = CreateRequestBuilder::new() .insert_option("compaction.type", "twcs") - .insert_option("compaction.twcs.max_active_window_runs", "1") - .insert_option("compaction.twcs.max_active_window_files", "1") - .insert_option("compaction.twcs.max_inactive_window_runs", "1") - .insert_option("compaction.twcs.max_inactive_window_files", "1") .build(); let region_dir = request.region_dir.clone(); let column_schemas = request @@ -631,8 +704,10 @@ async fn test_open_overwrite_compaction_window() { .await .unwrap(); // Flush 2 SSTs for compaction. - put_and_flush(&engine, region_id, &column_schemas, 0..1200).await; // window 3600 - put_and_flush(&engine, region_id, &column_schemas, 1200..2400).await; // window 3600 + put_and_flush(&engine, region_id, &column_schemas, 0..600).await; // window 3600 + put_and_flush(&engine, region_id, &column_schemas, 600..1200).await; // window 3600 + put_and_flush(&engine, region_id, &column_schemas, 1200..1800).await; // window 3600 + put_and_flush(&engine, region_id, &column_schemas, 1800..2400).await; // window 3600 engine .handle_request( diff --git a/src/mito2/src/engine/filter_deleted_test.rs b/src/mito2/src/engine/filter_deleted_test.rs index 7849ab0e1a..70a842cc06 100644 --- a/src/mito2/src/engine/filter_deleted_test.rs +++ b/src/mito2/src/engine/filter_deleted_test.rs @@ -45,7 +45,6 @@ async fn test_scan_without_filtering_deleted() { .await; let request = CreateRequestBuilder::new() .insert_option("compaction.type", "twcs") - .insert_option("compaction.twcs.max_active_window_runs", "10") .build(); let column_schemas = rows_schema(&request); diff --git a/src/mito2/src/engine/merge_mode_test.rs b/src/mito2/src/engine/merge_mode_test.rs index 76988f2ac0..34e2d6425f 100644 --- a/src/mito2/src/engine/merge_mode_test.rs +++ b/src/mito2/src/engine/merge_mode_test.rs @@ -111,8 +111,6 @@ async fn test_merge_mode_compaction() { let request = CreateRequestBuilder::new() .field_num(2) .insert_option("compaction.type", "twcs") - .insert_option("compaction.twcs.max_active_window_runs", "1") - .insert_option("compaction.twcs.max_inactive_window_runs", "1") .insert_option("merge_mode", "last_non_null") .build(); let region_dir = request.region_dir.clone(); @@ -191,7 +189,7 @@ async fn test_merge_mode_compaction() { +-------+---------+---------+---------------------+"; // Scans in parallel. let mut scanner = engine.scanner(region_id, ScanRequest::default()).unwrap(); - assert_eq!(1, scanner.num_files()); + assert_eq!(2, scanner.num_files()); assert_eq!(1, scanner.num_memtables()); scanner.set_target_partitions(2); let stream = scanner.scan().await.unwrap(); diff --git a/src/mito2/src/lib.rs b/src/mito2/src/lib.rs index 32cde9a468..e47310ce0e 100644 --- a/src/mito2/src/lib.rs +++ b/src/mito2/src/lib.rs @@ -20,6 +20,7 @@ #![feature(assert_matches)] #![feature(result_flattening)] #![feature(int_roundings)] +#![feature(debug_closure_helpers)] #[cfg(any(test, feature = "test"))] #[cfg_attr(feature = "test", allow(unused))] diff --git a/src/mito2/src/metrics.rs b/src/mito2/src/metrics.rs index 2675413c4e..107f94b515 100644 --- a/src/mito2/src/metrics.rs +++ b/src/mito2/src/metrics.rs @@ -84,7 +84,7 @@ lazy_static! { /// Histogram of flushed bytes. pub static ref FLUSH_BYTES_TOTAL: IntCounter = register_int_counter!("greptime_mito_flush_bytes_total", "mito flush bytes total").unwrap(); - /// Gauge for inflight compaction tasks. + /// Gauge for inflight flush tasks. pub static ref INFLIGHT_FLUSH_COUNT: IntGauge = register_int_gauge!( "greptime_mito_inflight_flush_count", @@ -153,7 +153,6 @@ lazy_static! { "greptime_mito_inflight_compaction_count", "inflight compaction count", ).unwrap(); - // ------- End of compaction metrics. // Query metrics. /// Timer of different stages in query. @@ -403,6 +402,20 @@ lazy_static! { } +lazy_static! { + /// Counter for compaction input file size. + pub static ref COMPACTION_INPUT_BYTES: Counter = register_counter!( + "greptime_mito_compaction_input_bytes", + "mito compaction input file size", + ).unwrap(); + + /// Counter for compaction output file size. + pub static ref COMPACTION_OUTPUT_BYTES: Counter = register_counter!( + "greptime_mito_compaction_output_bytes", + "mito compaction output file size", + ).unwrap(); +} + /// Stager notifier to collect metrics. pub struct StagerMetrics { cache_hit: IntCounter, diff --git a/src/mito2/src/region/options.rs b/src/mito2/src/region/options.rs index 66ecd83358..c79706c104 100644 --- a/src/mito2/src/region/options.rs +++ b/src/mito2/src/region/options.rs @@ -199,18 +199,9 @@ impl Default for CompactionOptions { #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] #[serde(default)] pub struct TwcsOptions { - /// Max num of sorted runs that can be kept in active writing time window. + /// Minimum file num in every time window to trigger a compaction. #[serde_as(as = "DisplayFromStr")] - pub max_active_window_runs: usize, - /// Max num of files in the active window. - #[serde_as(as = "DisplayFromStr")] - pub max_active_window_files: usize, - /// Max num of sorted runs that can be kept in inactive time windows. - #[serde_as(as = "DisplayFromStr")] - pub max_inactive_window_runs: usize, - /// Max num of files in inactive time windows. - #[serde_as(as = "DisplayFromStr")] - pub max_inactive_window_files: usize, + pub trigger_file_num: usize, /// Compaction time window defined when creating tables. #[serde(with = "humantime_serde")] pub time_window: Option, @@ -243,12 +234,9 @@ impl TwcsOptions { impl Default for TwcsOptions { fn default() -> Self { Self { - max_active_window_runs: 4, - max_active_window_files: 4, - max_inactive_window_runs: 1, - max_inactive_window_files: 1, + trigger_file_num: 4, time_window: None, - max_output_file_size: Some(ReadableSize::gb(2)), + max_output_file_size: Some(ReadableSize::mb(512)), remote_compaction: false, fallback_to_local: true, } @@ -500,7 +488,7 @@ mod tests { #[test] fn test_without_compaction_type() { let map = make_map(&[ - ("compaction.twcs.max_active_window_runs", "8"), + ("compaction.twcs.trigger_file_num", "8"), ("compaction.twcs.time_window", "2h"), ]); let err = RegionOptions::try_from(&map).unwrap_err(); @@ -510,14 +498,14 @@ mod tests { #[test] fn test_with_compaction_type() { let map = make_map(&[ - ("compaction.twcs.max_active_window_runs", "8"), + ("compaction.twcs.trigger_file_num", "8"), ("compaction.twcs.time_window", "2h"), ("compaction.type", "twcs"), ]); let options = RegionOptions::try_from(&map).unwrap(); let expect = RegionOptions { compaction: CompactionOptions::Twcs(TwcsOptions { - max_active_window_runs: 8, + trigger_file_num: 8, time_window: Some(Duration::from_secs(3600 * 2)), ..Default::default() }), @@ -618,10 +606,7 @@ mod tests { }); let map = make_map(&[ ("ttl", "7d"), - ("compaction.twcs.max_active_window_runs", "8"), - ("compaction.twcs.max_active_window_files", "11"), - ("compaction.twcs.max_inactive_window_runs", "2"), - ("compaction.twcs.max_inactive_window_files", "3"), + ("compaction.twcs.trigger_file_num", "8"), ("compaction.twcs.max_output_file_size", "1GB"), ("compaction.twcs.time_window", "2h"), ("compaction.type", "twcs"), @@ -645,10 +630,7 @@ mod tests { let expect = RegionOptions { ttl: Some(Duration::from_secs(3600 * 24 * 7).into()), compaction: CompactionOptions::Twcs(TwcsOptions { - max_active_window_runs: 8, - max_active_window_files: 11, - max_inactive_window_runs: 2, - max_inactive_window_files: 3, + trigger_file_num: 8, time_window: Some(Duration::from_secs(3600 * 2)), max_output_file_size: Some(ReadableSize::gb(1)), remote_compaction: false, @@ -679,10 +661,7 @@ mod tests { let options = RegionOptions { ttl: Some(Duration::from_secs(3600 * 24 * 7).into()), compaction: CompactionOptions::Twcs(TwcsOptions { - max_active_window_runs: 8, - max_active_window_files: usize::MAX, - max_inactive_window_runs: 2, - max_inactive_window_files: usize::MAX, + trigger_file_num: 8, time_window: Some(Duration::from_secs(3600 * 2)), max_output_file_size: None, remote_compaction: false, @@ -719,10 +698,7 @@ mod tests { "ttl": "7days", "compaction": { "compaction.type": "twcs", - "compaction.twcs.max_active_window_runs": "8", - "compaction.twcs.max_active_window_files": "11", - "compaction.twcs.max_inactive_window_runs": "2", - "compaction.twcs.max_inactive_window_files": "7", + "compaction.twcs.trigger_file_num": "8", "compaction.twcs.max_output_file_size": "7MB", "compaction.twcs.time_window": "2h" }, @@ -748,10 +724,7 @@ mod tests { let options = RegionOptions { ttl: Some(Duration::from_secs(3600 * 24 * 7).into()), compaction: CompactionOptions::Twcs(TwcsOptions { - max_active_window_runs: 8, - max_active_window_files: 11, - max_inactive_window_runs: 2, - max_inactive_window_files: 7, + trigger_file_num: 8, time_window: Some(Duration::from_secs(3600 * 2)), max_output_file_size: Some(ReadableSize::mb(7)), remote_compaction: false, diff --git a/src/mito2/src/sst/file.rs b/src/mito2/src/sst/file.rs index a432b53bdf..8396f79ca3 100644 --- a/src/mito2/src/sst/file.rs +++ b/src/mito2/src/sst/file.rs @@ -15,11 +15,13 @@ //! Structures to describe metadata of files. use std::fmt; +use std::fmt::{Debug, Formatter}; use std::num::NonZeroU64; use std::str::FromStr; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; +use common_base::readable_size::ReadableSize; use common_time::Timestamp; use serde::{Deserialize, Serialize}; use smallvec::SmallVec; @@ -105,7 +107,7 @@ pub(crate) fn overlaps(l: &FileTimeRange, r: &FileTimeRange) -> bool { } /// Metadata of a SST file. -#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize, Default)] +#[derive(Clone, PartialEq, Eq, Hash, Serialize, Deserialize, Default)] #[serde(default)] pub struct FileMeta { /// Region of file. @@ -142,6 +144,42 @@ pub struct FileMeta { pub sequence: Option, } +impl Debug for FileMeta { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + let mut debug_struct = f.debug_struct("FileMeta"); + debug_struct + .field("region_id", &self.region_id) + .field_with("file_id", |f| write!(f, "{} ", self.file_id)) + .field_with("time_range", |f| { + write!( + f, + "({}, {}) ", + self.time_range.0.to_iso8601_string(), + self.time_range.1.to_iso8601_string() + ) + }) + .field("level", &self.level) + .field("file_size", &ReadableSize(self.file_size)); + if !self.available_indexes.is_empty() { + debug_struct + .field("available_indexes", &self.available_indexes) + .field("index_file_size", &ReadableSize(self.index_file_size)); + } + debug_struct + .field("num_rows", &self.num_rows) + .field("num_row_groups", &self.num_row_groups) + .field_with("sequence", |f| match self.sequence { + None => { + write!(f, "None") + } + Some(seq) => { + write!(f, "{}", seq) + } + }) + .finish() + } +} + /// Type of index. #[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)] pub enum IndexType { @@ -188,13 +226,9 @@ pub struct FileHandle { impl fmt::Debug for FileHandle { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("FileHandle") - .field("region_id", &self.inner.meta.region_id) - .field("file_id", &self.inner.meta.file_id) - .field("time_range", &self.inner.meta.time_range) - .field("size", &self.inner.meta.file_size) - .field("level", &self.inner.meta.level) - .field("compacting", &self.inner.compacting) - .field("deleted", &self.inner.deleted) + .field("meta", self.meta_ref()) + .field("compacting", &self.compacting()) + .field("deleted", &self.inner.deleted.load(Ordering::Relaxed)) .finish() } } diff --git a/src/mito2/src/sst/parquet.rs b/src/mito2/src/sst/parquet.rs index e2faa9a75d..93cf797dae 100644 --- a/src/mito2/src/sst/parquet.rs +++ b/src/mito2/src/sst/parquet.rs @@ -50,6 +50,10 @@ pub struct WriteOptions { pub write_buffer_size: ReadableSize, /// Row group size. pub row_group_size: usize, + /// Max single output file size. + /// Note: This is not a hard limit as we can only observe the file size when + /// ArrowWrite writes to underlying writers. + pub max_file_size: Option, } impl Default for WriteOptions { @@ -57,6 +61,7 @@ impl Default for WriteOptions { WriteOptions { write_buffer_size: DEFAULT_WRITE_BUFFER_SIZE, row_group_size: DEFAULT_ROW_GROUP_SIZE, + max_file_size: None, } } } @@ -99,8 +104,9 @@ mod tests { use tokio_util::compat::FuturesAsyncWriteCompatExt; use super::*; - use crate::access_layer::FilePathProvider; + use crate::access_layer::{FilePathProvider, RegionFilePathFactory}; use crate::cache::{CacheManager, CacheStrategy, PageKey}; + use crate::read::BatchReader; use crate::sst::index::{Indexer, IndexerBuilder}; use crate::sst::parquet::format::WriteFormat; use crate::sst::parquet::reader::ParquetReaderBuilder; @@ -108,7 +114,8 @@ mod tests { use crate::sst::{location, DEFAULT_WRITE_CONCURRENCY}; use crate::test_util::sst_util::{ assert_parquet_metadata_eq, build_test_binary_test_region_metadata, new_batch_by_range, - new_batch_with_binary, new_source, sst_file_handle, sst_region_metadata, + new_batch_with_binary, new_source, sst_file_handle, sst_file_handle_with_file_id, + sst_region_metadata, }; use crate::test_util::{check_reader_result, TestEnv}; @@ -532,4 +539,58 @@ mod tests { ) .await; } + + #[tokio::test] + async fn test_write_multiple_files() { + common_telemetry::init_default_ut_logging(); + // create test env + let mut env = TestEnv::new(); + let object_store = env.init_object_store_manager(); + let metadata = Arc::new(sst_region_metadata()); + let batches = &[ + new_batch_by_range(&["a", "d"], 0, 1000), + new_batch_by_range(&["b", "f"], 0, 1000), + new_batch_by_range(&["b", "h"], 100, 200), + new_batch_by_range(&["b", "h"], 200, 300), + new_batch_by_range(&["b", "h"], 300, 1000), + ]; + let total_rows: usize = batches.iter().map(|batch| batch.num_rows()).sum(); + + let source = new_source(batches); + let write_opts = WriteOptions { + row_group_size: 50, + max_file_size: Some(1024 * 16), + ..Default::default() + }; + + let path_provider = RegionFilePathFactory { + region_dir: "test".to_string(), + }; + let mut writer = ParquetWriter::new_with_object_store( + object_store.clone(), + metadata.clone(), + NoopIndexBuilder, + path_provider, + ) + .await; + + let files = writer.write_all(source, None, &write_opts).await.unwrap(); + assert_eq!(2, files.len()); + + let mut rows_read = 0; + for f in &files { + let file_handle = sst_file_handle_with_file_id( + f.file_id, + f.time_range.0.value(), + f.time_range.1.value(), + ); + let builder = + ParquetReaderBuilder::new("test".to_string(), file_handle, object_store.clone()); + let mut reader = builder.build().await.unwrap(); + while let Some(batch) = reader.next_batch().await.unwrap() { + rows_read += batch.num_rows(); + } + } + assert_eq!(total_rows, rows_read); + } } diff --git a/src/mito2/src/sst/parquet/writer.rs b/src/mito2/src/sst/parquet/writer.rs index 9f5f4af05c..b1dc28476e 100644 --- a/src/mito2/src/sst/parquet/writer.rs +++ b/src/mito2/src/sst/parquet/writer.rs @@ -15,11 +15,13 @@ //! Parquet writer. use std::future::Future; +use std::mem; use std::pin::Pin; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; use std::task::{Context, Poll}; +use common_telemetry::debug; use common_time::Timestamp; use datatypes::arrow::datatypes::SchemaRef; use object_store::{FuturesAsyncWriter, ObjectStore}; @@ -143,17 +145,52 @@ where } } - async fn get_or_create_indexer(&mut self) -> &mut Indexer { - match self.current_indexer { - None => { - self.current_file = FileId::random(); - let indexer = self.indexer_builder.build(self.current_file).await; - self.current_indexer = Some(indexer); - // safety: self.current_indexer already set above. - self.current_indexer.as_mut().unwrap() - } - Some(ref mut indexer) => indexer, - } + /// Finishes current SST file and index file. + async fn finish_current_file( + &mut self, + ssts: &mut SstInfoArray, + stats: &mut SourceStats, + ) -> Result<()> { + // maybe_init_writer will re-create a new file. + if let Some(mut current_writer) = mem::take(&mut self.writer) { + let stats = mem::take(stats); + // At least one row has been written. + assert!(stats.num_rows > 0); + + debug!( + "Finishing current file {}, file size: {}, num rows: {}", + self.current_file, + self.bytes_written.load(Ordering::Relaxed), + stats.num_rows + ); + + // Finish indexer and writer. + // safety: writer and index can only be both present or not. + let index_output = self.current_indexer.as_mut().unwrap().finish().await; + current_writer.flush().await.context(WriteParquetSnafu)?; + + let file_meta = current_writer.close().await.context(WriteParquetSnafu)?; + let file_size = self.bytes_written.load(Ordering::Relaxed) as u64; + + // Safety: num rows > 0 so we must have min/max. + let time_range = stats.time_range.unwrap(); + + // convert FileMetaData to ParquetMetaData + let parquet_metadata = parse_parquet_metadata(file_meta)?; + ssts.push(SstInfo { + file_id: self.current_file, + time_range, + file_size, + num_rows: stats.num_rows, + num_row_groups: parquet_metadata.num_row_groups() as u64, + file_metadata: Some(Arc::new(parquet_metadata)), + index_metadata: index_output, + }); + self.current_file = FileId::random(); + self.bytes_written.store(0, Ordering::Relaxed) + }; + + Ok(()) } /// Iterates source and writes all rows to Parquet file. @@ -184,6 +221,7 @@ where override_sequence: Option, // override the `sequence` field from `Source` opts: &WriteOptions, ) -> Result { + let mut results = smallvec![]; let write_format = WriteFormat::new(self.metadata.clone()).with_override_sequence(override_sequence); let mut stats = SourceStats::default(); @@ -196,49 +234,31 @@ where match res { Ok(mut batch) => { stats.update(&batch); - self.get_or_create_indexer().await.update(&mut batch).await; + // safety: self.current_indexer must be set when first batch has been written. + self.current_indexer + .as_mut() + .unwrap() + .update(&mut batch) + .await; + if let Some(max_file_size) = opts.max_file_size + && self.bytes_written.load(Ordering::Relaxed) > max_file_size + { + self.finish_current_file(&mut results, &mut stats).await?; + } } Err(e) => { - self.get_or_create_indexer().await.abort().await; + if let Some(indexer) = &mut self.current_indexer { + indexer.abort().await; + } return Err(e); } } } - let index_output = self.get_or_create_indexer().await.finish().await; - - if stats.num_rows == 0 { - return Ok(smallvec![]); - } - - let Some(mut arrow_writer) = self.writer.take() else { - // No batch actually written. - return Ok(smallvec![]); - }; - - arrow_writer.flush().await.context(WriteParquetSnafu)?; - - let file_meta = arrow_writer.close().await.context(WriteParquetSnafu)?; - let file_size = self.bytes_written.load(Ordering::Relaxed) as u64; - - // Safety: num rows > 0 so we must have min/max. - let time_range = stats.time_range.unwrap(); - - // convert FileMetaData to ParquetMetaData - let parquet_metadata = parse_parquet_metadata(file_meta)?; - - let file_id = self.current_file; + self.finish_current_file(&mut results, &mut stats).await?; // object_store.write will make sure all bytes are written or an error is raised. - Ok(smallvec![SstInfo { - file_id, - time_range, - file_size, - num_rows: stats.num_rows, - num_row_groups: parquet_metadata.num_row_groups() as u64, - file_metadata: Some(Arc::new(parquet_metadata)), - index_metadata: index_output, - }]) + Ok(results) } /// Customizes per-column config according to schema and maybe column cardinality. @@ -309,6 +329,10 @@ where AsyncArrowWriter::try_new(writer, schema.clone(), Some(writer_props)) .context(WriteParquetSnafu)?; self.writer = Some(arrow_writer); + + let indexer = self.indexer_builder.build(self.current_file).await; + self.current_indexer = Some(indexer); + // safety: self.writer is assigned above Ok(self.writer.as_mut().unwrap()) } diff --git a/src/mito2/src/worker/handle_alter.rs b/src/mito2/src/worker/handle_alter.rs index 85e1c9144a..cb44e7025c 100644 --- a/src/mito2/src/worker/handle_alter.rs +++ b/src/mito2/src/worker/handle_alter.rs @@ -214,28 +214,10 @@ fn set_twcs_options( region_id: RegionId, ) -> std::result::Result<(), MetadataError> { match key { - mito_engine_options::TWCS_MAX_ACTIVE_WINDOW_RUNS => { - let runs = parse_usize_with_default(key, value, default_option.max_active_window_runs)?; - log_option_update(region_id, key, options.max_active_window_runs, runs); - options.max_active_window_runs = runs; - } - mito_engine_options::TWCS_MAX_ACTIVE_WINDOW_FILES => { - let files = - parse_usize_with_default(key, value, default_option.max_active_window_files)?; - log_option_update(region_id, key, options.max_active_window_files, files); - options.max_active_window_files = files; - } - mito_engine_options::TWCS_MAX_INACTIVE_WINDOW_RUNS => { - let runs = - parse_usize_with_default(key, value, default_option.max_inactive_window_runs)?; - log_option_update(region_id, key, options.max_inactive_window_runs, runs); - options.max_inactive_window_runs = runs; - } - mito_engine_options::TWCS_MAX_INACTIVE_WINDOW_FILES => { - let files = - parse_usize_with_default(key, value, default_option.max_inactive_window_files)?; - log_option_update(region_id, key, options.max_inactive_window_files, files); - options.max_inactive_window_files = files; + mito_engine_options::TWCS_TRIGGER_FILE_NUM => { + let files = parse_usize_with_default(key, value, default_option.trigger_file_num)?; + log_option_update(region_id, key, options.trigger_file_num, files); + options.trigger_file_num = files; } mito_engine_options::TWCS_MAX_OUTPUT_FILE_SIZE => { let size = if value.is_empty() { diff --git a/src/store-api/src/mito_engine_options.rs b/src/store-api/src/mito_engine_options.rs index aa6fd8984d..e7bf832cf6 100644 --- a/src/store-api/src/mito_engine_options.rs +++ b/src/store-api/src/mito_engine_options.rs @@ -29,14 +29,8 @@ pub const SNAPSHOT_READ: &str = "snapshot_read"; pub const COMPACTION_TYPE: &str = "compaction.type"; /// TWCS compaction strategy. pub const COMPACTION_TYPE_TWCS: &str = "twcs"; -/// Option key for twcs max active window runs. -pub const TWCS_MAX_ACTIVE_WINDOW_RUNS: &str = "compaction.twcs.max_active_window_runs"; -/// Option key for twcs max active window files. -pub const TWCS_MAX_ACTIVE_WINDOW_FILES: &str = "compaction.twcs.max_active_window_files"; -/// Option key for twcs max inactive window runs. -pub const TWCS_MAX_INACTIVE_WINDOW_RUNS: &str = "compaction.twcs.max_inactive_window_runs"; -/// Option key for twcs max inactive window files. -pub const TWCS_MAX_INACTIVE_WINDOW_FILES: &str = "compaction.twcs.max_inactive_window_files"; +/// Option key for twcs min file num to trigger a compaction. +pub const TWCS_TRIGGER_FILE_NUM: &str = "compaction.twcs.trigger_file_num"; /// Option key for twcs max output file size. pub const TWCS_MAX_OUTPUT_FILE_SIZE: &str = "compaction.twcs.max_output_file_size"; /// Option key for twcs time window. @@ -68,10 +62,7 @@ pub fn is_mito_engine_option_key(key: &str) -> bool { [ "ttl", COMPACTION_TYPE, - TWCS_MAX_ACTIVE_WINDOW_RUNS, - TWCS_MAX_ACTIVE_WINDOW_FILES, - TWCS_MAX_INACTIVE_WINDOW_RUNS, - TWCS_MAX_INACTIVE_WINDOW_FILES, + TWCS_TRIGGER_FILE_NUM, TWCS_MAX_OUTPUT_FILE_SIZE, TWCS_TIME_WINDOW, REMOTE_COMPACTION, @@ -100,10 +91,7 @@ mod tests { assert!(is_mito_engine_option_key("ttl")); assert!(is_mito_engine_option_key("compaction.type")); assert!(is_mito_engine_option_key( - "compaction.twcs.max_active_window_runs" - )); - assert!(is_mito_engine_option_key( - "compaction.twcs.max_inactive_window_runs" + "compaction.twcs.trigger_file_num" )); assert!(is_mito_engine_option_key("compaction.twcs.time_window")); assert!(is_mito_engine_option_key("storage")); diff --git a/src/store-api/src/region_request.rs b/src/store-api/src/region_request.rs index bd4c080d15..6d7f2af57e 100644 --- a/src/store-api/src/region_request.rs +++ b/src/store-api/src/region_request.rs @@ -52,9 +52,7 @@ use crate::metadata::{ use crate::metric_engine_consts::PHYSICAL_TABLE_METADATA_KEY; use crate::metrics; use crate::mito_engine_options::{ - TTL_KEY, TWCS_MAX_ACTIVE_WINDOW_FILES, TWCS_MAX_ACTIVE_WINDOW_RUNS, - TWCS_MAX_INACTIVE_WINDOW_FILES, TWCS_MAX_INACTIVE_WINDOW_RUNS, TWCS_MAX_OUTPUT_FILE_SIZE, - TWCS_TIME_WINDOW, + TTL_KEY, TWCS_MAX_OUTPUT_FILE_SIZE, TWCS_TIME_WINDOW, TWCS_TRIGGER_FILE_NUM, }; use crate::path_utils::region_dir; use crate::storage::{ColumnId, RegionId, ScanRequest}; @@ -1027,12 +1025,9 @@ impl TryFrom<&PbOption> for SetRegionOption { Ok(Self::Ttl(Some(ttl))) } - TWCS_MAX_ACTIVE_WINDOW_RUNS - | TWCS_MAX_ACTIVE_WINDOW_FILES - | TWCS_MAX_INACTIVE_WINDOW_FILES - | TWCS_MAX_INACTIVE_WINDOW_RUNS - | TWCS_MAX_OUTPUT_FILE_SIZE - | TWCS_TIME_WINDOW => Ok(Self::Twsc(key.to_string(), value.to_string())), + TWCS_TRIGGER_FILE_NUM | TWCS_MAX_OUTPUT_FILE_SIZE | TWCS_TIME_WINDOW => { + Ok(Self::Twsc(key.to_string(), value.to_string())) + } _ => InvalidSetRegionOptionRequestSnafu { key, value }.fail(), } } @@ -1041,16 +1036,7 @@ impl TryFrom<&PbOption> for SetRegionOption { impl From<&UnsetRegionOption> for SetRegionOption { fn from(unset_option: &UnsetRegionOption) -> Self { match unset_option { - UnsetRegionOption::TwcsMaxActiveWindowFiles => { - SetRegionOption::Twsc(unset_option.to_string(), String::new()) - } - UnsetRegionOption::TwcsMaxInactiveWindowFiles => { - SetRegionOption::Twsc(unset_option.to_string(), String::new()) - } - UnsetRegionOption::TwcsMaxActiveWindowRuns => { - SetRegionOption::Twsc(unset_option.to_string(), String::new()) - } - UnsetRegionOption::TwcsMaxInactiveWindowRuns => { + UnsetRegionOption::TwcsTriggerFileNum => { SetRegionOption::Twsc(unset_option.to_string(), String::new()) } UnsetRegionOption::TwcsMaxOutputFileSize => { @@ -1070,10 +1056,7 @@ impl TryFrom<&str> for UnsetRegionOption { fn try_from(key: &str) -> Result { match key.to_ascii_lowercase().as_str() { TTL_KEY => Ok(Self::Ttl), - TWCS_MAX_ACTIVE_WINDOW_FILES => Ok(Self::TwcsMaxActiveWindowFiles), - TWCS_MAX_INACTIVE_WINDOW_FILES => Ok(Self::TwcsMaxInactiveWindowFiles), - TWCS_MAX_ACTIVE_WINDOW_RUNS => Ok(Self::TwcsMaxActiveWindowRuns), - TWCS_MAX_INACTIVE_WINDOW_RUNS => Ok(Self::TwcsMaxInactiveWindowRuns), + TWCS_TRIGGER_FILE_NUM => Ok(Self::TwcsTriggerFileNum), TWCS_MAX_OUTPUT_FILE_SIZE => Ok(Self::TwcsMaxOutputFileSize), TWCS_TIME_WINDOW => Ok(Self::TwcsTimeWindow), _ => InvalidUnsetRegionOptionRequestSnafu { key }.fail(), @@ -1083,10 +1066,7 @@ impl TryFrom<&str> for UnsetRegionOption { #[derive(Debug, Eq, PartialEq, Clone, Serialize, Deserialize)] pub enum UnsetRegionOption { - TwcsMaxActiveWindowFiles, - TwcsMaxInactiveWindowFiles, - TwcsMaxActiveWindowRuns, - TwcsMaxInactiveWindowRuns, + TwcsTriggerFileNum, TwcsMaxOutputFileSize, TwcsTimeWindow, Ttl, @@ -1096,10 +1076,7 @@ impl UnsetRegionOption { pub fn as_str(&self) -> &str { match self { Self::Ttl => TTL_KEY, - Self::TwcsMaxActiveWindowFiles => TWCS_MAX_ACTIVE_WINDOW_FILES, - Self::TwcsMaxInactiveWindowFiles => TWCS_MAX_INACTIVE_WINDOW_FILES, - Self::TwcsMaxActiveWindowRuns => TWCS_MAX_ACTIVE_WINDOW_RUNS, - Self::TwcsMaxInactiveWindowRuns => TWCS_MAX_INACTIVE_WINDOW_RUNS, + Self::TwcsTriggerFileNum => TWCS_TRIGGER_FILE_NUM, Self::TwcsMaxOutputFileSize => TWCS_MAX_OUTPUT_FILE_SIZE, Self::TwcsTimeWindow => TWCS_TIME_WINDOW, } diff --git a/tests-fuzz/src/generator/alter_expr.rs b/tests-fuzz/src/generator/alter_expr.rs index e6d694c46e..a2df3124ee 100644 --- a/tests-fuzz/src/generator/alter_expr.rs +++ b/tests-fuzz/src/generator/alter_expr.rs @@ -238,21 +238,9 @@ impl Generator for AlterExprSetTableOptionsGenerator< let max_output_file_size: u64 = rng.random(); AlterTableOption::TwcsMaxOutputFileSize(ReadableSize(max_output_file_size)) } - AlterTableOption::TwcsMaxInactiveWindowRuns(_) => { - let max_inactive_window_runs: u64 = rng.random(); - AlterTableOption::TwcsMaxInactiveWindowRuns(max_inactive_window_runs) - } - AlterTableOption::TwcsMaxActiveWindowFiles(_) => { - let max_active_window_files: u64 = rng.random(); - AlterTableOption::TwcsMaxActiveWindowFiles(max_active_window_files) - } - AlterTableOption::TwcsMaxActiveWindowRuns(_) => { - let max_active_window_runs: u64 = rng.random(); - AlterTableOption::TwcsMaxActiveWindowRuns(max_active_window_runs) - } - AlterTableOption::TwcsMaxInactiveWindowFiles(_) => { - let max_inactive_window_files: u64 = rng.random(); - AlterTableOption::TwcsMaxInactiveWindowFiles(max_inactive_window_files) + AlterTableOption::TwcsTriggerFileNum(_) => { + let trigger_file_num: u64 = rng.random(); + AlterTableOption::TwcsTriggerFileNum(trigger_file_num) } }) .collect(); @@ -365,7 +353,7 @@ mod tests { .generate(&mut rng) .unwrap(); let serialized = serde_json::to_string(&expr).unwrap(); - let expected = r#"{"table_name":{"value":"quasi","quote_style":null},"alter_kinds":{"SetTableOptions":{"options":[{"TwcsMaxOutputFileSize":16770910638250818741}]}}}"#; + let expected = r#"{"table_name":{"value":"quasi","quote_style":null},"alter_kinds":{"SetTableOptions":{"options":[{"TwcsTimeWindow":{"value":2428665013,"unit":"Millisecond"}}]}}}"#; assert_eq!(expected, serialized); let expr = AlterExprUnsetTableOptionsGeneratorBuilder::default() @@ -375,7 +363,7 @@ mod tests { .generate(&mut rng) .unwrap(); let serialized = serde_json::to_string(&expr).unwrap(); - let expected = r#"{"table_name":{"value":"quasi","quote_style":null},"alter_kinds":{"UnsetTableOptions":{"keys":["compaction.twcs.max_active_window_runs","compaction.twcs.max_output_file_size","compaction.twcs.time_window","compaction.twcs.max_inactive_window_files","compaction.twcs.max_active_window_files"]}}}"#; + let expected = r#"{"table_name":{"value":"quasi","quote_style":null},"alter_kinds":{"UnsetTableOptions":{"keys":["compaction.twcs.trigger_file_num","compaction.twcs.time_window"]}}}"#; assert_eq!(expected, serialized); } } diff --git a/tests-fuzz/src/ir/alter_expr.rs b/tests-fuzz/src/ir/alter_expr.rs index 1d637ff660..1790467ac2 100644 --- a/tests-fuzz/src/ir/alter_expr.rs +++ b/tests-fuzz/src/ir/alter_expr.rs @@ -21,9 +21,8 @@ use common_time::{Duration, FOREVER, INSTANT}; use derive_builder::Builder; use serde::{Deserialize, Serialize}; use store_api::mito_engine_options::{ - APPEND_MODE_KEY, COMPACTION_TYPE, TTL_KEY, TWCS_MAX_ACTIVE_WINDOW_FILES, - TWCS_MAX_ACTIVE_WINDOW_RUNS, TWCS_MAX_INACTIVE_WINDOW_FILES, TWCS_MAX_INACTIVE_WINDOW_RUNS, - TWCS_MAX_OUTPUT_FILE_SIZE, TWCS_TIME_WINDOW, + APPEND_MODE_KEY, COMPACTION_TYPE, TTL_KEY, TWCS_MAX_OUTPUT_FILE_SIZE, TWCS_TIME_WINDOW, + TWCS_TRIGGER_FILE_NUM, }; use strum::EnumIter; @@ -78,10 +77,7 @@ pub enum AlterTableOption { Ttl(Ttl), TwcsTimeWindow(Duration), TwcsMaxOutputFileSize(ReadableSize), - TwcsMaxInactiveWindowFiles(u64), - TwcsMaxActiveWindowFiles(u64), - TwcsMaxInactiveWindowRuns(u64), - TwcsMaxActiveWindowRuns(u64), + TwcsTriggerFileNum(u64), } impl AlterTableOption { @@ -90,10 +86,7 @@ impl AlterTableOption { AlterTableOption::Ttl(_) => TTL_KEY, AlterTableOption::TwcsTimeWindow(_) => TWCS_TIME_WINDOW, AlterTableOption::TwcsMaxOutputFileSize(_) => TWCS_MAX_OUTPUT_FILE_SIZE, - AlterTableOption::TwcsMaxInactiveWindowFiles(_) => TWCS_MAX_INACTIVE_WINDOW_FILES, - AlterTableOption::TwcsMaxActiveWindowFiles(_) => TWCS_MAX_ACTIVE_WINDOW_FILES, - AlterTableOption::TwcsMaxInactiveWindowRuns(_) => TWCS_MAX_INACTIVE_WINDOW_RUNS, - AlterTableOption::TwcsMaxActiveWindowRuns(_) => TWCS_MAX_ACTIVE_WINDOW_RUNS, + AlterTableOption::TwcsTriggerFileNum(_) => TWCS_TRIGGER_FILE_NUM, } } @@ -111,21 +104,9 @@ impl AlterTableOption { }; Ok(AlterTableOption::Ttl(ttl)) } - TWCS_MAX_ACTIVE_WINDOW_RUNS => { - let runs = value.parse().unwrap(); - Ok(AlterTableOption::TwcsMaxActiveWindowRuns(runs)) - } - TWCS_MAX_ACTIVE_WINDOW_FILES => { + TWCS_TRIGGER_FILE_NUM => { let files = value.parse().unwrap(); - Ok(AlterTableOption::TwcsMaxActiveWindowFiles(files)) - } - TWCS_MAX_INACTIVE_WINDOW_RUNS => { - let runs = value.parse().unwrap(); - Ok(AlterTableOption::TwcsMaxInactiveWindowRuns(runs)) - } - TWCS_MAX_INACTIVE_WINDOW_FILES => { - let files = value.parse().unwrap(); - Ok(AlterTableOption::TwcsMaxInactiveWindowFiles(files)) + Ok(AlterTableOption::TwcsTriggerFileNum(files)) } TWCS_MAX_OUTPUT_FILE_SIZE => { // may be "1M" instead of "1 MiB" @@ -178,17 +159,8 @@ impl Display for AlterTableOption { // Caution: to_string loses precision for ReadableSize write!(f, "'{}' = '{}'", TWCS_MAX_OUTPUT_FILE_SIZE, s) } - AlterTableOption::TwcsMaxInactiveWindowFiles(u) => { - write!(f, "'{}' = '{}'", TWCS_MAX_INACTIVE_WINDOW_FILES, u) - } - AlterTableOption::TwcsMaxActiveWindowFiles(u) => { - write!(f, "'{}' = '{}'", TWCS_MAX_ACTIVE_WINDOW_FILES, u) - } - AlterTableOption::TwcsMaxInactiveWindowRuns(u) => { - write!(f, "'{}' = '{}'", TWCS_MAX_INACTIVE_WINDOW_RUNS, u) - } - AlterTableOption::TwcsMaxActiveWindowRuns(u) => { - write!(f, "'{}' = '{}'", TWCS_MAX_ACTIVE_WINDOW_RUNS, u) + AlterTableOption::TwcsTriggerFileNum(u) => { + write!(f, "'{}' = '{}'", TWCS_TRIGGER_FILE_NUM, u) } } } @@ -212,21 +184,15 @@ mod tests { ] ); - let option_string = "compaction.twcs.max_active_window_files = '5030469694939972912', - compaction.twcs.max_active_window_runs = '8361168990283879099', - compaction.twcs.max_inactive_window_files = '6028716566907830876', - compaction.twcs.max_inactive_window_runs = '10622283085591494074', + let option_string = "compaction.twcs.trigger_file_num = '5030469694939972912', compaction.twcs.max_output_file_size = '15686.4PiB', compaction.twcs.time_window = '2061999256ms', compaction.type = 'twcs', ttl = '1month 3days 15h 49m 8s 279ms'"; let options = AlterTableOption::parse_kv_pairs(option_string).unwrap(); - assert_eq!(options.len(), 7); + assert_eq!(options.len(), 4); let expected = vec![ - AlterTableOption::TwcsMaxActiveWindowFiles(5030469694939972912), - AlterTableOption::TwcsMaxActiveWindowRuns(8361168990283879099), - AlterTableOption::TwcsMaxInactiveWindowFiles(6028716566907830876), - AlterTableOption::TwcsMaxInactiveWindowRuns(10622283085591494074), + AlterTableOption::TwcsTriggerFileNum(5030469694939972912), AlterTableOption::TwcsMaxOutputFileSize(ReadableSize::from_str("15686.4PiB").unwrap()), AlterTableOption::TwcsTimeWindow(Duration::new_nanosecond(2_061_999_256_000_000)), AlterTableOption::Ttl(Ttl::Duration(Duration::new_millisecond( diff --git a/tests-fuzz/src/translator/mysql/alter_expr.rs b/tests-fuzz/src/translator/mysql/alter_expr.rs index 3bf30b09a3..942f41eefd 100644 --- a/tests-fuzz/src/translator/mysql/alter_expr.rs +++ b/tests-fuzz/src/translator/mysql/alter_expr.rs @@ -191,10 +191,7 @@ mod tests { AlterTableOption::Ttl(Ttl::Duration(Duration::new_second(60))), AlterTableOption::TwcsTimeWindow(Duration::new_second(60)), AlterTableOption::TwcsMaxOutputFileSize(ReadableSize::from_str("1GB").unwrap()), - AlterTableOption::TwcsMaxActiveWindowFiles(10), - AlterTableOption::TwcsMaxActiveWindowRuns(10), - AlterTableOption::TwcsMaxInactiveWindowFiles(5), - AlterTableOption::TwcsMaxInactiveWindowRuns(5), + AlterTableOption::TwcsTriggerFileNum(5), ], }, }; @@ -204,10 +201,7 @@ mod tests { "ALTER TABLE test SET 'ttl' = '60s', ", "'compaction.twcs.time_window' = '60s', ", "'compaction.twcs.max_output_file_size' = '1.0GiB', ", - "'compaction.twcs.max_active_window_files' = '10', ", - "'compaction.twcs.max_active_window_runs' = '10', ", - "'compaction.twcs.max_inactive_window_files' = '5', ", - "'compaction.twcs.max_inactive_window_runs' = '5';" + "'compaction.twcs.trigger_file_num' = '5';" ); assert_eq!(expected, output); } diff --git a/tests-fuzz/src/translator/postgres/alter_expr.rs b/tests-fuzz/src/translator/postgres/alter_expr.rs index f66ce0db92..8a0681daa6 100644 --- a/tests-fuzz/src/translator/postgres/alter_expr.rs +++ b/tests-fuzz/src/translator/postgres/alter_expr.rs @@ -187,10 +187,7 @@ mod tests { AlterTableOption::Ttl(Ttl::Duration(Duration::new_second(60))), AlterTableOption::TwcsTimeWindow(Duration::new_second(60)), AlterTableOption::TwcsMaxOutputFileSize(ReadableSize::from_str("1GB").unwrap()), - AlterTableOption::TwcsMaxActiveWindowFiles(10), - AlterTableOption::TwcsMaxActiveWindowRuns(10), - AlterTableOption::TwcsMaxInactiveWindowFiles(5), - AlterTableOption::TwcsMaxInactiveWindowRuns(5), + AlterTableOption::TwcsTriggerFileNum(10), ], }, }; @@ -200,10 +197,7 @@ mod tests { "ALTER TABLE test SET 'ttl' = '60s', ", "'compaction.twcs.time_window' = '60s', ", "'compaction.twcs.max_output_file_size' = '1.0GiB', ", - "'compaction.twcs.max_active_window_files' = '10', ", - "'compaction.twcs.max_active_window_runs' = '10', ", - "'compaction.twcs.max_inactive_window_files' = '5', ", - "'compaction.twcs.max_inactive_window_runs' = '5';" + "'compaction.twcs.trigger_file_num' = '10';", ); assert_eq!(expected, output); } diff --git a/tests/cases/distributed/explain/order_by.result b/tests/cases/distributed/explain/order_by.result index 26e566a776..9b96d27829 100644 --- a/tests/cases/distributed/explain/order_by.result +++ b/tests/cases/distributed/explain/order_by.result @@ -59,7 +59,7 @@ DROP TABLE test; Affected Rows: 0 -CREATE TABLE test_pk(pk INTEGER PRIMARY KEY, i INTEGER, t TIMESTAMP TIME INDEX) WITH('compaction.type'='twcs', 'compaction.twcs.max_inactive_window_files'='4'); +CREATE TABLE test_pk(pk INTEGER PRIMARY KEY, i INTEGER, t TIMESTAMP TIME INDEX) WITH('compaction.type'='twcs', 'compaction.twcs.trigger_file_num'='4'); Affected Rows: 0 diff --git a/tests/cases/distributed/explain/order_by.sql b/tests/cases/distributed/explain/order_by.sql index 690afe8da4..3819e8061f 100644 --- a/tests/cases/distributed/explain/order_by.sql +++ b/tests/cases/distributed/explain/order_by.sql @@ -28,7 +28,7 @@ EXPLAIN SELECT DISTINCT a, b FROM test ORDER BY a, b; DROP TABLE test; -CREATE TABLE test_pk(pk INTEGER PRIMARY KEY, i INTEGER, t TIMESTAMP TIME INDEX) WITH('compaction.type'='twcs', 'compaction.twcs.max_inactive_window_files'='4'); +CREATE TABLE test_pk(pk INTEGER PRIMARY KEY, i INTEGER, t TIMESTAMP TIME INDEX) WITH('compaction.type'='twcs', 'compaction.twcs.trigger_file_num'='4'); INSERT INTO test_pk VALUES (1, 1, 1), (2, NULL, 2), (3, 1, 3), (4, 2, 4), (5, 2, 5), (6, NULL, 6); diff --git a/tests/cases/standalone/common/alter/alter_table_options.result b/tests/cases/standalone/common/alter/alter_table_options.result index 14849a70d4..f91fd9da88 100644 --- a/tests/cases/standalone/common/alter/alter_table_options.result +++ b/tests/cases/standalone/common/alter/alter_table_options.result @@ -155,46 +155,31 @@ ALTER TABLE ato SET 'compaction.twcs.max_output_file_size'='500MB'; Affected Rows: 0 -ALTER TABLE ato SET 'compaction.twcs.max_inactive_window_files'='2'; - -Affected Rows: 0 - -ALTER TABLE ato SET 'compaction.twcs.max_active_window_files'='2'; - -Affected Rows: 0 - -ALTER TABLE ato SET 'compaction.twcs.max_active_window_runs'='6'; - -Affected Rows: 0 - -ALTER TABLE ato SET 'compaction.twcs.max_inactive_window_runs'='6'; +ALTER TABLE ato SET 'compaction.twcs.trigger_file_num'='2'; Affected Rows: 0 SHOW CREATE TABLE ato; -+-------+------------------------------------------------------+ -| Table | Create Table | -+-------+------------------------------------------------------+ -| ato | CREATE TABLE IF NOT EXISTS "ato" ( | -| | "i" INT NULL, | -| | "j" TIMESTAMP(3) NOT NULL, | -| | TIME INDEX ("j"), | -| | PRIMARY KEY ("i") | -| | ) | -| | | -| | ENGINE=mito | -| | WITH( | -| | 'compaction.twcs.max_active_window_files' = '2', | -| | 'compaction.twcs.max_active_window_runs' = '6', | -| | 'compaction.twcs.max_inactive_window_files' = '2', | -| | 'compaction.twcs.max_inactive_window_runs' = '6', | -| | 'compaction.twcs.max_output_file_size' = '500MB', | -| | 'compaction.twcs.time_window' = '2h', | -| | 'compaction.type' = 'twcs', | -| | ttl = '1s' | -| | ) | -+-------+------------------------------------------------------+ ++-------+-----------------------------------------------------+ +| Table | Create Table | ++-------+-----------------------------------------------------+ +| ato | CREATE TABLE IF NOT EXISTS "ato" ( | +| | "i" INT NULL, | +| | "j" TIMESTAMP(3) NOT NULL, | +| | TIME INDEX ("j"), | +| | PRIMARY KEY ("i") | +| | ) | +| | | +| | ENGINE=mito | +| | WITH( | +| | 'compaction.twcs.max_output_file_size' = '500MB', | +| | 'compaction.twcs.time_window' = '2h', | +| | 'compaction.twcs.trigger_file_num' = '2', | +| | 'compaction.type' = 'twcs', | +| | ttl = '1s' | +| | ) | ++-------+-----------------------------------------------------+ ALTER TABLE ato UNSET 'compaction.twcs.time_window'; @@ -206,78 +191,69 @@ Error: 1004(InvalidArguments), Invalid unset table option request: Invalid set r SHOW CREATE TABLE ato; -+-------+------------------------------------------------------+ -| Table | Create Table | -+-------+------------------------------------------------------+ -| ato | CREATE TABLE IF NOT EXISTS "ato" ( | -| | "i" INT NULL, | -| | "j" TIMESTAMP(3) NOT NULL, | -| | TIME INDEX ("j"), | -| | PRIMARY KEY ("i") | -| | ) | -| | | -| | ENGINE=mito | -| | WITH( | -| | 'compaction.twcs.max_active_window_files' = '2', | -| | 'compaction.twcs.max_active_window_runs' = '6', | -| | 'compaction.twcs.max_inactive_window_files' = '2', | -| | 'compaction.twcs.max_inactive_window_runs' = '6', | -| | 'compaction.twcs.max_output_file_size' = '500MB', | -| | 'compaction.type' = 'twcs', | -| | ttl = '1s' | -| | ) | -+-------+------------------------------------------------------+ ++-------+-----------------------------------------------------+ +| Table | Create Table | ++-------+-----------------------------------------------------+ +| ato | CREATE TABLE IF NOT EXISTS "ato" ( | +| | "i" INT NULL, | +| | "j" TIMESTAMP(3) NOT NULL, | +| | TIME INDEX ("j"), | +| | PRIMARY KEY ("i") | +| | ) | +| | | +| | ENGINE=mito | +| | WITH( | +| | 'compaction.twcs.max_output_file_size' = '500MB', | +| | 'compaction.twcs.trigger_file_num' = '2', | +| | 'compaction.type' = 'twcs', | +| | ttl = '1s' | +| | ) | ++-------+-----------------------------------------------------+ -ALTER TABLE ato SET 'compaction.twcs.max_inactive_window_runs'=''; +ALTER TABLE ato SET 'compaction.twcs.trigger_file_num'=''; Affected Rows: 0 SHOW CREATE TABLE ato; -+-------+------------------------------------------------------+ -| Table | Create Table | -+-------+------------------------------------------------------+ -| ato | CREATE TABLE IF NOT EXISTS "ato" ( | -| | "i" INT NULL, | -| | "j" TIMESTAMP(3) NOT NULL, | -| | TIME INDEX ("j"), | -| | PRIMARY KEY ("i") | -| | ) | -| | | -| | ENGINE=mito | -| | WITH( | -| | 'compaction.twcs.max_active_window_files' = '2', | -| | 'compaction.twcs.max_active_window_runs' = '6', | -| | 'compaction.twcs.max_inactive_window_files' = '2', | -| | 'compaction.twcs.max_output_file_size' = '500MB', | -| | 'compaction.type' = 'twcs', | -| | ttl = '1s' | -| | ) | -+-------+------------------------------------------------------+ ++-------+-----------------------------------------------------+ +| Table | Create Table | ++-------+-----------------------------------------------------+ +| ato | CREATE TABLE IF NOT EXISTS "ato" ( | +| | "i" INT NULL, | +| | "j" TIMESTAMP(3) NOT NULL, | +| | TIME INDEX ("j"), | +| | PRIMARY KEY ("i") | +| | ) | +| | | +| | ENGINE=mito | +| | WITH( | +| | 'compaction.twcs.max_output_file_size' = '500MB', | +| | 'compaction.type' = 'twcs', | +| | ttl = '1s' | +| | ) | ++-------+-----------------------------------------------------+ -- SQLNESS ARG restart=true SHOW CREATE TABLE ato; -+-------+------------------------------------------------------+ -| Table | Create Table | -+-------+------------------------------------------------------+ -| ato | CREATE TABLE IF NOT EXISTS "ato" ( | -| | "i" INT NULL, | -| | "j" TIMESTAMP(3) NOT NULL, | -| | TIME INDEX ("j"), | -| | PRIMARY KEY ("i") | -| | ) | -| | | -| | ENGINE=mito | -| | WITH( | -| | 'compaction.twcs.max_active_window_files' = '2', | -| | 'compaction.twcs.max_active_window_runs' = '6', | -| | 'compaction.twcs.max_inactive_window_files' = '2', | -| | 'compaction.twcs.max_output_file_size' = '500MB', | -| | 'compaction.type' = 'twcs', | -| | ttl = '1s' | -| | ) | -+-------+------------------------------------------------------+ ++-------+-----------------------------------------------------+ +| Table | Create Table | ++-------+-----------------------------------------------------+ +| ato | CREATE TABLE IF NOT EXISTS "ato" ( | +| | "i" INT NULL, | +| | "j" TIMESTAMP(3) NOT NULL, | +| | TIME INDEX ("j"), | +| | PRIMARY KEY ("i") | +| | ) | +| | | +| | ENGINE=mito | +| | WITH( | +| | 'compaction.twcs.max_output_file_size' = '500MB', | +| | 'compaction.type' = 'twcs', | +| | ttl = '1s' | +| | ) | ++-------+-----------------------------------------------------+ DROP TABLE ato; diff --git a/tests/cases/standalone/common/alter/alter_table_options.sql b/tests/cases/standalone/common/alter/alter_table_options.sql index 63d794c661..badf1275cd 100644 --- a/tests/cases/standalone/common/alter/alter_table_options.sql +++ b/tests/cases/standalone/common/alter/alter_table_options.sql @@ -36,13 +36,7 @@ ALTER TABLE ato SET 'compaction.twcs.time_window'='2h'; ALTER TABLE ato SET 'compaction.twcs.max_output_file_size'='500MB'; -ALTER TABLE ato SET 'compaction.twcs.max_inactive_window_files'='2'; - -ALTER TABLE ato SET 'compaction.twcs.max_active_window_files'='2'; - -ALTER TABLE ato SET 'compaction.twcs.max_active_window_runs'='6'; - -ALTER TABLE ato SET 'compaction.twcs.max_inactive_window_runs'='6'; +ALTER TABLE ato SET 'compaction.twcs.trigger_file_num'='2'; SHOW CREATE TABLE ato; @@ -52,7 +46,7 @@ ALTER TABLE ato UNSET '🕶️'; SHOW CREATE TABLE ato; -ALTER TABLE ato SET 'compaction.twcs.max_inactive_window_runs'=''; +ALTER TABLE ato SET 'compaction.twcs.trigger_file_num'=''; SHOW CREATE TABLE ato; diff --git a/tests/cases/standalone/common/create/create_with_options.result b/tests/cases/standalone/common/create/create_with_options.result index 8379451e11..bb350a78aa 100644 --- a/tests/cases/standalone/common/create/create_with_options.result +++ b/tests/cases/standalone/common/create/create_with_options.result @@ -67,8 +67,7 @@ engine=mito with( 'ttl'='7d', 'compaction.type'='twcs', - 'compaction.twcs.max_active_window_runs'='2', - 'compaction.twcs.max_inactive_window_runs'='2', + 'compaction.twcs.trigger_file_num'='2', 'compaction.twcs.time_window'='1d', 'index.inverted_index.ignore_column_ids'='1,2,3', 'index.inverted_index.segment_row_count'='512', @@ -90,7 +89,7 @@ create table if not exists invalid_compaction( PRIMARY KEY(host) ) engine=mito -with('compaction.type'='twcs', 'compaction.twcs.max_active_window_runs'='8d'); +with('compaction.type'='twcs', 'compaction.twcs.trigger_file_num'='8d'); Error: 1004(InvalidArguments), Invalid options: invalid digit found in string diff --git a/tests/cases/standalone/common/create/create_with_options.sql b/tests/cases/standalone/common/create/create_with_options.sql index 3c848cd27e..13824f76aa 100644 --- a/tests/cases/standalone/common/create/create_with_options.sql +++ b/tests/cases/standalone/common/create/create_with_options.sql @@ -57,8 +57,7 @@ engine=mito with( 'ttl'='7d', 'compaction.type'='twcs', - 'compaction.twcs.max_active_window_runs'='2', - 'compaction.twcs.max_inactive_window_runs'='2', + 'compaction.twcs.trigger_file_num'='2', 'compaction.twcs.time_window'='1d', 'index.inverted_index.ignore_column_ids'='1,2,3', 'index.inverted_index.segment_row_count'='512', @@ -76,4 +75,4 @@ create table if not exists invalid_compaction( PRIMARY KEY(host) ) engine=mito -with('compaction.type'='twcs', 'compaction.twcs.max_active_window_runs'='8d'); +with('compaction.type'='twcs', 'compaction.twcs.trigger_file_num'='8d'); diff --git a/tests/cases/standalone/common/order/windowed_sort.result b/tests/cases/standalone/common/order/windowed_sort.result index 1509e25bea..f41a3345fe 100644 --- a/tests/cases/standalone/common/order/windowed_sort.result +++ b/tests/cases/standalone/common/order/windowed_sort.result @@ -1,5 +1,5 @@ -- Test without PK, with a windowed sort query. -CREATE TABLE test(i INTEGER, t TIMESTAMP TIME INDEX) WITH('compaction.type'='twcs', 'compaction.twcs.max_inactive_window_files'='4'); +CREATE TABLE test(i INTEGER, t TIMESTAMP TIME INDEX) WITH('compaction.type'='twcs'); Affected Rows: 0 @@ -213,7 +213,7 @@ DROP TABLE test; Affected Rows: 0 -- Test with PK, with a windowed sort query. -CREATE TABLE test_pk(pk INTEGER PRIMARY KEY, i INTEGER, t TIMESTAMP TIME INDEX) WITH('compaction.type'='twcs', 'compaction.twcs.max_inactive_window_files'='4'); +CREATE TABLE test_pk(pk INTEGER PRIMARY KEY, i INTEGER, t TIMESTAMP TIME INDEX) WITH('compaction.type'='twcs', 'compaction.twcs.trigger_file_num'='4'); Affected Rows: 0 diff --git a/tests/cases/standalone/common/order/windowed_sort.sql b/tests/cases/standalone/common/order/windowed_sort.sql index 9e0e24c38d..1349e4d96c 100644 --- a/tests/cases/standalone/common/order/windowed_sort.sql +++ b/tests/cases/standalone/common/order/windowed_sort.sql @@ -1,5 +1,5 @@ -- Test without PK, with a windowed sort query. -CREATE TABLE test(i INTEGER, t TIMESTAMP TIME INDEX) WITH('compaction.type'='twcs', 'compaction.twcs.max_inactive_window_files'='4'); +CREATE TABLE test(i INTEGER, t TIMESTAMP TIME INDEX) WITH('compaction.type'='twcs'); INSERT INTO test VALUES (1, 1), (NULL, 2), (1, 3); @@ -71,7 +71,7 @@ EXPLAIN ANALYZE SELECT * FROM test where t > 8 ORDER BY t DESC LIMIT 4; DROP TABLE test; -- Test with PK, with a windowed sort query. -CREATE TABLE test_pk(pk INTEGER PRIMARY KEY, i INTEGER, t TIMESTAMP TIME INDEX) WITH('compaction.type'='twcs', 'compaction.twcs.max_inactive_window_files'='4'); +CREATE TABLE test_pk(pk INTEGER PRIMARY KEY, i INTEGER, t TIMESTAMP TIME INDEX) WITH('compaction.type'='twcs', 'compaction.twcs.trigger_file_num'='4'); INSERT INTO test_pk VALUES (1, 1, 1), (2, NULL, 2), (3, 1, 3); diff --git a/tests/cases/standalone/common/select/flush_append_only.result b/tests/cases/standalone/common/select/flush_append_only.result index 2413e8e0d4..38fadb35bf 100644 --- a/tests/cases/standalone/common/select/flush_append_only.result +++ b/tests/cases/standalone/common/select/flush_append_only.result @@ -9,8 +9,7 @@ with ( append_mode = 'true', 'compaction.type' = 'twcs', - 'compaction.twcs.max_active_window_files' = '8', - 'compaction.twcs.max_inactive_window_files' = '8' + 'compaction.twcs.trigger_file_num' = '8', ); Affected Rows: 0 diff --git a/tests/cases/standalone/common/select/flush_append_only.sql b/tests/cases/standalone/common/select/flush_append_only.sql index e8d6defea2..8167776db2 100644 --- a/tests/cases/standalone/common/select/flush_append_only.sql +++ b/tests/cases/standalone/common/select/flush_append_only.sql @@ -9,8 +9,7 @@ with ( append_mode = 'true', 'compaction.type' = 'twcs', - 'compaction.twcs.max_active_window_files' = '8', - 'compaction.twcs.max_inactive_window_files' = '8' + 'compaction.twcs.trigger_file_num' = '8', ); insert into diff --git a/tests/cases/standalone/optimizer/order_by.result b/tests/cases/standalone/optimizer/order_by.result index 6bfd107bb6..d46ca5ab1b 100644 --- a/tests/cases/standalone/optimizer/order_by.result +++ b/tests/cases/standalone/optimizer/order_by.result @@ -52,7 +52,7 @@ explain select * from numbers order by number asc limit 10; | | | +---------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ -CREATE TABLE test_pk(pk INTEGER PRIMARY KEY, i INTEGER, t TIMESTAMP TIME INDEX) WITH('compaction.type'='twcs', 'compaction.twcs.max_inactive_window_files'='4'); +CREATE TABLE test_pk(pk INTEGER PRIMARY KEY, i INTEGER, t TIMESTAMP TIME INDEX) WITH('compaction.type'='twcs'); Affected Rows: 0 diff --git a/tests/cases/standalone/optimizer/order_by.sql b/tests/cases/standalone/optimizer/order_by.sql index f8cb7f6c9d..211f09ebbf 100644 --- a/tests/cases/standalone/optimizer/order_by.sql +++ b/tests/cases/standalone/optimizer/order_by.sql @@ -8,7 +8,7 @@ explain select * from numbers order by number desc limit 10; explain select * from numbers order by number asc limit 10; -CREATE TABLE test_pk(pk INTEGER PRIMARY KEY, i INTEGER, t TIMESTAMP TIME INDEX) WITH('compaction.type'='twcs', 'compaction.twcs.max_inactive_window_files'='4'); +CREATE TABLE test_pk(pk INTEGER PRIMARY KEY, i INTEGER, t TIMESTAMP TIME INDEX) WITH('compaction.type'='twcs'); INSERT INTO test_pk VALUES (1, 1, 1), (2, NULL, 2), (3, 1, 3), (4, 2, 4), (5, 2, 5), (6, NULL, 6);