feat: initial twcs impl (#1851)

* feat: initial twcs impl

* chore: rename SimplePicker to LeveledPicker

* rename some structs

* Remove Compaction strategy

* make compaction picker a trait object

* make compaction picker configurable for every region

* chore: add some test for ttl

* add some tests

* fix: some style issues in cr

* feat: enable twcs when creating tables

* feat: allow config time window when creating tables

* fix: some cr comments
This commit is contained in:
Lei, HUANG
2023-07-04 16:42:27 +08:00
committed by GitHub
parent b8e92292d2
commit 3b6f70cde3
21 changed files with 1150 additions and 453 deletions

View File

@@ -14,6 +14,7 @@
use std::cmp::Ordering;
use crate::util::div_ceil;
use crate::Timestamp;
/// Unix timestamp in millisecond resolution.
@@ -80,11 +81,17 @@ impl PartialOrd<TimestampMillis> for i64 {
}
pub trait BucketAligned: Sized {
/// Returns the timestamp aligned by `bucket_duration` or `None` if underflow occurred.
/// Aligns the value by `bucket_duration` or `None` if underflow occurred.
///
/// # Panics
/// Panics if `bucket_duration <= 0`.
fn align_by_bucket(self, bucket_duration: i64) -> Option<Self>;
/// Aligns the value by `bucket_duration` to ceil or `None` if overflow occurred.
///
/// # Panics
/// Panics if `bucket_duration <= 0`.
fn align_to_ceil_by_bucket(self, bucket_duration: i64) -> Option<Self>;
}
impl BucketAligned for i64 {
@@ -93,6 +100,11 @@ impl BucketAligned for i64 {
self.checked_div_euclid(bucket_duration)
.and_then(|val| val.checked_mul(bucket_duration))
}
fn align_to_ceil_by_bucket(self, bucket_duration: i64) -> Option<Self> {
assert!(bucket_duration > 0, "{}", bucket_duration);
div_ceil(self, bucket_duration).checked_mul(bucket_duration)
}
}
impl BucketAligned for Timestamp {
@@ -103,6 +115,14 @@ impl BucketAligned for Timestamp {
.align_by_bucket(bucket_duration)
.map(|val| Timestamp::new(val, unit))
}
fn align_to_ceil_by_bucket(self, bucket_duration: i64) -> Option<Self> {
assert!(bucket_duration > 0, "{}", bucket_duration);
let unit = self.unit();
self.value()
.align_to_ceil_by_bucket(bucket_duration)
.map(|val| Timestamp::new(val, unit))
}
}
#[cfg(test)]
@@ -180,4 +200,31 @@ mod tests {
Timestamp::new_millisecond(i64::MIN).align_by_bucket(bucket)
);
}
#[test]
fn test_align_to_ceil() {
assert_eq!(None, i64::MAX.align_to_ceil_by_bucket(10));
assert_eq!(
Some(i64::MAX - (i64::MAX % 10)),
(i64::MAX - (i64::MAX % 10)).align_to_ceil_by_bucket(10)
);
assert_eq!(Some(i64::MAX), i64::MAX.align_to_ceil_by_bucket(1));
assert_eq!(Some(i64::MAX), i64::MAX.align_to_ceil_by_bucket(1));
assert_eq!(Some(i64::MAX), i64::MAX.align_to_ceil_by_bucket(i64::MAX));
assert_eq!(
Some(i64::MIN - (i64::MIN % 10)),
i64::MIN.align_to_ceil_by_bucket(10)
);
assert_eq!(Some(i64::MIN), i64::MIN.align_to_ceil_by_bucket(1));
assert_eq!(Some(3), 1i64.align_to_ceil_by_bucket(3));
assert_eq!(Some(3), 3i64.align_to_ceil_by_bucket(3));
assert_eq!(Some(6), 4i64.align_to_ceil_by_bucket(3));
assert_eq!(Some(0), 0i64.align_to_ceil_by_bucket(3));
assert_eq!(Some(0), (-1i64).align_to_ceil_by_bucket(3));
assert_eq!(Some(0), (-2i64).align_to_ceil_by_bucket(3));
assert_eq!(Some(-3), (-3i64).align_to_ceil_by_bucket(3));
assert_eq!(Some(-3), (-4i64).align_to_ceil_by_bucket(3));
}
}

View File

@@ -44,7 +44,7 @@ use query::query_engine::{QueryEngineFactory, QueryEngineRef};
use servers::Mode;
use session::context::QueryContext;
use snafu::prelude::*;
use storage::compaction::{CompactionHandler, CompactionSchedulerRef, SimplePicker};
use storage::compaction::{CompactionHandler, CompactionSchedulerRef};
use storage::config::EngineConfig as StorageEngineConfig;
use storage::scheduler::{LocalScheduler, SchedulerConfig};
use storage::EngineImpl;
@@ -395,9 +395,8 @@ impl Instance {
}
fn create_compaction_scheduler<S: LogStore>(opts: &DatanodeOptions) -> CompactionSchedulerRef<S> {
let picker = SimplePicker::default();
let config = SchedulerConfig::from(opts);
let handler = CompactionHandler { picker };
let handler = CompactionHandler::default();
let scheduler = LocalScheduler::new(config, handler);
Arc::new(scheduler)
}

View File

