From 78fab08b51fd9dfdcad9d7b65b38e14e1182c596 Mon Sep 17 00:00:00 2001 From: "Lei, HUANG" <6406592+v0y4g3r@users.noreply.github.com> Date: Mon, 29 May 2023 14:41:00 +0800 Subject: [PATCH] feat: window inferer (#1648) * feat: window inferer * doc: add some doc * test: add a long missing unit test case for windowed reader * add more tests * fix: some CR comments --- Cargo.lock | 1 + src/common/time/src/range.rs | 1 + src/common/time/src/timestamp.rs | 6 +- src/datatypes/src/data_type.rs | 11 + src/datatypes/src/types/timestamp_type.rs | 8 +- src/storage/Cargo.toml | 1 + src/storage/src/lib.rs | 1 + src/storage/src/memtable.rs | 9 +- src/storage/src/memtable/btree.rs | 13 +- src/storage/src/memtable/inserter.rs | 4 +- src/storage/src/read/windowed.rs | 5 + src/storage/src/region/tests.rs | 299 ++++++++++++++++++-- src/storage/src/window_infer.rs | 329 ++++++++++++++++++++++ 13 files changed, 646 insertions(+), 42 deletions(-) create mode 100644 src/storage/src/window_infer.rs diff --git a/Cargo.lock b/Cargo.lock index 4255a1a08e..61f69538dd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8619,6 +8619,7 @@ dependencies = [ "datatypes", "futures", "futures-util", + "itertools", "lazy_static", "log-store", "metrics", diff --git a/src/common/time/src/range.rs b/src/common/time/src/range.rs index ed08014505..8e007896b0 100644 --- a/src/common/time/src/range.rs +++ b/src/common/time/src/range.rs @@ -221,6 +221,7 @@ impl TimestampRange { } /// Shortcut method to create a timestamp range with given start/end value and time unit. + /// Returns empty iff `start` > `end`. pub fn with_unit(start: i64, end: i64, unit: TimeUnit) -> Option { let start = Timestamp::new(start, unit); let end = Timestamp::new(end, unit); diff --git a/src/common/time/src/timestamp.rs b/src/common/time/src/timestamp.rs index d1c2aef802..0678fa5a35 100644 --- a/src/common/time/src/timestamp.rs +++ b/src/common/time/src/timestamp.rs @@ -81,7 +81,7 @@ impl Timestamp { } /// Subtracts current timestamp with another timestamp, yielding a duration. - pub fn sub(&self, rhs: Self) -> Option { + pub fn sub(&self, rhs: &Self) -> Option { let lhs = self.to_chrono_datetime()?; let rhs = rhs.to_chrono_datetime()?; Some(lhs - rhs) @@ -943,12 +943,12 @@ mod tests { fn test_subtract_timestamp() { assert_eq!( Some(chrono::Duration::milliseconds(42)), - Timestamp::new_millisecond(100).sub(Timestamp::new_millisecond(58)) + Timestamp::new_millisecond(100).sub(&Timestamp::new_millisecond(58)) ); assert_eq!( Some(chrono::Duration::milliseconds(-42)), - Timestamp::new_millisecond(58).sub(Timestamp::new_millisecond(100)) + Timestamp::new_millisecond(58).sub(&Timestamp::new_millisecond(100)) ); } diff --git a/src/datatypes/src/data_type.rs b/src/datatypes/src/data_type.rs index c2e7a0ed6b..f9e40880c8 100644 --- a/src/datatypes/src/data_type.rs +++ b/src/datatypes/src/data_type.rs @@ -170,6 +170,17 @@ impl ConcreteDataType { _ => None, } } + + /// Try to cast data type as a [`TimestampType`]. + pub fn as_timestamp(&self) -> Option { + match self { + ConcreteDataType::Int64(_) => { + Some(TimestampType::Millisecond(TimestampMillisecondType)) + } + ConcreteDataType::Timestamp(t) => Some(*t), + _ => None, + } + } } impl TryFrom<&ArrowDataType> for ConcreteDataType { diff --git a/src/datatypes/src/types/timestamp_type.rs b/src/datatypes/src/types/timestamp_type.rs index faa9a7b926..45e5b95068 100644 --- a/src/datatypes/src/types/timestamp_type.rs +++ b/src/datatypes/src/types/timestamp_type.rs @@ -47,7 +47,7 @@ const MILLISECOND_VARIATION: u64 = 3; const MICROSECOND_VARIATION: u64 = 6; const NANOSECOND_VARIATION: u64 = 9; -#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] #[enum_dispatch(DataType)] pub enum TimestampType { Second(TimestampSecondType), @@ -92,6 +92,10 @@ impl TimestampType { } } + pub fn create_timestamp(&self, val: i64) -> Timestamp { + Timestamp::new(val, self.unit()) + } + pub fn precision(&self) -> u64 { match self { TimestampType::Second(_) => SECOND_VARIATION, @@ -105,7 +109,7 @@ impl TimestampType { macro_rules! impl_data_type_for_timestamp { ($unit: ident) => { paste! { - #[derive(Debug, Default, Clone, PartialEq, Eq, Serialize, Deserialize)] + #[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] pub struct []; impl DataType for [] { diff --git a/src/storage/Cargo.toml b/src/storage/Cargo.toml index f8b924f4db..51248a34b4 100644 --- a/src/storage/Cargo.toml +++ b/src/storage/Cargo.toml @@ -25,6 +25,7 @@ datafusion-common.workspace = true datafusion-expr.workspace = true futures.workspace = true futures-util.workspace = true +itertools = "0.10" lazy_static = "1.4" metrics.workspace = true object-store = { path = "../object-store" } diff --git a/src/storage/src/lib.rs b/src/storage/src/lib.rs index bba07f7ef3..ae7e955c60 100644 --- a/src/storage/src/lib.rs +++ b/src/storage/src/lib.rs @@ -41,6 +41,7 @@ pub mod write_batch; pub use engine::EngineImpl; mod file_purger; mod metrics; +mod window_infer; pub use sst::parquet::ParquetWriter; pub use sst::Source; diff --git a/src/storage/src/memtable.rs b/src/storage/src/memtable.rs index 71f9e59e0c..4bfe2ba814 100644 --- a/src/storage/src/memtable.rs +++ b/src/storage/src/memtable.rs @@ -23,6 +23,7 @@ use std::sync::atomic::{AtomicBool, AtomicU32, AtomicUsize, Ordering}; use std::sync::Arc; use common_time::range::TimestampRange; +use common_time::Timestamp; use datatypes::vectors::VectorRef; use metrics::{decrement_gauge, increment_gauge}; use store_api::storage::{consts, OpType, SequenceNumber}; @@ -39,16 +40,16 @@ use crate::schema::{ProjectedSchemaRef, RegionSchemaRef}; /// Unique id for memtables under same region. pub type MemtableId = u32; -#[derive(Debug)] +#[derive(Debug, Default)] pub struct MemtableStats { /// The estimated bytes allocated by this memtable from heap. Result /// of this method may be larger than the estimated based on [`num_rows`] because /// of the implementor's pre-alloc behavior. - estimated_bytes: usize, + pub estimated_bytes: usize, /// The max timestamp that this memtable contains. - pub max_timestamp: i64, + pub max_timestamp: Timestamp, /// The min timestamp that this memtable contains. - pub min_timestamp: i64, + pub min_timestamp: Timestamp, } impl MemtableStats { diff --git a/src/storage/src/memtable/btree.rs b/src/storage/src/memtable/btree.rs index c0a0852a32..e6f1ac8fb5 100644 --- a/src/storage/src/memtable/btree.rs +++ b/src/storage/src/memtable/btree.rs @@ -158,10 +158,19 @@ impl Memtable for BTreeMemtable { } fn stats(&self) -> MemtableStats { + let ts_meta = self.schema.column_metadata(self.schema.timestamp_index()); + + let Some(timestamp_type) = ts_meta.desc.data_type.as_timestamp() else { + // safety: timestamp column always has timestamp type, otherwise it's a bug. + panic!("Timestamp column is not a valid timestamp type: {:?}", self.schema); + }; + MemtableStats { estimated_bytes: self.alloc_tracker.bytes_allocated(), - max_timestamp: self.max_timestamp.load(AtomicOrdering::Relaxed), - min_timestamp: self.min_timestamp.load(AtomicOrdering::Relaxed), + max_timestamp: timestamp_type + .create_timestamp(self.max_timestamp.load(AtomicOrdering::Relaxed)), + min_timestamp: timestamp_type + .create_timestamp(self.min_timestamp.load(AtomicOrdering::Relaxed)), } } diff --git a/src/storage/src/memtable/inserter.rs b/src/storage/src/memtable/inserter.rs index 50b20ad235..299f0d64a2 100644 --- a/src/storage/src/memtable/inserter.rs +++ b/src/storage/src/memtable/inserter.rs @@ -177,8 +177,8 @@ mod tests { min_ts: i64, ) { let iter = mem.iter(&IterContext::default()).unwrap(); - assert_eq!(min_ts, mem.stats().min_timestamp); - assert_eq!(max_ts, mem.stats().max_timestamp); + assert_eq!(min_ts, mem.stats().min_timestamp.value()); + assert_eq!(max_ts, mem.stats().max_timestamp.value()); let mut index = 0; for batch in iter { diff --git a/src/storage/src/read/windowed.rs b/src/storage/src/read/windowed.rs index 197318b7ab..33619f3eb1 100644 --- a/src/storage/src/read/windowed.rs +++ b/src/storage/src/read/windowed.rs @@ -33,6 +33,11 @@ pub struct WindowedReader { } impl WindowedReader { + /// Creates a new [WindowedReader] from given schema and a set of boxed readers. + /// + /// ### Note + /// [WindowedReader] always reads the readers in a reverse order. The last reader in `readers` + /// gets polled first. pub fn new(schema: ProjectedSchemaRef, readers: Vec) -> Self { Self { schema, readers } } diff --git a/src/storage/src/region/tests.rs b/src/storage/src/region/tests.rs index eafc0da4d5..e0fa79d973 100644 --- a/src/storage/src/region/tests.rs +++ b/src/storage/src/region/tests.rs @@ -14,6 +14,48 @@ //! Region tests. +use std::collections::{HashMap, HashSet}; + +use common_base::readable_size::ReadableSize; +use common_datasource::compression::CompressionType; +use common_telemetry::logging; +use common_test_util::temp_dir::{create_temp_dir, TempDir}; +use common_time::range::TimestampRange; +use common_time::timestamp::TimeUnit; +use datatypes::prelude::{LogicalTypeId, ScalarVector, WrapperType}; +use datatypes::timestamp::TimestampMillisecond; +use datatypes::vectors::{ + BooleanVector, Int64Vector, StringVector, TimestampMillisecondVector, VectorRef, +}; +use log_store::raft_engine::log_store::RaftEngineLogStore; +use log_store::NoopLogStore; +use object_store::services::Fs; +use object_store::ObjectStore; +use store_api::manifest::{Manifest, MAX_VERSION}; +use store_api::storage::{ + Chunk, ChunkReader, FlushContext, FlushReason, ReadContext, Region, RegionMeta, ScanRequest, + SequenceNumber, Snapshot, WriteContext, WriteRequest, +}; + +use super::*; +use crate::chunk::ChunkReaderImpl; +use crate::compaction::noop::NoopCompactionScheduler; +use crate::engine; +use crate::engine::RegionMap; +use crate::file_purger::noop::NoopFilePurgeHandler; +use crate::flush::{FlushScheduler, PickerConfig, SizeBasedStrategy}; +use crate::manifest::action::{RegionChange, RegionMetaActionList}; +use crate::manifest::manifest_compress_type; +use crate::manifest::region::RegionManifest; +use crate::manifest::test_utils::*; +use crate::memtable::DefaultMemtableBuilder; +use crate::metadata::RegionMetadata; +use crate::region::{RegionImpl, StoreConfig}; +use crate::scheduler::{LocalScheduler, SchedulerConfig}; +use crate::sst::{FileId, FsAccessLayer}; +use crate::test_util::descriptor_util::RegionDescBuilder; +use crate::test_util::{self, config_util, schema_util, write_batch_util}; + mod alter; mod basic; mod close; @@ -21,35 +63,6 @@ mod compact; mod flush; mod projection; -use std::collections::{HashMap, HashSet}; - -use common_telemetry::logging; -use common_test_util::temp_dir::create_temp_dir; -use datatypes::prelude::{ScalarVector, WrapperType}; -use datatypes::timestamp::TimestampMillisecond; -use datatypes::type_id::LogicalTypeId; -use datatypes::vectors::{Int64Vector, TimestampMillisecondVector, VectorRef}; -use log_store::raft_engine::log_store::RaftEngineLogStore; -use log_store::NoopLogStore; -use object_store::services::Fs; -use object_store::ObjectStore; -use store_api::manifest::MAX_VERSION; -use store_api::storage::{ - Chunk, ChunkReader, RegionMeta, ScanRequest, SequenceNumber, Snapshot, WriteRequest, -}; - -use super::*; -use crate::chunk::ChunkReaderImpl; -use crate::file_purger::noop::NoopFilePurgeHandler; -use crate::manifest::action::{RegionChange, RegionMetaActionList}; -use crate::manifest::manifest_compress_type; -use crate::manifest::test_utils::*; -use crate::memtable::DefaultMemtableBuilder; -use crate::scheduler::{LocalScheduler, SchedulerConfig}; -use crate::sst::{FileId, FsAccessLayer}; -use crate::test_util::descriptor_util::RegionDescBuilder; -use crate::test_util::{self, config_util, schema_util, write_batch_util}; - /// Create metadata of a region with schema: (timestamp, v0). pub fn new_metadata(region_name: &str) -> RegionMetadata { let desc = RegionDescBuilder::new(region_name) @@ -464,3 +477,231 @@ fn assert_recovered_manifest( files ); } + +fn create_region_meta(region_name: &str) -> RegionMetadata { + let desc = RegionDescBuilder::new(region_name) + .push_field_column(("v0", LogicalTypeId::Int64, true)) + .push_field_column(("v1", LogicalTypeId::String, true)) + .push_field_column(("v2", LogicalTypeId::Boolean, true)) + .build(); + desc.try_into().unwrap() +} + +async fn create_store_config(region_name: &str, root: &str) -> StoreConfig { + let mut builder = Fs::default(); + builder.root(root); + let object_store = ObjectStore::new(builder).unwrap().finish(); + let parent_dir = ""; + let sst_dir = engine::region_sst_dir(parent_dir, region_name); + let manifest_dir = engine::region_manifest_dir(parent_dir, region_name); + + let sst_layer = Arc::new(FsAccessLayer::new(&sst_dir, object_store.clone())); + let manifest = RegionManifest::with_checkpointer( + &manifest_dir, + object_store, + CompressionType::Uncompressed, + None, + None, + ); + manifest.start().await.unwrap(); + + let compaction_scheduler = Arc::new(NoopCompactionScheduler::default()); + + let regions = Arc::new(RegionMap::new()); + + let flush_scheduler = Arc::new( + FlushScheduler::new( + SchedulerConfig::default(), + compaction_scheduler.clone(), + regions, + PickerConfig::default(), + ) + .unwrap(), + ); + + let log_store = Arc::new(NoopLogStore::default()); + + let file_purger = Arc::new(LocalScheduler::new( + SchedulerConfig::default(), + NoopFilePurgeHandler, + )); + StoreConfig { + log_store, + sst_layer, + manifest, + memtable_builder: Arc::new(DefaultMemtableBuilder::default()), + flush_scheduler, + flush_strategy: Arc::new(SizeBasedStrategy::default()), + compaction_scheduler, + engine_config: Default::default(), + file_purger, + ttl: None, + compaction_time_window: None, + write_buffer_size: ReadableSize::mb(32).0 as usize, + } +} + +struct WindowedReaderTester { + data_written: Vec>, + time_windows: Vec<(i64, i64)>, + expected_ts: Vec, + region: RegionImpl, + _temp_dir: TempDir, +} + +impl WindowedReaderTester { + async fn new( + region_name: &'static str, + data_written: Vec>, + time_windows: Vec<(i64, i64)>, + expected_ts: Vec, + ) -> Self { + let temp_dir = create_temp_dir(&format!("write_and_read_windowed_{}", region_name)); + let root = temp_dir.path().to_str().unwrap(); + let metadata = create_region_meta(region_name); + let store_config = create_store_config(region_name, root).await; + let region = RegionImpl::create(metadata, store_config).await.unwrap(); + + let tester = Self { + data_written, + time_windows, + expected_ts, + region, + _temp_dir: temp_dir, + }; + tester.prepare().await; + tester + } + + async fn prepare(&self) { + for batch in &self.data_written { + let mut write_batch = self.region.write_request(); + let ts = TimestampMillisecondVector::from_iterator( + batch + .iter() + .map(|(v, _, _, _)| TimestampMillisecond::new(*v)), + ); + let v0 = Int64Vector::from_iterator(batch.iter().map(|(_, v, _, _)| *v)); + let v1 = StringVector::from_iterator(batch.iter().map(|(_, _, v, _)| v.as_str())); + let v2 = BooleanVector::from_iterator(batch.iter().map(|(_, _, _, v)| *v)); + + let columns = [ + ("timestamp".to_string(), Arc::new(ts) as VectorRef), + ("v0".to_string(), Arc::new(v0) as VectorRef), + ("v1".to_string(), Arc::new(v1) as VectorRef), + ("v2".to_string(), Arc::new(v2) as VectorRef), + ] + .into_iter() + .collect::>(); + write_batch.put(columns).unwrap(); + + self.region + .write(&WriteContext {}, write_batch) + .await + .unwrap(); + + // flush the region to ensure data resides across SST files. + self.region + .flush(&FlushContext { + wait: true, + reason: FlushReason::Others, + }) + .await + .unwrap(); + } + } + + async fn check(&self) { + let windows = self + .time_windows + .iter() + .map(|(start, end)| { + TimestampRange::with_unit(*start, *end, TimeUnit::Millisecond).unwrap() + }) + .collect::>(); + + let read_context = ReadContext::default(); + let snapshot = self.region.snapshot(&read_context).unwrap(); + let response = snapshot + .windowed_scan( + &read_context, + ScanRequest { + sequence: None, + projection: None, + filters: vec![], + }, + windows, + ) + .await + .unwrap(); + + let mut timestamps = Vec::with_capacity(self.expected_ts.len()); + + let mut reader = response.reader; + let ts_index = reader.user_schema().timestamp_index().unwrap(); + while let Some(chunk) = reader.next_chunk().await.unwrap() { + let ts_col = &chunk.columns[ts_index]; + let ts_col = ts_col + .as_any() + .downcast_ref::() + .unwrap(); + for ts in ts_col.iter_data() { + timestamps.push(ts.unwrap().0.value()); + } + } + assert_eq!(timestamps, self.expected_ts); + } +} + +#[tokio::test] +async fn test_read_by_chunk_reader() { + common_telemetry::init_default_ut_logging(); + + WindowedReaderTester::new( + "test_region", + vec![vec![(1, 1, "1".to_string(), false)]], + vec![(1, 2)], + vec![1], + ) + .await + .check() + .await; + + WindowedReaderTester::new( + "test_region", + vec![ + vec![ + (1, 1, "1".to_string(), false), + (2, 2, "2".to_string(), false), + ], + vec![ + (3, 3, "3".to_string(), false), + (4, 4, "4".to_string(), false), + ], + ], + vec![(1, 2), (2, 3), (3, 4), (4, 5)], + vec![4, 3, 2, 1], + ) + .await + .check() + .await; + + WindowedReaderTester::new( + "test_region", + vec![ + vec![ + (1, 1, "1".to_string(), false), + (2, 2, "2".to_string(), false), + ], + vec![ + (3, 3, "3".to_string(), false), + (4, 4, "4".to_string(), false), + ], + ], + vec![(1, 2), (2, 3), (4, 5), (3, 4)], + vec![3, 4, 2, 1], + ) + .await + .check() + .await; +} diff --git a/src/storage/src/window_infer.rs b/src/storage/src/window_infer.rs new file mode 100644 index 0000000000..334794ddf4 --- /dev/null +++ b/src/storage/src/window_infer.rs @@ -0,0 +1,329 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashSet; + +use common_time::range::TimestampRange; +use common_time::timestamp::TimeUnit; +use common_time::timestamp_millis::BucketAligned; +use itertools::Itertools; + +use crate::memtable::MemtableStats; +use crate::sst::FileMeta; + +/// A set of predefined time windows. +const TIME_WINDOW_SIZE: [i64; 9] = [ + 60, // 1 minute + 60 * 10, // 10 minute + 60 * 30, // 30 minute + 60 * 60, // 1 hour + 2 * 60 * 60, // 2 hours + 6 * 60 * 60, // 6 hours + 12 * 60 * 60, // 12 hours + 24 * 60 * 60, // 1 day + 7 * 24 * 60 * 60, // 1 week +]; + +/// [WindowInfer] infers the time windows that can be used to optimize table scans ordered by +/// timestamp column or have explicit time windows. By splitting time spans of tables into +/// time windows, we can scan entries window by window. +pub(crate) trait WindowInfer { + /// Infers time windows according to the SST files and memtables. + /// + /// ### Note + /// The order of returned vector defines how records are yielded. + fn infer_window(&self, files: &[FileMeta], mem_tables: &[MemtableStats]) + -> Vec; +} + +/// [PlainWindowInference] simply finds the minimum time span within all SST files in level 0 and +/// memtables, matches that time span into a set of predefined time windows. +pub(crate) struct PlainWindowInference; + +impl WindowInfer for PlainWindowInference { + fn infer_window( + &self, + files: &[FileMeta], + mem_tables: &[MemtableStats], + ) -> Vec { + let mut min_duration_sec = i64::MAX; + let mut durations = Vec::with_capacity(files.len() + mem_tables.len()); + + for meta in files { + if let Some((start, end)) = &meta.time_range { + // unwrap safety: converting timestamps with any unit to seconds won't overflow. + let start_sec = start.convert_to(TimeUnit::Second).unwrap().value(); + // file timestamp range end is inclusive + let end_sec = end.convert_to_ceil(TimeUnit::Second).unwrap().value(); + debug_assert!(end_sec >= start_sec); + if meta.level == 0 { + // only level 0 is involved when calculating time windows. + min_duration_sec = min_duration_sec.min(end_sec - start_sec); + } + durations.push((start_sec, end_sec)); + } + } + + for stats in mem_tables { + // unwrap safety: converting timestamps with any unit to seconds won't overflow. + let start_sec = stats + .min_timestamp + .convert_to(TimeUnit::Second) + .unwrap() + .value(); + let end_sec = stats + .max_timestamp + .convert_to_ceil(TimeUnit::Second) + .unwrap() + .value(); + min_duration_sec = min_duration_sec.min(end_sec - start_sec); + durations.push((start_sec, end_sec)); + } + + let window_size = min_duration_to_window_size(min_duration_sec); + align_time_spans_to_windows(&durations, window_size) + .into_iter() + .sorted_by(|(l_start, _), (r_start, _)| r_start.cmp(l_start)) // sort time windows in descending order + // unwrap safety: we ensure that end>=start so that TimestampRange::with_unit won't return None + .map(|(start, end)| TimestampRange::with_unit(start, end, TimeUnit::Second).unwrap()) + .collect() + } +} + +/// Given a set of time spans and a min duration, this function aligns the time spans to windows that +/// collectively covers all the time spans. +/// +/// For example, given time span `[1, 6)` and duration 5, the span can be aligned and split to +/// two windows with length 5: `[0, 5)` and `[5, 10]`, and these two windows can cover the original +/// span `[1, 6)`. +fn align_time_spans_to_windows(durations: &[(i64, i64)], min_duration: i64) -> HashSet<(i64, i64)> { + let mut res = HashSet::new(); + for (start, end) in durations { + let mut next = *start; + while next <= *end { + let next_aligned = next.align_by_bucket(min_duration).unwrap_or(i64::MIN); + if let Some(next_end_aligned) = next_aligned.checked_add(min_duration) { + res.insert((next_aligned, next_end_aligned)); + next = next_end_aligned; + } else { + // arithmetic overflow, clamp to i64::MAX and break the loop. + res.insert((next_aligned, i64::MAX)); + break; + } + } + } + res +} + +/// Find the most suitable time window size according to the `min_duration` found across all +/// SST files and memtables through a binary search. +fn min_duration_to_window_size(min_duration: i64) -> i64 { + match TIME_WINDOW_SIZE.binary_search(&min_duration) { + Ok(idx) => TIME_WINDOW_SIZE[idx], + Err(idx) => { + if idx < TIME_WINDOW_SIZE.len() { + TIME_WINDOW_SIZE[idx] + } else { + TIME_WINDOW_SIZE.last().copied().unwrap() + } + } + } +} + +#[cfg(test)] +mod tests { + use common_time::Timestamp; + + use super::*; + + #[test] + fn test_get_time_window_size() { + assert_eq!(60, min_duration_to_window_size(0)); + for window in TIME_WINDOW_SIZE { + assert_eq!(window, min_duration_to_window_size(window)); + } + assert_eq!(60, min_duration_to_window_size(1)); + assert_eq!(60 * 10, min_duration_to_window_size(100)); + assert_eq!(60 * 30, min_duration_to_window_size(1800)); + assert_eq!(60 * 60, min_duration_to_window_size(3000)); + assert_eq!(2 * 60 * 60, min_duration_to_window_size(4000)); + assert_eq!(6 * 60 * 60, min_duration_to_window_size(21599)); + assert_eq!(12 * 60 * 60, min_duration_to_window_size(21601)); + assert_eq!(24 * 60 * 60, min_duration_to_window_size(43201)); + assert_eq!(7 * 24 * 60 * 60, min_duration_to_window_size(604799)); + assert_eq!(7 * 24 * 60 * 60, min_duration_to_window_size(31535999)); + assert_eq!(7 * 24 * 60 * 60, min_duration_to_window_size(i64::MAX)); + } + + fn check_align_durations_to_windows( + durations: &[(i64, i64)], + min_duration: i64, + expected: &[(i64, i64)], + ) { + let res = align_time_spans_to_windows(durations, min_duration); + let expected = expected.iter().copied().collect::>(); + assert_eq!(res, expected); + } + + #[test] + fn test_duration_to_windows() { + check_align_durations_to_windows(&[(0, 1)], 2, &[(0, 2)]); + check_align_durations_to_windows(&[(-3, 1)], 2, &[(-4, -2), (-2, 0), (0, 2)]); + check_align_durations_to_windows(&[(1, 3)], 2, &[(0, 2), (2, 4)]); + check_align_durations_to_windows( + &[(i64::MIN, i64::MIN + 3)], + 2, + &[(i64::MIN, i64::MIN + 2), (i64::MIN + 2, i64::MIN + 4)], + ); + + check_align_durations_to_windows( + &[(i64::MAX - 3, i64::MAX)], + 2, + &[(i64::MAX - 3, i64::MAX - 1), (i64::MAX - 1, i64::MAX)], + ); + + check_align_durations_to_windows(&[(-3, 10)], 7, &[(-7, 0), (0, 7), (7, 14)]); + } + + #[test] + fn test_multiple_duration_to_windows() { + check_align_durations_to_windows(&[(0, 1), (1, 3)], 3, &[(0, 3), (3, 6)]); + check_align_durations_to_windows(&[(0, 1), (1, 2), (7, 11)], 3, &[(0, 3), (6, 9), (9, 12)]); + + check_align_durations_to_windows( + &[(-2, 1), (i64::MAX - 2, i64::MAX)], + 3, + &[ + (-3, 0), + (0, 3), + (i64::MAX - 4, i64::MAX - 1), + (i64::MAX - 1, i64::MAX), + ], + ); + } + + #[test] + fn test_plain_window_inference() { + let window_inference = PlainWindowInference {}; + + let res = window_inference.infer_window( + &[FileMeta { + time_range: Some(( + Timestamp::new(1000, TimeUnit::Millisecond), + Timestamp::new(3000, TimeUnit::Millisecond), + )), + ..Default::default() + }], + &[MemtableStats { + max_timestamp: Timestamp::new(3001, TimeUnit::Millisecond), + min_timestamp: Timestamp::new(2001, TimeUnit::Millisecond), + ..Default::default() + }], + ); + assert_eq!( + vec![TimestampRange::with_unit(0, 60, TimeUnit::Second).unwrap()], + res + ); + + let res = window_inference.infer_window( + &[FileMeta { + time_range: Some(( + Timestamp::new(0, TimeUnit::Millisecond), + Timestamp::new(60 * 1000 + 1, TimeUnit::Millisecond), + )), + ..Default::default() + }], + &[MemtableStats { + max_timestamp: Timestamp::new(3001, TimeUnit::Millisecond), + min_timestamp: Timestamp::new(2001, TimeUnit::Millisecond), + ..Default::default() + }], + ); + assert_eq!( + vec![ + TimestampRange::with_unit(60, 120, TimeUnit::Second).unwrap(), + TimestampRange::with_unit(0, 60, TimeUnit::Second).unwrap(), + ], + res + ); + + let res = window_inference.infer_window( + &[ + FileMeta { + time_range: Some(( + Timestamp::new(0, TimeUnit::Millisecond), + Timestamp::new(60 * 1000 + 1, TimeUnit::Millisecond), + )), + ..Default::default() + }, + FileMeta { + time_range: Some(( + Timestamp::new(60 * 60 * 1000, TimeUnit::Millisecond), + Timestamp::new(60 * 60 * 1000 + 1, TimeUnit::Millisecond), + )), + ..Default::default() + }, + ], + &[MemtableStats { + max_timestamp: Timestamp::new(3001, TimeUnit::Millisecond), + min_timestamp: Timestamp::new(2001, TimeUnit::Millisecond), + ..Default::default() + }], + ); + assert_eq!( + vec![ + TimestampRange::with_unit(60 * 60, 61 * 60, TimeUnit::Second).unwrap(), + TimestampRange::with_unit(60, 120, TimeUnit::Second).unwrap(), + TimestampRange::with_unit(0, 60, TimeUnit::Second).unwrap(), + ], + res + ); + + let res = window_inference.infer_window( + &[ + FileMeta { + time_range: Some(( + Timestamp::new(0, TimeUnit::Millisecond), + Timestamp::new(60 * 1000, TimeUnit::Millisecond), + )), + level: 1, // this SST will be ignored + ..Default::default() + }, + FileMeta { + time_range: Some(( + Timestamp::new(0, TimeUnit::Millisecond), + Timestamp::new(10 * 60 * 1000, TimeUnit::Millisecond), + )), + ..Default::default() + }, + ], + &[MemtableStats { + max_timestamp: Timestamp::new(60 * 30 * 1000 + 1, TimeUnit::Millisecond), + min_timestamp: Timestamp::new(0, TimeUnit::Millisecond), + ..Default::default() + }], + ); + + // inferred window size should be 600 sec + assert_eq!( + vec![ + TimestampRange::with_unit(1800, 2400, TimeUnit::Second).unwrap(), + TimestampRange::with_unit(1200, 1800, TimeUnit::Second).unwrap(), + TimestampRange::with_unit(600, 1200, TimeUnit::Second).unwrap(), + TimestampRange::with_unit(0, 600, TimeUnit::Second).unwrap(), + ], + res + ); + } +}