feat!: reduce sorted runs during compaction (#3702)

* feat: add functions to find and merge sorted runs

* chore: refactor code

* chore: remove some duplicates

* chore: remove one clone

* refactor: change max_active_window_files to max_active_window_runs

* feat: integrate with sorted runs

* fix: unit tests

* feat: limit num of sorted runs during compaction

* fix: some test

* fix: some cr comments

* feat: use smallvec

* chore: rebase main

* feat/reduce-sorted-runs:
 Refactor compaction logic and update test configurations

 - Refactored `merge_all_runs` function to use `sort_ranged_items` for sorting.
 - Improved item merging logic by iterating with `into_iter` and handling overlaps.
 - Updated test configurations to use `max_active_window_runs` instead of `max_active_window_files` for consistency.

---------

Co-authored-by: tison <wander4096@gmail.com>
This commit is contained in:
Lei, HUANG
2024-06-28 16:17:30 +08:00
committed by GitHub
parent 352cc9ddde
commit ef935a1de6
12 changed files with 797 additions and 84 deletions

View File

@@ -15,6 +15,7 @@
mod buckets;
pub mod compactor;
pub mod picker;
mod run;
mod task;
#[cfg(test)]
mod test_util;

View File

@@ -132,8 +132,8 @@ pub fn new_picker(
} else {
match compaction_options {
CompactionOptions::Twcs(twcs_opts) => Arc::new(TwcsPicker::new(
twcs_opts.max_active_window_files,
twcs_opts.max_inactive_window_files,
twcs_opts.max_active_window_runs,
twcs_opts.max_inactive_window_runs,
twcs_opts.time_window_seconds(),
)) as Arc<_>,
}

View File

@@ -0,0 +1,709 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! This file contains code to find sorted runs in a set if ranged items and
//! along with the best way to merge these items to satisfy the desired run count.
use std::cmp::Ordering;
use common_base::BitVec;
use common_time::Timestamp;
use itertools::Itertools;
use smallvec::{smallvec, SmallVec};
use crate::sst::file::FileHandle;
/// Trait for any items with specific range.
pub(crate) trait Ranged {
type BoundType: Ord + Copy;
/// Returns the inclusive range of item.
fn range(&self) -> (Self::BoundType, Self::BoundType);
fn overlap<T>(&self, other: &T) -> bool
where
T: Ranged<BoundType = Self::BoundType>,
{
let (lhs_start, lhs_end) = self.range();
let (rhs_start, rhs_end) = other.range();
match lhs_start.cmp(&rhs_start) {
Ordering::Less => lhs_end >= rhs_start,
Ordering::Equal => true,
Ordering::Greater => lhs_start <= rhs_end,
}
}
}
// Sorts ranges by start asc and end desc.
fn sort_ranged_items<T: Ranged>(values: &mut [T]) {
values.sort_unstable_by(|l, r| {
let (l_start, l_end) = l.range();
let (r_start, r_end) = r.range();
l_start.cmp(&r_start).then(r_end.cmp(&l_end))
});
}
/// Trait for items to merge.
pub(crate) trait Item: Ranged + Clone {
/// Size is used to calculate the cost of merging items.
fn size(&self) -> usize;
}
impl Ranged for FileHandle {
type BoundType = Timestamp;
fn range(&self) -> (Self::BoundType, Self::BoundType) {
self.time_range()
}
}
impl Item for FileHandle {
fn size(&self) -> usize {
self.size() as usize
}
}
#[derive(Debug, Clone)]
struct MergeItems<T: Item> {
items: SmallVec<[T; 4]>,
start: T::BoundType,
end: T::BoundType,
size: usize,
}
impl<T: Item> Ranged for MergeItems<T> {
type BoundType = T::BoundType;
fn range(&self) -> (Self::BoundType, Self::BoundType) {
(self.start, self.end)
}
}
impl<T: Item> MergeItems<T> {
/// Creates unmerged item from given value.
pub fn new_unmerged(val: T) -> Self {
let (start, end) = val.range();
let size = val.size();
Self {
items: smallvec![val],
start,
end,
size,
}
}
/// The range of current merge item
pub(crate) fn range(&self) -> (T::BoundType, T::BoundType) {
(self.start, self.end)
}
/// Merges current item with other item.
pub(crate) fn merge(self, other: Self) -> Self {
let start = self.start.min(other.start);
let end = self.end.max(other.end);
let size = self.size + other.size;
let mut items = SmallVec::with_capacity(self.items.len() + other.items.len());
items.extend(self.items);
items.extend(other.items);
Self {
start,
end,
size,
items,
}
}
/// Returns true if current item is merged from two items.
pub fn merged(&self) -> bool {
self.items.len() > 1
}
}
/// A set of files with non-overlapping time ranges.
#[derive(Debug, Clone)]
pub(crate) struct SortedRun<T: Item> {
/// Items to merge
items: Vec<MergeItems<T>>,
/// penalty is defined as the total size of merged items.
penalty: usize,
/// The lower bound of all items.
start: Option<T::BoundType>,
// The upper bound of all items.
end: Option<T::BoundType>,
}
impl<T> Default for SortedRun<T>
where
T: Item,
{
fn default() -> Self {
Self {
items: vec![],
penalty: 0,
start: None,
end: None,
}
}
}
impl<T> SortedRun<T>
where
T: Item,
{
fn push_item(&mut self, t: MergeItems<T>) {
let (file_start, file_end) = t.range();
if t.merged() {
self.penalty += t.size;
}
self.items.push(t);
self.start = Some(self.start.map_or(file_start, |v| v.min(file_start)));
self.end = Some(self.end.map_or(file_end, |v| v.max(file_end)));
}
}
/// Finds sorted runs in given items.
pub(crate) fn find_sorted_runs<T>(items: &mut [T]) -> Vec<SortedRun<T>>
where
T: Item,
{
if items.is_empty() {
return vec![];
}
// sort files
sort_ranged_items(items);
let mut current_run = SortedRun::default();
let mut runs = vec![];
let mut selection = BitVec::repeat(false, items.len());
while !selection.all() {
// until all items are assigned to some sorted run.
for (item, mut selected) in items.iter().zip(selection.iter_mut()) {
if *selected {
// item is already assigned.
continue;
}
let current_item = MergeItems::new_unmerged(item.clone());
match current_run.items.last() {
None => {
// current run is empty, just add current_item
selected.set(true);
current_run.push_item(current_item);
}
Some(last) => {
// the current item does not overlap with the last item in current run,
// then it belongs to current run.
if !last.overlap(&current_item) {
// does not overlap, push to current run
selected.set(true);
current_run.push_item(current_item);
}
}
}
}
// finished an iteration, we've found a new run.
runs.push(std::mem::take(&mut current_run));
}
runs
}
fn merge_all_runs<T: Item>(runs: Vec<SortedRun<T>>) -> SortedRun<T> {
assert!(!runs.is_empty());
let mut all_items = runs
.into_iter()
.flat_map(|r| r.items.into_iter())
.collect::<Vec<_>>();
sort_ranged_items(&mut all_items);
let mut res = SortedRun::default();
let mut iter = all_items.into_iter();
// safety: all_items is not empty
let mut current_item = iter.next().unwrap();
for item in iter {
if current_item.overlap(&item) {
current_item = current_item.merge(item);
} else {
res.push_item(current_item);
current_item = item;
}
}
res.push_item(current_item);
res
}
/// Reduces the num of runs to given target and returns items to merge.
/// The time complexity of this function is `C_{k}_{runs.len()}` where k=`runs.len()`-target+1.
pub(crate) fn reduce_runs<T: Item>(runs: Vec<SortedRun<T>>, target: usize) -> Vec<Vec<T>> {
assert_ne!(target, 0);
if target >= runs.len() {
// already satisfied.
return vec![];
}
let k = runs.len() + 1 - target;
runs.into_iter()
.combinations(k) // find all possible solutions
.map(|runs_to_merge| merge_all_runs(runs_to_merge)) // calculate merge penalty
.min_by(|p, r| p.penalty.cmp(&r.penalty)) // find solution with the min penalty
.unwrap() // safety: their must be at least one solution.
.items
.into_iter()
.filter(|m| m.merged()) // find all files to merge in that solution
.map(|m| m.items.to_vec())
.collect()
}
#[cfg(test)]
mod tests {
use std::collections::HashSet;
use super::*;
#[derive(Clone, Debug)]
struct MockFile {
start: i64,
end: i64,
size: usize,
}
impl Ranged for MockFile {
type BoundType = i64;
fn range(&self) -> (Self::BoundType, Self::BoundType) {
(self.start, self.end)
}
}
impl Item for MockFile {
fn size(&self) -> usize {
self.size
}
}
fn build_items(ranges: &[(i64, i64)]) -> Vec<MockFile> {
ranges
.iter()
.map(|(start, end)| MockFile {
start: *start,
end: *end,
size: (*end - *start) as usize,
})
.collect()
}
fn check_sorted_runs(
ranges: &[(i64, i64)],
expected_runs: &[Vec<(i64, i64)>],
) -> Vec<SortedRun<MockFile>> {
let mut files = build_items(ranges);
let runs = find_sorted_runs(&mut files);
let result_file_ranges: Vec<Vec<_>> = runs
.iter()
.map(|r| r.items.iter().map(|f| f.range()).collect())
.collect();
assert_eq!(&expected_runs, &result_file_ranges);
runs
}
#[test]
fn test_find_sorted_runs() {
check_sorted_runs(&[], &[]);
check_sorted_runs(&[(1, 1), (2, 2)], &[vec![(1, 1), (2, 2)]]);
check_sorted_runs(&[(1, 2)], &[vec![(1, 2)]]);
check_sorted_runs(&[(1, 2), (2, 3)], &[vec![(1, 2)], vec![(2, 3)]]);
check_sorted_runs(&[(1, 2), (3, 4)], &[vec![(1, 2), (3, 4)]]);
check_sorted_runs(&[(2, 4), (1, 3)], &[vec![(1, 3)], vec![(2, 4)]]);
check_sorted_runs(
&[(1, 3), (2, 4), (4, 5)],
&[vec![(1, 3), (4, 5)], vec![(2, 4)]],
);
check_sorted_runs(
&[(1, 2), (3, 4), (3, 5)],
&[vec![(1, 2), (3, 5)], vec![(3, 4)]],
);
check_sorted_runs(
&[(1, 3), (2, 4), (5, 6)],
&[vec![(1, 3), (5, 6)], vec![(2, 4)]],
);
check_sorted_runs(
&[(1, 2), (3, 5), (4, 6)],
&[vec![(1, 2), (3, 5)], vec![(4, 6)]],
);
check_sorted_runs(
&[(1, 2), (3, 4), (4, 6), (7, 8)],
&[vec![(1, 2), (3, 4), (7, 8)], vec![(4, 6)]],
);
check_sorted_runs(
&[(1, 2), (3, 4), (5, 6), (3, 6), (7, 8), (8, 9)],
&[vec![(1, 2), (3, 6), (7, 8)], vec![(3, 4), (5, 6), (8, 9)]],
);
check_sorted_runs(
&[(10, 19), (20, 21), (20, 29), (30, 39)],
&[vec![(10, 19), (20, 29), (30, 39)], vec![(20, 21)]],
);
check_sorted_runs(
&[(10, 19), (20, 29), (21, 22), (30, 39), (31, 32), (32, 42)],
&[
vec![(10, 19), (20, 29), (30, 39)],
vec![(21, 22), (31, 32)],
vec![(32, 42)],
],
);
}
fn check_merge_sorted_runs(
items: &[(i64, i64)],
expected_penalty: usize,
expected: &[Vec<(i64, i64)>],
) {
let mut items = build_items(items);
let runs = find_sorted_runs(&mut items);
assert_eq!(2, runs.len());
let res = merge_all_runs(runs);
let penalty = res.penalty;
let ranges = res
.items
.into_iter()
.map(|i| {
i.items
.into_iter()
.map(|f| (f.start, f.end))
.sorted_by(|l, r| l.0.cmp(&r.0).then(l.1.cmp(&r.1)))
.collect::<Vec<_>>()
})
.collect::<Vec<_>>();
assert_eq!(expected, &ranges);
assert_eq!(expected_penalty, penalty);
}
#[test]
fn test_merge_sorted_runs() {
// [1..2]
// [1...3]
check_merge_sorted_runs(&[(1, 2), (1, 3)], 3, &[vec![(1, 2), (1, 3)]]);
// [1..2][3..4]
// [2..3]
check_merge_sorted_runs(
&[(1, 2), (2, 3), (3, 4)],
3,
&[vec![(1, 2), (2, 3), (3, 4)]],
);
// [1..10][11..20][21...30]
// [18]
check_merge_sorted_runs(
&[(1, 10), (11, 20), (21, 30), (18, 18)],
9,
&[vec![(1, 10)], vec![(11, 20), (18, 18)], vec![(21, 30)]],
);
// [1..3][4..5]
// [2...4]
check_merge_sorted_runs(
&[(1, 3), (2, 4), (4, 5)],
5,
&[vec![(1, 3), (2, 4), (4, 5)]],
);
// [1..2][3..4] [7..8]
// [4..6]
check_merge_sorted_runs(
&[(1, 2), (3, 4), (4, 6), (7, 8)],
3,
&[vec![(1, 2)], vec![(3, 4), (4, 6)], vec![(7, 8)]],
);
// [1..2][3..4][5..6][7..8]
// [3........6] [8..9]
//
check_merge_sorted_runs(
&[(1, 2), (3, 4), (5, 6), (3, 6), (7, 8), (8, 9)],
7,
&[
vec![(1, 2)],
vec![(3, 4), (3, 6), (5, 6)],
vec![(7, 8), (8, 9)],
],
);
// [10.....19][20........29][30........39]
// [21..22] [31..32]
check_merge_sorted_runs(
&[(10, 19), (20, 29), (21, 22), (30, 39), (31, 32)],
20,
&[
vec![(10, 19)],
vec![(20, 29), (21, 22)],
vec![(30, 39), (31, 32)],
],
);
// [1..10][11..20][21..30]
// [1..10] [21..30]
check_merge_sorted_runs(
&[(1, 10), (1, 10), (11, 20), (21, 30), (21, 30)],
36,
&[
vec![(1, 10), (1, 10)],
vec![(11, 20)],
vec![(21, 30), (21, 30)],
],
);
// [1..10][11..20][21...30]
// [22..30]
check_merge_sorted_runs(
&[(1, 10), (11, 20), (21, 30), (22, 30)],
17,
&[vec![(1, 10)], vec![(11, 20)], vec![(21, 30), (22, 30)]],
);
}
/// files: file arrangement with two sorted runs.
fn check_merge_all_sorted_runs(
files: &[(i64, i64)],
expected_penalty: usize,
expected: &[Vec<(i64, i64)>],
) {
let mut files = build_items(files);
let runs = find_sorted_runs(&mut files);
let result = merge_all_runs(runs);
assert_eq!(expected_penalty, result.penalty);
assert_eq!(expected.len(), result.items.len());
let res = result
.items
.iter()
.map(|i| {
let mut res = i.items.iter().map(|f| (f.start, f.end)).collect::<Vec<_>>();
res.sort_unstable_by(|l, r| l.0.cmp(&r.0));
res
})
.collect::<Vec<_>>();
assert_eq!(expected, &res);
}
#[test]
fn test_merge_all_sorted_runs() {
// [1..2][3..4]
// [4..10]
check_merge_all_sorted_runs(
&[(1, 2), (3, 4), (4, 10)],
7, // 1+6
&[vec![(1, 2)], vec![(3, 4), (4, 10)]],
);
// [1..2] [3..4] [5..6]
// [4..........10]
check_merge_all_sorted_runs(
&[(1, 2), (3, 4), (5, 6), (4, 10)],
8, // 1+1+6
&[vec![(1, 2)], vec![(3, 4), (4, 10), (5, 6)]],
);
// [10..20] [30..40] [50....60]
// [35........55]
// [51..61]
check_merge_all_sorted_runs(
&[(10, 20), (30, 40), (50, 60), (35, 55), (51, 61)],
50,
&[vec![(10, 20)], vec![(30, 40), (35, 55), (50, 60), (51, 61)]],
);
}
#[test]
fn test_sorted_runs_time_range() {
let mut files = build_items(&[(1, 2), (3, 4), (4, 10)]);
let runs = find_sorted_runs(&mut files);
assert_eq!(2, runs.len());
let SortedRun { start, end, .. } = &runs[0];
assert_eq!(Some(1), *start);
assert_eq!(Some(4), *end);
let SortedRun { start, end, .. } = &runs[1];
assert_eq!(Some(4), *start);
assert_eq!(Some(10), *end);
}
fn check_reduce_runs(
files: &[(i64, i64)],
expected_runs: &[Vec<(i64, i64)>],
target: usize,
expected: &[Vec<(i64, i64)>],
) {
let runs = check_sorted_runs(files, expected_runs);
let files_to_merge = reduce_runs(runs, target);
let file_timestamps = files_to_merge
.into_iter()
.map(|f| {
let mut overlapping = f.into_iter().map(|f| (f.start, f.end)).collect::<Vec<_>>();
overlapping.sort_unstable_by(|l, r| l.0.cmp(&r.0).then(l.1.cmp(&r.1)));
overlapping
})
.collect::<HashSet<_>>();
let expected = expected.iter().cloned().collect::<HashSet<_>>();
assert_eq!(&expected, &file_timestamps);
}
#[test]
fn test_reduce_runs() {
// [1..3] [5..6]
// [2..4]
check_reduce_runs(
&[(1, 3), (2, 4), (5, 6)],
&[vec![(1, 3), (5, 6)], vec![(2, 4)]],
1,
&[vec![(1, 3), (2, 4)]],
);
// [1..2][3..5]
// [4..6]
check_reduce_runs(
&[(1, 2), (3, 5), (4, 6)],
&[vec![(1, 2), (3, 5)], vec![(4, 6)]],
1,
&[vec![(3, 5), (4, 6)]],
);
// [1..4]
// [2..5]
// [3..6]
check_reduce_runs(
&[(1, 4), (2, 5), (3, 6)],
&[vec![(1, 4)], vec![(2, 5)], vec![(3, 6)]],
1,
&[vec![(1, 4), (2, 5), (3, 6)]],
);
check_reduce_runs(
&[(1, 4), (2, 5), (3, 6)],
&[vec![(1, 4)], vec![(2, 5)], vec![(3, 6)]],
2,
&[vec![(1, 4), (2, 5)]],
);
// [1..2][3..4] [7..8]
// [4..6]
check_reduce_runs(
&[(1, 2), (3, 4), (4, 6), (7, 8)],
&[vec![(1, 2), (3, 4), (7, 8)], vec![(4, 6)]],
1,
&[vec![(3, 4), (4, 6)]],
);
// [1..2][3........6][7..8]
// [3..4][5..6] [8..9]
check_reduce_runs(
&[(1, 2), (3, 4), (5, 6), (3, 6), (7, 8), (8, 9)],
&[vec![(1, 2), (3, 6), (7, 8)], vec![(3, 4), (5, 6), (8, 9)]],
2,
&[], // already satisfied
);
// [1..2][3........6][7..8]
// [3..4][5..6] [8..9]
check_reduce_runs(
&[(1, 2), (3, 4), (5, 6), (3, 6), (7, 8), (8, 9)],
&[vec![(1, 2), (3, 6), (7, 8)], vec![(3, 4), (5, 6), (8, 9)]],
1,
&[vec![(3, 4), (3, 6), (5, 6)], vec![(7, 8), (8, 9)]],
);
// [10..20] [30..40] [50........80] [100..110]
// [50..60] [80...100]
// [80..90]
check_reduce_runs(
&[
(10, 20),
(30, 40),
(50, 60),
(50, 80),
(80, 90),
(80, 100),
(100, 110),
],
&[
vec![(10, 20), (30, 40), (50, 80), (100, 110)],
vec![(50, 60), (80, 100)],
vec![(80, 90)],
],
2,
&[vec![(80, 90), (80, 100)]],
);
// [10..20] [30..40] [50........80] [100..110]
// [50..60] [80.......100]
// [80..90]
check_reduce_runs(
&[
(10, 20),
(30, 40),
(50, 60),
(50, 80),
(80, 90),
(80, 100),
(100, 110),
],
&[
vec![(10, 20), (30, 40), (50, 80), (100, 110)],
vec![(50, 60), (80, 100)],
vec![(80, 90)],
],
1,
&[vec![(50, 60), (50, 80), (80, 90), (80, 100), (100, 110)]],
);
// [0..10]
// [0...11]
// [0....12]
// [0.....13]
check_reduce_runs(
&[(0, 10), (0, 11), (0, 12), (0, 13)],
&[vec![(0, 13)], vec![(0, 12)], vec![(0, 11)], vec![(0, 10)]],
4,
&[],
);
// enforce 3 runs
check_reduce_runs(
&[(0, 10), (0, 11), (0, 12), (0, 13)],
&[vec![(0, 13)], vec![(0, 12)], vec![(0, 11)], vec![(0, 10)]],
3,
&[vec![(0, 10), (0, 11)]],
);
// enforce 2 runs
check_reduce_runs(
&[(0, 10), (0, 11), (0, 12), (0, 13)],
&[vec![(0, 13)], vec![(0, 12)], vec![(0, 11)], vec![(0, 10)]],
2,
&[vec![(0, 10), (0, 11), (0, 12)]],
);
// enforce 1 run
check_reduce_runs(
&[(0, 10), (0, 11), (0, 12), (0, 13)],
&[vec![(0, 13)], vec![(0, 12)], vec![(0, 11)], vec![(0, 10)]],
1,
&[vec![(0, 10), (0, 11), (0, 12), (0, 13)]],
);
}
}

View File

@@ -24,85 +24,81 @@ use common_time::Timestamp;
use crate::compaction::buckets::infer_time_bucket;
use crate::compaction::compactor::CompactionRegion;
use crate::compaction::picker::{Picker, PickerOutput};
use crate::compaction::run::{find_sorted_runs, reduce_runs};
use crate::compaction::{get_expired_ssts, CompactionOutput};
use crate::sst::file::{overlaps, FileHandle, FileId};
use crate::sst::file::{overlaps, FileHandle, FileId, Level};
use crate::sst::version::LevelMeta;
const LEVEL_COMPACTED: Level = 1;
/// `TwcsPicker` picks files of which the max timestamp are in the same time window as compaction
/// candidates.
pub struct TwcsPicker {
max_active_window_files: usize,
max_inactive_window_files: usize,
max_active_window_runs: usize,
max_inactive_window_runs: usize,
time_window_seconds: Option<i64>,
}
impl Debug for TwcsPicker {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("TwcsPicker")
.field("max_active_window_files", &self.max_active_window_files)
.field("max_inactive_window_files", &self.max_inactive_window_files)
.field("max_active_window_runs", &self.max_active_window_runs)
.field("max_inactive_window_runs", &self.max_inactive_window_runs)
.finish()
}
}
impl TwcsPicker {
pub fn new(
max_active_window_files: usize,
max_inactive_window_files: usize,
max_active_window_runs: usize,
max_inactive_window_runs: usize,
time_window_seconds: Option<i64>,
) -> Self {
Self {
max_inactive_window_files,
max_active_window_files,
max_inactive_window_runs,
max_active_window_runs,
time_window_seconds,
}
}
/// Builds compaction output from files.
/// For active writing window, we allow for at most `max_active_window_files` files to alleviate
/// For active writing window, we allow for at most `max_active_window_runs` files to alleviate
/// fragmentation. For other windows, we allow at most 1 file at each window.
fn build_output(
&self,
time_windows: &BTreeMap<i64, Window>,
time_windows: &mut BTreeMap<i64, Window>,
active_window: Option<i64>,
) -> Vec<CompactionOutput> {
let mut output = vec![];
for (window, files) in time_windows {
let files_in_window = &files.files;
// we only remove deletion markers once no file in current window overlaps with any other window.
let filter_deleted = !files.overlapping;
let sorted_runs = find_sorted_runs(&mut files.files);
if let Some(active_window) = active_window
let max_runs = if let Some(active_window) = active_window
&& *window == active_window
{
if files_in_window.len() > self.max_active_window_files {
self.max_active_window_runs
} else {
self.max_inactive_window_runs
};
// we only remove deletion markers once no file in current window overlaps with any other window.
let found_runs = sorted_runs.len();
let filter_deleted = !files.overlapping && (found_runs == 1 || max_runs == 1);
if found_runs > max_runs {
let files_to_compact = reduce_runs(sorted_runs, max_runs);
info!("Building compaction output, active window: {:?}, current window: {}, max runs: {}, found runs: {}, output size: {}", active_window, *window,max_runs, found_runs, files_to_compact.len());
for inputs in files_to_compact {
output.push(CompactionOutput {
output_file_id: FileId::random(),
output_level: 1, // we only have two levels and always compact to l1
inputs: files_in_window.clone(),
output_level: LEVEL_COMPACTED, // always compact to l1
inputs,
filter_deleted,
output_time_range: None, // we do not enforce output time range in twcs compactions.
output_time_range: None, // we do not enforce output time range in twcs compactions.});
});
} else {
debug!("Active window not present or no enough files in active window {:?}, window: {}", active_window, *window);
}
} else {
// not active writing window
if files_in_window.len() > self.max_inactive_window_files {
output.push(CompactionOutput {
output_file_id: FileId::random(),
output_level: 1,
inputs: files_in_window.clone(),
filter_deleted,
output_time_range: None,
});
} else {
debug!(
"No enough files, current: {}, max_inactive_window_files: {}",
files_in_window.len(),
self.max_inactive_window_files
)
}
debug!("Skip building compaction output, active window: {:?}, current window: {}, max runs: {}, found runs: {}, ", active_window, *window, max_runs, found_runs);
}
}
output
@@ -139,8 +135,9 @@ impl Picker for TwcsPicker {
// Find active window from files in level 0.
let active_window = find_latest_window_in_seconds(levels[0].files(), time_window_size);
// Assign files to windows
let windows = assign_to_windows(levels.iter().flat_map(LevelMeta::files), time_window_size);
let outputs = self.build_output(&windows, active_window);
let mut windows =
assign_to_windows(levels.iter().flat_map(LevelMeta::files), time_window_size);
let outputs = self.build_output(&mut windows, active_window);
if outputs.is_empty() && expired_ssts.is_empty() {
return None;
@@ -482,10 +479,10 @@ mod tests {
impl CompactionPickerTestCase {
fn check(&self) {
let windows = assign_to_windows(self.input_files.iter(), self.window_size);
let mut windows = assign_to_windows(self.input_files.iter(), self.window_size);
let active_window =
find_latest_window_in_seconds(self.input_files.iter(), self.window_size);
let output = TwcsPicker::new(4, 1, None).build_output(&windows, active_window);
let output = TwcsPicker::new(4, 1, None).build_output(&mut windows, active_window);
let output = output
.iter()

View File

@@ -100,8 +100,8 @@ async fn test_append_mode_compaction() {
let request = CreateRequestBuilder::new()
.insert_option("compaction.type", "twcs")
.insert_option("compaction.twcs.max_active_window_files", "2")
.insert_option("compaction.twcs.max_inactive_window_files", "2")
.insert_option("compaction.twcs.max_active_window_runs", "2")
.insert_option("compaction.twcs.max_inactive_window_runs", "2")
.insert_option("append_mode", "true")
.build();
let region_dir = request.region_dir.clone();
@@ -167,7 +167,7 @@ async fn test_append_mode_compaction() {
+-------+---------+---------------------+";
// Scans in parallel.
let scanner = engine.scanner(region_id, ScanRequest::default()).unwrap();
assert_eq!(1, scanner.num_files());
assert_eq!(2, scanner.num_files());
assert_eq!(1, scanner.num_memtables());
let stream = scanner.scan().await.unwrap();
let batches = RecordBatches::try_collect(stream).await.unwrap();

View File

@@ -140,8 +140,18 @@ async fn test_compaction_region() {
assert_eq!(result.affected_rows, 0);
let scanner = engine.scanner(region_id, ScanRequest::default()).unwrap();
// Input:
// [0..9]
// [10...19]
// [20....29]
// -[15.........29]-
// [15.....24]
// Output:
// [0..9]
// [10..14]
// [15..24]
assert_eq!(
1,
3,
scanner.num_files(),
"unexpected files: {:?}",
scanner.file_ids()
@@ -161,8 +171,8 @@ async fn test_compaction_region_with_overlapping() {
let region_id = RegionId::new(1, 1);
let request = CreateRequestBuilder::new()
.insert_option("compaction.type", "twcs")
.insert_option("compaction.twcs.max_active_window_files", "2")
.insert_option("compaction.twcs.max_inactive_window_files", "2")
.insert_option("compaction.twcs.max_active_window_runs", "2")
.insert_option("compaction.twcs.max_inactive_window_runs", "2")
.insert_option("compaction.twcs.time_window", "1h")
.build();
@@ -191,14 +201,7 @@ async fn test_compaction_region_with_overlapping() {
assert_eq!(result.affected_rows, 0);
let scanner = engine.scanner(region_id, ScanRequest::default()).unwrap();
assert_eq!(
2,
scanner.num_files(),
"unexpected files: {:?}",
scanner.file_ids()
);
let stream = scanner.scan().await.unwrap();
let vec = collect_stream_ts(stream).await;
assert_eq!((3600..10800).map(|i| { i * 1000 }).collect::<Vec<_>>(), vec);
}
@@ -212,8 +215,8 @@ async fn test_compaction_region_with_overlapping_delete_all() {
let region_id = RegionId::new(1, 1);
let request = CreateRequestBuilder::new()
.insert_option("compaction.type", "twcs")
.insert_option("compaction.twcs.max_active_window_files", "2")
.insert_option("compaction.twcs.max_inactive_window_files", "2")
.insert_option("compaction.twcs.max_active_window_runs", "2")
.insert_option("compaction.twcs.max_inactive_window_runs", "2")
.insert_option("compaction.twcs.time_window", "1h")
.build();
@@ -249,7 +252,6 @@ async fn test_compaction_region_with_overlapping_delete_all() {
scanner.file_ids()
);
let stream = scanner.scan().await.unwrap();
let vec = collect_stream_ts(stream).await;
assert!(vec.is_empty());
}
@@ -275,7 +277,7 @@ async fn test_readonly_during_compaction() {
let region_id = RegionId::new(1, 1);
let request = CreateRequestBuilder::new()
.insert_option("compaction.type", "twcs")
.insert_option("compaction.twcs.max_active_window_files", "1")
.insert_option("compaction.twcs.max_active_window_runs", "1")
.build();
let column_schemas = request
@@ -289,7 +291,7 @@ async fn test_readonly_during_compaction() {
.unwrap();
// Flush 2 SSTs for compaction.
put_and_flush(&engine, region_id, &column_schemas, 0..10).await;
put_and_flush(&engine, region_id, &column_schemas, 10..20).await;
put_and_flush(&engine, region_id, &column_schemas, 5..20).await;
// Waits until the engine receives compaction finished request.
listener.wait_handle_finished().await;

View File

@@ -34,7 +34,7 @@ async fn test_scan_without_filtering_deleted() {
let region_id = RegionId::new(1, 1);
let request = CreateRequestBuilder::new()
.insert_option("compaction.type", "twcs")
.insert_option("compaction.twcs.max_active_window_files", "10")
.insert_option("compaction.twcs.max_active_window_runs", "10")
.build();
let column_schemas = rows_schema(&request);

View File

@@ -175,12 +175,12 @@ impl Default for CompactionOptions {
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(default)]
pub struct TwcsOptions {
/// Max num of files that can be kept in active writing time window.
/// Max num of sorted runs that can be kept in active writing time window.
#[serde_as(as = "DisplayFromStr")]
pub max_active_window_files: usize,
pub max_active_window_runs: usize,
/// Max num of files that can be kept in inactive time window.
#[serde_as(as = "DisplayFromStr")]
pub max_inactive_window_files: usize,
pub max_inactive_window_runs: usize,
/// Compaction time window defined when creating tables.
#[serde(with = "humantime_serde")]
pub time_window: Option<Duration>,
@@ -205,8 +205,8 @@ impl TwcsOptions {
impl Default for TwcsOptions {
fn default() -> Self {
Self {
max_active_window_files: 4,
max_inactive_window_files: 1,
max_active_window_runs: 1,
max_inactive_window_runs: 1,
time_window: None,
}
}
@@ -429,7 +429,7 @@ mod tests {
#[test]
fn test_without_compaction_type() {
let map = make_map(&[
("compaction.twcs.max_active_window_files", "8"),
("compaction.twcs.max_active_window_runs", "8"),
("compaction.twcs.time_window", "2h"),
]);
let err = RegionOptions::try_from(&map).unwrap_err();
@@ -439,14 +439,14 @@ mod tests {
#[test]
fn test_with_compaction_type() {
let map = make_map(&[
("compaction.twcs.max_active_window_files", "8"),
("compaction.twcs.max_active_window_runs", "8"),
("compaction.twcs.time_window", "2h"),
("compaction.type", "twcs"),
]);
let options = RegionOptions::try_from(&map).unwrap();
let expect = RegionOptions {
compaction: CompactionOptions::Twcs(TwcsOptions {
max_active_window_files: 8,
max_active_window_runs: 8,
time_window: Some(Duration::from_secs(3600 * 2)),
..Default::default()
}),
@@ -547,8 +547,8 @@ mod tests {
});
let map = make_map(&[
("ttl", "7d"),
("compaction.twcs.max_active_window_files", "8"),
("compaction.twcs.max_inactive_window_files", "2"),
("compaction.twcs.max_active_window_runs", "8"),
("compaction.twcs.max_inactive_window_runs", "2"),
("compaction.twcs.time_window", "2h"),
("compaction.type", "twcs"),
("storage", "S3"),
@@ -569,8 +569,8 @@ mod tests {
let expect = RegionOptions {
ttl: Some(Duration::from_secs(3600 * 24 * 7)),
compaction: CompactionOptions::Twcs(TwcsOptions {
max_active_window_files: 8,
max_inactive_window_files: 2,
max_active_window_runs: 8,
max_inactive_window_runs: 2,
time_window: Some(Duration::from_secs(3600 * 2)),
}),
storage: Some("S3".to_string()),

View File

@@ -200,6 +200,10 @@ impl FileHandle {
pub fn meta_ref(&self) -> &FileMeta {
&self.inner.meta
}
pub fn size(&self) -> u64 {
self.inner.meta.file_size
}
}
/// Inner data of [FileHandle].

View File

@@ -22,8 +22,8 @@ pub fn is_mito_engine_option_key(key: &str) -> bool {
[
"ttl",
"compaction.type",
"compaction.twcs.max_active_window_files",
"compaction.twcs.max_inactive_window_files",
"compaction.twcs.max_active_window_runs",
"compaction.twcs.max_inactive_window_runs",
"compaction.twcs.time_window",
"storage",
"index.inverted_index.ignore_column_ids",
@@ -47,10 +47,10 @@ mod tests {
assert!(is_mito_engine_option_key("ttl"));
assert!(is_mito_engine_option_key("compaction.type"));
assert!(is_mito_engine_option_key(
"compaction.twcs.max_active_window_files"
"compaction.twcs.max_active_window_runs"
));
assert!(is_mito_engine_option_key(
"compaction.twcs.max_inactive_window_files"
"compaction.twcs.max_inactive_window_runs"
));
assert!(is_mito_engine_option_key("compaction.twcs.time_window"));
assert!(is_mito_engine_option_key("storage"));

View File

@@ -67,8 +67,8 @@ engine=mito
with(
'ttl'='7d',
'compaction.type'='twcs',
'compaction.twcs.max_active_window_files'='8',
'compaction.twcs.max_inactive_window_files'='2',
'compaction.twcs.max_active_window_runs'='2',
'compaction.twcs.max_inactive_window_runs'='2',
'compaction.twcs.time_window'='1d',
'index.inverted_index.ignore_column_ids'='1,2,3',
'index.inverted_index.segment_row_count'='512',
@@ -90,7 +90,7 @@ create table if not exists invalid_compaction(
PRIMARY KEY(host)
)
engine=mito
with('compaction.type'='twcs', 'compaction.twcs.max_active_window_files'='8d');
with('compaction.type'='twcs', 'compaction.twcs.max_active_window_runs'='8d');
Error: 1004(InvalidArguments), Invalid options: invalid digit found in string

View File

@@ -57,8 +57,8 @@ engine=mito
with(
'ttl'='7d',
'compaction.type'='twcs',
'compaction.twcs.max_active_window_files'='8',
'compaction.twcs.max_inactive_window_files'='2',
'compaction.twcs.max_active_window_runs'='2',
'compaction.twcs.max_inactive_window_runs'='2',
'compaction.twcs.time_window'='1d',
'index.inverted_index.ignore_column_ids'='1,2,3',
'index.inverted_index.segment_row_count'='512',
@@ -76,4 +76,4 @@ create table if not exists invalid_compaction(
PRIMARY KEY(host)
)
engine=mito
with('compaction.type'='twcs', 'compaction.twcs.max_active_window_files'='8d');
with('compaction.type'='twcs', 'compaction.twcs.max_active_window_runs'='8d');