feat!: revise compaction picker (#6121)

* - **Refactor `RegionFilePathFactory` to `RegionFilePathProvider`:** Updated references and implementations in `access_layer.rs`, `write_cache.rs`, and related test files to use the new struct name.
 - **Add `max_file_size` support in compaction:** Introduced `max_file_size` option in `PickerOutput`, `SerializedPickerOutput`, and `WriteOptions` in `compactor.rs`, `picker.rs`, `twcs.rs`, and `window.rs`.
 - **Enhance Parquet writing logic:** Modified `parquet.rs` and `parquet/writer.rs` to support optional `max_file_size` and added a test case `test_write_multiple_files` to verify writing multiple files based on size constraints.

 **Refactor Parquet Writer Initialization and File Handling**
 - Updated `ParquetWriter` in `writer.rs` to handle `current_indexer` as an `Option`, allowing for more flexible initialization and management.
 - Introduced `finish_current_file` method to encapsulate logic for completing and transitioning between SST files, improving code clarity and maintainability.
 - Enhanced error handling and logging with `debug` statements for better traceability during file operations.

 - **Removed Output Size Enforcement in `twcs.rs`:**
   - Deleted the `enforce_max_output_size` function and related logic to simplify compaction input handling.

 - **Added Max File Size Option in `parquet.rs`:**
   - Introduced `max_file_size` in `WriteOptions` to control the maximum size of output files.

 - **Refactored Indexer Management in `parquet/writer.rs`:**
   - Changed `current_indexer` from an `Option` to a direct `Indexer` type.
   - Implemented `roll_to_next_file` to handle file transitions when exceeding `max_file_size`.
   - Simplified indexer initialization and management logic.

 - **Refactored SST File Handling**:
   - Introduced `FilePathProvider` trait and its implementations (`WriteCachePathProvider`, `RegionFilePathFactory`) to manage SST and index file paths.
   - Updated `AccessLayer`, `WriteCache`, and `ParquetWriter` to use `FilePathProvider` for path management.
   - Modified `SstWriteRequest` and `SstUploadRequest` to use path providers instead of direct paths.
   - Files affected: `access_layer.rs`, `write_cache.rs`, `parquet.rs`, `writer.rs`.

 - **Enhanced Indexer Management**:
   - Replaced `IndexerBuilder` with `IndexerBuilderImpl` and made it async to support dynamic indexer creation.
   - Updated `ParquetWriter` to handle multiple indexers and file IDs.
   - Files affected: `index.rs`, `parquet.rs`, `writer.rs`.

 - **Removed Redundant File ID Handling**:
   - Removed `file_id` from `SstWriteRequest` and `CompactionOutput`.
   - Updated related logic to dynamically generate file IDs where necessary.
   - Files affected: `compaction.rs`, `flush.rs`, `picker.rs`, `twcs.rs`, `window.rs`.

 - **Test Adjustments**:
   - Updated tests to align with new path and indexer management.
   - Introduced `FixedPathProvider` and `NoopIndexBuilder` for testing purposes.
   - Files affected: `sst_util.rs`, `version_util.rs`, `parquet.rs`.

* chore: rebase main

* feat/multiple-compaction-output:
 ### Add Benchmarking and Refactor Compaction Logic

 - **Benchmarking**: Added a new benchmark `run_bench` in `Cargo.toml` and implemented benchmarks in `benches/run_bench.rs` using Criterion for `find_sorted_runs` and `reduce_runs` functions.
 - **Compaction Module Enhancements**:
   - Made `run.rs` public and refactored the `Ranged` and `Item` traits to be public.
   - Simplified the logic in `find_sorted_runs` and `reduce_runs` by removing `MergeItems` and related functions.
   - Introduced `find_overlapping_items` for identifying overlapping items.
 - **Code Cleanup**: Removed redundant code and tests related to `MergeItems` in `run.rs`.

* feat/multiple-compaction-output:
 ### Enhance Compaction Logic and Add Benchmarks

 - **Compaction Logic Improvements**:
   - Updated `reduce_runs` function in `src/mito2/src/compaction/run.rs` to remove the target parameter and improve the logic for selecting files to merge based on minimum penalty.
   - Enhanced `find_overlapping_items` to handle unsorted inputs and improve overlap detection efficiency.

 - **Benchmark Enhancements**:
   - Added `bench_find_overlapping_items` in `src/mito2/benches/run_bench.rs` to benchmark the new `find_overlapping_items` function.
   - Extended existing benchmarks to include larger data sizes.

 - **Testing Enhancements**:
   - Updated tests in `src/mito2/src/compaction/run.rs` to reflect changes in `reduce_runs` and added new tests for `find_overlapping_items`.

 - **Logging and Debugging**:
   - Improved logging in `src/mito2/src/compaction/twcs.rs` to provide more detailed information about compaction decisions.

* feat/multiple-compaction-output:
 ### Refactor and Enhance Compaction Logic

 - **Refactor `find_overlapping_items` Function**: Changed the function signature to accept slices instead of mutable vectors in `run.rs`.
 - **Rename and Update Struct Fields**: Renamed `penalty` to `size` in `SortedRun` struct and updated related logic in `run.rs`.
 - **Enhance `reduce_runs` Function**: Improved logic to sort runs by size and limit probe runs to 100 in `run.rs`.
 - **Add `merge_seq_files` Function**: Introduced a new function `merge_seq_files` in `run.rs` for merging sequential files.
 - **Modify `TwcsPicker` Logic**: Updated the compaction logic to use `merge_seq_files` when only one run is found in `twcs.rs`.
 - **Remove `enforce_file_num` Function**: Deleted the `enforce_file_num` function and its related test cases in `twcs.rs`.

* feat/multiple-compaction-output:
 ### Enhance Compaction Logic and Testing

 - **Add `merge_seq_files` Functionality**: Implemented the `merge_seq_files` function in `run.rs` to optimize file merging based on scoring systems. Updated
 benchmarks in `run_bench.rs` to include `bench_merge_seq_files`.
 - **Improve Compaction Strategy in `twcs.rs`**: Modified the compaction logic to handle file merging more effectively, considering file size and overlap.
 - **Update Tests**: Enhanced test coverage in `compaction_test.rs` and `append_mode_test.rs` to validate new compaction logic and file merging strategies.
 - **Remove Unused Function**: Deleted `new_file_handles` from `test_util.rs` as it was no longer needed.

* feat/multiple-compaction-output:
 ### Refactor TWCS Compaction Options

 - **Refactor Compaction Logic**: Simplified the TWCS compaction logic by replacing multiple parameters (`max_active_window_runs`, `max_active_window_files`, `max_inactive_window_runs`, `max_inactive_window_files`) with a single `trigger_file_num` parameter in `picker.rs`, `twcs.rs`, and `options.rs`.
 - **Update Tests**: Adjusted test cases to reflect the new compaction logic in `append_mode_test.rs`, `compaction_test.rs`, `filter_deleted_test.rs`, `merge_mode_test.rs`, and various test files under `tests/cases`.
 - **Modify Engine Options**: Updated engine option keys to use `trigger_file_num` in `mito_engine_options.rs` and `region_request.rs`.
 - **Fuzz Testing**: Updated fuzz test generators and translators to accommodate the new compaction parameter in `alter_expr.rs` and related files.

 This refactor aims to streamline the compaction configuration by reducing the number of parameters and simplifying the codebase.

* chore: add trailing space

* fix license header

* feat/revise-compaction-picker:
 **Limit File Processing and Optimize Merge Logic in `run.rs`**

 - Introduced a limit to process a maximum of 100 files in `merge_seq_files` to control time complexity.
 - Adjusted logic to calculate `target_size` and iterate over files using the limited set of files.
 - Updated scoring calculations to use the limited file set, ensuring efficient file merging.

* feat/revise-compaction-picker:
 ### Add Compaction Metrics and Remove Debug Logging

 - **Compaction Metrics**: Introduced new histograms `COMPACTION_INPUT_BYTES` and `COMPACTION_OUTPUT_BYTES` to track compaction input and output file sizes in `metrics.rs`. Updated `compactor.rs` to observe these metrics during the compaction process.
 - **Logging Cleanup**: Removed debug logging of file ranges during the merge process in `twcs.rs`.

* feat/revise-compaction-picker:
 ## Enhance Compaction Logic and Metrics

 - **Compaction Logic Improvements**:
   - Added methods `input_file_size` and `output_file_size` to `MergeOutput` in `compactor.rs` to streamline file size calculations.
   - Updated `Compactor` implementation to use these methods for metrics tracking.
   - Modified `Ranged` trait logic in `run.rs` to improve range comparison.
   - Enhanced test cases in `run.rs` to reflect changes in compaction logic.

 - **Metrics Enhancements**:
   - Changed `COMPACTION_INPUT_BYTES` and `COMPACTION_OUTPUT_BYTES` from histograms to counters in `metrics.rs` for better performance tracking.

 - **Debugging and Logging**:
   - Added detailed logging for compaction pick results in `twcs.rs`.
   - Implemented custom `Debug` trait for `FileMeta` in `file.rs` to improve debugging output.

 - **Testing Enhancements**:
   - Added new test `test_compaction_overlapping_files` in `compaction_test.rs` to verify compaction behavior with overlapping files.
   - Updated `merge_mode_test.rs` to reflect changes in file handling during scans.

* feat/revise-compaction-picker:
 ### Update `FileHandle` Debug Implementation

 - **Refactor Debug Output**: Simplified the `fmt::Debug` implementation for `FileHandle` in `src/mito2/src/sst/file.rs` by consolidating multiple fields into a single `meta` field using `meta_ref()`.
 - **Atomic Operations**: Updated the `deleted` field to use atomic loading with `Ordering::Relaxed`.

* Trigger CI

* feat/revise-compaction-picker:
 **Update compaction logic and default options**

 - **`twcs.rs`**: Enhanced logging for compaction pick results by improving the formatting for better readability.
 - **`options.rs`**: Modified the default `max_output_file_size` in `TwcsOptions` from 2GB to 512MB to optimize file handling and performance.

* feat/revise-compaction-picker:
 Refactor `find_overlapping_items` to use an external result vector

 - Updated `find_overlapping_items` in `src/mito2/src/compaction/run.rs` to accept a mutable result vector instead of returning a new vector, improving memory efficiency.
 - Modified benchmarks in `src/mito2/benches/bench_compaction_picker.rs` to accommodate the new function signature.
 - Adjusted tests in `src/mito2/src/compaction/run.rs` to use the updated function signature, ensuring correct functionality with the new approach.

* feat/revise-compaction-picker:
 Improve file merging logic in `run.rs`

 - Refactor the loop logic in `merge_seq_files` to simplify the iteration over file groups.
 - Adjust the range for `end_idx` to include the endpoint, allowing for more flexible group selection.
 - Remove the condition that skips groups with only one file, enabling more comprehensive processing of file sequences.

* feat/revise-compaction-picker:
 Enhance `find_overlapping_items` with `SortedRun` and Update Tests

 - Refactor `find_overlapping_items` in `src/mito2/src/compaction/run.rs` to utilize the `SortedRun` struct for improved efficiency and clarity.
 - Introduce a `sorted` flag in `SortedRun` to optimize sorting operations.
 - Update test cases in `src/mito2/benches/bench_compaction_picker.rs` to accommodate changes in `find_overlapping_items` by using `SortedRun`.
 - Add `From<Vec<T>>` implementation for `SortedRun` to facilitate easy conversion from vectors.

* feat/revise-compaction-picker:
 **Enhancements in `compaction/run.rs`:**

 - Added `ReadableSize` import to handle size calculations.
 - Modified the logic in `merge_seq_files` to clamp the calculated target size to a maximum of 2GB when `max_file_size` is not provided.

* feat/revise-compaction-picker: Add Default Max Output Size Constant for Compaction

Introduce DEFAULT_MAX_OUTPUT_SIZE constant to define the default maximum compaction output file size as 2GB. Refactor the merge_seq_files function to utilize this constant, ensuring consistent and maintainable code for handling file size limits during compaction.
This commit is contained in:
Lei, HUANG
2025-05-23 11:29:08 +08:00
committed by GitHub
parent bf496e05cc
commit 4b71e493f7
40 changed files with 1172 additions and 1073 deletions

View File

@@ -97,3 +97,8 @@ required-features = ["test"]
name = "bench_filter_time_partition"
harness = false
required-features = ["test"]
[[bench]]
name = "bench_compaction_picker"
harness = false
required-features = ["test"]

View File

@@ -0,0 +1,157 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use criterion::{black_box, criterion_group, criterion_main, Criterion};
use mito2::compaction::run::{
find_overlapping_items, find_sorted_runs, merge_seq_files, reduce_runs, Item, Ranged, SortedRun,
};
#[derive(Clone, Debug, Eq, Hash, PartialEq)]
struct MockFile {
start: i64,
end: i64,
size: usize,
}
impl Ranged for MockFile {
type BoundType = i64;
fn range(&self) -> (Self::BoundType, Self::BoundType) {
(self.start, self.end)
}
}
impl Item for MockFile {
fn size(&self) -> usize {
self.size
}
}
fn generate_test_files(n: usize) -> Vec<MockFile> {
let mut files = Vec::with_capacity(n);
for _ in 0..n {
// Create slightly overlapping ranges to force multiple sorted runs
files.push(MockFile {
start: 0,
end: 10,
size: 10,
});
}
files
}
fn bench_find_sorted_runs(c: &mut Criterion) {
let mut group = c.benchmark_group("find_sorted_runs");
for size in [10, 100, 1000].iter() {
group.bench_function(format!("size_{}", size), |b| {
let mut files = generate_test_files(*size);
b.iter(|| {
find_sorted_runs(black_box(&mut files));
});
});
}
group.finish();
}
fn bench_reduce_runs(c: &mut Criterion) {
let mut group = c.benchmark_group("reduce_runs");
for size in [10, 100, 1000].iter() {
group.bench_function(format!("size_{}", size), |b| {
let mut files = generate_test_files(*size);
let runs = find_sorted_runs(&mut files);
b.iter(|| {
reduce_runs(black_box(runs.clone()));
});
});
}
group.finish();
}
fn bench_find_overlapping_items(c: &mut Criterion) {
let mut group = c.benchmark_group("find_overlapping_items");
for size in [10, 100, 1000].iter() {
group.bench_function(format!("size_{}", size), |b| {
// Create two sets of files with some overlapping ranges
let mut files1 = Vec::with_capacity(*size);
let mut files2 = Vec::with_capacity(*size);
for i in 0..*size {
files1.push(MockFile {
start: i as i64,
end: (i + 5) as i64,
size: 10,
});
files2.push(MockFile {
start: (i + 3) as i64,
end: (i + 8) as i64,
size: 10,
});
}
let mut r1 = SortedRun::from(files1);
let mut r2 = SortedRun::from(files2);
b.iter(|| {
let mut result = vec![];
find_overlapping_items(black_box(&mut r1), black_box(&mut r2), &mut result);
});
});
}
group.finish();
}
fn bench_merge_seq_files(c: &mut Criterion) {
let mut group = c.benchmark_group("merge_seq_files");
for size in [10, 100, 1000].iter() {
group.bench_function(format!("size_{}", size), |b| {
// Create a set of files with varying sizes
let mut files = Vec::with_capacity(*size);
for i in 0..*size {
// Create files with different sizes to test the scoring algorithm
let file_size = if i % 3 == 0 {
5
} else if i % 3 == 1 {
10
} else {
15
};
files.push(MockFile {
start: i as i64,
end: (i + 1) as i64,
size: file_size,
});
}
b.iter(|| {
merge_seq_files(black_box(&files), black_box(Some(50)));
});
});
}
group.finish();
}
criterion_group!(
benches,
bench_find_sorted_runs,
bench_reduce_runs,
bench_find_overlapping_items,
bench_merge_seq_files
);
criterion_main!(benches);

View File

@@ -362,7 +362,7 @@ impl FilePathProvider for WriteCachePathProvider {
/// Path provider that builds paths in region storage path.
#[derive(Clone, Debug)]
pub(crate) struct RegionFilePathFactory {
region_dir: String,
pub(crate) region_dir: String,
}
impl RegionFilePathFactory {

View File

@@ -15,7 +15,7 @@
mod buckets;
pub mod compactor;
pub mod picker;
mod run;
pub mod run;
mod task;
#[cfg(test)]
mod test_util;

View File

@@ -36,6 +36,7 @@ use crate::error::{EmptyRegionDirSnafu, JoinSnafu, ObjectStoreNotFoundSnafu, Res
use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList};
use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions};
use crate::manifest::storage::manifest_compress_type;
use crate::metrics;
use crate::read::Source;
use crate::region::opener::new_manifest_dir;
use crate::region::options::RegionOptions;
@@ -240,6 +241,14 @@ impl MergeOutput {
pub fn is_empty(&self) -> bool {
self.files_to_add.is_empty() && self.files_to_remove.is_empty()
}
pub fn input_file_size(&self) -> u64 {
self.files_to_remove.iter().map(|f| f.file_size).sum()
}
pub fn output_file_size(&self) -> u64 {
self.files_to_add.iter().map(|f| f.file_size).sum()
}
}
/// Compactor is the trait that defines the compaction logic.
@@ -286,6 +295,7 @@ impl Compactor for DefaultCompactor {
compacted_inputs.extend(output.inputs.iter().map(|f| f.meta_ref().clone()));
let write_opts = WriteOptions {
write_buffer_size: compaction_region.engine_config.sst_write_buffer_size,
max_file_size: picker_output.max_file_size,
..Default::default()
};
@@ -460,6 +470,9 @@ impl Compactor for DefaultCompactor {
);
return Ok(());
}
metrics::COMPACTION_INPUT_BYTES.inc_by(merge_output.input_file_size() as f64);
metrics::COMPACTION_OUTPUT_BYTES.inc_by(merge_output.output_file_size() as f64);
self.update_manifest(compaction_region, merge_output)
.await?;

View File

@@ -45,6 +45,8 @@ pub struct PickerOutput {
pub outputs: Vec<CompactionOutput>,
pub expired_ssts: Vec<FileHandle>,
pub time_window_size: i64,
/// Max single output file size in bytes.
pub max_file_size: Option<usize>,
}
/// SerializedPickerOutput is a serialized version of PickerOutput by replacing [CompactionOutput] and [FileHandle] with [SerializedCompactionOutput] and [FileMeta].
@@ -53,6 +55,7 @@ pub struct SerializedPickerOutput {
pub outputs: Vec<SerializedCompactionOutput>,
pub expired_ssts: Vec<FileMeta>,
pub time_window_size: i64,
pub max_file_size: Option<usize>,
}
impl From<&PickerOutput> for SerializedPickerOutput {
@@ -76,6 +79,7 @@ impl From<&PickerOutput> for SerializedPickerOutput {
outputs,
expired_ssts,
time_window_size: input.time_window_size,
max_file_size: input.max_file_size,
}
}
}
@@ -111,6 +115,7 @@ impl PickerOutput {
outputs,
expired_ssts,
time_window_size: input.time_window_size,
max_file_size: input.max_file_size,
}
}
}
@@ -131,10 +136,7 @@ pub fn new_picker(
} else {
match compaction_options {
CompactionOptions::Twcs(twcs_opts) => Arc::new(TwcsPicker {
max_active_window_runs: twcs_opts.max_active_window_runs,
max_active_window_files: twcs_opts.max_active_window_files,
max_inactive_window_runs: twcs_opts.max_inactive_window_runs,
max_inactive_window_files: twcs_opts.max_inactive_window_files,
trigger_file_num: twcs_opts.trigger_file_num,
time_window_seconds: twcs_opts.time_window_seconds(),
max_output_file_size: twcs_opts.max_output_file_size.map(|r| r.as_bytes()),
append_mode,
@@ -179,6 +181,7 @@ mod tests {
],
expired_ssts: expired_ssts_file_handle.clone(),
time_window_size: 1000,
max_file_size: None,
};
let picker_output_str =

File diff suppressed because it is too large Load Diff

View File

@@ -22,7 +22,6 @@ use tokio::sync::mpsc;
use crate::compaction::compactor::{CompactionRegion, Compactor};
use crate::compaction::picker::{CompactionTask, PickerOutput};
use crate::error;
use crate::error::CompactRegionSnafu;
use crate::manifest::action::RegionEdit;
use crate::metrics::{COMPACTION_FAILURE_COUNT, COMPACTION_STAGE_ELAPSED};
@@ -30,6 +29,7 @@ use crate::request::{
BackgroundNotify, CompactionFailed, CompactionFinished, OutputTx, WorkerRequest,
};
use crate::worker::WorkerListener;
use crate::{error, metrics};
/// Maximum number of compaction tasks in parallel.
pub const MAX_PARALLEL_COMPACTION: usize = 1;
@@ -98,6 +98,8 @@ impl CompactionTaskImpl {
};
let merge_time = merge_timer.stop_and_record();
metrics::COMPACTION_INPUT_BYTES.inc_by(compaction_result.input_file_size() as f64);
metrics::COMPACTION_OUTPUT_BYTES.inc_by(compaction_result.output_file_size() as f64);
info!(
"Compacted SST files, region_id: {}, input: {:?}, output: {:?}, window: {:?}, waiter_num: {}, merge_time: {}s",
self.compaction_region.region_id,

View File

@@ -44,30 +44,3 @@ pub fn new_file_handle(
file_purger,
)
}
pub(crate) fn new_file_handles(file_specs: &[(i64, i64, u64)]) -> Vec<FileHandle> {
let file_purger = new_noop_file_purger();
file_specs
.iter()
.map(|(start, end, size)| {
FileHandle::new(
FileMeta {
region_id: 0.into(),
file_id: FileId::random(),
time_range: (
Timestamp::new_millisecond(*start),
Timestamp::new_millisecond(*end),
),
level: 0,
file_size: *size,
available_indexes: Default::default(),
index_file_size: 0,
num_rows: 0,
num_row_groups: 0,
sequence: None,
},
file_purger.clone(),
)
})
.collect()
}

View File

@@ -16,15 +16,17 @@ use std::collections::hash_map::Entry;
use std::collections::{BTreeMap, HashMap};
use std::fmt::Debug;
use common_telemetry::{info, trace};
use common_base::readable_size::ReadableSize;
use common_telemetry::info;
use common_time::timestamp::TimeUnit;
use common_time::timestamp_millis::BucketAligned;
use common_time::Timestamp;
use store_api::storage::RegionId;
use crate::compaction::buckets::infer_time_bucket;
use crate::compaction::compactor::CompactionRegion;
use crate::compaction::picker::{Picker, PickerOutput};
use crate::compaction::run::{find_sorted_runs, reduce_runs, Item};
use crate::compaction::run::{find_sorted_runs, merge_seq_files, reduce_runs};
use crate::compaction::{get_expired_ssts, CompactionOutput};
use crate::sst::file::{overlaps, FileHandle, Level};
use crate::sst::version::LevelMeta;
@@ -35,14 +37,8 @@ const LEVEL_COMPACTED: Level = 1;
/// candidates.
#[derive(Debug)]
pub struct TwcsPicker {
/// Max allowed sorted runs in active window.
pub max_active_window_runs: usize,
/// Max allowed files in active window.
pub max_active_window_files: usize,
/// Max allowed sorted runs in inactive windows.
pub max_inactive_window_runs: usize,
/// Max allowed files in inactive windows.
pub max_inactive_window_files: usize,
/// Minimum file num to trigger a compaction.
pub trigger_file_num: usize,
/// Compaction time window in seconds.
pub time_window_seconds: Option<i64>,
/// Max allowed compaction output file size.
@@ -53,89 +49,48 @@ pub struct TwcsPicker {
impl TwcsPicker {
/// Builds compaction output from files.
/// For active writing window, we allow for at most `max_active_window_runs` files to alleviate
/// fragmentation. For other windows, we allow at most 1 file at each window.
fn build_output(
&self,
region_id: RegionId,
time_windows: &mut BTreeMap<i64, Window>,
active_window: Option<i64>,
) -> Vec<CompactionOutput> {
let mut output = vec![];
for (window, files) in time_windows {
if files.files.is_empty() {
continue;
}
let sorted_runs = find_sorted_runs(&mut files.files);
let found_runs = sorted_runs.len();
// We only remove deletion markers if we found less than 2 runs and not in append mode.
// because after compaction there will be no overlapping files.
let filter_deleted = !files.overlapping && found_runs <= 2 && !self.append_mode;
let (max_runs, max_files) = if let Some(active_window) = active_window
&& *window == active_window
{
(self.max_active_window_runs, self.max_active_window_files)
let inputs = if found_runs > 1 {
reduce_runs(sorted_runs)
} else {
(
self.max_inactive_window_runs,
self.max_inactive_window_files,
)
let run = sorted_runs.last().unwrap();
if run.items().len() < self.trigger_file_num {
continue;
}
// no overlapping files, try merge small files
merge_seq_files(run.items(), self.max_output_file_size)
};
let found_runs = sorted_runs.len();
// We only remove deletion markers once no file in current window overlaps with any other window
// and region is not in append mode.
let filter_deleted =
!files.overlapping && (found_runs == 1 || max_runs == 1) && !self.append_mode;
let inputs = if found_runs > max_runs {
let files_to_compact = reduce_runs(sorted_runs, max_runs);
let files_to_compact_len = files_to_compact.len();
info!(
"Building compaction output, active window: {:?}, \
current window: {}, \
max runs: {}, \
found runs: {}, \
output size: {}, \
max output size: {:?}, \
remove deletion markers: {}",
active_window,
if !inputs.is_empty() {
log_pick_result(
region_id,
*window,
max_runs,
active_window,
found_runs,
files_to_compact_len,
self.max_output_file_size,
filter_deleted
);
files_to_compact
} else if files.files.len() > max_files {
info!(
"Enforcing max file num in window: {}, active: {:?}, max: {}, current: {}, max output size: {:?}, filter delete: {}",
*window,
active_window,
max_files,
files.files.len(),
self.max_output_file_size,
filter_deleted,
&inputs,
);
// Files in window exceeds file num limit
vec![enforce_file_num(&files.files, max_files)]
} else {
trace!("Skip building compaction output, active window: {:?}, current window: {}, max runs: {}, found runs: {}, ", active_window, *window, max_runs, found_runs);
continue;
};
let split_inputs = if !filter_deleted
&& let Some(max_output_file_size) = self.max_output_file_size
{
let len_before_split = inputs.len();
let maybe_split = enforce_max_output_size(inputs, max_output_file_size);
if maybe_split.len() != len_before_split {
info!("Compaction output file size exceeds threshold {}, split compaction inputs to: {:?}", max_output_file_size, maybe_split);
}
maybe_split
} else {
inputs
};
for input in split_inputs {
debug_assert!(input.len() > 1);
output.push(CompactionOutput {
output_level: LEVEL_COMPACTED, // always compact to l1
inputs: input,
inputs,
filter_deleted,
output_time_range: None, // we do not enforce output time range in twcs compactions.
});
@@ -145,66 +100,50 @@ impl TwcsPicker {
}
}
/// Limits the size of compaction output in a naive manner.
/// todo(hl): we can find the output file size more precisely by checking the time range
/// of each row group and adding the sizes of those non-overlapping row groups. But now
/// we'd better not to expose the SST details in this level.
fn enforce_max_output_size(
inputs: Vec<Vec<FileHandle>>,
max_output_file_size: u64,
) -> Vec<Vec<FileHandle>> {
inputs
.into_iter()
.flat_map(|input| {
debug_assert!(input.len() > 1);
let estimated_output_size = input.iter().map(|f| f.size()).sum::<u64>();
if estimated_output_size < max_output_file_size {
// total file size does not exceed the threshold, just return the original input.
return vec![input];
}
let mut splits = vec![];
let mut new_input = vec![];
let mut new_input_size = 0;
for f in input {
if new_input_size + f.size() > max_output_file_size {
splits.push(std::mem::take(&mut new_input));
new_input_size = 0;
}
new_input_size += f.size();
new_input.push(f);
}
if !new_input.is_empty() {
splits.push(new_input);
}
splits
#[allow(clippy::too_many_arguments)]
fn log_pick_result(
region_id: RegionId,
window: i64,
active_window: Option<i64>,
found_runs: usize,
file_num: usize,
max_output_file_size: Option<u64>,
filter_deleted: bool,
inputs: &[FileHandle],
) {
let input_file_str: Vec<String> = inputs
.iter()
.map(|f| {
let range = f.time_range();
let start = range.0.to_iso8601_string();
let end = range.1.to_iso8601_string();
let num_rows = f.num_rows();
format!(
"SST{{id: {}, range: ({}, {}), size: {}, num rows: {} }}",
f.file_id(),
start,
end,
ReadableSize(f.size()),
num_rows
)
})
.filter(|p| p.len() > 1)
.collect()
}
/// Merges consecutive files so that file num does not exceed `max_file_num`, and chooses
/// the solution with minimum overhead according to files sizes to be merged.
/// `enforce_file_num` only merges consecutive files so that it won't create overlapping outputs.
/// `runs` must be sorted according to time ranges.
fn enforce_file_num<T: Item>(files: &[T], max_file_num: usize) -> Vec<T> {
debug_assert!(files.len() > max_file_num);
let to_merge = files.len() - max_file_num + 1;
let mut min_penalty = usize::MAX;
let mut min_idx = 0;
for idx in 0..=(files.len() - to_merge) {
let current_penalty: usize = files
.iter()
.skip(idx)
.take(to_merge)
.map(|f| f.size())
.sum();
if current_penalty < min_penalty {
min_penalty = current_penalty;
min_idx = idx;
}
}
files.iter().skip(min_idx).take(to_merge).cloned().collect()
.collect();
let window_str = Timestamp::new_second(window).to_iso8601_string();
let active_window_str = active_window.map(|s| Timestamp::new_second(s).to_iso8601_string());
let max_output_file_size = max_output_file_size.map(|size| ReadableSize(size).to_string());
info!(
"Region ({:?}) compaction pick result: current window: {}, active window: {:?}, \
found runs: {}, file num: {}, max output file size: {:?}, filter deleted: {}, \
input files: {:?}",
region_id,
window_str,
active_window_str,
found_runs,
file_num,
max_output_file_size,
filter_deleted,
input_file_str
);
}
impl Picker for TwcsPicker {
@@ -240,16 +179,18 @@ impl Picker for TwcsPicker {
// Assign files to windows
let mut windows =
assign_to_windows(levels.iter().flat_map(LevelMeta::files), time_window_size);
let outputs = self.build_output(&mut windows, active_window);
let outputs = self.build_output(region_id, &mut windows, active_window);
if outputs.is_empty() && expired_ssts.is_empty() {
return None;
}
let max_file_size = self.max_output_file_size.map(|v| v as usize);
Some(PickerOutput {
outputs,
expired_ssts,
time_window_size,
max_file_size,
})
}
}
@@ -368,12 +309,10 @@ fn find_latest_window_in_seconds<'a>(
#[cfg(test)]
mod tests {
use std::collections::HashSet;
use std::sync::Arc;
use super::*;
use crate::compaction::test_util::{new_file_handle, new_file_handles};
use crate::sst::file::{FileId, FileMeta, Level};
use crate::test_util::NoopFilePurger;
use crate::compaction::test_util::new_file_handle;
use crate::sst::file::{FileId, Level};
#[test]
fn test_get_latest_window_in_seconds() {
@@ -614,25 +553,31 @@ mod tests {
impl CompactionPickerTestCase {
fn check(&self) {
let file_id_to_idx = self
.input_files
.iter()
.enumerate()
.map(|(idx, file)| (file.file_id(), idx))
.collect::<HashMap<_, _>>();
let mut windows = assign_to_windows(self.input_files.iter(), self.window_size);
let active_window =
find_latest_window_in_seconds(self.input_files.iter(), self.window_size);
let output = TwcsPicker {
max_active_window_runs: 4,
max_active_window_files: usize::MAX,
max_inactive_window_runs: 1,
max_inactive_window_files: usize::MAX,
trigger_file_num: 4,
time_window_seconds: None,
max_output_file_size: None,
append_mode: false,
}
.build_output(&mut windows, active_window);
.build_output(RegionId::from_u64(0), &mut windows, active_window);
let output = output
.iter()
.map(|o| {
let input_file_ids =
o.inputs.iter().map(|f| f.file_id()).collect::<HashSet<_>>();
let input_file_ids = o
.inputs
.iter()
.map(|f| file_id_to_idx.get(&f.file_id()).copied().unwrap())
.collect::<HashSet<_>>();
(input_file_ids, o.output_level)
})
.collect::<Vec<_>>();
@@ -641,11 +586,7 @@ mod tests {
.expected_outputs
.iter()
.map(|o| {
let input_file_ids = o
.input_files
.iter()
.map(|idx| self.input_files[*idx].file_id())
.collect::<HashSet<_>>();
let input_file_ids = o.input_files.iter().copied().collect::<HashSet<_>>();
(input_file_ids, o.output_level)
})
.collect::<Vec<_>>();
@@ -658,47 +599,11 @@ mod tests {
output_level: Level,
}
fn check_enforce_file_num(
input_files: &[(i64, i64, u64)],
max_file_num: usize,
files_to_merge: &[(i64, i64)],
) {
let mut files = new_file_handles(input_files);
// ensure sorted
find_sorted_runs(&mut files);
let mut to_merge = enforce_file_num(&files, max_file_num);
to_merge.sort_unstable_by_key(|f| f.time_range().0);
assert_eq!(
files_to_merge.to_vec(),
to_merge
.iter()
.map(|f| {
let (start, end) = f.time_range();
(start.value(), end.value())
})
.collect::<Vec<_>>()
);
}
#[test]
fn test_enforce_file_num() {
check_enforce_file_num(
&[(0, 300, 2), (100, 200, 1), (200, 400, 1)],
2,
&[(100, 200), (200, 400)],
);
check_enforce_file_num(
&[(0, 300, 200), (100, 200, 100), (200, 400, 100)],
1,
&[(0, 300), (100, 200), (200, 400)],
);
}
#[test]
fn test_build_twcs_output() {
let file_ids = (0..4).map(|_| FileId::random()).collect::<Vec<_>>();
// Case 1: 2 runs found in each time window.
CompactionPickerTestCase {
window_size: 3,
input_files: [
@@ -708,13 +613,25 @@ mod tests {
new_file_handle(file_ids[3], 50, 2998, 0), //active windows
]
.to_vec(),
expected_outputs: vec![ExpectedOutput {
input_files: vec![0, 1],
output_level: 1,
}],
expected_outputs: vec![
ExpectedOutput {
input_files: vec![0, 1],
output_level: 1,
},
ExpectedOutput {
input_files: vec![2, 3],
output_level: 1,
},
],
}
.check();
// Case 2:
// -2000........-3
// -3000.....-100
// 0..............2999
// 50..........2998
// 11.........2990
let file_ids = (0..6).map(|_| FileId::random()).collect::<Vec<_>>();
CompactionPickerTestCase {
window_size: 3,
@@ -724,7 +641,6 @@ mod tests {
new_file_handle(file_ids[2], 0, 2999, 0),
new_file_handle(file_ids[3], 50, 2998, 0),
new_file_handle(file_ids[4], 11, 2990, 0),
new_file_handle(file_ids[5], 50, 4998, 0),
]
.to_vec(),
expected_outputs: vec![
@@ -733,7 +649,7 @@ mod tests {
output_level: 1,
},
ExpectedOutput {
input_files: vec![2, 3, 4],
input_files: vec![2, 4],
output_level: 1,
},
],
@@ -741,44 +657,5 @@ mod tests {
.check();
}
fn make_file_handles(inputs: &[(i64, i64, u64)]) -> Vec<FileHandle> {
inputs
.iter()
.map(|(start, end, size)| {
FileHandle::new(
FileMeta {
region_id: Default::default(),
file_id: Default::default(),
time_range: (
Timestamp::new_millisecond(*start),
Timestamp::new_millisecond(*end),
),
level: 0,
file_size: *size,
available_indexes: Default::default(),
index_file_size: 0,
num_rows: 0,
num_row_groups: 0,
sequence: None,
},
Arc::new(NoopFilePurger),
)
})
.collect()
}
#[test]
fn test_limit_output_size() {
let mut files = make_file_handles(&[(1, 1, 1)].repeat(6));
let runs = find_sorted_runs(&mut files);
assert_eq!(6, runs.len());
let files_to_merge = reduce_runs(runs, 2);
let enforced = enforce_max_output_size(files_to_merge, 2);
assert_eq!(2, enforced.len());
assert_eq!(2, enforced[0].len());
assert_eq!(2, enforced[1].len());
}
// TODO(hl): TTL tester that checks if get_expired_ssts function works as expected.
}

View File

@@ -115,6 +115,7 @@ impl Picker for WindowedCompactionPicker {
outputs,
expired_ssts,
time_window_size: time_window,
max_file_size: None, // todo (hl): we may need to support `max_file_size` parameter in manual compaction.
})
}
}

View File

@@ -110,8 +110,6 @@ async fn test_append_mode_compaction() {
let request = CreateRequestBuilder::new()
.insert_option("compaction.type", "twcs")
.insert_option("compaction.twcs.max_active_window_runs", "2")
.insert_option("compaction.twcs.max_inactive_window_runs", "2")
.insert_option("append_mode", "true")
.build();
let region_dir = request.region_dir.clone();
@@ -177,7 +175,7 @@ async fn test_append_mode_compaction() {
+-------+---------+---------------------+";
// Scans in parallel.
let mut scanner = engine.scanner(region_id, ScanRequest::default()).unwrap();
assert_eq!(2, scanner.num_files());
assert_eq!(1, scanner.num_files());
assert_eq!(1, scanner.num_memtables());
scanner.set_target_partitions(2);
let stream = scanner.scan().await.unwrap();

View File

@@ -129,8 +129,6 @@ async fn test_compaction_region() {
let request = CreateRequestBuilder::new()
.insert_option("compaction.type", "twcs")
.insert_option("compaction.twcs.max_active_window_runs", "1")
.insert_option("compaction.twcs.max_inactive_window_runs", "1")
.build();
let column_schemas = request
@@ -163,12 +161,12 @@ async fn test_compaction_region() {
// [0..9]
// [10...19]
// [20....29]
// -[15.........29]-
// -[15.........29]- (delete)
// [15.....24]
// Output:
// [0..9]
// [10..14]
// [15..24]
// [10............29] (contains delete)
// [15....24]
assert_eq!(
3,
scanner.num_files(),
@@ -181,6 +179,71 @@ async fn test_compaction_region() {
assert_eq!((0..25).map(|v| v * 1000).collect::<Vec<_>>(), vec);
}
#[tokio::test]
async fn test_compaction_overlapping_files() {
common_telemetry::init_default_ut_logging();
let mut env = TestEnv::new();
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
env.get_schema_metadata_manager()
.register_region_table_info(
region_id.table_id(),
"test_table",
"test_catalog",
"test_schema",
None,
env.get_kv_backend(),
)
.await;
let request = CreateRequestBuilder::new()
.insert_option("compaction.type", "twcs")
.build();
let column_schemas = request
.column_metadatas
.iter()
.map(column_metadata_to_column_schema)
.collect::<Vec<_>>();
engine
.handle_request(region_id, RegionRequest::Create(request))
.await
.unwrap();
// Flush 5 SSTs for compaction.
put_and_flush(&engine, region_id, &column_schemas, 0..10).await;
delete_and_flush(&engine, region_id, &column_schemas, 10..20).await;
put_and_flush(&engine, region_id, &column_schemas, 20..30).await;
delete_and_flush(&engine, region_id, &column_schemas, 30..40).await;
let result = engine
.handle_request(
region_id,
RegionRequest::Compact(RegionCompactRequest::default()),
)
.await
.unwrap();
assert_eq!(result.affected_rows, 0);
let scanner = engine.scanner(region_id, ScanRequest::default()).unwrap();
assert_eq!(
1,
scanner.num_files(),
"unexpected files: {:?}",
scanner.file_ids()
);
let stream = scanner.scan().await.unwrap();
let vec = collect_stream_ts(stream).await;
assert_eq!(
vec,
(0..=9)
.map(|v| v * 1000)
.chain((20..=29).map(|v| v * 1000))
.collect::<Vec<_>>()
);
}
#[tokio::test]
async fn test_compaction_region_with_overlapping() {
common_telemetry::init_default_ut_logging();
@@ -201,8 +264,6 @@ async fn test_compaction_region_with_overlapping() {
let request = CreateRequestBuilder::new()
.insert_option("compaction.type", "twcs")
.insert_option("compaction.twcs.max_active_window_runs", "2")
.insert_option("compaction.twcs.max_inactive_window_runs", "2")
.insert_option("compaction.twcs.time_window", "1h")
.build();
@@ -257,10 +318,6 @@ async fn test_compaction_region_with_overlapping_delete_all() {
let request = CreateRequestBuilder::new()
.insert_option("compaction.type", "twcs")
.insert_option("compaction.twcs.max_active_window_runs", "2")
.insert_option("compaction.twcs.max_active_window_files", "2")
.insert_option("compaction.twcs.max_inactive_window_runs", "2")
.insert_option("compaction.twcs.max_inactive_window_files", "2")
.insert_option("compaction.twcs.time_window", "1h")
.build();
@@ -290,7 +347,7 @@ async fn test_compaction_region_with_overlapping_delete_all() {
let scanner = engine.scanner(region_id, ScanRequest::default()).unwrap();
assert_eq!(
4,
2,
scanner.num_files(),
"unexpected files: {:?}",
scanner.file_ids()
@@ -332,7 +389,6 @@ async fn test_readonly_during_compaction() {
let request = CreateRequestBuilder::new()
.insert_option("compaction.type", "twcs")
.insert_option("compaction.twcs.max_active_window_runs", "1")
.build();
let column_schemas = request
@@ -404,10 +460,6 @@ async fn test_compaction_update_time_window() {
let request = CreateRequestBuilder::new()
.insert_option("compaction.type", "twcs")
.insert_option("compaction.twcs.max_active_window_runs", "2")
.insert_option("compaction.twcs.max_active_window_files", "2")
.insert_option("compaction.twcs.max_inactive_window_runs", "2")
.insert_option("compaction.twcs.max_inactive_window_files", "2")
.build();
let column_schemas = request
@@ -420,9 +472,10 @@ async fn test_compaction_update_time_window() {
.await
.unwrap();
// Flush 3 SSTs for compaction.
put_and_flush(&engine, region_id, &column_schemas, 0..1200).await; // window 3600
put_and_flush(&engine, region_id, &column_schemas, 1200..2400).await; // window 3600
put_and_flush(&engine, region_id, &column_schemas, 2400..3600).await; // window 3600
put_and_flush(&engine, region_id, &column_schemas, 0..900).await; // window 3600
put_and_flush(&engine, region_id, &column_schemas, 900..1800).await; // window 3600
put_and_flush(&engine, region_id, &column_schemas, 1800..2700).await; // window 3600
put_and_flush(&engine, region_id, &column_schemas, 2700..3600).await; // window 3600
let result = engine
.handle_request(
@@ -433,11 +486,21 @@ async fn test_compaction_update_time_window() {
.unwrap();
assert_eq!(result.affected_rows, 0);
assert_eq!(
engine
.get_region(region_id)
.unwrap()
.version_control
.current()
.version
.compaction_time_window,
Some(Duration::from_secs(3600))
);
let scanner = engine.scanner(region_id, ScanRequest::default()).unwrap();
assert_eq!(0, scanner.num_memtables());
// We keep at most two files.
// We keep all 3 files because no enough file to merge
assert_eq!(
2,
1,
scanner.num_files(),
"unexpected files: {:?}",
scanner.file_ids()
@@ -492,10 +555,6 @@ async fn test_change_region_compaction_window() {
let request = CreateRequestBuilder::new()
.insert_option("compaction.type", "twcs")
.insert_option("compaction.twcs.max_active_window_runs", "1")
.insert_option("compaction.twcs.max_active_window_files", "1")
.insert_option("compaction.twcs.max_inactive_window_runs", "1")
.insert_option("compaction.twcs.max_inactive_window_files", "1")
.build();
let region_dir = request.region_dir.clone();
let column_schemas = request
@@ -508,8 +567,10 @@ async fn test_change_region_compaction_window() {
.await
.unwrap();
// Flush 2 SSTs for compaction.
put_and_flush(&engine, region_id, &column_schemas, 0..1200).await; // window 3600
put_and_flush(&engine, region_id, &column_schemas, 1200..2400).await; // window 3600
put_and_flush(&engine, region_id, &column_schemas, 0..600).await; // window 3600
put_and_flush(&engine, region_id, &column_schemas, 600..1200).await; // window 3600
put_and_flush(&engine, region_id, &column_schemas, 1200..1800).await; // window 3600
put_and_flush(&engine, region_id, &column_schemas, 1800..2400).await; // window 3600
engine
.handle_request(
@@ -520,7 +581,7 @@ async fn test_change_region_compaction_window() {
.unwrap();
// Put window 7200
put_and_flush(&engine, region_id, &column_schemas, 4000..5000).await; // window 3600
put_and_flush(&engine, region_id, &column_schemas, 4000..5000).await;
// Check compaction window.
let region = engine.get_region(region_id).unwrap();
@@ -543,6 +604,22 @@ async fn test_change_region_compaction_window() {
},
});
engine.handle_request(region_id, request).await.unwrap();
assert_eq!(
engine
.get_region(region_id)
.unwrap()
.version_control
.current()
.version
.options
.compaction
.time_window(),
Some(Duration::from_secs(7200))
);
put_and_flush(&engine, region_id, &column_schemas, 5000..5100).await;
put_and_flush(&engine, region_id, &column_schemas, 5100..5200).await;
put_and_flush(&engine, region_id, &column_schemas, 5200..5300).await;
// Compaction again. It should compacts window 3600 and 7200
// into 7200.
@@ -585,12 +662,12 @@ async fn test_change_region_compaction_window() {
{
let region = engine.get_region(region_id).unwrap();
let version = region.version();
// We open the region without options, so the time window should be None.
assert!(version.options.compaction.time_window().is_none());
assert_eq!(
Some(Duration::from_secs(7200)),
version.compaction_time_window,
);
// We open the region without options, so the time window should be None.
assert!(version.options.compaction.time_window().is_none());
}
}
@@ -615,10 +692,6 @@ async fn test_open_overwrite_compaction_window() {
let request = CreateRequestBuilder::new()
.insert_option("compaction.type", "twcs")
.insert_option("compaction.twcs.max_active_window_runs", "1")
.insert_option("compaction.twcs.max_active_window_files", "1")
.insert_option("compaction.twcs.max_inactive_window_runs", "1")
.insert_option("compaction.twcs.max_inactive_window_files", "1")
.build();
let region_dir = request.region_dir.clone();
let column_schemas = request
@@ -631,8 +704,10 @@ async fn test_open_overwrite_compaction_window() {
.await
.unwrap();
// Flush 2 SSTs for compaction.
put_and_flush(&engine, region_id, &column_schemas, 0..1200).await; // window 3600
put_and_flush(&engine, region_id, &column_schemas, 1200..2400).await; // window 3600
put_and_flush(&engine, region_id, &column_schemas, 0..600).await; // window 3600
put_and_flush(&engine, region_id, &column_schemas, 600..1200).await; // window 3600
put_and_flush(&engine, region_id, &column_schemas, 1200..1800).await; // window 3600
put_and_flush(&engine, region_id, &column_schemas, 1800..2400).await; // window 3600
engine
.handle_request(

View File

@@ -45,7 +45,6 @@ async fn test_scan_without_filtering_deleted() {
.await;
let request = CreateRequestBuilder::new()
.insert_option("compaction.type", "twcs")
.insert_option("compaction.twcs.max_active_window_runs", "10")
.build();
let column_schemas = rows_schema(&request);

View File

@@ -111,8 +111,6 @@ async fn test_merge_mode_compaction() {
let request = CreateRequestBuilder::new()
.field_num(2)
.insert_option("compaction.type", "twcs")
.insert_option("compaction.twcs.max_active_window_runs", "1")
.insert_option("compaction.twcs.max_inactive_window_runs", "1")
.insert_option("merge_mode", "last_non_null")
.build();
let region_dir = request.region_dir.clone();
@@ -191,7 +189,7 @@ async fn test_merge_mode_compaction() {
+-------+---------+---------+---------------------+";
// Scans in parallel.
let mut scanner = engine.scanner(region_id, ScanRequest::default()).unwrap();
assert_eq!(1, scanner.num_files());
assert_eq!(2, scanner.num_files());
assert_eq!(1, scanner.num_memtables());
scanner.set_target_partitions(2);
let stream = scanner.scan().await.unwrap();

View File

@@ -20,6 +20,7 @@
#![feature(assert_matches)]
#![feature(result_flattening)]
#![feature(int_roundings)]
#![feature(debug_closure_helpers)]
#[cfg(any(test, feature = "test"))]
#[cfg_attr(feature = "test", allow(unused))]

View File

@@ -84,7 +84,7 @@ lazy_static! {
/// Histogram of flushed bytes.
pub static ref FLUSH_BYTES_TOTAL: IntCounter =
register_int_counter!("greptime_mito_flush_bytes_total", "mito flush bytes total").unwrap();
/// Gauge for inflight compaction tasks.
/// Gauge for inflight flush tasks.
pub static ref INFLIGHT_FLUSH_COUNT: IntGauge =
register_int_gauge!(
"greptime_mito_inflight_flush_count",
@@ -153,7 +153,6 @@ lazy_static! {
"greptime_mito_inflight_compaction_count",
"inflight compaction count",
).unwrap();
// ------- End of compaction metrics.
// Query metrics.
/// Timer of different stages in query.
@@ -403,6 +402,20 @@ lazy_static! {
}
lazy_static! {
/// Counter for compaction input file size.
pub static ref COMPACTION_INPUT_BYTES: Counter = register_counter!(
"greptime_mito_compaction_input_bytes",
"mito compaction input file size",
).unwrap();
/// Counter for compaction output file size.
pub static ref COMPACTION_OUTPUT_BYTES: Counter = register_counter!(
"greptime_mito_compaction_output_bytes",
"mito compaction output file size",
).unwrap();
}
/// Stager notifier to collect metrics.
pub struct StagerMetrics {
cache_hit: IntCounter,

View File

@@ -199,18 +199,9 @@ impl Default for CompactionOptions {
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(default)]
pub struct TwcsOptions {
/// Max num of sorted runs that can be kept in active writing time window.
/// Minimum file num in every time window to trigger a compaction.
#[serde_as(as = "DisplayFromStr")]
pub max_active_window_runs: usize,
/// Max num of files in the active window.
#[serde_as(as = "DisplayFromStr")]
pub max_active_window_files: usize,
/// Max num of sorted runs that can be kept in inactive time windows.
#[serde_as(as = "DisplayFromStr")]
pub max_inactive_window_runs: usize,
/// Max num of files in inactive time windows.
#[serde_as(as = "DisplayFromStr")]
pub max_inactive_window_files: usize,
pub trigger_file_num: usize,
/// Compaction time window defined when creating tables.
#[serde(with = "humantime_serde")]
pub time_window: Option<Duration>,
@@ -243,12 +234,9 @@ impl TwcsOptions {
impl Default for TwcsOptions {
fn default() -> Self {
Self {
max_active_window_runs: 4,
max_active_window_files: 4,
max_inactive_window_runs: 1,
max_inactive_window_files: 1,
trigger_file_num: 4,
time_window: None,
max_output_file_size: Some(ReadableSize::gb(2)),
max_output_file_size: Some(ReadableSize::mb(512)),
remote_compaction: false,
fallback_to_local: true,
}
@@ -500,7 +488,7 @@ mod tests {
#[test]
fn test_without_compaction_type() {
let map = make_map(&[
("compaction.twcs.max_active_window_runs", "8"),
("compaction.twcs.trigger_file_num", "8"),
("compaction.twcs.time_window", "2h"),
]);
let err = RegionOptions::try_from(&map).unwrap_err();
@@ -510,14 +498,14 @@ mod tests {
#[test]
fn test_with_compaction_type() {
let map = make_map(&[
("compaction.twcs.max_active_window_runs", "8"),
("compaction.twcs.trigger_file_num", "8"),
("compaction.twcs.time_window", "2h"),
("compaction.type", "twcs"),
]);
let options = RegionOptions::try_from(&map).unwrap();
let expect = RegionOptions {
compaction: CompactionOptions::Twcs(TwcsOptions {
max_active_window_runs: 8,
trigger_file_num: 8,
time_window: Some(Duration::from_secs(3600 * 2)),
..Default::default()
}),
@@ -618,10 +606,7 @@ mod tests {
});
let map = make_map(&[
("ttl", "7d"),
("compaction.twcs.max_active_window_runs", "8"),
("compaction.twcs.max_active_window_files", "11"),
("compaction.twcs.max_inactive_window_runs", "2"),
("compaction.twcs.max_inactive_window_files", "3"),
("compaction.twcs.trigger_file_num", "8"),
("compaction.twcs.max_output_file_size", "1GB"),
("compaction.twcs.time_window", "2h"),
("compaction.type", "twcs"),
@@ -645,10 +630,7 @@ mod tests {
let expect = RegionOptions {
ttl: Some(Duration::from_secs(3600 * 24 * 7).into()),
compaction: CompactionOptions::Twcs(TwcsOptions {
max_active_window_runs: 8,
max_active_window_files: 11,
max_inactive_window_runs: 2,
max_inactive_window_files: 3,
trigger_file_num: 8,
time_window: Some(Duration::from_secs(3600 * 2)),
max_output_file_size: Some(ReadableSize::gb(1)),
remote_compaction: false,
@@ -679,10 +661,7 @@ mod tests {
let options = RegionOptions {
ttl: Some(Duration::from_secs(3600 * 24 * 7).into()),
compaction: CompactionOptions::Twcs(TwcsOptions {
max_active_window_runs: 8,
max_active_window_files: usize::MAX,
max_inactive_window_runs: 2,
max_inactive_window_files: usize::MAX,
trigger_file_num: 8,
time_window: Some(Duration::from_secs(3600 * 2)),
max_output_file_size: None,
remote_compaction: false,
@@ -719,10 +698,7 @@ mod tests {
"ttl": "7days",
"compaction": {
"compaction.type": "twcs",
"compaction.twcs.max_active_window_runs": "8",
"compaction.twcs.max_active_window_files": "11",
"compaction.twcs.max_inactive_window_runs": "2",
"compaction.twcs.max_inactive_window_files": "7",
"compaction.twcs.trigger_file_num": "8",
"compaction.twcs.max_output_file_size": "7MB",
"compaction.twcs.time_window": "2h"
},
@@ -748,10 +724,7 @@ mod tests {
let options = RegionOptions {
ttl: Some(Duration::from_secs(3600 * 24 * 7).into()),
compaction: CompactionOptions::Twcs(TwcsOptions {
max_active_window_runs: 8,
max_active_window_files: 11,
max_inactive_window_runs: 2,
max_inactive_window_files: 7,
trigger_file_num: 8,
time_window: Some(Duration::from_secs(3600 * 2)),
max_output_file_size: Some(ReadableSize::mb(7)),
remote_compaction: false,

View File

@@ -15,11 +15,13 @@
//! Structures to describe metadata of files.
use std::fmt;
use std::fmt::{Debug, Formatter};
use std::num::NonZeroU64;
use std::str::FromStr;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use common_base::readable_size::ReadableSize;
use common_time::Timestamp;
use serde::{Deserialize, Serialize};
use smallvec::SmallVec;
@@ -105,7 +107,7 @@ pub(crate) fn overlaps(l: &FileTimeRange, r: &FileTimeRange) -> bool {
}
/// Metadata of a SST file.
#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize, Default)]
#[derive(Clone, PartialEq, Eq, Hash, Serialize, Deserialize, Default)]
#[serde(default)]
pub struct FileMeta {
/// Region of file.
@@ -142,6 +144,42 @@ pub struct FileMeta {
pub sequence: Option<NonZeroU64>,
}
impl Debug for FileMeta {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
let mut debug_struct = f.debug_struct("FileMeta");
debug_struct
.field("region_id", &self.region_id)
.field_with("file_id", |f| write!(f, "{} ", self.file_id))
.field_with("time_range", |f| {
write!(
f,
"({}, {}) ",
self.time_range.0.to_iso8601_string(),
self.time_range.1.to_iso8601_string()
)
})
.field("level", &self.level)
.field("file_size", &ReadableSize(self.file_size));
if !self.available_indexes.is_empty() {
debug_struct
.field("available_indexes", &self.available_indexes)
.field("index_file_size", &ReadableSize(self.index_file_size));
}
debug_struct
.field("num_rows", &self.num_rows)
.field("num_row_groups", &self.num_row_groups)
.field_with("sequence", |f| match self.sequence {
None => {
write!(f, "None")
}
Some(seq) => {
write!(f, "{}", seq)
}
})
.finish()
}
}
/// Type of index.
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub enum IndexType {
@@ -188,13 +226,9 @@ pub struct FileHandle {
impl fmt::Debug for FileHandle {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("FileHandle")
.field("region_id", &self.inner.meta.region_id)
.field("file_id", &self.inner.meta.file_id)
.field("time_range", &self.inner.meta.time_range)
.field("size", &self.inner.meta.file_size)
.field("level", &self.inner.meta.level)
.field("compacting", &self.inner.compacting)
.field("deleted", &self.inner.deleted)
.field("meta", self.meta_ref())
.field("compacting", &self.compacting())
.field("deleted", &self.inner.deleted.load(Ordering::Relaxed))
.finish()
}
}

View File

@@ -50,6 +50,10 @@ pub struct WriteOptions {
pub write_buffer_size: ReadableSize,
/// Row group size.
pub row_group_size: usize,
/// Max single output file size.
/// Note: This is not a hard limit as we can only observe the file size when
/// ArrowWrite writes to underlying writers.
pub max_file_size: Option<usize>,
}
impl Default for WriteOptions {
@@ -57,6 +61,7 @@ impl Default for WriteOptions {
WriteOptions {
write_buffer_size: DEFAULT_WRITE_BUFFER_SIZE,
row_group_size: DEFAULT_ROW_GROUP_SIZE,
max_file_size: None,
}
}
}
@@ -99,8 +104,9 @@ mod tests {
use tokio_util::compat::FuturesAsyncWriteCompatExt;
use super::*;
use crate::access_layer::FilePathProvider;
use crate::access_layer::{FilePathProvider, RegionFilePathFactory};
use crate::cache::{CacheManager, CacheStrategy, PageKey};
use crate::read::BatchReader;
use crate::sst::index::{Indexer, IndexerBuilder};
use crate::sst::parquet::format::WriteFormat;
use crate::sst::parquet::reader::ParquetReaderBuilder;
@@ -108,7 +114,8 @@ mod tests {
use crate::sst::{location, DEFAULT_WRITE_CONCURRENCY};
use crate::test_util::sst_util::{
assert_parquet_metadata_eq, build_test_binary_test_region_metadata, new_batch_by_range,
new_batch_with_binary, new_source, sst_file_handle, sst_region_metadata,
new_batch_with_binary, new_source, sst_file_handle, sst_file_handle_with_file_id,
sst_region_metadata,
};
use crate::test_util::{check_reader_result, TestEnv};
@@ -532,4 +539,58 @@ mod tests {
)
.await;
}
#[tokio::test]
async fn test_write_multiple_files() {
common_telemetry::init_default_ut_logging();
// create test env
let mut env = TestEnv::new();
let object_store = env.init_object_store_manager();
let metadata = Arc::new(sst_region_metadata());
let batches = &[
new_batch_by_range(&["a", "d"], 0, 1000),
new_batch_by_range(&["b", "f"], 0, 1000),
new_batch_by_range(&["b", "h"], 100, 200),
new_batch_by_range(&["b", "h"], 200, 300),
new_batch_by_range(&["b", "h"], 300, 1000),
];
let total_rows: usize = batches.iter().map(|batch| batch.num_rows()).sum();
let source = new_source(batches);
let write_opts = WriteOptions {
row_group_size: 50,
max_file_size: Some(1024 * 16),
..Default::default()
};
let path_provider = RegionFilePathFactory {
region_dir: "test".to_string(),
};
let mut writer = ParquetWriter::new_with_object_store(
object_store.clone(),
metadata.clone(),
NoopIndexBuilder,
path_provider,
)
.await;
let files = writer.write_all(source, None, &write_opts).await.unwrap();
assert_eq!(2, files.len());
let mut rows_read = 0;
for f in &files {
let file_handle = sst_file_handle_with_file_id(
f.file_id,
f.time_range.0.value(),
f.time_range.1.value(),
);
let builder =
ParquetReaderBuilder::new("test".to_string(), file_handle, object_store.clone());
let mut reader = builder.build().await.unwrap();
while let Some(batch) = reader.next_batch().await.unwrap() {
rows_read += batch.num_rows();
}
}
assert_eq!(total_rows, rows_read);
}
}

View File

@@ -15,11 +15,13 @@
//! Parquet writer.
use std::future::Future;
use std::mem;
use std::pin::Pin;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::task::{Context, Poll};
use common_telemetry::debug;
use common_time::Timestamp;
use datatypes::arrow::datatypes::SchemaRef;
use object_store::{FuturesAsyncWriter, ObjectStore};
@@ -143,17 +145,52 @@ where
}
}
async fn get_or_create_indexer(&mut self) -> &mut Indexer {
match self.current_indexer {
None => {
self.current_file = FileId::random();
let indexer = self.indexer_builder.build(self.current_file).await;
self.current_indexer = Some(indexer);
// safety: self.current_indexer already set above.
self.current_indexer.as_mut().unwrap()
}
Some(ref mut indexer) => indexer,
}
/// Finishes current SST file and index file.
async fn finish_current_file(
&mut self,
ssts: &mut SstInfoArray,
stats: &mut SourceStats,
) -> Result<()> {
// maybe_init_writer will re-create a new file.
if let Some(mut current_writer) = mem::take(&mut self.writer) {
let stats = mem::take(stats);
// At least one row has been written.
assert!(stats.num_rows > 0);
debug!(
"Finishing current file {}, file size: {}, num rows: {}",
self.current_file,
self.bytes_written.load(Ordering::Relaxed),
stats.num_rows
);
// Finish indexer and writer.
// safety: writer and index can only be both present or not.
let index_output = self.current_indexer.as_mut().unwrap().finish().await;
current_writer.flush().await.context(WriteParquetSnafu)?;
let file_meta = current_writer.close().await.context(WriteParquetSnafu)?;
let file_size = self.bytes_written.load(Ordering::Relaxed) as u64;
// Safety: num rows > 0 so we must have min/max.
let time_range = stats.time_range.unwrap();
// convert FileMetaData to ParquetMetaData
let parquet_metadata = parse_parquet_metadata(file_meta)?;
ssts.push(SstInfo {
file_id: self.current_file,
time_range,
file_size,
num_rows: stats.num_rows,
num_row_groups: parquet_metadata.num_row_groups() as u64,
file_metadata: Some(Arc::new(parquet_metadata)),
index_metadata: index_output,
});
self.current_file = FileId::random();
self.bytes_written.store(0, Ordering::Relaxed)
};
Ok(())
}
/// Iterates source and writes all rows to Parquet file.
@@ -184,6 +221,7 @@ where
override_sequence: Option<SequenceNumber>, // override the `sequence` field from `Source`
opts: &WriteOptions,
) -> Result<SstInfoArray> {
let mut results = smallvec![];
let write_format =
WriteFormat::new(self.metadata.clone()).with_override_sequence(override_sequence);
let mut stats = SourceStats::default();
@@ -196,49 +234,31 @@ where
match res {
Ok(mut batch) => {
stats.update(&batch);
self.get_or_create_indexer().await.update(&mut batch).await;
// safety: self.current_indexer must be set when first batch has been written.
self.current_indexer
.as_mut()
.unwrap()
.update(&mut batch)
.await;
if let Some(max_file_size) = opts.max_file_size
&& self.bytes_written.load(Ordering::Relaxed) > max_file_size
{
self.finish_current_file(&mut results, &mut stats).await?;
}
}
Err(e) => {
self.get_or_create_indexer().await.abort().await;
if let Some(indexer) = &mut self.current_indexer {
indexer.abort().await;
}
return Err(e);
}
}
}
let index_output = self.get_or_create_indexer().await.finish().await;
if stats.num_rows == 0 {
return Ok(smallvec![]);
}
let Some(mut arrow_writer) = self.writer.take() else {
// No batch actually written.
return Ok(smallvec![]);
};
arrow_writer.flush().await.context(WriteParquetSnafu)?;
let file_meta = arrow_writer.close().await.context(WriteParquetSnafu)?;
let file_size = self.bytes_written.load(Ordering::Relaxed) as u64;
// Safety: num rows > 0 so we must have min/max.
let time_range = stats.time_range.unwrap();
// convert FileMetaData to ParquetMetaData
let parquet_metadata = parse_parquet_metadata(file_meta)?;
let file_id = self.current_file;
self.finish_current_file(&mut results, &mut stats).await?;
// object_store.write will make sure all bytes are written or an error is raised.
Ok(smallvec![SstInfo {
file_id,
time_range,
file_size,
num_rows: stats.num_rows,
num_row_groups: parquet_metadata.num_row_groups() as u64,
file_metadata: Some(Arc::new(parquet_metadata)),
index_metadata: index_output,
}])
Ok(results)
}
/// Customizes per-column config according to schema and maybe column cardinality.
@@ -309,6 +329,10 @@ where
AsyncArrowWriter::try_new(writer, schema.clone(), Some(writer_props))
.context(WriteParquetSnafu)?;
self.writer = Some(arrow_writer);
let indexer = self.indexer_builder.build(self.current_file).await;
self.current_indexer = Some(indexer);
// safety: self.writer is assigned above
Ok(self.writer.as_mut().unwrap())
}

View File

@@ -214,28 +214,10 @@ fn set_twcs_options(
region_id: RegionId,
) -> std::result::Result<(), MetadataError> {
match key {
mito_engine_options::TWCS_MAX_ACTIVE_WINDOW_RUNS => {
let runs = parse_usize_with_default(key, value, default_option.max_active_window_runs)?;
log_option_update(region_id, key, options.max_active_window_runs, runs);
options.max_active_window_runs = runs;
}
mito_engine_options::TWCS_MAX_ACTIVE_WINDOW_FILES => {
let files =
parse_usize_with_default(key, value, default_option.max_active_window_files)?;
log_option_update(region_id, key, options.max_active_window_files, files);
options.max_active_window_files = files;
}
mito_engine_options::TWCS_MAX_INACTIVE_WINDOW_RUNS => {
let runs =
parse_usize_with_default(key, value, default_option.max_inactive_window_runs)?;
log_option_update(region_id, key, options.max_inactive_window_runs, runs);
options.max_inactive_window_runs = runs;
}
mito_engine_options::TWCS_MAX_INACTIVE_WINDOW_FILES => {
let files =
parse_usize_with_default(key, value, default_option.max_inactive_window_files)?;
log_option_update(region_id, key, options.max_inactive_window_files, files);
options.max_inactive_window_files = files;
mito_engine_options::TWCS_TRIGGER_FILE_NUM => {
let files = parse_usize_with_default(key, value, default_option.trigger_file_num)?;
log_option_update(region_id, key, options.trigger_file_num, files);
options.trigger_file_num = files;
}
mito_engine_options::TWCS_MAX_OUTPUT_FILE_SIZE => {
let size = if value.is_empty() {

View File

@@ -29,14 +29,8 @@ pub const SNAPSHOT_READ: &str = "snapshot_read";
pub const COMPACTION_TYPE: &str = "compaction.type";
/// TWCS compaction strategy.
pub const COMPACTION_TYPE_TWCS: &str = "twcs";
/// Option key for twcs max active window runs.
pub const TWCS_MAX_ACTIVE_WINDOW_RUNS: &str = "compaction.twcs.max_active_window_runs";
/// Option key for twcs max active window files.
pub const TWCS_MAX_ACTIVE_WINDOW_FILES: &str = "compaction.twcs.max_active_window_files";
/// Option key for twcs max inactive window runs.
pub const TWCS_MAX_INACTIVE_WINDOW_RUNS: &str = "compaction.twcs.max_inactive_window_runs";
/// Option key for twcs max inactive window files.
pub const TWCS_MAX_INACTIVE_WINDOW_FILES: &str = "compaction.twcs.max_inactive_window_files";
/// Option key for twcs min file num to trigger a compaction.
pub const TWCS_TRIGGER_FILE_NUM: &str = "compaction.twcs.trigger_file_num";
/// Option key for twcs max output file size.
pub const TWCS_MAX_OUTPUT_FILE_SIZE: &str = "compaction.twcs.max_output_file_size";
/// Option key for twcs time window.
@@ -68,10 +62,7 @@ pub fn is_mito_engine_option_key(key: &str) -> bool {
[
"ttl",
COMPACTION_TYPE,
TWCS_MAX_ACTIVE_WINDOW_RUNS,
TWCS_MAX_ACTIVE_WINDOW_FILES,
TWCS_MAX_INACTIVE_WINDOW_RUNS,
TWCS_MAX_INACTIVE_WINDOW_FILES,
TWCS_TRIGGER_FILE_NUM,
TWCS_MAX_OUTPUT_FILE_SIZE,
TWCS_TIME_WINDOW,
REMOTE_COMPACTION,
@@ -100,10 +91,7 @@ mod tests {
assert!(is_mito_engine_option_key("ttl"));
assert!(is_mito_engine_option_key("compaction.type"));
assert!(is_mito_engine_option_key(
"compaction.twcs.max_active_window_runs"
));
assert!(is_mito_engine_option_key(
"compaction.twcs.max_inactive_window_runs"
"compaction.twcs.trigger_file_num"
));
assert!(is_mito_engine_option_key("compaction.twcs.time_window"));
assert!(is_mito_engine_option_key("storage"));

View File

@@ -52,9 +52,7 @@ use crate::metadata::{
use crate::metric_engine_consts::PHYSICAL_TABLE_METADATA_KEY;
use crate::metrics;
use crate::mito_engine_options::{
TTL_KEY, TWCS_MAX_ACTIVE_WINDOW_FILES, TWCS_MAX_ACTIVE_WINDOW_RUNS,
TWCS_MAX_INACTIVE_WINDOW_FILES, TWCS_MAX_INACTIVE_WINDOW_RUNS, TWCS_MAX_OUTPUT_FILE_SIZE,
TWCS_TIME_WINDOW,
TTL_KEY, TWCS_MAX_OUTPUT_FILE_SIZE, TWCS_TIME_WINDOW, TWCS_TRIGGER_FILE_NUM,
};
use crate::path_utils::region_dir;
use crate::storage::{ColumnId, RegionId, ScanRequest};
@@ -1027,12 +1025,9 @@ impl TryFrom<&PbOption> for SetRegionOption {
Ok(Self::Ttl(Some(ttl)))
}
TWCS_MAX_ACTIVE_WINDOW_RUNS
| TWCS_MAX_ACTIVE_WINDOW_FILES
| TWCS_MAX_INACTIVE_WINDOW_FILES
| TWCS_MAX_INACTIVE_WINDOW_RUNS
| TWCS_MAX_OUTPUT_FILE_SIZE
| TWCS_TIME_WINDOW => Ok(Self::Twsc(key.to_string(), value.to_string())),
TWCS_TRIGGER_FILE_NUM | TWCS_MAX_OUTPUT_FILE_SIZE | TWCS_TIME_WINDOW => {
Ok(Self::Twsc(key.to_string(), value.to_string()))
}
_ => InvalidSetRegionOptionRequestSnafu { key, value }.fail(),
}
}
@@ -1041,16 +1036,7 @@ impl TryFrom<&PbOption> for SetRegionOption {
impl From<&UnsetRegionOption> for SetRegionOption {
fn from(unset_option: &UnsetRegionOption) -> Self {
match unset_option {
UnsetRegionOption::TwcsMaxActiveWindowFiles => {
SetRegionOption::Twsc(unset_option.to_string(), String::new())
}
UnsetRegionOption::TwcsMaxInactiveWindowFiles => {
SetRegionOption::Twsc(unset_option.to_string(), String::new())
}
UnsetRegionOption::TwcsMaxActiveWindowRuns => {
SetRegionOption::Twsc(unset_option.to_string(), String::new())
}
UnsetRegionOption::TwcsMaxInactiveWindowRuns => {
UnsetRegionOption::TwcsTriggerFileNum => {
SetRegionOption::Twsc(unset_option.to_string(), String::new())
}
UnsetRegionOption::TwcsMaxOutputFileSize => {
@@ -1070,10 +1056,7 @@ impl TryFrom<&str> for UnsetRegionOption {
fn try_from(key: &str) -> Result<Self> {
match key.to_ascii_lowercase().as_str() {
TTL_KEY => Ok(Self::Ttl),
TWCS_MAX_ACTIVE_WINDOW_FILES => Ok(Self::TwcsMaxActiveWindowFiles),
TWCS_MAX_INACTIVE_WINDOW_FILES => Ok(Self::TwcsMaxInactiveWindowFiles),
TWCS_MAX_ACTIVE_WINDOW_RUNS => Ok(Self::TwcsMaxActiveWindowRuns),
TWCS_MAX_INACTIVE_WINDOW_RUNS => Ok(Self::TwcsMaxInactiveWindowRuns),
TWCS_TRIGGER_FILE_NUM => Ok(Self::TwcsTriggerFileNum),
TWCS_MAX_OUTPUT_FILE_SIZE => Ok(Self::TwcsMaxOutputFileSize),
TWCS_TIME_WINDOW => Ok(Self::TwcsTimeWindow),
_ => InvalidUnsetRegionOptionRequestSnafu { key }.fail(),
@@ -1083,10 +1066,7 @@ impl TryFrom<&str> for UnsetRegionOption {
#[derive(Debug, Eq, PartialEq, Clone, Serialize, Deserialize)]
pub enum UnsetRegionOption {
TwcsMaxActiveWindowFiles,
TwcsMaxInactiveWindowFiles,
TwcsMaxActiveWindowRuns,
TwcsMaxInactiveWindowRuns,
TwcsTriggerFileNum,
TwcsMaxOutputFileSize,
TwcsTimeWindow,
Ttl,
@@ -1096,10 +1076,7 @@ impl UnsetRegionOption {
pub fn as_str(&self) -> &str {
match self {
Self::Ttl => TTL_KEY,
Self::TwcsMaxActiveWindowFiles => TWCS_MAX_ACTIVE_WINDOW_FILES,
Self::TwcsMaxInactiveWindowFiles => TWCS_MAX_INACTIVE_WINDOW_FILES,
Self::TwcsMaxActiveWindowRuns => TWCS_MAX_ACTIVE_WINDOW_RUNS,
Self::TwcsMaxInactiveWindowRuns => TWCS_MAX_INACTIVE_WINDOW_RUNS,
Self::TwcsTriggerFileNum => TWCS_TRIGGER_FILE_NUM,
Self::TwcsMaxOutputFileSize => TWCS_MAX_OUTPUT_FILE_SIZE,
Self::TwcsTimeWindow => TWCS_TIME_WINDOW,
}

View File

@@ -238,21 +238,9 @@ impl<R: Rng> Generator<AlterTableExpr, R> for AlterExprSetTableOptionsGenerator<
let max_output_file_size: u64 = rng.random();
AlterTableOption::TwcsMaxOutputFileSize(ReadableSize(max_output_file_size))
}
AlterTableOption::TwcsMaxInactiveWindowRuns(_) => {
let max_inactive_window_runs: u64 = rng.random();
AlterTableOption::TwcsMaxInactiveWindowRuns(max_inactive_window_runs)
}
AlterTableOption::TwcsMaxActiveWindowFiles(_) => {
let max_active_window_files: u64 = rng.random();
AlterTableOption::TwcsMaxActiveWindowFiles(max_active_window_files)
}
AlterTableOption::TwcsMaxActiveWindowRuns(_) => {
let max_active_window_runs: u64 = rng.random();
AlterTableOption::TwcsMaxActiveWindowRuns(max_active_window_runs)
}
AlterTableOption::TwcsMaxInactiveWindowFiles(_) => {
let max_inactive_window_files: u64 = rng.random();
AlterTableOption::TwcsMaxInactiveWindowFiles(max_inactive_window_files)
AlterTableOption::TwcsTriggerFileNum(_) => {
let trigger_file_num: u64 = rng.random();
AlterTableOption::TwcsTriggerFileNum(trigger_file_num)
}
})
.collect();
@@ -365,7 +353,7 @@ mod tests {
.generate(&mut rng)
.unwrap();
let serialized = serde_json::to_string(&expr).unwrap();
let expected = r#"{"table_name":{"value":"quasi","quote_style":null},"alter_kinds":{"SetTableOptions":{"options":[{"TwcsMaxOutputFileSize":16770910638250818741}]}}}"#;
let expected = r#"{"table_name":{"value":"quasi","quote_style":null},"alter_kinds":{"SetTableOptions":{"options":[{"TwcsTimeWindow":{"value":2428665013,"unit":"Millisecond"}}]}}}"#;
assert_eq!(expected, serialized);
let expr = AlterExprUnsetTableOptionsGeneratorBuilder::default()
@@ -375,7 +363,7 @@ mod tests {
.generate(&mut rng)
.unwrap();
let serialized = serde_json::to_string(&expr).unwrap();
let expected = r#"{"table_name":{"value":"quasi","quote_style":null},"alter_kinds":{"UnsetTableOptions":{"keys":["compaction.twcs.max_active_window_runs","compaction.twcs.max_output_file_size","compaction.twcs.time_window","compaction.twcs.max_inactive_window_files","compaction.twcs.max_active_window_files"]}}}"#;
let expected = r#"{"table_name":{"value":"quasi","quote_style":null},"alter_kinds":{"UnsetTableOptions":{"keys":["compaction.twcs.trigger_file_num","compaction.twcs.time_window"]}}}"#;
assert_eq!(expected, serialized);
}
}

View File

@@ -21,9 +21,8 @@ use common_time::{Duration, FOREVER, INSTANT};
use derive_builder::Builder;
use serde::{Deserialize, Serialize};
use store_api::mito_engine_options::{
APPEND_MODE_KEY, COMPACTION_TYPE, TTL_KEY, TWCS_MAX_ACTIVE_WINDOW_FILES,
TWCS_MAX_ACTIVE_WINDOW_RUNS, TWCS_MAX_INACTIVE_WINDOW_FILES, TWCS_MAX_INACTIVE_WINDOW_RUNS,
TWCS_MAX_OUTPUT_FILE_SIZE, TWCS_TIME_WINDOW,
APPEND_MODE_KEY, COMPACTION_TYPE, TTL_KEY, TWCS_MAX_OUTPUT_FILE_SIZE, TWCS_TIME_WINDOW,
TWCS_TRIGGER_FILE_NUM,
};
use strum::EnumIter;
@@ -78,10 +77,7 @@ pub enum AlterTableOption {
Ttl(Ttl),
TwcsTimeWindow(Duration),
TwcsMaxOutputFileSize(ReadableSize),
TwcsMaxInactiveWindowFiles(u64),
TwcsMaxActiveWindowFiles(u64),
TwcsMaxInactiveWindowRuns(u64),
TwcsMaxActiveWindowRuns(u64),
TwcsTriggerFileNum(u64),
}
impl AlterTableOption {
@@ -90,10 +86,7 @@ impl AlterTableOption {
AlterTableOption::Ttl(_) => TTL_KEY,
AlterTableOption::TwcsTimeWindow(_) => TWCS_TIME_WINDOW,
AlterTableOption::TwcsMaxOutputFileSize(_) => TWCS_MAX_OUTPUT_FILE_SIZE,
AlterTableOption::TwcsMaxInactiveWindowFiles(_) => TWCS_MAX_INACTIVE_WINDOW_FILES,
AlterTableOption::TwcsMaxActiveWindowFiles(_) => TWCS_MAX_ACTIVE_WINDOW_FILES,
AlterTableOption::TwcsMaxInactiveWindowRuns(_) => TWCS_MAX_INACTIVE_WINDOW_RUNS,
AlterTableOption::TwcsMaxActiveWindowRuns(_) => TWCS_MAX_ACTIVE_WINDOW_RUNS,
AlterTableOption::TwcsTriggerFileNum(_) => TWCS_TRIGGER_FILE_NUM,
}
}
@@ -111,21 +104,9 @@ impl AlterTableOption {
};
Ok(AlterTableOption::Ttl(ttl))
}
TWCS_MAX_ACTIVE_WINDOW_RUNS => {
let runs = value.parse().unwrap();
Ok(AlterTableOption::TwcsMaxActiveWindowRuns(runs))
}
TWCS_MAX_ACTIVE_WINDOW_FILES => {
TWCS_TRIGGER_FILE_NUM => {
let files = value.parse().unwrap();
Ok(AlterTableOption::TwcsMaxActiveWindowFiles(files))
}
TWCS_MAX_INACTIVE_WINDOW_RUNS => {
let runs = value.parse().unwrap();
Ok(AlterTableOption::TwcsMaxInactiveWindowRuns(runs))
}
TWCS_MAX_INACTIVE_WINDOW_FILES => {
let files = value.parse().unwrap();
Ok(AlterTableOption::TwcsMaxInactiveWindowFiles(files))
Ok(AlterTableOption::TwcsTriggerFileNum(files))
}
TWCS_MAX_OUTPUT_FILE_SIZE => {
// may be "1M" instead of "1 MiB"
@@ -178,17 +159,8 @@ impl Display for AlterTableOption {
// Caution: to_string loses precision for ReadableSize
write!(f, "'{}' = '{}'", TWCS_MAX_OUTPUT_FILE_SIZE, s)
}
AlterTableOption::TwcsMaxInactiveWindowFiles(u) => {
write!(f, "'{}' = '{}'", TWCS_MAX_INACTIVE_WINDOW_FILES, u)
}
AlterTableOption::TwcsMaxActiveWindowFiles(u) => {
write!(f, "'{}' = '{}'", TWCS_MAX_ACTIVE_WINDOW_FILES, u)
}
AlterTableOption::TwcsMaxInactiveWindowRuns(u) => {
write!(f, "'{}' = '{}'", TWCS_MAX_INACTIVE_WINDOW_RUNS, u)
}
AlterTableOption::TwcsMaxActiveWindowRuns(u) => {
write!(f, "'{}' = '{}'", TWCS_MAX_ACTIVE_WINDOW_RUNS, u)
AlterTableOption::TwcsTriggerFileNum(u) => {
write!(f, "'{}' = '{}'", TWCS_TRIGGER_FILE_NUM, u)
}
}
}
@@ -212,21 +184,15 @@ mod tests {
]
);
let option_string = "compaction.twcs.max_active_window_files = '5030469694939972912',
compaction.twcs.max_active_window_runs = '8361168990283879099',
compaction.twcs.max_inactive_window_files = '6028716566907830876',
compaction.twcs.max_inactive_window_runs = '10622283085591494074',
let option_string = "compaction.twcs.trigger_file_num = '5030469694939972912',
compaction.twcs.max_output_file_size = '15686.4PiB',
compaction.twcs.time_window = '2061999256ms',
compaction.type = 'twcs',
ttl = '1month 3days 15h 49m 8s 279ms'";
let options = AlterTableOption::parse_kv_pairs(option_string).unwrap();
assert_eq!(options.len(), 7);
assert_eq!(options.len(), 4);
let expected = vec![
AlterTableOption::TwcsMaxActiveWindowFiles(5030469694939972912),
AlterTableOption::TwcsMaxActiveWindowRuns(8361168990283879099),
AlterTableOption::TwcsMaxInactiveWindowFiles(6028716566907830876),
AlterTableOption::TwcsMaxInactiveWindowRuns(10622283085591494074),
AlterTableOption::TwcsTriggerFileNum(5030469694939972912),
AlterTableOption::TwcsMaxOutputFileSize(ReadableSize::from_str("15686.4PiB").unwrap()),
AlterTableOption::TwcsTimeWindow(Duration::new_nanosecond(2_061_999_256_000_000)),
AlterTableOption::Ttl(Ttl::Duration(Duration::new_millisecond(

View File

@@ -191,10 +191,7 @@ mod tests {
AlterTableOption::Ttl(Ttl::Duration(Duration::new_second(60))),
AlterTableOption::TwcsTimeWindow(Duration::new_second(60)),
AlterTableOption::TwcsMaxOutputFileSize(ReadableSize::from_str("1GB").unwrap()),
AlterTableOption::TwcsMaxActiveWindowFiles(10),
AlterTableOption::TwcsMaxActiveWindowRuns(10),
AlterTableOption::TwcsMaxInactiveWindowFiles(5),
AlterTableOption::TwcsMaxInactiveWindowRuns(5),
AlterTableOption::TwcsTriggerFileNum(5),
],
},
};
@@ -204,10 +201,7 @@ mod tests {
"ALTER TABLE test SET 'ttl' = '60s', ",
"'compaction.twcs.time_window' = '60s', ",
"'compaction.twcs.max_output_file_size' = '1.0GiB', ",
"'compaction.twcs.max_active_window_files' = '10', ",
"'compaction.twcs.max_active_window_runs' = '10', ",
"'compaction.twcs.max_inactive_window_files' = '5', ",
"'compaction.twcs.max_inactive_window_runs' = '5';"
"'compaction.twcs.trigger_file_num' = '5';"
);
assert_eq!(expected, output);
}

View File

@@ -187,10 +187,7 @@ mod tests {
AlterTableOption::Ttl(Ttl::Duration(Duration::new_second(60))),
AlterTableOption::TwcsTimeWindow(Duration::new_second(60)),
AlterTableOption::TwcsMaxOutputFileSize(ReadableSize::from_str("1GB").unwrap()),
AlterTableOption::TwcsMaxActiveWindowFiles(10),
AlterTableOption::TwcsMaxActiveWindowRuns(10),
AlterTableOption::TwcsMaxInactiveWindowFiles(5),
AlterTableOption::TwcsMaxInactiveWindowRuns(5),
AlterTableOption::TwcsTriggerFileNum(10),
],
},
};
@@ -200,10 +197,7 @@ mod tests {
"ALTER TABLE test SET 'ttl' = '60s', ",
"'compaction.twcs.time_window' = '60s', ",
"'compaction.twcs.max_output_file_size' = '1.0GiB', ",
"'compaction.twcs.max_active_window_files' = '10', ",
"'compaction.twcs.max_active_window_runs' = '10', ",
"'compaction.twcs.max_inactive_window_files' = '5', ",
"'compaction.twcs.max_inactive_window_runs' = '5';"
"'compaction.twcs.trigger_file_num' = '10';",
);
assert_eq!(expected, output);
}

View File

@@ -59,7 +59,7 @@ DROP TABLE test;
Affected Rows: 0
CREATE TABLE test_pk(pk INTEGER PRIMARY KEY, i INTEGER, t TIMESTAMP TIME INDEX) WITH('compaction.type'='twcs', 'compaction.twcs.max_inactive_window_files'='4');
CREATE TABLE test_pk(pk INTEGER PRIMARY KEY, i INTEGER, t TIMESTAMP TIME INDEX) WITH('compaction.type'='twcs', 'compaction.twcs.trigger_file_num'='4');
Affected Rows: 0

View File

@@ -28,7 +28,7 @@ EXPLAIN SELECT DISTINCT a, b FROM test ORDER BY a, b;
DROP TABLE test;
CREATE TABLE test_pk(pk INTEGER PRIMARY KEY, i INTEGER, t TIMESTAMP TIME INDEX) WITH('compaction.type'='twcs', 'compaction.twcs.max_inactive_window_files'='4');
CREATE TABLE test_pk(pk INTEGER PRIMARY KEY, i INTEGER, t TIMESTAMP TIME INDEX) WITH('compaction.type'='twcs', 'compaction.twcs.trigger_file_num'='4');
INSERT INTO test_pk VALUES (1, 1, 1), (2, NULL, 2), (3, 1, 3), (4, 2, 4), (5, 2, 5), (6, NULL, 6);

View File

@@ -155,46 +155,31 @@ ALTER TABLE ato SET 'compaction.twcs.max_output_file_size'='500MB';
Affected Rows: 0
ALTER TABLE ato SET 'compaction.twcs.max_inactive_window_files'='2';
Affected Rows: 0
ALTER TABLE ato SET 'compaction.twcs.max_active_window_files'='2';
Affected Rows: 0
ALTER TABLE ato SET 'compaction.twcs.max_active_window_runs'='6';
Affected Rows: 0
ALTER TABLE ato SET 'compaction.twcs.max_inactive_window_runs'='6';
ALTER TABLE ato SET 'compaction.twcs.trigger_file_num'='2';
Affected Rows: 0
SHOW CREATE TABLE ato;
+-------+------------------------------------------------------+
| Table | Create Table |
+-------+------------------------------------------------------+
| ato | CREATE TABLE IF NOT EXISTS "ato" ( |
| | "i" INT NULL, |
| | "j" TIMESTAMP(3) NOT NULL, |
| | TIME INDEX ("j"), |
| | PRIMARY KEY ("i") |
| | ) |
| | |
| | ENGINE=mito |
| | WITH( |
| | 'compaction.twcs.max_active_window_files' = '2', |
| | 'compaction.twcs.max_active_window_runs' = '6', |
| | 'compaction.twcs.max_inactive_window_files' = '2', |
| | 'compaction.twcs.max_inactive_window_runs' = '6', |
| | 'compaction.twcs.max_output_file_size' = '500MB', |
| | 'compaction.twcs.time_window' = '2h', |
| | 'compaction.type' = 'twcs', |
| | ttl = '1s' |
| | ) |
+-------+------------------------------------------------------+
+-------+-----------------------------------------------------+
| Table | Create Table |
+-------+-----------------------------------------------------+
| ato | CREATE TABLE IF NOT EXISTS "ato" ( |
| | "i" INT NULL, |
| | "j" TIMESTAMP(3) NOT NULL, |
| | TIME INDEX ("j"), |
| | PRIMARY KEY ("i") |
| | ) |
| | |
| | ENGINE=mito |
| | WITH( |
| | 'compaction.twcs.max_output_file_size' = '500MB', |
| | 'compaction.twcs.time_window' = '2h', |
| | 'compaction.twcs.trigger_file_num' = '2', |
| | 'compaction.type' = 'twcs', |
| | ttl = '1s' |
| | ) |
+-------+-----------------------------------------------------+
ALTER TABLE ato UNSET 'compaction.twcs.time_window';
@@ -206,78 +191,69 @@ Error: 1004(InvalidArguments), Invalid unset table option request: Invalid set r
SHOW CREATE TABLE ato;
+-------+------------------------------------------------------+
| Table | Create Table |
+-------+------------------------------------------------------+
| ato | CREATE TABLE IF NOT EXISTS "ato" ( |
| | "i" INT NULL, |
| | "j" TIMESTAMP(3) NOT NULL, |
| | TIME INDEX ("j"), |
| | PRIMARY KEY ("i") |
| | ) |
| | |
| | ENGINE=mito |
| | WITH( |
| | 'compaction.twcs.max_active_window_files' = '2', |
| | 'compaction.twcs.max_active_window_runs' = '6', |
| | 'compaction.twcs.max_inactive_window_files' = '2', |
| | 'compaction.twcs.max_inactive_window_runs' = '6', |
| | 'compaction.twcs.max_output_file_size' = '500MB', |
| | 'compaction.type' = 'twcs', |
| | ttl = '1s' |
| | ) |
+-------+------------------------------------------------------+
+-------+-----------------------------------------------------+
| Table | Create Table |
+-------+-----------------------------------------------------+
| ato | CREATE TABLE IF NOT EXISTS "ato" ( |
| | "i" INT NULL, |
| | "j" TIMESTAMP(3) NOT NULL, |
| | TIME INDEX ("j"), |
| | PRIMARY KEY ("i") |
| | ) |
| | |
| | ENGINE=mito |
| | WITH( |
| | 'compaction.twcs.max_output_file_size' = '500MB', |
| | 'compaction.twcs.trigger_file_num' = '2', |
| | 'compaction.type' = 'twcs', |
| | ttl = '1s' |
| | ) |
+-------+-----------------------------------------------------+
ALTER TABLE ato SET 'compaction.twcs.max_inactive_window_runs'='';
ALTER TABLE ato SET 'compaction.twcs.trigger_file_num'='';
Affected Rows: 0
SHOW CREATE TABLE ato;
+-------+------------------------------------------------------+
| Table | Create Table |
+-------+------------------------------------------------------+
| ato | CREATE TABLE IF NOT EXISTS "ato" ( |
| | "i" INT NULL, |
| | "j" TIMESTAMP(3) NOT NULL, |
| | TIME INDEX ("j"), |
| | PRIMARY KEY ("i") |
| | ) |
| | |
| | ENGINE=mito |
| | WITH( |
| | 'compaction.twcs.max_active_window_files' = '2', |
| | 'compaction.twcs.max_active_window_runs' = '6', |
| | 'compaction.twcs.max_inactive_window_files' = '2', |
| | 'compaction.twcs.max_output_file_size' = '500MB', |
| | 'compaction.type' = 'twcs', |
| | ttl = '1s' |
| | ) |
+-------+------------------------------------------------------+
+-------+-----------------------------------------------------+
| Table | Create Table |
+-------+-----------------------------------------------------+
| ato | CREATE TABLE IF NOT EXISTS "ato" ( |
| | "i" INT NULL, |
| | "j" TIMESTAMP(3) NOT NULL, |
| | TIME INDEX ("j"), |
| | PRIMARY KEY ("i") |
| | ) |
| | |
| | ENGINE=mito |
| | WITH( |
| | 'compaction.twcs.max_output_file_size' = '500MB', |
| | 'compaction.type' = 'twcs', |
| | ttl = '1s' |
| | ) |
+-------+-----------------------------------------------------+
-- SQLNESS ARG restart=true
SHOW CREATE TABLE ato;
+-------+------------------------------------------------------+
| Table | Create Table |
+-------+------------------------------------------------------+
| ato | CREATE TABLE IF NOT EXISTS "ato" ( |
| | "i" INT NULL, |
| | "j" TIMESTAMP(3) NOT NULL, |
| | TIME INDEX ("j"), |
| | PRIMARY KEY ("i") |
| | ) |
| | |
| | ENGINE=mito |
| | WITH( |
| | 'compaction.twcs.max_active_window_files' = '2', |
| | 'compaction.twcs.max_active_window_runs' = '6', |
| | 'compaction.twcs.max_inactive_window_files' = '2', |
| | 'compaction.twcs.max_output_file_size' = '500MB', |
| | 'compaction.type' = 'twcs', |
| | ttl = '1s' |
| | ) |
+-------+------------------------------------------------------+
+-------+-----------------------------------------------------+
| Table | Create Table |
+-------+-----------------------------------------------------+
| ato | CREATE TABLE IF NOT EXISTS "ato" ( |
| | "i" INT NULL, |
| | "j" TIMESTAMP(3) NOT NULL, |
| | TIME INDEX ("j"), |
| | PRIMARY KEY ("i") |
| | ) |
| | |
| | ENGINE=mito |
| | WITH( |
| | 'compaction.twcs.max_output_file_size' = '500MB', |
| | 'compaction.type' = 'twcs', |
| | ttl = '1s' |
| | ) |
+-------+-----------------------------------------------------+
DROP TABLE ato;

View File

@@ -36,13 +36,7 @@ ALTER TABLE ato SET 'compaction.twcs.time_window'='2h';
ALTER TABLE ato SET 'compaction.twcs.max_output_file_size'='500MB';
ALTER TABLE ato SET 'compaction.twcs.max_inactive_window_files'='2';
ALTER TABLE ato SET 'compaction.twcs.max_active_window_files'='2';
ALTER TABLE ato SET 'compaction.twcs.max_active_window_runs'='6';
ALTER TABLE ato SET 'compaction.twcs.max_inactive_window_runs'='6';
ALTER TABLE ato SET 'compaction.twcs.trigger_file_num'='2';
SHOW CREATE TABLE ato;
@@ -52,7 +46,7 @@ ALTER TABLE ato UNSET '🕶️';
SHOW CREATE TABLE ato;
ALTER TABLE ato SET 'compaction.twcs.max_inactive_window_runs'='';
ALTER TABLE ato SET 'compaction.twcs.trigger_file_num'='';
SHOW CREATE TABLE ato;

View File

@@ -67,8 +67,7 @@ engine=mito
with(
'ttl'='7d',
'compaction.type'='twcs',
'compaction.twcs.max_active_window_runs'='2',
'compaction.twcs.max_inactive_window_runs'='2',
'compaction.twcs.trigger_file_num'='2',
'compaction.twcs.time_window'='1d',
'index.inverted_index.ignore_column_ids'='1,2,3',
'index.inverted_index.segment_row_count'='512',
@@ -90,7 +89,7 @@ create table if not exists invalid_compaction(
PRIMARY KEY(host)
)
engine=mito
with('compaction.type'='twcs', 'compaction.twcs.max_active_window_runs'='8d');
with('compaction.type'='twcs', 'compaction.twcs.trigger_file_num'='8d');
Error: 1004(InvalidArguments), Invalid options: invalid digit found in string

View File

@@ -57,8 +57,7 @@ engine=mito
with(
'ttl'='7d',
'compaction.type'='twcs',
'compaction.twcs.max_active_window_runs'='2',
'compaction.twcs.max_inactive_window_runs'='2',
'compaction.twcs.trigger_file_num'='2',
'compaction.twcs.time_window'='1d',
'index.inverted_index.ignore_column_ids'='1,2,3',
'index.inverted_index.segment_row_count'='512',
@@ -76,4 +75,4 @@ create table if not exists invalid_compaction(
PRIMARY KEY(host)
)
engine=mito
with('compaction.type'='twcs', 'compaction.twcs.max_active_window_runs'='8d');
with('compaction.type'='twcs', 'compaction.twcs.trigger_file_num'='8d');

View File

@@ -1,5 +1,5 @@
-- Test without PK, with a windowed sort query.
CREATE TABLE test(i INTEGER, t TIMESTAMP TIME INDEX) WITH('compaction.type'='twcs', 'compaction.twcs.max_inactive_window_files'='4');
CREATE TABLE test(i INTEGER, t TIMESTAMP TIME INDEX) WITH('compaction.type'='twcs');
Affected Rows: 0
@@ -213,7 +213,7 @@ DROP TABLE test;
Affected Rows: 0
-- Test with PK, with a windowed sort query.
CREATE TABLE test_pk(pk INTEGER PRIMARY KEY, i INTEGER, t TIMESTAMP TIME INDEX) WITH('compaction.type'='twcs', 'compaction.twcs.max_inactive_window_files'='4');
CREATE TABLE test_pk(pk INTEGER PRIMARY KEY, i INTEGER, t TIMESTAMP TIME INDEX) WITH('compaction.type'='twcs', 'compaction.twcs.trigger_file_num'='4');
Affected Rows: 0

View File

@@ -1,5 +1,5 @@
-- Test without PK, with a windowed sort query.
CREATE TABLE test(i INTEGER, t TIMESTAMP TIME INDEX) WITH('compaction.type'='twcs', 'compaction.twcs.max_inactive_window_files'='4');
CREATE TABLE test(i INTEGER, t TIMESTAMP TIME INDEX) WITH('compaction.type'='twcs');
INSERT INTO test VALUES (1, 1), (NULL, 2), (1, 3);
@@ -71,7 +71,7 @@ EXPLAIN ANALYZE SELECT * FROM test where t > 8 ORDER BY t DESC LIMIT 4;
DROP TABLE test;
-- Test with PK, with a windowed sort query.
CREATE TABLE test_pk(pk INTEGER PRIMARY KEY, i INTEGER, t TIMESTAMP TIME INDEX) WITH('compaction.type'='twcs', 'compaction.twcs.max_inactive_window_files'='4');
CREATE TABLE test_pk(pk INTEGER PRIMARY KEY, i INTEGER, t TIMESTAMP TIME INDEX) WITH('compaction.type'='twcs', 'compaction.twcs.trigger_file_num'='4');
INSERT INTO test_pk VALUES (1, 1, 1), (2, NULL, 2), (3, 1, 3);

View File

@@ -9,8 +9,7 @@ with
(
append_mode = 'true',
'compaction.type' = 'twcs',
'compaction.twcs.max_active_window_files' = '8',
'compaction.twcs.max_inactive_window_files' = '8'
'compaction.twcs.trigger_file_num' = '8',
);
Affected Rows: 0

View File

@@ -9,8 +9,7 @@ with
(
append_mode = 'true',
'compaction.type' = 'twcs',
'compaction.twcs.max_active_window_files' = '8',
'compaction.twcs.max_inactive_window_files' = '8'
'compaction.twcs.trigger_file_num' = '8',
);
insert into

View File

@@ -52,7 +52,7 @@ explain select * from numbers order by number asc limit 10;
| | |
+---------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
CREATE TABLE test_pk(pk INTEGER PRIMARY KEY, i INTEGER, t TIMESTAMP TIME INDEX) WITH('compaction.type'='twcs', 'compaction.twcs.max_inactive_window_files'='4');
CREATE TABLE test_pk(pk INTEGER PRIMARY KEY, i INTEGER, t TIMESTAMP TIME INDEX) WITH('compaction.type'='twcs');
Affected Rows: 0

View File

@@ -8,7 +8,7 @@ explain select * from numbers order by number desc limit 10;
explain select * from numbers order by number asc limit 10;
CREATE TABLE test_pk(pk INTEGER PRIMARY KEY, i INTEGER, t TIMESTAMP TIME INDEX) WITH('compaction.type'='twcs', 'compaction.twcs.max_inactive_window_files'='4');
CREATE TABLE test_pk(pk INTEGER PRIMARY KEY, i INTEGER, t TIMESTAMP TIME INDEX) WITH('compaction.type'='twcs');
INSERT INTO test_pk VALUES (1, 1, 1), (2, NULL, 2), (3, 1, 3), (4, 2, 4), (5, 2, 5), (6, NULL, 6);