@@ -33,8 +33,8 @@ use snafu::{ensure, OptionExt, ResultExt};
use storage::manifest::manifest_compress_type;
use store_api::storage::{
CloseOptions, ColumnDescriptorBuilder, ColumnFamilyDescriptor, ColumnFamilyDescriptorBuilder,
ColumnId, EngineContext as StorageEngineContext, OpenOptions, RegionNumber, RowKeyDescriptor,
RowKeyDescriptorBuilder, StorageEngine,
ColumnId, CompactionStrategy, EngineContext as StorageEngineContext, OpenOptions, RegionNumber,
RowKeyDescriptor, RowKeyDescriptorBuilder, StorageEngine,
};
use table::engine::{
region_name, table_dir, CloseTableResult, EngineContext, TableEngine, TableEngineProcedure,
@@ -417,6 +417,7 @@ impl<S: StorageEngine> MitoEngineInner<S> {
.await.map_err(BoxedError::new)
.context(table_error::TableOperationSnafu)? else { return Ok(None) };
let compaction_strategy = CompactionStrategy::from(&table_info.meta.options.extra_options);
let opts = OpenOptions {
parent_dir: table_dir.to_string(),
write_buffer_size: table_info
@@ -425,6 +426,7 @@ impl<S: StorageEngine> MitoEngineInner<S> {
.write_buffer_size
.map(|s| s.0 as usize),
ttl: table_info.meta.options.ttl,
compaction_strategy,
};
debug!(
@@ -501,6 +503,7 @@ impl<S: StorageEngine> MitoEngineInner<S> {
table: name,
};
let compaction_strategy = CompactionStrategy::from(&table_info.meta.options.extra_options);
let opts = OpenOptions {
parent_dir: table_dir.to_string(),
write_buffer_size: table_info
@@ -509,6 +512,7 @@ impl<S: StorageEngine> MitoEngineInner<S> {
.write_buffer_size
.map(|s| s.0 as usize),
ttl: table_info.meta.options.ttl,
compaction_strategy,
};
// TODO(weny): Returns an error earlier if the target region does not exist in the meta.

View File

@@ -24,8 +24,8 @@ use datatypes::schema::{Schema, SchemaRef};
use serde::{Deserialize, Serialize};
use snafu::{ensure, ResultExt};
use store_api::storage::{
ColumnId, CreateOptions, EngineContext, OpenOptions, RegionDescriptorBuilder, RegionNumber,
StorageEngine,
ColumnId, CompactionStrategy, CreateOptions, EngineContext, OpenOptions,
RegionDescriptorBuilder, RegionNumber, StorageEngine,
};
use table::engine::{region_id, table_dir};
use table::metadata::{TableInfoBuilder, TableMetaBuilder, TableType};
@@ -232,15 +232,18 @@ impl<S: StorageEngine> TableCreator<S> {
let table_options = &self.data.request.table_options;
let write_buffer_size = table_options.write_buffer_size.map(|size| size.0 as usize);
let ttl = table_options.ttl;
let compaction_strategy = CompactionStrategy::from(&table_options.extra_options);
let open_opts = OpenOptions {
parent_dir: table_dir.to_string(),
write_buffer_size,
ttl,
compaction_strategy: compaction_strategy.clone(),
};
let create_opts = CreateOptions {
parent_dir: table_dir.to_string(),
write_buffer_size,
ttl,
compaction_strategy,
};
let primary_key_indices = &self.data.request.primary_key_indices;

View File

@@ -15,17 +15,182 @@
pub mod noop;
mod picker;
mod scheduler;
mod strategy;
mod task;
mod twcs;
mod writer;
use std::sync::Arc;
pub use picker::{Picker, PickerContext, SimplePicker};
use common_telemetry::tracing::log::warn;
use common_time::timestamp::TimeUnit;
use common_time::Timestamp;
pub use picker::{LeveledTimeWindowPicker, Picker, PickerContext};
pub use scheduler::{CompactionHandler, CompactionRequestImpl};
use store_api::logstore::LogStore;
use store_api::storage::CompactionStrategy;
pub use task::{CompactionTask, CompactionTaskImpl};
pub use twcs::TwcsPicker;
use crate::scheduler::Scheduler;
use crate::sst::FileHandle;
pub type CompactionPickerRef<S> =
Arc<dyn Picker<Request = CompactionRequestImpl<S>, Task = CompactionTaskImpl<S>> + Send + Sync>;
pub type CompactionSchedulerRef<S> =
Arc<dyn Scheduler<Request = CompactionRequestImpl<S>> + Send + Sync>;
/// Infers the suitable time bucket duration.
/// Now it simply find the max and min timestamp across all SSTs in level and fit the time span
/// into time bucket.
pub(crate) fn infer_time_bucket<'a>(files: impl Iterator<Item = &'a FileHandle>) -> i64 {
let mut max_ts = Timestamp::new(i64::MIN, TimeUnit::Second);
let mut min_ts = Timestamp::new(i64::MAX, TimeUnit::Second);
for f in files {
if let Some((start, end)) = f.time_range() {
min_ts = min_ts.min(*start);
max_ts = max_ts.max(*end);
} else {
// we don't expect an SST file without time range,
// it's either a bug or data corruption.
warn!("Found SST file without time range metadata: {f:?}");
}
}
// safety: Convert whatever timestamp into seconds will not cause overflow.
let min_sec = min_ts.convert_to(TimeUnit::Second).unwrap().value();
let max_sec = max_ts.convert_to(TimeUnit::Second).unwrap().value();
max_sec
.checked_sub(min_sec)
.map(|span| TIME_BUCKETS.fit_time_bucket(span)) // return the max bucket on subtraction overflow.
.unwrap_or_else(|| TIME_BUCKETS.max()) // safety: TIME_BUCKETS cannot be empty.
}
pub(crate) struct TimeBuckets([i64; 7]);
impl TimeBuckets {
/// Fits a given time span into time bucket by find the minimum bucket that can cover the span.
/// Returns the max bucket if no such bucket can be found.
fn fit_time_bucket(&self, span_sec: i64) -> i64 {
assert!(span_sec >= 0);
match self.0.binary_search(&span_sec) {
Ok(idx) => self.0[idx],
Err(idx) => {
if idx < self.0.len() {
self.0[idx]
} else {
self.0.last().copied().unwrap()
}
}
}
}
#[cfg(test)]
fn get(&self, idx: usize) -> i64 {
self.0[idx]
}
fn max(&self) -> i64 {
self.0.last().copied().unwrap()
}
}
/// A set of predefined time buckets.
pub(crate) const TIME_BUCKETS: TimeBuckets = TimeBuckets([
60 * 60, // one hour
2 * 60 * 60, // two hours
12 * 60 * 60, // twelve hours
24 * 60 * 60, // one day
7 * 24 * 60 * 60, // one week
365 * 24 * 60 * 60, // one year
10 * 365 * 24 * 60 * 60, // ten years
]);
pub fn compaction_strategy_to_picker<S: LogStore>(
strategy: &CompactionStrategy,
) -> CompactionPickerRef<S> {
match strategy {
CompactionStrategy::LeveledTimeWindow => {
Arc::new(LeveledTimeWindowPicker::default()) as Arc<_>
}
CompactionStrategy::Twcs(twcs_opts) => Arc::new(TwcsPicker::new(
twcs_opts.max_active_window_files,
twcs_opts.max_inactive_window_files,
twcs_opts.time_window_seconds,
)) as Arc<_>,
}
}
#[cfg(test)]
mod tests {
use common_time::Timestamp;
use super::*;
use crate::file_purger::noop::new_noop_file_purger;
use crate::sst::{FileHandle, FileId, FileMeta, Level};
/// Test util to create file handles.
pub fn new_file_handle(
file_id: FileId,
start_ts_millis: i64,
end_ts_millis: i64,
level: Level,
) -> FileHandle {
let file_purger = new_noop_file_purger();
let layer = Arc::new(crate::test_util::access_layer_util::MockAccessLayer {});
FileHandle::new(
FileMeta {
region_id: 0,
file_id,
time_range: Some((
Timestamp::new_millisecond(start_ts_millis),
Timestamp::new_millisecond(end_ts_millis),
)),
level,
file_size: 0,
},
layer,
file_purger,
)
}
#[test]
fn test_time_bucket() {
assert_eq!(TIME_BUCKETS.get(0), TIME_BUCKETS.fit_time_bucket(1));
assert_eq!(TIME_BUCKETS.get(0), TIME_BUCKETS.fit_time_bucket(60 * 60));
assert_eq!(
TIME_BUCKETS.get(1),
TIME_BUCKETS.fit_time_bucket(60 * 60 + 1)
);
assert_eq!(
TIME_BUCKETS.get(2),
TIME_BUCKETS.fit_time_bucket(TIME_BUCKETS.get(2) - 1)
);
assert_eq!(
TIME_BUCKETS.get(2),
TIME_BUCKETS.fit_time_bucket(TIME_BUCKETS.get(2))
);
assert_eq!(
TIME_BUCKETS.get(3),
TIME_BUCKETS.fit_time_bucket(TIME_BUCKETS.get(3) - 1)
);
assert_eq!(TIME_BUCKETS.get(6), TIME_BUCKETS.fit_time_bucket(i64::MAX));
}
#[test]
fn test_infer_time_buckets() {
assert_eq!(
TIME_BUCKETS.get(0),
infer_time_bucket(
[
new_file_handle(FileId::random(), 0, TIME_BUCKETS.get(0) * 1000 - 1, 0),
new_file_handle(FileId::random(), 1, 10_000, 0)
]
.iter()
)
);
}
}

View File

@@ -12,30 +12,49 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use std::fmt::{Debug, Formatter};
use std::marker::PhantomData;
use std::sync::Arc;
use std::time::Duration;
use common_telemetry::tracing::log::warn;
use common_telemetry::{debug, error, info};
use common_time::timestamp::TimeUnit;
use common_time::timestamp_millis::BucketAligned;
use common_time::Timestamp;
use snafu::ResultExt;
use store_api::logstore::LogStore;
use crate::compaction::infer_time_bucket;
use crate::compaction::scheduler::CompactionRequestImpl;
use crate::compaction::strategy::{SimpleTimeWindowStrategy, StrategyRef};
use crate::compaction::task::{CompactionTask, CompactionTaskImpl};
use crate::error::TtlCalculationSnafu;
use crate::compaction::task::{CompactionOutput, CompactionTask, CompactionTaskImpl};
use crate::error::{Result, TtlCalculationSnafu};
use crate::scheduler::Request;
use crate::sst::{FileHandle, Level};
use crate::version::LevelMetasRef;
use crate::sst::{FileHandle, LevelMeta};
/// Picker picks input SST files and builds the compaction task.
/// Different compaction strategy may implement different pickers.
pub trait Picker: Send + 'static {
pub trait Picker: Debug + Send + 'static {
type Request: Request;
type Task: CompactionTask;
fn pick(&self, req: &Self::Request) -> crate::error::Result<Option<Self::Task>>;
fn pick(&self, req: &Self::Request) -> Result<Option<Self::Task>>;
}
pub(crate) fn get_expired_ssts(
levels: &[LevelMeta],
ttl: Option<Duration>,
now: Timestamp,
) -> Result<Vec<FileHandle>> {
let Some(ttl) = ttl else { return Ok(vec![]); };
let expire_time = now.sub_duration(ttl).context(TtlCalculationSnafu)?;
let expired_ssts = levels
.iter()
.flat_map(|l| l.get_expired_files(&expire_time).into_iter())
.collect();
Ok(expired_ssts)
}
pub struct PickerContext {
@@ -54,56 +73,40 @@ impl PickerContext {
}
}
/// L0 -> L1 compaction based on time windows.
pub struct SimplePicker<S> {
strategy: StrategyRef,
/// `LeveledTimeWindowPicker` only handles level 0 to level 1 compaction in a time-window tiered
/// manner. It picks all SSTs in level 0 and writes rows in these SSTs to a new file partitioned
/// by a inferred time bucket in level 1.
pub struct LeveledTimeWindowPicker<S> {
_phantom_data: PhantomData<S>,
}
impl<S> Default for SimplePicker<S> {
fn default() -> Self {
Self::new(Arc::new(SimpleTimeWindowStrategy {}))
impl<S> Debug for LeveledTimeWindowPicker<S> {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "LeveledTimeWindowPicker{{..}}")
}
}
impl<S> SimplePicker<S> {
pub fn new(strategy: StrategyRef) -> Self {
impl<S> Default for LeveledTimeWindowPicker<S> {
fn default() -> Self {
Self::new()
}
}
impl<S> LeveledTimeWindowPicker<S> {
pub fn new() -> Self {
Self {
strategy,
_phantom_data: Default::default(),
}
}
fn get_expired_ssts(
&self,
levels: &LevelMetasRef,
ttl: Option<Duration>,
) -> crate::error::Result<Vec<FileHandle>> {
let Some(ttl) = ttl else { return Ok(vec![]); };
let expire_time = Timestamp::current_millis()
.sub_duration(ttl)
.context(TtlCalculationSnafu)?;
let mut expired_ssts = vec![];
for level in 0..levels.level_num() {
expired_ssts.extend(levels.level(level as Level).get_expired_files(&expire_time));
}
Ok(expired_ssts)
}
}
impl<S: LogStore> Picker for SimplePicker<S> {
impl<S: LogStore> Picker for LeveledTimeWindowPicker<S> {
type Request = CompactionRequestImpl<S>;
type Task = CompactionTaskImpl<S>;
fn pick(
&self,
req: &CompactionRequestImpl<S>,
) -> crate::error::Result<Option<CompactionTaskImpl<S>>> {
fn pick(&self, req: &CompactionRequestImpl<S>) -> Result<Option<CompactionTaskImpl<S>>> {
let levels = &req.levels();
let expired_ssts = self
.get_expired_ssts(levels, req.ttl)
let expired_ssts = get_expired_ssts(levels.levels(), req.ttl, Timestamp::current_millis())
.map_err(|e| {
error!(e;"Failed to get region expired SST files, region: {}, ttl: {:?}", req.region_id, req.ttl);
e
@@ -121,12 +124,16 @@ impl<S: LogStore> Picker for SimplePicker<S> {
let ctx = &PickerContext::with(req.compaction_time_window);
let mut outputs = vec![];
for level_num in 0..levels.level_num() {
let level = levels.level(level_num as u8);
let (compaction_time_window, outputs) = self.strategy.pick(ctx, level);
let compaction_time_window = Self::pick_level(ctx, level, &mut outputs);
if outputs.is_empty() {
debug!("No SST file can be compacted at level {}", level_num);
debug!(
"No SST file can be compacted at level {}, path: {:?}",
level_num, req.sst_layer
);
continue;
}
@@ -151,3 +158,272 @@ impl<S: LogStore> Picker for SimplePicker<S> {
Ok(None)
}
}
impl<S> LeveledTimeWindowPicker<S> {
fn pick_level(
ctx: &PickerContext,
level: &LevelMeta,
results: &mut Vec<CompactionOutput>,
) -> Option<i64> {
// SimpleTimeWindowStrategy only handles level 0 to level 1 compaction.
if level.level() != 0 {
return None;
}
let files = find_compactable_files(level);
debug!("Compactable files found: {:?}", files);
if files.is_empty() {
return None;
}
let time_window = ctx.compaction_time_window().unwrap_or_else(|| {
let inferred = infer_time_bucket(files.iter());
debug!(
"Compaction window is not present, inferring from files: {:?}",
inferred
);
inferred
});
let buckets = calculate_time_buckets(time_window, &files);
debug!("File bucket:{}, file groups: {:?}", time_window, buckets);
results.extend(buckets.into_iter().map(|(bound, files)| CompactionOutput {
output_level: 1,
time_window_bound: bound,
time_window_sec: time_window,
inputs: files,
// strict window is used in simple time window strategy in that rows in one file
// may get compacted to multiple destinations.
strict_window: true,
}));
Some(time_window)
}
}
/// Finds files that can be compacted in given level.
/// Currently they're files that is not currently under compaction.
#[inline]
fn find_compactable_files(level: &LevelMeta) -> Vec<FileHandle> {
level.files().filter(|f| !f.compacting()).cloned().collect()
}
/// Calculates buckets for files. If file does not contain a time range in metadata, it will be
/// assigned to a special bucket `i64::MAX` (normally no timestamp can be aligned to this bucket)
/// so that all files without timestamp can be compacted together.
fn calculate_time_buckets(bucket_sec: i64, files: &[FileHandle]) -> HashMap<i64, Vec<FileHandle>> {
let mut buckets = HashMap::new();
for file in files {
if let Some((start, end)) = file.time_range() {
let bounds = file_time_bucket_span(
start.convert_to(TimeUnit::Second).unwrap().value(),
end.convert_to(TimeUnit::Second).unwrap().value(),
bucket_sec,
);
for bound in bounds {
buckets
.entry(bound)
.or_insert_with(Vec::new)
.push(file.clone());
}
} else {
warn!("Found corrupted SST without timestamp bounds: {:?}", file);
}
}
buckets
}
/// Calculates timestamp span between start and end timestamp.
fn file_time_bucket_span(start_sec: i64, end_sec: i64, bucket_sec: i64) -> Vec<i64> {
assert!(start_sec <= end_sec);
// if timestamp is between `[i64::MIN, i64::MIN.align_by_bucket(bucket)]`, which cannot
// be aligned to a valid i64 bound, simply return `i64::MIN` rather than just underflow.
let mut start_aligned = start_sec.align_by_bucket(bucket_sec).unwrap_or(i64::MIN);
let end_aligned = end_sec.align_by_bucket(bucket_sec).unwrap_or(i64::MIN);
let mut res = Vec::with_capacity(((end_aligned - start_aligned) / bucket_sec + 1) as usize);
while start_aligned < end_aligned {
res.push(start_aligned);
start_aligned += bucket_sec;
}
res.push(end_aligned);
res
}
#[cfg(test)]
mod tests {
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use super::*;
use crate::compaction::tests::new_file_handle;
use crate::compaction::TIME_BUCKETS;
use crate::file_purger::noop::new_noop_file_purger;
use crate::sst::{FileId, Level, LevelMetas};
#[test]
fn test_time_bucket_span() {
assert_eq!(vec![0], file_time_bucket_span(1, 9, 10));
assert_eq!(vec![0, 10], file_time_bucket_span(1, 10, 10));
assert_eq!(vec![-10], file_time_bucket_span(-10, -1, 10));
assert_eq!(vec![-10, 0], file_time_bucket_span(-10, 0, 10));
}
#[test]
fn test_time_bucket_span_large() {
assert_eq!(
vec![
(i64::MAX - 10).align_by_bucket(10).unwrap(),
i64::MAX.align_by_bucket(10).unwrap(),
],
file_time_bucket_span(i64::MAX - 10, i64::MAX, 10)
);
// magic hmmm?
for bucket in 1..100 {
assert_eq!(
vec![
i64::MIN,
(i64::MIN + bucket).align_by_bucket(bucket).unwrap()
],
file_time_bucket_span(i64::MIN, i64::MIN + bucket, bucket)
);
}
}
fn new_file_handles(input: &[(FileId, i64, i64)]) -> Vec<FileHandle> {
input
.iter()
.map(|(file_id, start, end)| new_file_handle(*file_id, *start, *end, 0))
.collect()
}
fn check_bucket_calculation(
bucket_sec: i64,
files: Vec<FileHandle>,
expected: &[(i64, &[FileId])],
) {
let res = calculate_time_buckets(bucket_sec, &files);
let expected = expected
.iter()
.map(|(bucket, file_ids)| (*bucket, file_ids.iter().copied().collect::<HashSet<_>>()))
.collect::<HashMap<_, _>>();
for (bucket, file_ids) in expected {
let actual = res
.get(&bucket)
.unwrap()
.iter()
.map(|f| f.file_id())
.collect();
assert_eq!(
file_ids, actual,
"bucket: {bucket}, expected: {file_ids:?}, actual: {actual:?}",
);
}
}
#[test]
fn test_calculate_time_buckets() {
let file_id_a = FileId::random();
let file_id_b = FileId::random();
// simple case, files with disjoint
check_bucket_calculation(
10,
new_file_handles(&[(file_id_a, 0, 9000), (file_id_b, 10000, 19000)]),
&[(0, &[file_id_a]), (10, &[file_id_b])],
);
// files across buckets
check_bucket_calculation(
10,
new_file_handles(&[(file_id_a, 0, 10001), (file_id_b, 10000, 19000)]),
&[(0, &[file_id_a]), (10, &[file_id_a, file_id_b])],
);
check_bucket_calculation(
10,
new_file_handles(&[(file_id_a, 0, 10000)]),
&[(0, &[file_id_a]), (10, &[file_id_a])],
);
// file with an large time range
let file_id_array = &[file_id_a];
let expected = (0..(TIME_BUCKETS.get(4) / TIME_BUCKETS.get(0)))
.map(|b| (b * TIME_BUCKETS.get(0), file_id_array as _))
.collect::<Vec<_>>();
check_bucket_calculation(
TIME_BUCKETS.get(0),
new_file_handles(&[(file_id_a, 0, TIME_BUCKETS.get(4) * 1000)]),
&expected,
);
}
struct TtlTester {
files: Vec<(FileId, i64, i64, Level)>,
ttl: Option<Duration>,
expired: Vec<usize>,
now: Timestamp,
}
impl TtlTester {
fn check(&self) {
let expected_expired = self
.expired
.iter()
.map(|idx| self.files[*idx].0)
.collect::<HashSet<_>>();
let file_purger = new_noop_file_purger();
let layer = Arc::new(crate::test_util::access_layer_util::MockAccessLayer {});
let file_handles = self
.files
.iter()
.map(|(file_id, start_ts, end_ts, level)| {
new_file_handle(*file_id, *start_ts, *end_ts, *level).meta()
})
.collect::<Vec<_>>();
let levels = LevelMetas::new(layer, file_purger).merge(
file_handles.into_iter(),
vec![].into_iter(),
None,
);
let expired = get_expired_ssts(levels.levels(), self.ttl, self.now)
.unwrap()
.into_iter()
.map(|f| f.file_id())
.collect::<HashSet<_>>();
assert_eq!(expected_expired, expired);
}
}
#[test]
fn test_find_expired_ssts() {
TtlTester {
files: vec![
(FileId::random(), 8000, 9000, 0),
(FileId::random(), 10000, 11000, 0),
(FileId::random(), 8000, 11000, 1),
(FileId::random(), 2000, 3000, 1),
],
ttl: Some(Duration::from_secs(1)),
expired: vec![3],
now: Timestamp::new_second(10),
}
.check();
TtlTester {
files: vec![
(FileId::random(), 8000, 8999, 0),
(FileId::random(), 10000, 11000, 0),
(FileId::random(), 8000, 11000, 1),
(FileId::random(), 2000, 3000, 1),
],
ttl: Some(Duration::from_secs(1)),
expired: vec![0, 3],
now: Timestamp::new_second(10),
}
.check();
}
}

View File

@@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::marker::PhantomData;
use std::sync::Arc;
use std::time::Duration;
@@ -22,8 +23,8 @@ use store_api::storage::RegionId;
use tokio::sync::oneshot::Sender;
use tokio::sync::Notify;
use crate::compaction::picker::Picker;
use crate::compaction::task::CompactionTask;
use crate::compaction::CompactionPickerRef;
use crate::error::Result;
use crate::manifest::region::RegionManifest;
use crate::region::{RegionWriterRef, SharedDataRef};
@@ -63,7 +64,7 @@ pub struct CompactionRequestImpl<S: LogStore> {
pub compaction_time_window: Option<i64>,
/// Compaction result sender.
pub sender: Option<Sender<Result<()>>>,
pub picker: CompactionPickerRef<S>,
pub sst_write_buffer_size: ReadableSize,
}
@@ -79,18 +80,40 @@ impl<S: LogStore> CompactionRequestImpl<S> {
}
}
pub struct CompactionHandler<P> {
pub picker: P,
pub struct CompactionHandler<S: LogStore> {
_phantom_data: PhantomData<S>,
#[cfg(test)]
pub pending_tasks: Arc<tokio::sync::RwLock<Vec<tokio::task::JoinHandle<()>>>>,
}
impl<S: LogStore> Default for CompactionHandler<S> {
fn default() -> Self {
Self {
_phantom_data: Default::default(),
#[cfg(test)]
pending_tasks: Arc::new(Default::default()),
}
}
}
impl<S: LogStore> CompactionHandler<S> {
#[cfg(test)]
pub fn new_with_pending_tasks(
tasks: Arc<tokio::sync::RwLock<Vec<tokio::task::JoinHandle<()>>>>,
) -> Self {
Self {
_phantom_data: Default::default(),
pending_tasks: tasks,
}
}
}
#[async_trait::async_trait]
impl<P> Handler for CompactionHandler<P>
impl<S> Handler for CompactionHandler<S>
where
P: Picker + Send + Sync,
S: LogStore,
{
type Request = P::Request;
type Request = CompactionRequestImpl<S>;
async fn handle_request(
&self,
@@ -99,7 +122,7 @@ where
finish_notifier: Arc<Notify>,
) -> Result<()> {
let region_id = req.key();
let Some(task) = self.picker.pick(&req)? else {
let Some(task) = req.picker.pick(&req)? else {
info!("No file needs compaction in region: {:?}", region_id);
req.complete(Ok(()));
return Ok(());

View File

@@ -1,327 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use std::sync::Arc;
use common_telemetry::{debug, warn};
use common_time::timestamp::TimeUnit;
use common_time::timestamp_millis::BucketAligned;
use common_time::Timestamp;
use crate::compaction::picker::PickerContext;
use crate::compaction::task::CompactionOutput;
use crate::sst::{FileHandle, LevelMeta};
/// Compaction strategy that defines which SSTs need to be compacted at given level.
pub trait Strategy {
fn pick(&self, ctx: &PickerContext, level: &LevelMeta) -> (Option<i64>, Vec<CompactionOutput>);
}
pub type StrategyRef = Arc<dyn Strategy + Send + Sync>;
/// SimpleTimeWindowStrategy only handles level 0 to level 1 compaction in a time-window tiered
/// manner. It picks all SSTs in level 0 and writes rows in these SSTs to a new file partitioned
/// by a inferred time bucket in level 1.
pub struct SimpleTimeWindowStrategy {}
impl Strategy for SimpleTimeWindowStrategy {
fn pick(&self, ctx: &PickerContext, level: &LevelMeta) -> (Option<i64>, Vec<CompactionOutput>) {
// SimpleTimeWindowStrategy only handles level 0 to level 1 compaction.
if level.level() != 0 {
return (None, vec![]);
}
let files = find_compactable_files(level);
debug!("Compactable files found: {:?}", files);
if files.is_empty() {
return (None, vec![]);
}
let time_window = ctx.compaction_time_window().unwrap_or_else(|| {
let inferred = infer_time_bucket(&files);
debug!(
"Compaction window is not present, inferring from files: {:?}",
inferred
);
inferred
});
let buckets = calculate_time_buckets(time_window, &files);
debug!("File bucket:{}, file groups: {:?}", time_window, buckets);
(
Some(time_window),
buckets
.into_iter()
.map(|(bound, files)| CompactionOutput {
output_level: 1,
bucket_bound: bound,
bucket: time_window,
inputs: files,
})
.collect(),
)
}
}
/// Finds files that can be compacted in given level.
/// Currently they're files that is not currently under compaction.
#[inline]
fn find_compactable_files(level: &LevelMeta) -> Vec<FileHandle> {
level.files().filter(|f| !f.compacting()).cloned().collect()
}
/// Calculates buckets for files. If file does not contain a time range in metadata, it will be
/// assigned to a special bucket `i64::MAX` (normally no timestamp can be aligned to this bucket)
/// so that all files without timestamp can be compacted together.
fn calculate_time_buckets(bucket_sec: i64, files: &[FileHandle]) -> HashMap<i64, Vec<FileHandle>> {
let mut buckets = HashMap::new();
for file in files {
if let Some((start, end)) = file.time_range() {
let bounds = file_time_bucket_span(
start.convert_to(TimeUnit::Second).unwrap().value(),
end.convert_to(TimeUnit::Second).unwrap().value(),
bucket_sec,
);
for bound in bounds {
buckets
.entry(bound)
.or_insert_with(Vec::new)
.push(file.clone());
}
} else {
warn!("Found corrupted SST without timestamp bounds: {:?}", file);
}
}
buckets
}
/// Calculates timestamp span between start and end timestamp.
fn file_time_bucket_span(start_sec: i64, end_sec: i64, bucket_sec: i64) -> Vec<i64> {
assert!(start_sec <= end_sec);
// if timestamp is between `[i64::MIN, i64::MIN.align_by_bucket(bucket)]`, which cannot
// be aligned to a valid i64 bound, simply return `i64::MIN` rather than just underflow.
let mut start_aligned = start_sec.align_by_bucket(bucket_sec).unwrap_or(i64::MIN);
let end_aligned = end_sec.align_by_bucket(bucket_sec).unwrap_or(i64::MIN);
let mut res = Vec::with_capacity(((end_aligned - start_aligned) / bucket_sec + 1) as usize);
while start_aligned < end_aligned {
res.push(start_aligned);
start_aligned += bucket_sec;
}
res.push(end_aligned);
res
}
/// Infers the suitable time bucket duration.
/// Now it simply find the max and min timestamp across all SSTs in level and fit the time span
/// into time bucket.
fn infer_time_bucket(files: &[FileHandle]) -> i64 {
let mut max_ts = &Timestamp::new(i64::MIN, TimeUnit::Second);
let mut min_ts = &Timestamp::new(i64::MAX, TimeUnit::Second);
for f in files {
if let Some((start, end)) = f.time_range() {
min_ts = min_ts.min(start);
max_ts = max_ts.max(end);
} else {
// we don't expect an SST file without time range,
// it's either a bug or data corruption.
warn!("Found SST file without time range metadata: {f:?}");
}
}
// safety: Convert whatever timestamp into seconds will not cause overflow.
let min_sec = min_ts.convert_to(TimeUnit::Second).unwrap().value();
let max_sec = max_ts.convert_to(TimeUnit::Second).unwrap().value();
max_sec
.checked_sub(min_sec)
.map(fit_time_bucket) // return the max bucket on subtraction overflow.
.unwrap_or_else(|| *TIME_BUCKETS.last().unwrap()) // safety: TIME_BUCKETS cannot be empty.
}
/// A set of predefined time buckets.
const TIME_BUCKETS: [i64; 7] = [
60 * 60, // one hour
2 * 60 * 60, // two hours
12 * 60 * 60, // twelve hours
24 * 60 * 60, // one day
7 * 24 * 60 * 60, // one week
365 * 24 * 60 * 60, // one year
10 * 365 * 24 * 60 * 60, // ten years
];
/// Fits a given time span into time bucket by find the minimum bucket that can cover the span.
/// Returns the max bucket if no such bucket can be found.
fn fit_time_bucket(span_sec: i64) -> i64 {
assert!(span_sec >= 0);
for b in TIME_BUCKETS {
if b >= span_sec {
return b;
}
}
*TIME_BUCKETS.last().unwrap()
}
#[cfg(test)]
mod tests {
use std::collections::{HashMap, HashSet};
use super::*;
use crate::file_purger::noop::new_noop_file_purger;
use crate::sst::{FileId, FileMeta};
#[test]
fn test_time_bucket_span() {
assert_eq!(vec![0], file_time_bucket_span(1, 9, 10));
assert_eq!(vec![0, 10], file_time_bucket_span(1, 10, 10));
assert_eq!(vec![-10], file_time_bucket_span(-10, -1, 10));
assert_eq!(vec![-10, 0], file_time_bucket_span(-10, 0, 10));
}
#[test]
fn test_time_bucket_span_large() {
assert_eq!(
vec![
(i64::MAX - 10).align_by_bucket(10).unwrap(),
i64::MAX.align_by_bucket(10).unwrap(),
],
file_time_bucket_span(i64::MAX - 10, i64::MAX, 10)
);
// magic hmmm?
for bucket in 1..100 {
assert_eq!(
vec![
i64::MIN,
(i64::MIN + bucket).align_by_bucket(bucket).unwrap()
],
file_time_bucket_span(i64::MIN, i64::MIN + bucket, bucket)
);
}
}
#[test]
fn test_time_bucket() {
assert_eq!(TIME_BUCKETS[0], fit_time_bucket(1));
assert_eq!(TIME_BUCKETS[0], fit_time_bucket(60 * 60));
assert_eq!(TIME_BUCKETS[1], fit_time_bucket(60 * 60 + 1));
assert_eq!(TIME_BUCKETS[2], fit_time_bucket(TIME_BUCKETS[2] - 1));
assert_eq!(TIME_BUCKETS[2], fit_time_bucket(TIME_BUCKETS[2]));
assert_eq!(TIME_BUCKETS[3], fit_time_bucket(TIME_BUCKETS[3] - 1));
assert_eq!(TIME_BUCKETS[6], fit_time_bucket(i64::MAX));
}
#[test]
fn test_infer_time_buckets() {
assert_eq!(
TIME_BUCKETS[0],
infer_time_bucket(&[
new_file_handle(FileId::random(), 0, TIME_BUCKETS[0] * 1000 - 1),
new_file_handle(FileId::random(), 1, 10_000)
])
);
}
fn new_file_handle(file_id: FileId, start_ts_millis: i64, end_ts_millis: i64) -> FileHandle {
let file_purger = new_noop_file_purger();
let layer = Arc::new(crate::test_util::access_layer_util::MockAccessLayer {});
FileHandle::new(
FileMeta {
region_id: 0,
file_id,
time_range: Some((
Timestamp::new_millisecond(start_ts_millis),
Timestamp::new_millisecond(end_ts_millis),
)),
level: 0,
file_size: 0,
},
layer,
file_purger,
)
}
fn new_file_handles(input: &[(FileId, i64, i64)]) -> Vec<FileHandle> {
input
.iter()
.map(|(file_id, start, end)| new_file_handle(*file_id, *start, *end))
.collect()
}
fn check_bucket_calculation(
bucket_sec: i64,
files: Vec<FileHandle>,
expected: &[(i64, &[FileId])],
) {
let res = calculate_time_buckets(bucket_sec, &files);
let expected = expected
.iter()
.map(|(bucket, file_ids)| (*bucket, file_ids.iter().copied().collect::<HashSet<_>>()))
.collect::<HashMap<_, _>>();
for (bucket, file_ids) in expected {
let actual = res
.get(&bucket)
.unwrap()
.iter()
.map(|f| f.file_id())
.collect();
assert_eq!(
file_ids, actual,
"bucket: {bucket}, expected: {file_ids:?}, actual: {actual:?}",
);
}
}
#[test]
fn test_calculate_time_buckets() {
let file_id_a = FileId::random();
let file_id_b = FileId::random();
// simple case, files with disjoint
check_bucket_calculation(
10,
new_file_handles(&[(file_id_a, 0, 9000), (file_id_b, 10000, 19000)]),
&[(0, &[file_id_a]), (10, &[file_id_b])],
);
// files across buckets
check_bucket_calculation(
10,
new_file_handles(&[(file_id_a, 0, 10001), (file_id_b, 10000, 19000)]),
&[(0, &[file_id_a]), (10, &[file_id_a, file_id_b])],
);
check_bucket_calculation(
10,
new_file_handles(&[(file_id_a, 0, 10000)]),
&[(0, &[file_id_a]), (10, &[file_id_a])],
);
// file with an large time range
let file_id_array = &[file_id_a];
let expected = (0..(TIME_BUCKETS[4] / TIME_BUCKETS[0]))
.map(|b| (b * TIME_BUCKETS[0], file_id_array as _))
.collect::<Vec<_>>();
check_bucket_calculation(
TIME_BUCKETS[0],
new_file_handles(&[(file_id_a, 0, TIME_BUCKETS[4] * 1000)]),
&expected,
);
}
}

View File

@@ -169,13 +169,15 @@ impl<S: LogStore> CompactionTask for CompactionTaskImpl<S> {
#[derive(Debug)]
pub struct CompactionOutput {
/// Compaction output file level.
pub(crate) output_level: Level,
/// The left bound of time bucket.
pub(crate) bucket_bound: i64,
/// Bucket duration in seconds.
pub(crate) bucket: i64,
pub output_level: Level,
/// The left bound of time window.
pub time_window_bound: i64,
/// Time window size in seconds.
pub time_window_sec: i64,
/// Compaction input files.
pub(crate) inputs: Vec<FileHandle>,
pub inputs: Vec<FileHandle>,
/// If the compaction output is strictly windowed.
pub strict_window: bool,
}
impl CompactionOutput {
@@ -186,13 +188,21 @@ impl CompactionOutput {
sst_layer: AccessLayerRef,
sst_write_buffer_size: ReadableSize,
) -> Result<Option<FileMeta>> {
let time_range = if self.strict_window {
(
Some(self.time_window_bound),
Some(self.time_window_bound + self.time_window_sec),
)
} else {
(None, None)
};
let reader = build_sst_reader(
region_id,
schema,
sst_layer.clone(),
&self.inputs,
self.bucket_bound,
self.bucket_bound + self.bucket,
time_range,
)
.await?;

View File

@@ -0,0 +1,398 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Time-window compaction strategy
use std::collections::BTreeMap;
use std::fmt::{Debug, Formatter};
use std::marker::PhantomData;
use common_telemetry::tracing::warn;
use common_telemetry::{debug, info};
use common_time::timestamp::TimeUnit;
use common_time::timestamp_millis::BucketAligned;
use common_time::Timestamp;
use store_api::logstore::LogStore;
use crate::compaction::picker::get_expired_ssts;
use crate::compaction::task::CompactionOutput;
use crate::compaction::{infer_time_bucket, CompactionRequestImpl, CompactionTaskImpl, Picker};
use crate::sst::{FileHandle, LevelMeta};
/// `TwcsPicker` picks files of which the max timestamp are in the same time window as compaction
/// candidates.
pub struct TwcsPicker<S> {
max_active_window_files: usize,
max_inactive_window_files: usize,
time_window_seconds: Option<i64>,
_phantom_data: PhantomData<S>,
}
impl<S> Debug for TwcsPicker<S> {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("TwcsPicker")
.field("max_active_window_files", &self.max_active_window_files)
.field("max_inactive_window_files", &self.max_inactive_window_files)
.finish()
}
}
impl<S> TwcsPicker<S> {
pub fn new(
max_active_window_files: usize,
max_inactive_window_files: usize,
time_window_seconds: Option<i64>,
) -> Self {
Self {
max_inactive_window_files,
max_active_window_files,
_phantom_data: Default::default(),
time_window_seconds,
}
}
/// Builds compaction output from files.
/// For active writing window, we allow for at most `max_active_window_files` files to alleviate
/// fragmentation. For other windows, we allow at most 1 file at each window.
fn build_output(
&self,
time_windows: &BTreeMap<i64, Vec<FileHandle>>,
active_window: Option<i64>,
window_size: i64,
) -> Vec<CompactionOutput> {
let mut output = vec![];
for (window, files) in time_windows {
if let Some(active_window) = active_window && *window == active_window {
if files.len() > self.max_active_window_files {
output.push(CompactionOutput {
output_level: 1, // we only have two levels and always compact to l1
time_window_bound: *window,
time_window_sec: window_size,
inputs: files.clone(),
// Strict window is not needed since we always compact many files to one
// single file in TWCS.
strict_window: false,
});
} else {
debug!("Active window not present or no enough files in active window {:?}", active_window);
}
} else {
// not active writing window
if files.len() > self.max_inactive_window_files {
output.push(CompactionOutput {
output_level: 1,
time_window_bound: *window,
time_window_sec: window_size,
inputs: files.clone(),
strict_window: false,
});
}
}
}
output
}
}
impl<S: LogStore> Picker for TwcsPicker<S> {
type Request = CompactionRequestImpl<S>;
type Task = CompactionTaskImpl<S>;
fn pick(&self, req: &Self::Request) -> crate::error::Result<Option<Self::Task>> {
let levels = req.levels();
let expired_ssts = get_expired_ssts(levels.levels(), req.ttl, Timestamp::current_millis())?;
if !expired_ssts.is_empty() {
info!(
"Expired SSTs in region {}: {:?}",
req.region_id, expired_ssts
);
// here we mark expired SSTs as compacting to avoid them being picked.
expired_ssts.iter().for_each(|f| f.mark_compacting(true));
}
let time_window_size = req
.compaction_time_window
.or(self.time_window_seconds)
.unwrap_or_else(|| {
let inferred = infer_time_bucket(req.levels().level(0).files());
info!(
"Compaction window for region {} is not present, inferring from files: {:?}",
req.region_id, inferred
);
inferred
});
// Find active window from files in level 0.
let active_window =
find_latest_window_in_seconds(levels.level(0).files(), time_window_size);
let windows = assign_to_windows(
levels.levels().iter().flat_map(LevelMeta::files),
time_window_size,
);
let outputs = self.build_output(&windows, active_window, time_window_size);
let task = CompactionTaskImpl {
schema: req.schema(),
sst_layer: req.sst_layer.clone(),
outputs,
writer: req.writer.clone(),
shared_data: req.shared.clone(),
wal: req.wal.clone(),
manifest: req.manifest.clone(),
expired_ssts,
sst_write_buffer_size: req.sst_write_buffer_size,
compaction_time_window: Some(time_window_size),
};
Ok(Some(task))
}
}
/// Assigns files to windows with predefined window size (in seconds) by their max timestamps.
fn assign_to_windows<'a>(
files: impl Iterator<Item = &'a FileHandle>,
time_window_size: i64,
) -> BTreeMap<i64, Vec<FileHandle>> {
let mut windows: BTreeMap<i64, Vec<FileHandle>> = BTreeMap::new();
// Iterates all files and assign to time windows according to max timestamp
for file in files {
if let Some((_, end)) = file.time_range() {
let time_window = end
.convert_to(TimeUnit::Second)
.unwrap()
.value()
.align_to_ceil_by_bucket(time_window_size)
.unwrap_or(i64::MIN);
windows.entry(time_window).or_default().push(file.clone());
} else {
warn!("Unexpected file w/o timestamp: {:?}", file.file_id());
}
}
windows
}
/// Finds the latest active writing window among all files.
/// Returns `None` when there are no files or all files are corrupted.
fn find_latest_window_in_seconds<'a>(
files: impl Iterator<Item = &'a FileHandle>,
time_window_size: i64,
) -> Option<i64> {
let mut latest_timestamp = None;
for f in files {
if let Some((_, end)) = f.time_range() {
if let Some(latest) = latest_timestamp && end > latest {
latest_timestamp = Some(end);
} else {
latest_timestamp = Some(end);
}
} else {
warn!("Cannot find timestamp range of file: {}", f.file_id());
}
}
latest_timestamp
.and_then(|ts| ts.convert_to_ceil(TimeUnit::Second))
.and_then(|ts| ts.value().align_to_ceil_by_bucket(time_window_size))
}
#[cfg(test)]
mod tests {
use std::collections::HashSet;
use log_store::NoopLogStore;
use super::*;
use crate::compaction::tests::new_file_handle;
use crate::sst::{FileId, Level};
#[test]
fn test_get_latest_window_in_seconds() {
assert_eq!(
Some(1),
find_latest_window_in_seconds([new_file_handle(FileId::random(), 0, 999, 0)].iter(), 1)
);
assert_eq!(
Some(1),
find_latest_window_in_seconds(
[new_file_handle(FileId::random(), 0, 1000, 0)].iter(),
1
)
);
assert_eq!(
Some(-9223372036854000),
find_latest_window_in_seconds(
[new_file_handle(FileId::random(), i64::MIN, i64::MIN + 1, 0)].iter(),
3600,
)
);
assert_eq!(
(i64::MAX / 10000000 + 1) * 10000,
find_latest_window_in_seconds(
[new_file_handle(FileId::random(), i64::MIN, i64::MAX, 0)].iter(),
10000,
)
.unwrap()
);
}
#[test]
fn test_assign_to_windows() {
let windows = assign_to_windows(
[
new_file_handle(FileId::random(), 0, 999, 0),
new_file_handle(FileId::random(), 0, 999, 0),
new_file_handle(FileId::random(), 0, 999, 0),
new_file_handle(FileId::random(), 0, 999, 0),
new_file_handle(FileId::random(), 0, 999, 0),
]
.iter(),
3,
);
assert_eq!(5, windows.get(&0).unwrap().len());
let files = [FileId::random(); 3];
let windows = assign_to_windows(
[
new_file_handle(files[0], -2000, -3, 0),
new_file_handle(files[1], 0, 2999, 0),
new_file_handle(files[2], 50, 10001, 0),
]
.iter(),
3,
);
assert_eq!(files[0], windows.get(&0).unwrap().get(0).unwrap().file_id());
assert_eq!(files[1], windows.get(&3).unwrap().get(0).unwrap().file_id());
assert_eq!(
files[2],
windows.get(&12).unwrap().get(0).unwrap().file_id()
);
}
struct CompactionPickerTestCase {
window_size: i64,
input_files: Vec<FileHandle>,
expected_outputs: Vec<ExpectedOutput>,
}
impl CompactionPickerTestCase {
fn check(&self) {
let windows = assign_to_windows(self.input_files.iter(), self.window_size);
let active_window =
find_latest_window_in_seconds(self.input_files.iter(), self.window_size);
let output = TwcsPicker::<NoopLogStore>::new(4, 1, None).build_output(
&windows,
active_window,
self.window_size,
);
let output = output
.iter()
.map(|o| {
let input_file_ids =
o.inputs.iter().map(|f| f.file_id()).collect::<HashSet<_>>();
(
input_file_ids,
o.output_level,
o.time_window_sec,
o.time_window_bound,
o.strict_window,
)
})
.collect::<Vec<_>>();
let expected = self
.expected_outputs
.iter()
.map(|o| {
let input_file_ids = o
.input_files
.iter()
.map(|idx| self.input_files[*idx].file_id())
.collect::<HashSet<_>>();
(
input_file_ids,
o.output_level,
o.time_window_sec,
o.time_window_bound,
o.strict_window,
)
})
.collect::<Vec<_>>();
assert_eq!(expected, output);
}
}
struct ExpectedOutput {
input_files: Vec<usize>,
output_level: Level,
time_window_sec: i64,
time_window_bound: i64,
strict_window: bool,
}
#[test]
fn test_build_twcs_output() {
let file_ids = (0..4).map(|_| FileId::random()).collect::<Vec<_>>();
CompactionPickerTestCase {
window_size: 3,
input_files: [
new_file_handle(file_ids[0], -2000, -3, 0),
new_file_handle(file_ids[1], -3000, -100, 0),
new_file_handle(file_ids[2], 0, 2999, 0), //active windows
new_file_handle(file_ids[3], 50, 2998, 0), //active windows
]
.to_vec(),
expected_outputs: vec![ExpectedOutput {
input_files: vec![0, 1],
output_level: 1,
time_window_sec: 3,
time_window_bound: 0,
strict_window: false,
}],
}
.check();
let file_ids = (0..6).map(|_| FileId::random()).collect::<Vec<_>>();
CompactionPickerTestCase {
window_size: 3,
input_files: [
new_file_handle(file_ids[0], -2000, -3, 0),
new_file_handle(file_ids[1], -3000, -100, 0),
new_file_handle(file_ids[2], 0, 2999, 0),
new_file_handle(file_ids[3], 50, 2998, 0),
new_file_handle(file_ids[4], 11, 2990, 0),
new_file_handle(file_ids[5], 50, 4998, 0),
]
.to_vec(),
expected_outputs: vec![
ExpectedOutput {
input_files: vec![0, 1],
output_level: 1,
time_window_sec: 3,
time_window_bound: 0,
strict_window: false,
},
ExpectedOutput {
input_files: vec![2, 3, 4],
output_level: 1,
time_window_sec: 3,
time_window_bound: 3,
strict_window: false,
},
],
}
.check();
}
}

View File

@@ -29,8 +29,7 @@ pub(crate) async fn build_sst_reader(
schema: RegionSchemaRef,
sst_layer: AccessLayerRef,
files: &[FileHandle],
lower_sec_inclusive: i64,
upper_sec_exclusive: i64,
time_range: (Option<i64>, Option<i64>),
) -> error::Result<ChunkReaderImpl> {
// TODO(hl): Schemas in different SSTs may differ, thus we should infer
// timestamp column name from Parquet metadata.
@@ -43,14 +42,9 @@ pub(crate) async fn build_sst_reader(
ChunkReaderBuilder::new(region_id, schema, sst_layer)
.pick_ssts(files)
.filters(
build_time_range_filter(
lower_sec_inclusive,
upper_sec_exclusive,
&ts_col_name,
ts_col_unit,
)
.into_iter()
.collect(),
build_time_range_filter(time_range, &ts_col_name, ts_col_unit)
.into_iter()
.collect(),
)
.build()
.await
@@ -59,21 +53,22 @@ pub(crate) async fn build_sst_reader(
/// Build time range filter expr from lower (inclusive) and upper bound(exclusive).
/// Returns `None` if time range overflows.
fn build_time_range_filter(
low_sec: i64,
high_sec: i64,
time_range: (Option<i64>, Option<i64>),
ts_col_name: &str,
ts_col_unit: TimeUnit,
) -> Option<Expr> {
debug_assert!(low_sec <= high_sec);
let (low_ts_inclusive, high_ts_exclusive) = time_range;
let ts_col = DfExpr::Column(datafusion_common::Column::from_name(ts_col_name));
// Converting seconds to whatever unit won't lose precision.
// Here only handles overflow.
let low_ts = common_time::Timestamp::new_second(low_sec)
.convert_to(ts_col_unit)
let low_ts = low_ts_inclusive
.map(common_time::Timestamp::new_second)
.and_then(|ts| ts.convert_to(ts_col_unit))
.map(|ts| ts.value());
let high_ts = common_time::Timestamp::new_second(high_sec)
.convert_to(ts_col_unit)
let high_ts = high_ts_exclusive
.map(common_time::Timestamp::new_second)
.and_then(|ts| ts.convert_to(ts_col_unit))
.map(|ts| ts.value());
let expr = match (low_ts, high_ts) {
@@ -296,8 +291,7 @@ mod tests {
schema,
sst_layer,
files,
lower_sec_inclusive,
upper_sec_exclusive,
(Some(lower_sec_inclusive), Some(upper_sec_exclusive)),
)
.await
.unwrap();
@@ -378,9 +372,15 @@ mod tests {
sst_layer: AccessLayerRef,
) -> Vec<i64> {
let mut timestamps = vec![];
let mut reader = build_sst_reader(REGION_ID, schema, sst_layer, files, i64::MIN, i64::MAX)
.await
.unwrap();
let mut reader = build_sst_reader(
REGION_ID,
schema,
sst_layer,
files,
(Some(i64::MIN), Some(i64::MAX)),
)
.await
.unwrap();
while let Some(chunk) = reader.next_chunk().await.unwrap() {
let ts = chunk.columns[0]
.as_any()
@@ -447,8 +447,7 @@ mod tests {
schema.clone(),
sst_layer.clone(),
&input_files,
0,
3,
(Some(0), Some(3)),
)
.await
.unwrap();
@@ -457,8 +456,7 @@ mod tests {
schema.clone(),
sst_layer.clone(),
&input_files,
3,
6,
(Some(3), Some(6)),
)
.await
.unwrap();
@@ -467,8 +465,7 @@ mod tests {
schema.clone(),
sst_layer.clone(),
&input_files,
6,
10,
(Some(6), Some(10)),
)
.await
.unwrap();
@@ -554,7 +551,12 @@ mod tests {
#[test]
fn test_build_time_range_filter() {
assert!(build_time_range_filter(i64::MIN, i64::MAX, "ts", TimeUnit::Nanosecond).is_none());
assert!(build_time_range_filter(
(Some(i64::MIN), Some(i64::MAX)),
"ts",
TimeUnit::Nanosecond
)
.is_none());
assert_eq!(
Expr::from(datafusion_expr::binary_expr(
@@ -562,10 +564,10 @@ mod tests {
Operator::Lt,
datafusion_expr::lit(timestamp_to_scalar_value(
TimeUnit::Nanosecond,
Some(TimeUnit::Second.factor() as i64 / TimeUnit::Nanosecond.factor() as i64)
))
Some(TimeUnit::Second.factor() as i64 / TimeUnit::Nanosecond.factor() as i64),
)),
)),
build_time_range_filter(i64::MIN, 1, "ts", TimeUnit::Nanosecond).unwrap()
build_time_range_filter((Some(i64::MIN), Some(1)), "ts", TimeUnit::Nanosecond).unwrap()
);
assert_eq!(
@@ -576,10 +578,10 @@ mod tests {
TimeUnit::Nanosecond,
Some(
2 * TimeUnit::Second.factor() as i64 / TimeUnit::Nanosecond.factor() as i64
)
))
),
)),
)),
build_time_range_filter(2, i64::MAX, "ts", TimeUnit::Nanosecond).unwrap()
build_time_range_filter((Some(2), Some(i64::MAX)), "ts", TimeUnit::Nanosecond).unwrap()
);
}
}

View File

@@ -23,8 +23,8 @@ use snafu::ResultExt;
use store_api::logstore::LogStore;
use store_api::manifest::Manifest;
use store_api::storage::{
CloseContext, CloseOptions, CreateOptions, EngineContext, OpenOptions, Region,
RegionDescriptor, StorageEngine,
CloseContext, CloseOptions, CompactionStrategy, CreateOptions, EngineContext, OpenOptions,
Region, RegionDescriptor, StorageEngine,
};
use crate::compaction::CompactionSchedulerRef;
@@ -395,6 +395,7 @@ impl<S: LogStore> EngineInner<S> {
name,
&self.config,
opts.ttl,
opts.compaction_strategy.clone(),
)
.await?;
@@ -440,6 +441,7 @@ impl<S: LogStore> EngineInner<S> {
&region_name,
&self.config,
opts.ttl,
opts.compaction_strategy.clone(),
)
.await?;
@@ -471,6 +473,7 @@ impl<S: LogStore> EngineInner<S> {
region_name: &str,
config: &EngineConfig,
region_ttl: Option<Duration>,
compaction_strategy: CompactionStrategy,
) -> Result<StoreConfig<S>> {
let parent_dir = util::normalize_dir(parent_dir);
@@ -503,6 +506,7 @@ impl<S: LogStore> EngineInner<S> {
ttl,
write_buffer_size: write_buffer_size
.unwrap_or(self.config.region_write_buffer_size.as_bytes() as usize),
compaction_strategy,
})
}

View File

@@ -25,7 +25,7 @@ use store_api::storage::{RegionId, SequenceNumber};
use tokio::sync::oneshot::{Receiver, Sender};
use tokio::sync::{oneshot, Notify};
use crate::compaction::{CompactionRequestImpl, CompactionSchedulerRef};
use crate::compaction::{CompactionPickerRef, CompactionRequestImpl, CompactionSchedulerRef};
use crate::config::EngineConfig;
use crate::engine::RegionMap;
use crate::error::{
@@ -109,6 +109,7 @@ pub struct FlushRegionRequest<S: LogStore> {
pub ttl: Option<Duration>,
/// Time window for compaction.
pub compaction_time_window: Option<i64>,
pub compaction_picker: CompactionPickerRef<S>,
}
impl<S: LogStore> FlushRegionRequest<S> {
@@ -146,6 +147,7 @@ impl<S: LogStore> From<&FlushRegionRequest<S>> for CompactionRequestImpl<S> {
ttl: req.ttl,
compaction_time_window: req.compaction_time_window,
sender: None,
picker: req.compaction_picker.clone(),
sst_write_buffer_size: req.engine_config.sst_write_buffer_size,
}
}

View File

@@ -32,11 +32,13 @@ use store_api::manifest::{
self, Manifest, ManifestLogStorage, ManifestVersion, MetaActionIterator,
};
use store_api::storage::{
AlterRequest, CloseContext, FlushContext, FlushReason, OpenOptions, ReadContext, Region,
RegionId, SequenceNumber, WriteContext, WriteResponse,
AlterRequest, CloseContext, CompactionStrategy, FlushContext, FlushReason, OpenOptions,
ReadContext, Region, RegionId, SequenceNumber, WriteContext, WriteResponse,
};
use crate::compaction::CompactionSchedulerRef;
use crate::compaction::{
compaction_strategy_to_picker, CompactionPickerRef, CompactionSchedulerRef,
};
use crate::config::EngineConfig;
use crate::error::{self, Error, Result};
use crate::file_purger::FilePurgerRef;
@@ -164,6 +166,7 @@ pub struct StoreConfig<S: LogStore> {
pub file_purger: FilePurgerRef,
pub ttl: Option<Duration>,
pub write_buffer_size: usize,
pub compaction_strategy: CompactionStrategy,
}
pub type RecoveredMetadata = (SequenceNumber, (ManifestVersion, RawRegionMetadata));
@@ -252,6 +255,7 @@ impl<S: LogStore> RegionImpl<S> {
flush_strategy: store_config.flush_strategy,
flush_scheduler: store_config.flush_scheduler,
compaction_scheduler: store_config.compaction_scheduler,
compaction_picker: compaction_strategy_to_picker(&store_config.compaction_strategy),
sst_layer: store_config.sst_layer,
manifest: store_config.manifest,
});
@@ -336,6 +340,8 @@ impl<S: LogStore> RegionImpl<S> {
store_config.ttl,
store_config.write_buffer_size,
));
let compaction_picker = compaction_strategy_to_picker(&store_config.compaction_strategy);
let writer_ctx = WriterContext {
shared: &shared,
flush_strategy: &store_config.flush_strategy,
@@ -345,6 +351,7 @@ impl<S: LogStore> RegionImpl<S> {
wal: &wal,
writer: &writer,
manifest: &store_config.manifest,
compaction_picker: compaction_picker.clone(),
};
// Replay all unflushed data.
writer
@@ -364,6 +371,7 @@ impl<S: LogStore> RegionImpl<S> {
flush_strategy: store_config.flush_strategy,
flush_scheduler: store_config.flush_scheduler,
compaction_scheduler: store_config.compaction_scheduler,
compaction_picker,
sst_layer: store_config.sst_layer,
manifest: store_config.manifest,
});
@@ -586,6 +594,7 @@ impl<S: LogStore> RegionImpl<S> {
wal: &inner.wal,
writer: &inner.writer,
manifest: &inner.manifest,
compaction_picker: inner.compaction_picker.clone(),
};
inner.writer.replay(recovered_metadata, writer_ctx).await
@@ -642,6 +651,7 @@ struct RegionInner<S: LogStore> {
flush_strategy: FlushStrategyRef,
flush_scheduler: FlushSchedulerRef<S>,
compaction_scheduler: CompactionSchedulerRef<S>,
compaction_picker: CompactionPickerRef<S>,
sst_layer: AccessLayerRef,
manifest: RegionManifest,
}
@@ -685,6 +695,7 @@ impl<S: LogStore> RegionInner<S> {
wal: &self.wal,
writer: &self.writer,
manifest: &self.manifest,
compaction_picker: self.compaction_picker.clone(),
};
// The writer would also try to compat the schema of write batch if it finds out the
// schema version of request is less than current schema version.
@@ -746,6 +757,7 @@ impl<S: LogStore> RegionInner<S> {
wal: &self.wal,
writer: &self.writer,
manifest: &self.manifest,
compaction_picker: self.compaction_picker.clone(),
};
self.writer.flush(writer_ctx, ctx).await
}
@@ -761,6 +773,7 @@ impl<S: LogStore> RegionInner<S> {
wal: &self.wal,
writer: &self.writer,
manifest: &self.manifest,
compaction_picker: self.compaction_picker.clone(),
};
self.writer.compact(writer_ctx, ctx).await
}

View File

@@ -559,6 +559,7 @@ async fn create_store_config(region_name: &str, root: &str) -> StoreConfig<NoopL
file_purger,
ttl: None,
write_buffer_size: ReadableSize::mb(32).0 as usize,
compaction_strategy: Default::default(),
}
}

View File

@@ -26,7 +26,7 @@ use object_store::ObjectStore;
use store_api::storage::{FlushContext, FlushReason, OpenOptions, Region};
use tokio::sync::{Notify, RwLock};
use crate::compaction::{CompactionHandler, SimplePicker};
use crate::compaction::CompactionHandler;
use crate::config::EngineConfig;
use crate::error::Result;
use crate::file_purger::{FilePurgeHandler, FilePurgeRequest};
@@ -93,13 +93,8 @@ async fn create_region_for_compaction<
store_config.engine_config = Arc::new(engine_config);
store_config.flush_strategy = flush_strategy;
let picker = SimplePicker::default();
let pending_compaction_tasks = Arc::new(RwLock::new(vec![]));
let handler = CompactionHandler {
picker,
#[cfg(test)]
pending_tasks: pending_compaction_tasks.clone(),
};
let handler = CompactionHandler::new_with_pending_tasks(pending_compaction_tasks.clone());
let config = SchedulerConfig::default();
// Overwrite test compaction scheduler and file purger.
store_config.compaction_scheduler = Arc::new(LocalScheduler::new(config, handler));
@@ -262,12 +257,7 @@ impl CompactionTester {
store_config.engine_config = Arc::new(self.engine_config.clone());
store_config.flush_strategy = self.flush_strategy.clone();
let picker = SimplePicker::default();
let handler = CompactionHandler {
picker,
#[cfg(test)]
pending_tasks: Arc::new(Default::default()),
};
let handler = CompactionHandler::new_with_pending_tasks(Arc::new(Default::default()));
let config = SchedulerConfig::default();
// Overwrite test compaction scheduler and file purger.
store_config.compaction_scheduler = Arc::new(LocalScheduler::new(config, handler));

View File

@@ -27,7 +27,7 @@ use store_api::storage::{
};
use tokio::sync::{oneshot, Mutex};
use crate::compaction::{CompactionRequestImpl, CompactionSchedulerRef};
use crate::compaction::{CompactionPickerRef, CompactionRequestImpl, CompactionSchedulerRef};
use crate::config::EngineConfig;
use crate::error::{self, Result};
use crate::flush::{
@@ -412,6 +412,7 @@ pub struct WriterContext<'a, S: LogStore> {
pub wal: &'a Wal<S>,
pub writer: &'a RegionWriterRef,
pub manifest: &'a RegionManifest,
pub compaction_picker: CompactionPickerRef<S>,
}
impl<'a, S: LogStore> WriterContext<'a, S> {
@@ -779,6 +780,7 @@ impl WriterInner {
engine_config: self.engine_config.clone(),
ttl: self.ttl,
compaction_time_window: current_version.ssts().compaction_time_window(),
compaction_picker: ctx.compaction_picker.clone(),
};
let flush_handle = ctx
@@ -816,6 +818,7 @@ impl WriterInner {
ttl: self.ttl,
compaction_time_window,
sender: None,
picker: writer_ctx.compaction_picker.clone(),
sst_write_buffer_size,
};

View File

@@ -125,6 +125,7 @@ pub async fn new_store_config_with_object_store(
file_purger,
ttl: None,
write_buffer_size: DEFAULT_REGION_WRITE_BUFFER_SIZE.as_bytes() as usize,
compaction_strategy: Default::default(),
},
regions,
)

View File

@@ -1,3 +1,4 @@
#![feature(let_chains)]
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");

View File

@@ -32,7 +32,10 @@ pub use datatypes::schema::{
pub use self::chunk::{Chunk, ChunkReader};
pub use self::descriptors::*;
pub use self::engine::{CloseOptions, CreateOptions, EngineContext, OpenOptions, StorageEngine};
pub use self::engine::{
CloseOptions, CompactionStrategy, CreateOptions, EngineContext, OpenOptions, StorageEngine,
TwcsOptions,
};
pub use self::metadata::RegionMeta;
pub use self::region::{CloseContext, FlushContext, FlushReason, Region, RegionStat, WriteContext};
pub use self::requests::{

View File

@@ -18,6 +18,7 @@
//! a [`StorageEngine`] instance manages a bunch of storage unit called [`Region`], which holds
//! chunks of rows, support operations like PUT/DELETE/SCAN.
use std::collections::HashMap;
use std::time::Duration;
use async_trait::async_trait;
@@ -26,6 +27,13 @@ use common_error::ext::ErrorExt;
use crate::storage::descriptors::RegionDescriptor;
use crate::storage::region::Region;
const COMPACTION_STRATEGY_KEY: &str = "compaction";
const COMPACTION_STRATEGY_LEVELED_TIME_WINDOW_VALUE: &str = "LTW";
const COMPACTION_STRATEGY_TWCS_VALUE: &str = "TWCS";
const TWCS_MAX_ACTIVE_WINDOW_FILES_KEY: &str = "compaction.twcs.max_active_window_files";
const TWCS_TIME_WINDOW_SECONDS_KEY: &str = "compaction.twcs.time_window_seconds";
const TWCS_MAX_INACTIVE_WINDOW_FILES_KEY: &str = "compaction.twcs.max_inactive_window_files";
/// Storage engine provides primitive operations to store and access data.
#[async_trait]
pub trait StorageEngine: Send + Sync + Clone + 'static {
@@ -92,6 +100,8 @@ pub struct CreateOptions {
pub write_buffer_size: Option<usize>,
/// Region SST files TTL
pub ttl: Option<Duration>,
/// Compaction strategy
pub compaction_strategy: CompactionStrategy,
}
/// Options to open a region.
@@ -103,6 +113,8 @@ pub struct OpenOptions {
pub write_buffer_size: Option<usize>,
/// Region SST files TTL
pub ttl: Option<Duration>,
/// Compaction strategy
pub compaction_strategy: CompactionStrategy,
}
/// Options to close a region.
@@ -111,3 +123,70 @@ pub struct CloseOptions {
/// Flush region
pub flush: bool,
}
/// Options for compactions
#[derive(Debug, Clone, Default)]
pub enum CompactionStrategy {
/// Leveled time window compaction strategy
#[default]
LeveledTimeWindow,
/// TWCS
Twcs(TwcsOptions),
}
/// TWCS compaction options.
#[derive(Debug, Clone)]
pub struct TwcsOptions {
/// Max num of files that can be kept in active writing time window.
pub max_active_window_files: usize,
/// Max num of files that can be kept in inactive time window.
pub max_inactive_window_files: usize,
/// Compaction time window defined when creating tables.
pub time_window_seconds: Option<i64>,
}
impl Default for TwcsOptions {
fn default() -> Self {
Self {
max_active_window_files: 4,
max_inactive_window_files: 1,
time_window_seconds: None,
}
}
}
impl From<&HashMap<String, String>> for CompactionStrategy {
fn from(opts: &HashMap<String, String>) -> Self {
let Some(strategy_name) = opts.get(COMPACTION_STRATEGY_KEY) else { return CompactionStrategy::default() };
if strategy_name.eq_ignore_ascii_case(COMPACTION_STRATEGY_LEVELED_TIME_WINDOW_VALUE) {
CompactionStrategy::LeveledTimeWindow
} else if strategy_name.eq_ignore_ascii_case(COMPACTION_STRATEGY_TWCS_VALUE) {
let mut twcs_opts = TwcsOptions::default();
if let Some(max_active_window_files) = opts
.get(TWCS_MAX_ACTIVE_WINDOW_FILES_KEY)
.and_then(|num| num.parse::<usize>().ok())
{
twcs_opts.max_active_window_files = max_active_window_files;
}
if let Some(max_inactive_window_files) = opts
.get(TWCS_MAX_INACTIVE_WINDOW_FILES_KEY)
.and_then(|num| num.parse::<usize>().ok())
{
twcs_opts.max_inactive_window_files = max_inactive_window_files;
}
if let Some(time_window) = opts
.get(TWCS_TIME_WINDOW_SECONDS_KEY)
.and_then(|num| num.parse::<i64>().ok()) && time_window > 0
{
twcs_opts.time_window_seconds = Some(time_window);
}
CompactionStrategy::Twcs(twcs_opts)
} else {
// unrecognized compaction strategy
CompactionStrategy::default()
}
}
}