mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-08 22:32:55 +00:00
feat: window inferer (#1648)
* feat: window inferer * doc: add some doc * test: add a long missing unit test case for windowed reader * add more tests * fix: some CR comments
This commit is contained in:
1
Cargo.lock
generated
1
Cargo.lock
generated
@@ -8619,6 +8619,7 @@ dependencies = [
|
||||
"datatypes",
|
||||
"futures",
|
||||
"futures-util",
|
||||
"itertools",
|
||||
"lazy_static",
|
||||
"log-store",
|
||||
"metrics",
|
||||
|
||||
@@ -221,6 +221,7 @@ impl TimestampRange {
|
||||
}
|
||||
|
||||
/// Shortcut method to create a timestamp range with given start/end value and time unit.
|
||||
/// Returns empty iff `start` > `end`.
|
||||
pub fn with_unit(start: i64, end: i64, unit: TimeUnit) -> Option<Self> {
|
||||
let start = Timestamp::new(start, unit);
|
||||
let end = Timestamp::new(end, unit);
|
||||
|
||||
@@ -81,7 +81,7 @@ impl Timestamp {
|
||||
}
|
||||
|
||||
/// Subtracts current timestamp with another timestamp, yielding a duration.
|
||||
pub fn sub(&self, rhs: Self) -> Option<chrono::Duration> {
|
||||
pub fn sub(&self, rhs: &Self) -> Option<chrono::Duration> {
|
||||
let lhs = self.to_chrono_datetime()?;
|
||||
let rhs = rhs.to_chrono_datetime()?;
|
||||
Some(lhs - rhs)
|
||||
@@ -943,12 +943,12 @@ mod tests {
|
||||
fn test_subtract_timestamp() {
|
||||
assert_eq!(
|
||||
Some(chrono::Duration::milliseconds(42)),
|
||||
Timestamp::new_millisecond(100).sub(Timestamp::new_millisecond(58))
|
||||
Timestamp::new_millisecond(100).sub(&Timestamp::new_millisecond(58))
|
||||
);
|
||||
|
||||
assert_eq!(
|
||||
Some(chrono::Duration::milliseconds(-42)),
|
||||
Timestamp::new_millisecond(58).sub(Timestamp::new_millisecond(100))
|
||||
Timestamp::new_millisecond(58).sub(&Timestamp::new_millisecond(100))
|
||||
);
|
||||
}
|
||||
|
||||
|
||||
@@ -170,6 +170,17 @@ impl ConcreteDataType {
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
|
||||
/// Try to cast data type as a [`TimestampType`].
|
||||
pub fn as_timestamp(&self) -> Option<TimestampType> {
|
||||
match self {
|
||||
ConcreteDataType::Int64(_) => {
|
||||
Some(TimestampType::Millisecond(TimestampMillisecondType))
|
||||
}
|
||||
ConcreteDataType::Timestamp(t) => Some(*t),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl TryFrom<&ArrowDataType> for ConcreteDataType {
|
||||
|
||||
@@ -47,7 +47,7 @@ const MILLISECOND_VARIATION: u64 = 3;
|
||||
const MICROSECOND_VARIATION: u64 = 6;
|
||||
const NANOSECOND_VARIATION: u64 = 9;
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
|
||||
#[enum_dispatch(DataType)]
|
||||
pub enum TimestampType {
|
||||
Second(TimestampSecondType),
|
||||
@@ -92,6 +92,10 @@ impl TimestampType {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn create_timestamp(&self, val: i64) -> Timestamp {
|
||||
Timestamp::new(val, self.unit())
|
||||
}
|
||||
|
||||
pub fn precision(&self) -> u64 {
|
||||
match self {
|
||||
TimestampType::Second(_) => SECOND_VARIATION,
|
||||
@@ -105,7 +109,7 @@ impl TimestampType {
|
||||
macro_rules! impl_data_type_for_timestamp {
|
||||
($unit: ident) => {
|
||||
paste! {
|
||||
#[derive(Debug, Default, Clone, PartialEq, Eq, Serialize, Deserialize)]
|
||||
#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
|
||||
pub struct [<Timestamp $unit Type>];
|
||||
|
||||
impl DataType for [<Timestamp $unit Type>] {
|
||||
|
||||
@@ -25,6 +25,7 @@ datafusion-common.workspace = true
|
||||
datafusion-expr.workspace = true
|
||||
futures.workspace = true
|
||||
futures-util.workspace = true
|
||||
itertools = "0.10"
|
||||
lazy_static = "1.4"
|
||||
metrics.workspace = true
|
||||
object-store = { path = "../object-store" }
|
||||
|
||||
@@ -41,6 +41,7 @@ pub mod write_batch;
|
||||
pub use engine::EngineImpl;
|
||||
mod file_purger;
|
||||
mod metrics;
|
||||
mod window_infer;
|
||||
|
||||
pub use sst::parquet::ParquetWriter;
|
||||
pub use sst::Source;
|
||||
|
||||
@@ -23,6 +23,7 @@ use std::sync::atomic::{AtomicBool, AtomicU32, AtomicUsize, Ordering};
|
||||
use std::sync::Arc;
|
||||
|
||||
use common_time::range::TimestampRange;
|
||||
use common_time::Timestamp;
|
||||
use datatypes::vectors::VectorRef;
|
||||
use metrics::{decrement_gauge, increment_gauge};
|
||||
use store_api::storage::{consts, OpType, SequenceNumber};
|
||||
@@ -39,16 +40,16 @@ use crate::schema::{ProjectedSchemaRef, RegionSchemaRef};
|
||||
/// Unique id for memtables under same region.
|
||||
pub type MemtableId = u32;
|
||||
|
||||
#[derive(Debug)]
|
||||
#[derive(Debug, Default)]
|
||||
pub struct MemtableStats {
|
||||
/// The estimated bytes allocated by this memtable from heap. Result
|
||||
/// of this method may be larger than the estimated based on [`num_rows`] because
|
||||
/// of the implementor's pre-alloc behavior.
|
||||
estimated_bytes: usize,
|
||||
pub estimated_bytes: usize,
|
||||
/// The max timestamp that this memtable contains.
|
||||
pub max_timestamp: i64,
|
||||
pub max_timestamp: Timestamp,
|
||||
/// The min timestamp that this memtable contains.
|
||||
pub min_timestamp: i64,
|
||||
pub min_timestamp: Timestamp,
|
||||
}
|
||||
|
||||
impl MemtableStats {
|
||||
|
||||
@@ -158,10 +158,19 @@ impl Memtable for BTreeMemtable {
|
||||
}
|
||||
|
||||
fn stats(&self) -> MemtableStats {
|
||||
let ts_meta = self.schema.column_metadata(self.schema.timestamp_index());
|
||||
|
||||
let Some(timestamp_type) = ts_meta.desc.data_type.as_timestamp() else {
|
||||
// safety: timestamp column always has timestamp type, otherwise it's a bug.
|
||||
panic!("Timestamp column is not a valid timestamp type: {:?}", self.schema);
|
||||
};
|
||||
|
||||
MemtableStats {
|
||||
estimated_bytes: self.alloc_tracker.bytes_allocated(),
|
||||
max_timestamp: self.max_timestamp.load(AtomicOrdering::Relaxed),
|
||||
min_timestamp: self.min_timestamp.load(AtomicOrdering::Relaxed),
|
||||
max_timestamp: timestamp_type
|
||||
.create_timestamp(self.max_timestamp.load(AtomicOrdering::Relaxed)),
|
||||
min_timestamp: timestamp_type
|
||||
.create_timestamp(self.min_timestamp.load(AtomicOrdering::Relaxed)),
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -177,8 +177,8 @@ mod tests {
|
||||
min_ts: i64,
|
||||
) {
|
||||
let iter = mem.iter(&IterContext::default()).unwrap();
|
||||
assert_eq!(min_ts, mem.stats().min_timestamp);
|
||||
assert_eq!(max_ts, mem.stats().max_timestamp);
|
||||
assert_eq!(min_ts, mem.stats().min_timestamp.value());
|
||||
assert_eq!(max_ts, mem.stats().max_timestamp.value());
|
||||
|
||||
let mut index = 0;
|
||||
for batch in iter {
|
||||
|
||||
@@ -33,6 +33,11 @@ pub struct WindowedReader<R> {
|
||||
}
|
||||
|
||||
impl<R> WindowedReader<R> {
|
||||
/// Creates a new [WindowedReader] from given schema and a set of boxed readers.
|
||||
///
|
||||
/// ### Note
|
||||
/// [WindowedReader] always reads the readers in a reverse order. The last reader in `readers`
|
||||
/// gets polled first.
|
||||
pub fn new(schema: ProjectedSchemaRef, readers: Vec<R>) -> Self {
|
||||
Self { schema, readers }
|
||||
}
|
||||
|
||||
@@ -14,6 +14,48 @@
|
||||
|
||||
//! Region tests.
|
||||
|
||||
use std::collections::{HashMap, HashSet};
|
||||
|
||||
use common_base::readable_size::ReadableSize;
|
||||
use common_datasource::compression::CompressionType;
|
||||
use common_telemetry::logging;
|
||||
use common_test_util::temp_dir::{create_temp_dir, TempDir};
|
||||
use common_time::range::TimestampRange;
|
||||
use common_time::timestamp::TimeUnit;
|
||||
use datatypes::prelude::{LogicalTypeId, ScalarVector, WrapperType};
|
||||
use datatypes::timestamp::TimestampMillisecond;
|
||||
use datatypes::vectors::{
|
||||
BooleanVector, Int64Vector, StringVector, TimestampMillisecondVector, VectorRef,
|
||||
};
|
||||
use log_store::raft_engine::log_store::RaftEngineLogStore;
|
||||
use log_store::NoopLogStore;
|
||||
use object_store::services::Fs;
|
||||
use object_store::ObjectStore;
|
||||
use store_api::manifest::{Manifest, MAX_VERSION};
|
||||
use store_api::storage::{
|
||||
Chunk, ChunkReader, FlushContext, FlushReason, ReadContext, Region, RegionMeta, ScanRequest,
|
||||
SequenceNumber, Snapshot, WriteContext, WriteRequest,
|
||||
};
|
||||
|
||||
use super::*;
|
||||
use crate::chunk::ChunkReaderImpl;
|
||||
use crate::compaction::noop::NoopCompactionScheduler;
|
||||
use crate::engine;
|
||||
use crate::engine::RegionMap;
|
||||
use crate::file_purger::noop::NoopFilePurgeHandler;
|
||||
use crate::flush::{FlushScheduler, PickerConfig, SizeBasedStrategy};
|
||||
use crate::manifest::action::{RegionChange, RegionMetaActionList};
|
||||
use crate::manifest::manifest_compress_type;
|
||||
use crate::manifest::region::RegionManifest;
|
||||
use crate::manifest::test_utils::*;
|
||||
use crate::memtable::DefaultMemtableBuilder;
|
||||
use crate::metadata::RegionMetadata;
|
||||
use crate::region::{RegionImpl, StoreConfig};
|
||||
use crate::scheduler::{LocalScheduler, SchedulerConfig};
|
||||
use crate::sst::{FileId, FsAccessLayer};
|
||||
use crate::test_util::descriptor_util::RegionDescBuilder;
|
||||
use crate::test_util::{self, config_util, schema_util, write_batch_util};
|
||||
|
||||
mod alter;
|
||||
mod basic;
|
||||
mod close;
|
||||
@@ -21,35 +63,6 @@ mod compact;
|
||||
mod flush;
|
||||
mod projection;
|
||||
|
||||
use std::collections::{HashMap, HashSet};
|
||||
|
||||
use common_telemetry::logging;
|
||||
use common_test_util::temp_dir::create_temp_dir;
|
||||
use datatypes::prelude::{ScalarVector, WrapperType};
|
||||
use datatypes::timestamp::TimestampMillisecond;
|
||||
use datatypes::type_id::LogicalTypeId;
|
||||
use datatypes::vectors::{Int64Vector, TimestampMillisecondVector, VectorRef};
|
||||
use log_store::raft_engine::log_store::RaftEngineLogStore;
|
||||
use log_store::NoopLogStore;
|
||||
use object_store::services::Fs;
|
||||
use object_store::ObjectStore;
|
||||
use store_api::manifest::MAX_VERSION;
|
||||
use store_api::storage::{
|
||||
Chunk, ChunkReader, RegionMeta, ScanRequest, SequenceNumber, Snapshot, WriteRequest,
|
||||
};
|
||||
|
||||
use super::*;
|
||||
use crate::chunk::ChunkReaderImpl;
|
||||
use crate::file_purger::noop::NoopFilePurgeHandler;
|
||||
use crate::manifest::action::{RegionChange, RegionMetaActionList};
|
||||
use crate::manifest::manifest_compress_type;
|
||||
use crate::manifest::test_utils::*;
|
||||
use crate::memtable::DefaultMemtableBuilder;
|
||||
use crate::scheduler::{LocalScheduler, SchedulerConfig};
|
||||
use crate::sst::{FileId, FsAccessLayer};
|
||||
use crate::test_util::descriptor_util::RegionDescBuilder;
|
||||
use crate::test_util::{self, config_util, schema_util, write_batch_util};
|
||||
|
||||
/// Create metadata of a region with schema: (timestamp, v0).
|
||||
pub fn new_metadata(region_name: &str) -> RegionMetadata {
|
||||
let desc = RegionDescBuilder::new(region_name)
|
||||
@@ -464,3 +477,231 @@ fn assert_recovered_manifest(
|
||||
files
|
||||
);
|
||||
}
|
||||
|
||||
fn create_region_meta(region_name: &str) -> RegionMetadata {
|
||||
let desc = RegionDescBuilder::new(region_name)
|
||||
.push_field_column(("v0", LogicalTypeId::Int64, true))
|
||||
.push_field_column(("v1", LogicalTypeId::String, true))
|
||||
.push_field_column(("v2", LogicalTypeId::Boolean, true))
|
||||
.build();
|
||||
desc.try_into().unwrap()
|
||||
}
|
||||
|
||||
async fn create_store_config(region_name: &str, root: &str) -> StoreConfig<NoopLogStore> {
|
||||
let mut builder = Fs::default();
|
||||
builder.root(root);
|
||||
let object_store = ObjectStore::new(builder).unwrap().finish();
|
||||
let parent_dir = "";
|
||||
let sst_dir = engine::region_sst_dir(parent_dir, region_name);
|
||||
let manifest_dir = engine::region_manifest_dir(parent_dir, region_name);
|
||||
|
||||
let sst_layer = Arc::new(FsAccessLayer::new(&sst_dir, object_store.clone()));
|
||||
let manifest = RegionManifest::with_checkpointer(
|
||||
&manifest_dir,
|
||||
object_store,
|
||||
CompressionType::Uncompressed,
|
||||
None,
|
||||
None,
|
||||
);
|
||||
manifest.start().await.unwrap();
|
||||
|
||||
let compaction_scheduler = Arc::new(NoopCompactionScheduler::default());
|
||||
|
||||
let regions = Arc::new(RegionMap::new());
|
||||
|
||||
let flush_scheduler = Arc::new(
|
||||
FlushScheduler::new(
|
||||
SchedulerConfig::default(),
|
||||
compaction_scheduler.clone(),
|
||||
regions,
|
||||
PickerConfig::default(),
|
||||
)
|
||||
.unwrap(),
|
||||
);
|
||||
|
||||
let log_store = Arc::new(NoopLogStore::default());
|
||||
|
||||
let file_purger = Arc::new(LocalScheduler::new(
|
||||
SchedulerConfig::default(),
|
||||
NoopFilePurgeHandler,
|
||||
));
|
||||
StoreConfig {
|
||||
log_store,
|
||||
sst_layer,
|
||||
manifest,
|
||||
memtable_builder: Arc::new(DefaultMemtableBuilder::default()),
|
||||
flush_scheduler,
|
||||
flush_strategy: Arc::new(SizeBasedStrategy::default()),
|
||||
compaction_scheduler,
|
||||
engine_config: Default::default(),
|
||||
file_purger,
|
||||
ttl: None,
|
||||
compaction_time_window: None,
|
||||
write_buffer_size: ReadableSize::mb(32).0 as usize,
|
||||
}
|
||||
}
|
||||
|
||||
struct WindowedReaderTester {
|
||||
data_written: Vec<Vec<(i64, i64, String, bool)>>,
|
||||
time_windows: Vec<(i64, i64)>,
|
||||
expected_ts: Vec<i64>,
|
||||
region: RegionImpl<NoopLogStore>,
|
||||
_temp_dir: TempDir,
|
||||
}
|
||||
|
||||
impl WindowedReaderTester {
|
||||
async fn new(
|
||||
region_name: &'static str,
|
||||
data_written: Vec<Vec<(i64, i64, String, bool)>>,
|
||||
time_windows: Vec<(i64, i64)>,
|
||||
expected_ts: Vec<i64>,
|
||||
) -> Self {
|
||||
let temp_dir = create_temp_dir(&format!("write_and_read_windowed_{}", region_name));
|
||||
let root = temp_dir.path().to_str().unwrap();
|
||||
let metadata = create_region_meta(region_name);
|
||||
let store_config = create_store_config(region_name, root).await;
|
||||
let region = RegionImpl::create(metadata, store_config).await.unwrap();
|
||||
|
||||
let tester = Self {
|
||||
data_written,
|
||||
time_windows,
|
||||
expected_ts,
|
||||
region,
|
||||
_temp_dir: temp_dir,
|
||||
};
|
||||
tester.prepare().await;
|
||||
tester
|
||||
}
|
||||
|
||||
async fn prepare(&self) {
|
||||
for batch in &self.data_written {
|
||||
let mut write_batch = self.region.write_request();
|
||||
let ts = TimestampMillisecondVector::from_iterator(
|
||||
batch
|
||||
.iter()
|
||||
.map(|(v, _, _, _)| TimestampMillisecond::new(*v)),
|
||||
);
|
||||
let v0 = Int64Vector::from_iterator(batch.iter().map(|(_, v, _, _)| *v));
|
||||
let v1 = StringVector::from_iterator(batch.iter().map(|(_, _, v, _)| v.as_str()));
|
||||
let v2 = BooleanVector::from_iterator(batch.iter().map(|(_, _, _, v)| *v));
|
||||
|
||||
let columns = [
|
||||
("timestamp".to_string(), Arc::new(ts) as VectorRef),
|
||||
("v0".to_string(), Arc::new(v0) as VectorRef),
|
||||
("v1".to_string(), Arc::new(v1) as VectorRef),
|
||||
("v2".to_string(), Arc::new(v2) as VectorRef),
|
||||
]
|
||||
.into_iter()
|
||||
.collect::<HashMap<String, VectorRef>>();
|
||||
write_batch.put(columns).unwrap();
|
||||
|
||||
self.region
|
||||
.write(&WriteContext {}, write_batch)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// flush the region to ensure data resides across SST files.
|
||||
self.region
|
||||
.flush(&FlushContext {
|
||||
wait: true,
|
||||
reason: FlushReason::Others,
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
async fn check(&self) {
|
||||
let windows = self
|
||||
.time_windows
|
||||
.iter()
|
||||
.map(|(start, end)| {
|
||||
TimestampRange::with_unit(*start, *end, TimeUnit::Millisecond).unwrap()
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let read_context = ReadContext::default();
|
||||
let snapshot = self.region.snapshot(&read_context).unwrap();
|
||||
let response = snapshot
|
||||
.windowed_scan(
|
||||
&read_context,
|
||||
ScanRequest {
|
||||
sequence: None,
|
||||
projection: None,
|
||||
filters: vec![],
|
||||
},
|
||||
windows,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let mut timestamps = Vec::with_capacity(self.expected_ts.len());
|
||||
|
||||
let mut reader = response.reader;
|
||||
let ts_index = reader.user_schema().timestamp_index().unwrap();
|
||||
while let Some(chunk) = reader.next_chunk().await.unwrap() {
|
||||
let ts_col = &chunk.columns[ts_index];
|
||||
let ts_col = ts_col
|
||||
.as_any()
|
||||
.downcast_ref::<TimestampMillisecondVector>()
|
||||
.unwrap();
|
||||
for ts in ts_col.iter_data() {
|
||||
timestamps.push(ts.unwrap().0.value());
|
||||
}
|
||||
}
|
||||
assert_eq!(timestamps, self.expected_ts);
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_read_by_chunk_reader() {
|
||||
common_telemetry::init_default_ut_logging();
|
||||
|
||||
WindowedReaderTester::new(
|
||||
"test_region",
|
||||
vec![vec![(1, 1, "1".to_string(), false)]],
|
||||
vec![(1, 2)],
|
||||
vec![1],
|
||||
)
|
||||
.await
|
||||
.check()
|
||||
.await;
|
||||
|
||||
WindowedReaderTester::new(
|
||||
"test_region",
|
||||
vec![
|
||||
vec![
|
||||
(1, 1, "1".to_string(), false),
|
||||
(2, 2, "2".to_string(), false),
|
||||
],
|
||||
vec![
|
||||
(3, 3, "3".to_string(), false),
|
||||
(4, 4, "4".to_string(), false),
|
||||
],
|
||||
],
|
||||
vec![(1, 2), (2, 3), (3, 4), (4, 5)],
|
||||
vec![4, 3, 2, 1],
|
||||
)
|
||||
.await
|
||||
.check()
|
||||
.await;
|
||||
|
||||
WindowedReaderTester::new(
|
||||
"test_region",
|
||||
vec![
|
||||
vec![
|
||||
(1, 1, "1".to_string(), false),
|
||||
(2, 2, "2".to_string(), false),
|
||||
],
|
||||
vec![
|
||||
(3, 3, "3".to_string(), false),
|
||||
(4, 4, "4".to_string(), false),
|
||||
],
|
||||
],
|
||||
vec![(1, 2), (2, 3), (4, 5), (3, 4)],
|
||||
vec![3, 4, 2, 1],
|
||||
)
|
||||
.await
|
||||
.check()
|
||||
.await;
|
||||
}
|
||||
|
||||
329
src/storage/src/window_infer.rs
Normal file
329
src/storage/src/window_infer.rs
Normal file
@@ -0,0 +1,329 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::collections::HashSet;
|
||||
|
||||
use common_time::range::TimestampRange;
|
||||
use common_time::timestamp::TimeUnit;
|
||||
use common_time::timestamp_millis::BucketAligned;
|
||||
use itertools::Itertools;
|
||||
|
||||
use crate::memtable::MemtableStats;
|
||||
use crate::sst::FileMeta;
|
||||
|
||||
/// A set of predefined time windows.
|
||||
const TIME_WINDOW_SIZE: [i64; 9] = [
|
||||
60, // 1 minute
|
||||
60 * 10, // 10 minute
|
||||
60 * 30, // 30 minute
|
||||
60 * 60, // 1 hour
|
||||
2 * 60 * 60, // 2 hours
|
||||
6 * 60 * 60, // 6 hours
|
||||
12 * 60 * 60, // 12 hours
|
||||
24 * 60 * 60, // 1 day
|
||||
7 * 24 * 60 * 60, // 1 week
|
||||
];
|
||||
|
||||
/// [WindowInfer] infers the time windows that can be used to optimize table scans ordered by
|
||||
/// timestamp column or have explicit time windows. By splitting time spans of tables into
|
||||
/// time windows, we can scan entries window by window.
|
||||
pub(crate) trait WindowInfer {
|
||||
/// Infers time windows according to the SST files and memtables.
|
||||
///
|
||||
/// ### Note
|
||||
/// The order of returned vector defines how records are yielded.
|
||||
fn infer_window(&self, files: &[FileMeta], mem_tables: &[MemtableStats])
|
||||
-> Vec<TimestampRange>;
|
||||
}
|
||||
|
||||
/// [PlainWindowInference] simply finds the minimum time span within all SST files in level 0 and
|
||||
/// memtables, matches that time span into a set of predefined time windows.
|
||||
pub(crate) struct PlainWindowInference;
|
||||
|
||||
impl WindowInfer for PlainWindowInference {
|
||||
fn infer_window(
|
||||
&self,
|
||||
files: &[FileMeta],
|
||||
mem_tables: &[MemtableStats],
|
||||
) -> Vec<TimestampRange> {
|
||||
let mut min_duration_sec = i64::MAX;
|
||||
let mut durations = Vec::with_capacity(files.len() + mem_tables.len());
|
||||
|
||||
for meta in files {
|
||||
if let Some((start, end)) = &meta.time_range {
|
||||
// unwrap safety: converting timestamps with any unit to seconds won't overflow.
|
||||
let start_sec = start.convert_to(TimeUnit::Second).unwrap().value();
|
||||
// file timestamp range end is inclusive
|
||||
let end_sec = end.convert_to_ceil(TimeUnit::Second).unwrap().value();
|
||||
debug_assert!(end_sec >= start_sec);
|
||||
if meta.level == 0 {
|
||||
// only level 0 is involved when calculating time windows.
|
||||
min_duration_sec = min_duration_sec.min(end_sec - start_sec);
|
||||
}
|
||||
durations.push((start_sec, end_sec));
|
||||
}
|
||||
}
|
||||
|
||||
for stats in mem_tables {
|
||||
// unwrap safety: converting timestamps with any unit to seconds won't overflow.
|
||||
let start_sec = stats
|
||||
.min_timestamp
|
||||
.convert_to(TimeUnit::Second)
|
||||
.unwrap()
|
||||
.value();
|
||||
let end_sec = stats
|
||||
.max_timestamp
|
||||
.convert_to_ceil(TimeUnit::Second)
|
||||
.unwrap()
|
||||
.value();
|
||||
min_duration_sec = min_duration_sec.min(end_sec - start_sec);
|
||||
durations.push((start_sec, end_sec));
|
||||
}
|
||||
|
||||
let window_size = min_duration_to_window_size(min_duration_sec);
|
||||
align_time_spans_to_windows(&durations, window_size)
|
||||
.into_iter()
|
||||
.sorted_by(|(l_start, _), (r_start, _)| r_start.cmp(l_start)) // sort time windows in descending order
|
||||
// unwrap safety: we ensure that end>=start so that TimestampRange::with_unit won't return None
|
||||
.map(|(start, end)| TimestampRange::with_unit(start, end, TimeUnit::Second).unwrap())
|
||||
.collect()
|
||||
}
|
||||
}
|
||||
|
||||
/// Given a set of time spans and a min duration, this function aligns the time spans to windows that
|
||||
/// collectively covers all the time spans.
|
||||
///
|
||||
/// For example, given time span `[1, 6)` and duration 5, the span can be aligned and split to
|
||||
/// two windows with length 5: `[0, 5)` and `[5, 10]`, and these two windows can cover the original
|
||||
/// span `[1, 6)`.
|
||||
fn align_time_spans_to_windows(durations: &[(i64, i64)], min_duration: i64) -> HashSet<(i64, i64)> {
|
||||
let mut res = HashSet::new();
|
||||
for (start, end) in durations {
|
||||
let mut next = *start;
|
||||
while next <= *end {
|
||||
let next_aligned = next.align_by_bucket(min_duration).unwrap_or(i64::MIN);
|
||||
if let Some(next_end_aligned) = next_aligned.checked_add(min_duration) {
|
||||
res.insert((next_aligned, next_end_aligned));
|
||||
next = next_end_aligned;
|
||||
} else {
|
||||
// arithmetic overflow, clamp to i64::MAX and break the loop.
|
||||
res.insert((next_aligned, i64::MAX));
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
res
|
||||
}
|
||||
|
||||
/// Find the most suitable time window size according to the `min_duration` found across all
|
||||
/// SST files and memtables through a binary search.
|
||||
fn min_duration_to_window_size(min_duration: i64) -> i64 {
|
||||
match TIME_WINDOW_SIZE.binary_search(&min_duration) {
|
||||
Ok(idx) => TIME_WINDOW_SIZE[idx],
|
||||
Err(idx) => {
|
||||
if idx < TIME_WINDOW_SIZE.len() {
|
||||
TIME_WINDOW_SIZE[idx]
|
||||
} else {
|
||||
TIME_WINDOW_SIZE.last().copied().unwrap()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use common_time::Timestamp;
|
||||
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_get_time_window_size() {
|
||||
assert_eq!(60, min_duration_to_window_size(0));
|
||||
for window in TIME_WINDOW_SIZE {
|
||||
assert_eq!(window, min_duration_to_window_size(window));
|
||||
}
|
||||
assert_eq!(60, min_duration_to_window_size(1));
|
||||
assert_eq!(60 * 10, min_duration_to_window_size(100));
|
||||
assert_eq!(60 * 30, min_duration_to_window_size(1800));
|
||||
assert_eq!(60 * 60, min_duration_to_window_size(3000));
|
||||
assert_eq!(2 * 60 * 60, min_duration_to_window_size(4000));
|
||||
assert_eq!(6 * 60 * 60, min_duration_to_window_size(21599));
|
||||
assert_eq!(12 * 60 * 60, min_duration_to_window_size(21601));
|
||||
assert_eq!(24 * 60 * 60, min_duration_to_window_size(43201));
|
||||
assert_eq!(7 * 24 * 60 * 60, min_duration_to_window_size(604799));
|
||||
assert_eq!(7 * 24 * 60 * 60, min_duration_to_window_size(31535999));
|
||||
assert_eq!(7 * 24 * 60 * 60, min_duration_to_window_size(i64::MAX));
|
||||
}
|
||||
|
||||
fn check_align_durations_to_windows(
|
||||
durations: &[(i64, i64)],
|
||||
min_duration: i64,
|
||||
expected: &[(i64, i64)],
|
||||
) {
|
||||
let res = align_time_spans_to_windows(durations, min_duration);
|
||||
let expected = expected.iter().copied().collect::<HashSet<_>>();
|
||||
assert_eq!(res, expected);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_duration_to_windows() {
|
||||
check_align_durations_to_windows(&[(0, 1)], 2, &[(0, 2)]);
|
||||
check_align_durations_to_windows(&[(-3, 1)], 2, &[(-4, -2), (-2, 0), (0, 2)]);
|
||||
check_align_durations_to_windows(&[(1, 3)], 2, &[(0, 2), (2, 4)]);
|
||||
check_align_durations_to_windows(
|
||||
&[(i64::MIN, i64::MIN + 3)],
|
||||
2,
|
||||
&[(i64::MIN, i64::MIN + 2), (i64::MIN + 2, i64::MIN + 4)],
|
||||
);
|
||||
|
||||
check_align_durations_to_windows(
|
||||
&[(i64::MAX - 3, i64::MAX)],
|
||||
2,
|
||||
&[(i64::MAX - 3, i64::MAX - 1), (i64::MAX - 1, i64::MAX)],
|
||||
);
|
||||
|
||||
check_align_durations_to_windows(&[(-3, 10)], 7, &[(-7, 0), (0, 7), (7, 14)]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_multiple_duration_to_windows() {
|
||||
check_align_durations_to_windows(&[(0, 1), (1, 3)], 3, &[(0, 3), (3, 6)]);
|
||||
check_align_durations_to_windows(&[(0, 1), (1, 2), (7, 11)], 3, &[(0, 3), (6, 9), (9, 12)]);
|
||||
|
||||
check_align_durations_to_windows(
|
||||
&[(-2, 1), (i64::MAX - 2, i64::MAX)],
|
||||
3,
|
||||
&[
|
||||
(-3, 0),
|
||||
(0, 3),
|
||||
(i64::MAX - 4, i64::MAX - 1),
|
||||
(i64::MAX - 1, i64::MAX),
|
||||
],
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_plain_window_inference() {
|
||||
let window_inference = PlainWindowInference {};
|
||||
|
||||
let res = window_inference.infer_window(
|
||||
&[FileMeta {
|
||||
time_range: Some((
|
||||
Timestamp::new(1000, TimeUnit::Millisecond),
|
||||
Timestamp::new(3000, TimeUnit::Millisecond),
|
||||
)),
|
||||
..Default::default()
|
||||
}],
|
||||
&[MemtableStats {
|
||||
max_timestamp: Timestamp::new(3001, TimeUnit::Millisecond),
|
||||
min_timestamp: Timestamp::new(2001, TimeUnit::Millisecond),
|
||||
..Default::default()
|
||||
}],
|
||||
);
|
||||
assert_eq!(
|
||||
vec![TimestampRange::with_unit(0, 60, TimeUnit::Second).unwrap()],
|
||||
res
|
||||
);
|
||||
|
||||
let res = window_inference.infer_window(
|
||||
&[FileMeta {
|
||||
time_range: Some((
|
||||
Timestamp::new(0, TimeUnit::Millisecond),
|
||||
Timestamp::new(60 * 1000 + 1, TimeUnit::Millisecond),
|
||||
)),
|
||||
..Default::default()
|
||||
}],
|
||||
&[MemtableStats {
|
||||
max_timestamp: Timestamp::new(3001, TimeUnit::Millisecond),
|
||||
min_timestamp: Timestamp::new(2001, TimeUnit::Millisecond),
|
||||
..Default::default()
|
||||
}],
|
||||
);
|
||||
assert_eq!(
|
||||
vec![
|
||||
TimestampRange::with_unit(60, 120, TimeUnit::Second).unwrap(),
|
||||
TimestampRange::with_unit(0, 60, TimeUnit::Second).unwrap(),
|
||||
],
|
||||
res
|
||||
);
|
||||
|
||||
let res = window_inference.infer_window(
|
||||
&[
|
||||
FileMeta {
|
||||
time_range: Some((
|
||||
Timestamp::new(0, TimeUnit::Millisecond),
|
||||
Timestamp::new(60 * 1000 + 1, TimeUnit::Millisecond),
|
||||
)),
|
||||
..Default::default()
|
||||
},
|
||||
FileMeta {
|
||||
time_range: Some((
|
||||
Timestamp::new(60 * 60 * 1000, TimeUnit::Millisecond),
|
||||
Timestamp::new(60 * 60 * 1000 + 1, TimeUnit::Millisecond),
|
||||
)),
|
||||
..Default::default()
|
||||
},
|
||||
],
|
||||
&[MemtableStats {
|
||||
max_timestamp: Timestamp::new(3001, TimeUnit::Millisecond),
|
||||
min_timestamp: Timestamp::new(2001, TimeUnit::Millisecond),
|
||||
..Default::default()
|
||||
}],
|
||||
);
|
||||
assert_eq!(
|
||||
vec![
|
||||
TimestampRange::with_unit(60 * 60, 61 * 60, TimeUnit::Second).unwrap(),
|
||||
TimestampRange::with_unit(60, 120, TimeUnit::Second).unwrap(),
|
||||
TimestampRange::with_unit(0, 60, TimeUnit::Second).unwrap(),
|
||||
],
|
||||
res
|
||||
);
|
||||
|
||||
let res = window_inference.infer_window(
|
||||
&[
|
||||
FileMeta {
|
||||
time_range: Some((
|
||||
Timestamp::new(0, TimeUnit::Millisecond),
|
||||
Timestamp::new(60 * 1000, TimeUnit::Millisecond),
|
||||
)),
|
||||
level: 1, // this SST will be ignored
|
||||
..Default::default()
|
||||
},
|
||||
FileMeta {
|
||||
time_range: Some((
|
||||
Timestamp::new(0, TimeUnit::Millisecond),
|
||||
Timestamp::new(10 * 60 * 1000, TimeUnit::Millisecond),
|
||||
)),
|
||||
..Default::default()
|
||||
},
|
||||
],
|
||||
&[MemtableStats {
|
||||
max_timestamp: Timestamp::new(60 * 30 * 1000 + 1, TimeUnit::Millisecond),
|
||||
min_timestamp: Timestamp::new(0, TimeUnit::Millisecond),
|
||||
..Default::default()
|
||||
}],
|
||||
);
|
||||
|
||||
// inferred window size should be 600 sec
|
||||
assert_eq!(
|
||||
vec![
|
||||
TimestampRange::with_unit(1800, 2400, TimeUnit::Second).unwrap(),
|
||||
TimestampRange::with_unit(1200, 1800, TimeUnit::Second).unwrap(),
|
||||
TimestampRange::with_unit(600, 1200, TimeUnit::Second).unwrap(),
|
||||
TimestampRange::with_unit(0, 600, TimeUnit::Second).unwrap(),
|
||||
],
|
||||
res
|
||||
);
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user