From 43aefc5d74dfa73b7819cae77b7eb546d8534a41 Mon Sep 17 00:00:00 2001 From: "Lei, HUANG" <6406592+v0y4g3r@users.noreply.github.com> Date: Sat, 28 Jan 2023 15:16:41 +0800 Subject: [PATCH] feat: prunine sst files according to time range in filters (#887) * 1. Reimplement Eq for Timestamp 2. Add and/or for GenericRange * feat: extract time range from filters * feat: select sst files according to time range * fix: clippy * fix: empty value in range * fix: some cr comments * fix: return optional timestamp range * fix: cr comments --- Cargo.lock | 2 + src/common/time/Cargo.toml | 1 + src/common/time/src/error.rs | 24 ++- src/common/time/src/range.rs | 164 ++++++++++++--- src/common/time/src/timestamp.rs | 2 +- src/common/time/src/timestamp_millis.rs | 2 +- src/datatypes/Cargo.toml | 1 + src/datatypes/src/value.rs | 25 ++- src/query/tests/time_range_filter_test.rs | 241 ++++++++++++++++++++++ src/storage/src/chunk.rs | 30 ++- src/storage/src/sst.rs | 12 ++ src/table/src/predicate.rs | 195 ++++++++++++++++- 12 files changed, 667 insertions(+), 32 deletions(-) create mode 100644 src/query/tests/time_range_filter_test.rs diff --git a/Cargo.lock b/Cargo.lock index df25fd5a87..8d180502f5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1592,6 +1592,7 @@ name = "common-time" version = "0.1.0" dependencies = [ "chrono", + "common-error", "rand 0.8.5", "serde", "serde_json", @@ -2210,6 +2211,7 @@ dependencies = [ "arrow-schema", "common-base", "common-error", + "common-telemetry", "common-time", "datafusion-common", "enum_dispatch", diff --git a/src/common/time/Cargo.toml b/src/common/time/Cargo.toml index 799be63531..a55ab53eb1 100644 --- a/src/common/time/Cargo.toml +++ b/src/common/time/Cargo.toml @@ -6,6 +6,7 @@ license.workspace = true [dependencies] chrono = "0.4" +common-error = { path = "../error" } serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" snafu = { version = "0.7", features = ["backtraces"] } diff --git a/src/common/time/src/error.rs b/src/common/time/src/error.rs index a6d722c480..b99af96803 100644 --- a/src/common/time/src/error.rs +++ b/src/common/time/src/error.rs @@ -12,8 +12,12 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::any::Any; + use chrono::ParseError; -use snafu::{Backtrace, Snafu}; +use common_error::ext::ErrorExt; +use common_error::prelude::StatusCode; +use snafu::{Backtrace, ErrorCompat, Snafu}; #[derive(Debug, Snafu)] #[snafu(visibility(pub))] @@ -24,6 +28,24 @@ pub enum Error { ParseTimestamp { raw: String, backtrace: Backtrace }, } +impl ErrorExt for Error { + fn status_code(&self) -> StatusCode { + match self { + Error::ParseDateStr { .. } | Error::ParseTimestamp { .. } => { + StatusCode::InvalidArguments + } + } + } + + fn backtrace_opt(&self) -> Option<&Backtrace> { + ErrorCompat::backtrace(self) + } + + fn as_any(&self) -> &dyn Any { + self + } +} + pub type Result = std::result::Result; #[cfg(test)] diff --git a/src/common/time/src/range.rs b/src/common/time/src/range.rs index 42971aa489..10720998e5 100644 --- a/src/common/time/src/range.rs +++ b/src/common/time/src/range.rs @@ -18,8 +18,9 @@ use crate::Timestamp; /// A half-open time range. /// -/// The time range contains all timestamp `ts` that `ts >= start` and `ts < end`. It is -/// empty if `start >= end`. +/// The range contains values that `value >= start` and `val < end`. +/// +/// The range is empty iff `start == end == "the default value of T"` #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] pub struct GenericRange { start: Option, @@ -28,8 +29,9 @@ pub struct GenericRange { impl GenericRange where - T: Copy + PartialOrd, + T: Copy + PartialOrd + Default, { + /// Computes the AND'ed range with other. pub fn and(&self, other: &GenericRange) -> GenericRange { let start = match (self.start(), other.start()) { (Some(l), Some(r)) => { @@ -57,7 +59,7 @@ where (None, None) => None, }; - Self { start, end } + Self::from_optional(start, end) } /// Compute the OR'ed range of two ranges. @@ -98,12 +100,44 @@ where (None, None) => None, }; - Self { start, end } + Self::from_optional(start, end) + } + + /// Checks if current range intersect with target. + pub fn intersects(&self, target: &GenericRange) -> bool { + !self.and(target).is_empty() + } + + /// Create an empty range. + pub fn empty() -> GenericRange { + GenericRange { + start: Some(T::default()), + end: Some(T::default()), + } + } + + /// Create GenericRange from optional start and end. + /// If the present value of start >= the present value of end, it will return an empty range + /// with the default value of `T`. + fn from_optional(start: Option, end: Option) -> GenericRange { + match (start, end) { + (Some(start_val), Some(end_val)) => { + if start_val < end_val { + Self { + start: Some(start_val), + end: Some(end_val), + } + } else { + Self::empty() + } + } + (s, e) => Self { start: s, end: e }, + } } } impl GenericRange { - /// Creates a new range that contains timestamp in `[start, end)`. + /// Creates a new range that contains values in `[start, end)`. /// /// Returns `None` if `start` > `end`. pub fn new>(start: U, end: U) -> Option> { @@ -115,14 +149,7 @@ impl GenericRange { } } - /// Given a value, creates an empty time range that `start == end == value`. - pub fn empty_with_value>(value: U) -> GenericRange { - GenericRange { - start: Some(value.clone().into()), - end: Some(value.into()), - } - } - + /// Return a range containing all possible values. pub fn min_to_max() -> GenericRange { Self { start: None, @@ -143,11 +170,11 @@ impl GenericRange { } /// Returns true if `timestamp` is contained in the range. - pub fn contains>(&self, timestamp: &U) -> bool { + pub fn contains>(&self, target: &U) -> bool { match (&self.start, &self.end) { - (Some(start), Some(end)) => *timestamp >= *start && *timestamp < *end, - (Some(start), None) => *timestamp >= *start, - (None, Some(end)) => *timestamp < *end, + (Some(start), Some(end)) => *target >= *start && *target < *end, + (Some(start), None) => *target >= *start, + (None, Some(end)) => *target < *end, (None, None) => true, } } @@ -158,19 +185,70 @@ impl GenericRange { #[inline] pub fn is_empty(&self) -> bool { match (&self.start, &self.end) { - (Some(start), Some(end)) => start >= end, + (Some(start), Some(end)) => start == end, _ => false, } } } pub type TimestampRange = GenericRange; + impl TimestampRange { + pub fn new_inclusive(start: Option, end: Option) -> Self { + let end = if let Some(end) = end { + end.value() + .checked_add(1) + .map(|v| Timestamp::new(v, end.unit())) + } else { + None + }; + Self::from_optional(start, end) + } + + /// Shortcut method to create a timestamp range with given start/end value and time unit. pub fn with_unit(start: i64, end: i64, unit: TimeUnit) -> Option { let start = Timestamp::new(start, unit); let end = Timestamp::new(end, unit); Self::new(start, end) } + + /// Create a range that containing only given `ts`. + /// ### Notice: + /// Left-close right-open range cannot properly represent range with a single value. + /// For simplicity, this implementation returns an approximate range `[ts, ts+1)` instead. + pub fn single(ts: Timestamp) -> Self { + let unit = ts.unit(); + let start = Some(ts); + let end = ts.value().checked_add(1).map(|v| Timestamp::new(v, unit)); + + Self::from_optional(start, end) + } + + /// Create a range `[start, INF)`. + /// ### Notice + /// Left-close right-open range cannot properly represent range with exclusive start like: `(start, ...)`. + /// You may resort to `[start-1, ...)` instead. + pub fn from_start(start: Timestamp) -> Self { + Self { + start: Some(start), + end: None, + } + } + + /// Create a range `[-INF, end)`. + /// ### Notice + /// Left-close right-open range cannot properly represent range with inclusive end like: `[..., END]`. + /// If `inclusive` is true, this method returns `[-INF, end+1)` instead. + pub fn until_end(end: Timestamp, inclusive: bool) -> Self { + let end = if inclusive { + end.value() + .checked_add(1) + .map(|v| Timestamp::new(v, end.unit())) + } else { + Some(end) + }; + Self { start: None, end } + } } /// Time range in milliseconds. @@ -197,9 +275,9 @@ mod tests { assert_eq!(None, RangeMillis::new(1, 0)); - let range = RangeMillis::empty_with_value(1024); + let range = RangeMillis::empty(); assert_eq!(range.start(), range.end()); - assert_eq!(Some(TimestampMillis::new(1024)), *range.start()); + assert_eq!(Some(TimestampMillis::new(0)), *range.start()); } #[test] @@ -295,9 +373,7 @@ mod tests { TimestampRange::min_to_max().or(&TimestampRange::min_to_max()) ); - let empty = TimestampRange::empty_with_value(Timestamp::new_millisecond(1)).or( - &TimestampRange::empty_with_value(Timestamp::new_millisecond(2)), - ); + let empty = TimestampRange::empty().or(&TimestampRange::empty()); assert!(empty.is_empty()); let t1 = TimestampRange::with_unit(-10, 0, TimeUnit::Second).unwrap(); @@ -310,4 +386,44 @@ mod tests { let t1 = TimestampRange::with_unit(-10, 0, TimeUnit::Second).unwrap(); assert_eq!(t1, t1.or(&t1)); } + + #[test] + fn test_intersect() { + let t1 = TimestampRange::with_unit(-10, 0, TimeUnit::Second).unwrap(); + let t2 = TimestampRange::with_unit(-30, -20, TimeUnit::Second).unwrap(); + assert!(!t1.intersects(&t2)); + + let t1 = TimestampRange::with_unit(10, 20, TimeUnit::Second).unwrap(); + let t2 = TimestampRange::with_unit(0, 30, TimeUnit::Second).unwrap(); + assert!(t1.intersects(&t2)); + + let t1 = TimestampRange::with_unit(-20, -10, TimeUnit::Second).unwrap(); + let t2 = TimestampRange::with_unit(-10, 0, TimeUnit::Second).unwrap(); + assert!(!t1.intersects(&t2)); + + let t1 = TimestampRange::with_unit(0, 1, TimeUnit::Second).unwrap(); + let t2 = TimestampRange::with_unit(999, 1000, TimeUnit::Millisecond).unwrap(); + assert!(t1.intersects(&t2)); + + let t1 = TimestampRange::with_unit(1, 2, TimeUnit::Second).unwrap(); + let t2 = TimestampRange::with_unit(1000, 2000, TimeUnit::Millisecond).unwrap(); + assert!(t1.intersects(&t2)); + + let t1 = TimestampRange::with_unit(0, 1, TimeUnit::Second).unwrap(); + assert!(t1.intersects(&t1)); + + let t1 = TimestampRange::with_unit(0, 1, TimeUnit::Second).unwrap(); + let t2 = TimestampRange::empty(); + assert!(!t1.intersects(&t2)); + + // empty range does not intersect with empty range + let empty = TimestampRange::empty(); + assert!(!empty.intersects(&empty)); + + // full range intersects with full range + let full = TimestampRange::min_to_max(); + assert!(full.intersects(&full)); + + assert!(!full.intersects(&empty)); + } } diff --git a/src/common/time/src/timestamp.rs b/src/common/time/src/timestamp.rs index 200f420c29..b326810a36 100644 --- a/src/common/time/src/timestamp.rs +++ b/src/common/time/src/timestamp.rs @@ -352,7 +352,7 @@ mod tests { } } - /// Generate timestamp less than or equal to `threshold` + /// Generate timestamp less than or equal to `threshold` fn gen_ts_le(threshold: &Timestamp) -> Timestamp { let mut rng = rand::thread_rng(); let timestamp = rng.gen_range(i64::MIN..=threshold.value); diff --git a/src/common/time/src/timestamp_millis.rs b/src/common/time/src/timestamp_millis.rs index 1968e21384..c06e10a13a 100644 --- a/src/common/time/src/timestamp_millis.rs +++ b/src/common/time/src/timestamp_millis.rs @@ -17,7 +17,7 @@ use std::cmp::Ordering; /// Unix timestamp in millisecond resolution. /// /// Negative timestamp is allowed, which represents timestamp before '1970-01-01T00:00:00'. -#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)] +#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Default)] pub struct TimestampMillis(i64); impl TimestampMillis { diff --git a/src/datatypes/Cargo.toml b/src/datatypes/Cargo.toml index 94994ebb6c..0d0158a3b0 100644 --- a/src/datatypes/Cargo.toml +++ b/src/datatypes/Cargo.toml @@ -14,6 +14,7 @@ arrow-schema.workspace = true common-base = { path = "../common/base" } common-error = { path = "../common/error" } common-time = { path = "../common/time" } +common-telemetry = { path = "../common/telemetry" } datafusion-common.workspace = true enum_dispatch = "0.3" num = "0.4" diff --git a/src/datatypes/src/value.rs b/src/datatypes/src/value.rs index 30364ab7c5..c1a3414f3c 100644 --- a/src/datatypes/src/value.rs +++ b/src/datatypes/src/value.rs @@ -14,9 +14,11 @@ use std::cmp::Ordering; use std::fmt::{Display, Formatter}; +use std::str::FromStr; use arrow::datatypes::{DataType as ArrowDataType, Field}; use common_base::bytes::{Bytes, StringBytes}; +use common_telemetry::logging; use common_time::date::Date; use common_time::datetime::DateTime; use common_time::timestamp::{TimeUnit, Timestamp}; @@ -25,7 +27,8 @@ pub use ordered_float::OrderedFloat; use serde::{Deserialize, Serialize}; use snafu::ensure; -use crate::error::{self, Result}; +use crate::error; +use crate::error::Result; use crate::prelude::*; use crate::type_id::LogicalTypeId; use crate::types::ListType; @@ -286,6 +289,26 @@ fn timestamp_to_scalar_value(unit: TimeUnit, val: Option) -> ScalarValue { } } +/// Convert [ScalarValue] to [Timestamp]. +/// Return `None` if given scalar value cannot be converted to a valid timestamp. +pub fn scalar_value_to_timestamp(scalar: &ScalarValue) -> Option { + match scalar { + ScalarValue::Int64(val) => val.map(Timestamp::new_millisecond), + ScalarValue::Utf8(Some(s)) => match Timestamp::from_str(s) { + Ok(t) => Some(t), + Err(e) => { + logging::error!(e;"Failed to convert string literal {s} to timestamp"); + None + } + }, + ScalarValue::TimestampSecond(v, _) => v.map(Timestamp::new_second), + ScalarValue::TimestampMillisecond(v, _) => v.map(Timestamp::new_millisecond), + ScalarValue::TimestampMicrosecond(v, _) => v.map(Timestamp::new_microsecond), + ScalarValue::TimestampNanosecond(v, _) => v.map(Timestamp::new_nanosecond), + _ => None, + } +} + macro_rules! impl_ord_for_value_like { ($Type: ident, $left: ident, $right: ident) => { if $left.is_null() && !$right.is_null() { diff --git a/src/query/tests/time_range_filter_test.rs b/src/query/tests/time_range_filter_test.rs new file mode 100644 index 0000000000..aed523c340 --- /dev/null +++ b/src/query/tests/time_range_filter_test.rs @@ -0,0 +1,241 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::any::Any; +use std::sync::Arc; + +use catalog::local::{new_memory_catalog_list, MemoryCatalogProvider, MemorySchemaProvider}; +use catalog::{CatalogList, CatalogProvider, SchemaProvider}; +use common_query::physical_plan::PhysicalPlanRef; +use common_query::prelude::Expr; +use common_recordbatch::RecordBatch; +use common_time::range::TimestampRange; +use common_time::timestamp::TimeUnit; +use common_time::Timestamp; +use datatypes::data_type::ConcreteDataType; +use datatypes::schema::{ColumnSchema, Schema, SchemaRef}; +use datatypes::vectors::{Int64Vector, TimestampMillisecondVector}; +use query::QueryEngineRef; +use session::context::QueryContext; +use table::metadata::{FilterPushDownType, TableInfoRef}; +use table::predicate::TimeRangePredicateBuilder; +use table::test_util::MemTable; +use table::Table; +use tokio::sync::RwLock; + +struct MemTableWrapper { + inner: MemTable, + filter: RwLock>, +} + +impl MemTableWrapper { + pub async fn get_filters(&self) -> Vec { + self.filter.write().await.drain(..).collect() + } +} + +#[async_trait::async_trait] +impl Table for MemTableWrapper { + fn as_any(&self) -> &dyn Any { + self + } + + fn schema(&self) -> SchemaRef { + self.inner.schema() + } + + fn table_info(&self) -> TableInfoRef { + self.inner.table_info() + } + + async fn scan( + &self, + projection: Option<&Vec>, + filters: &[Expr], + limit: Option, + ) -> table::Result { + *self.filter.write().await = filters.to_vec(); + self.inner.scan(projection, filters, limit).await + } + + fn supports_filter_pushdown(&self, _filter: &Expr) -> table::Result { + Ok(FilterPushDownType::Exact) + } +} + +fn create_test_engine() -> TimeRangeTester { + let schema = Schema::try_new(vec![ + ColumnSchema::new("v".to_string(), ConcreteDataType::int64_datatype(), false), + ColumnSchema::new( + "ts".to_string(), + ConcreteDataType::timestamp_millisecond_datatype(), + false, + ), + ]) + .unwrap(); + + let table = Arc::new(MemTableWrapper { + inner: MemTable::new( + "m", + RecordBatch::new( + Arc::new(schema), + vec![ + Arc::new(Int64Vector::from_slice((0..1000).collect::>())) as Arc<_>, + Arc::new(TimestampMillisecondVector::from_slice( + (0..1000).collect::>(), + )) as Arc<_>, + ], + ) + .unwrap(), + ), + filter: Default::default(), + }); + + let catalog_list = new_memory_catalog_list().unwrap(); + + let default_schema = Arc::new(MemorySchemaProvider::new()); + MemorySchemaProvider::register_table(&default_schema, "m".to_string(), table.clone()).unwrap(); + + let default_catalog = Arc::new(MemoryCatalogProvider::new()); + default_catalog + .register_schema("public".to_string(), default_schema) + .unwrap(); + catalog_list + .register_catalog("greptime".to_string(), default_catalog) + .unwrap(); + + let engine = query::QueryEngineFactory::new(catalog_list).query_engine(); + TimeRangeTester { engine, table } +} + +struct TimeRangeTester { + engine: QueryEngineRef, + table: Arc, +} + +impl TimeRangeTester { + async fn check(&self, sql: &str, expect: TimestampRange) { + let stmt = query::parser::QueryLanguageParser::parse_sql(sql).unwrap(); + let _ = self + .engine + .execute( + &self + .engine + .statement_to_plan(stmt, Arc::new(QueryContext::new())) + .unwrap(), + ) + .await + .unwrap(); + let filters = self.table.get_filters().await; + + let range = TimeRangePredicateBuilder::new("ts", &filters).build(); + assert_eq!(expect, range); + } +} + +#[tokio::test] +async fn test_range_filter() { + let tester = create_test_engine(); + tester + .check( + "select * from m where ts >= 990;", + TimestampRange::from_start(Timestamp::new(990, TimeUnit::Millisecond)), + ) + .await; + + tester + .check( + "select * from m where ts <=1000;", + TimestampRange::until_end(Timestamp::new(1000, TimeUnit::Millisecond), true), + ) + .await; + + tester + .check( + "select * from m where ts > 1000;", + TimestampRange::from_start(Timestamp::new(1000, TimeUnit::Millisecond)), + ) + .await; + + tester + .check( + "select * from m where ts < 1000;", + TimestampRange::until_end(Timestamp::new(1000, TimeUnit::Millisecond), false), + ) + .await; + + tester + .check( + "select * from m where ts > 1000 and ts < 2000;", + TimestampRange::with_unit(1000, 2000, TimeUnit::Millisecond).unwrap(), + ) + .await; + + tester + .check( + "select * from m where ts >= 1000 or ts < 0;", + TimestampRange::min_to_max(), + ) + .await; + + tester + .check( + "select * from m where (ts >= 1000 and ts < 2000) or (ts>=3000 and ts<4000);", + TimestampRange::with_unit(1000, 4000, TimeUnit::Millisecond).unwrap(), + ) + .await; + + // sql's between is inclusive in both ends + tester + .check( + "select * from m where ts between 1000 and 2000", + TimestampRange::with_unit(1000, 2001, TimeUnit::Millisecond).unwrap(), + ) + .await; + + tester + .check( + "select * from m where ts in (10, 20, 30, 40)", + TimestampRange::with_unit(10, 41, TimeUnit::Millisecond).unwrap(), + ) + .await; + + tester + .check( + "select * from m where ts=1000", + TimestampRange::with_unit(1000, 1001, TimeUnit::Millisecond).unwrap(), + ) + .await; + + tester + .check( + "select * from m where ts=1000 or ts=2000", + TimestampRange::with_unit(1000, 2001, TimeUnit::Millisecond).unwrap(), + ) + .await; + + tester + .check( + "select * from m where ts>='2023-01-16 17:01:57+08:00'", + TimestampRange::from_start(Timestamp::new(1673859717000, TimeUnit::Millisecond)), + ) + .await; + + tester + .check( + "select * from m where ts > 10 and ts < 9", + TimestampRange::empty(), + ) + .await; +} diff --git a/src/storage/src/chunk.rs b/src/storage/src/chunk.rs index 06b6ab4025..62d5e8250e 100644 --- a/src/storage/src/chunk.rs +++ b/src/storage/src/chunk.rs @@ -16,9 +16,11 @@ use std::sync::Arc; use async_trait::async_trait; use common_query::logical_plan::Expr; +use common_telemetry::debug; +use common_time::range::TimestampRange; use snafu::ResultExt; use store_api::storage::{Chunk, ChunkReader, SchemaRef, SequenceNumber}; -use table::predicate::Predicate; +use table::predicate::{Predicate, TimeRangePredicateBuilder}; use crate::error::{self, Error, Result}; use crate::memtable::{IterContext, MemtableRef}; @@ -126,6 +128,7 @@ impl ChunkReaderBuilder { } pub async fn build(mut self) -> Result { + let time_range_predicate = self.build_time_range_predicate(); let schema = Arc::new( ProjectedSchema::new(self.schema, self.projection) .context(error::InvalidProjectionSnafu)?, @@ -147,6 +150,13 @@ impl ChunkReaderBuilder { predicate: Predicate::new(self.filters), }; for file in &self.files_to_read { + if !Self::file_in_range(file, time_range_predicate) { + debug!( + "Skip file {:?}, predicate: {:?}", + file, time_range_predicate + ); + continue; + } let reader = self .sst_layer .read_sst(file.file_name(), &read_opts) @@ -160,6 +170,24 @@ impl ChunkReaderBuilder { Ok(ChunkReaderImpl::new(schema, Box::new(reader))) } + + /// Build time range predicate from schema and filters. + pub fn build_time_range_predicate(&self) -> TimestampRange { + let Some(ts_col) = self.schema.user_schema().timestamp_column() else { return TimestampRange::min_to_max() }; + TimeRangePredicateBuilder::new(&ts_col.name, &self.filters).build() + } + + /// Check if SST file's time range matches predicate. + #[inline] + fn file_in_range(file: &FileHandle, predicate: TimestampRange) -> bool { + if predicate == TimestampRange::min_to_max() { + return true; + } + // end_timestamp of sst file is inclusive. + let file_ts_range = + TimestampRange::new_inclusive(file.start_timestamp(), file.end_timestamp()); + file_ts_range.intersects(&predicate) + } } impl Visitor for ChunkReaderBuilder { diff --git a/src/storage/src/sst.rs b/src/storage/src/sst.rs index c7fd8bceb2..4d60853cb3 100644 --- a/src/storage/src/sst.rs +++ b/src/storage/src/sst.rs @@ -157,6 +157,18 @@ impl FileHandle { pub fn file_name(&self) -> &str { &self.inner.meta.file_name } + + /// Return the start timestamp of current SST file. + #[inline] + pub fn start_timestamp(&self) -> Option { + self.inner.meta.start_timestamp + } + + /// Return the end timestamp of current SST file. + #[inline] + pub fn end_timestamp(&self) -> Option { + self.inner.meta.end_timestamp + } } /// Actually data of [FileHandle]. diff --git a/src/table/src/predicate.rs b/src/table/src/predicate.rs index d4537e775b..778a81824e 100644 --- a/src/table/src/predicate.rs +++ b/src/table/src/predicate.rs @@ -12,16 +12,20 @@ // See the License for the specific language governing permissions and // limitations under the License. -mod stats; - -use common_query::logical_plan::Expr; +use common_query::logical_plan::{DfExpr, Expr}; use common_telemetry::{error, warn}; +use common_time::range::TimestampRange; +use common_time::Timestamp; use datafusion::parquet::file::metadata::RowGroupMetaData; use datafusion::physical_optimizer::pruning::PruningPredicate; +use datafusion_expr::{Between, BinaryExpr, Operator}; use datatypes::schema::SchemaRef; +use datatypes::value::scalar_value_to_timestamp; use crate::predicate::stats::RowGroupPruningStatistics; +mod stats; + #[derive(Default, Clone)] pub struct Predicate { exprs: Vec, @@ -66,6 +70,191 @@ impl Predicate { } } +// tests for `TimeRangePredicateBuilder` locates in src/query/tests/time_range_filter_test.rs +// since it requires query engine to convert sql to filters. +pub struct TimeRangePredicateBuilder<'a> { + ts_col_name: &'a str, + filters: &'a [Expr], +} + +impl<'a> TimeRangePredicateBuilder<'a> { + pub fn new(ts_col_name: &'a str, filters: &'a [Expr]) -> Self { + Self { + ts_col_name, + filters, + } + } + + pub fn build(&self) -> TimestampRange { + let mut res = TimestampRange::min_to_max(); + for expr in self.filters { + let range = self + .extract_time_range_from_expr(expr.df_expr()) + .unwrap_or_else(TimestampRange::min_to_max); + res = res.and(&range); + } + res + } + + /// Extract time range filter from `WHERE`/`IN (...)`/`BETWEEN` clauses. + /// Return None if no time range can be found in expr. + fn extract_time_range_from_expr(&self, expr: &DfExpr) -> Option { + match expr { + DfExpr::BinaryExpr(BinaryExpr { left, op, right }) => { + self.extract_from_binary_expr(left, op, right) + } + DfExpr::Between(Between { + expr, + negated, + low, + high, + }) => self.extract_from_between_expr(expr, negated, low, high), + DfExpr::InList { + expr, + list, + negated, + } => self.extract_from_in_list_expr(expr, *negated, list), + _ => None, + } + } + + fn extract_from_binary_expr( + &self, + left: &DfExpr, + op: &Operator, + right: &DfExpr, + ) -> Option { + match op { + Operator::Eq => self + .get_timestamp_filter(left, right) + .map(TimestampRange::single), + Operator::Lt => self + .get_timestamp_filter(left, right) + .map(|ts| TimestampRange::until_end(ts, false)), + Operator::LtEq => self + .get_timestamp_filter(left, right) + .map(|ts| TimestampRange::until_end(ts, true)), + Operator::Gt => self + .get_timestamp_filter(left, right) + .map(TimestampRange::from_start), + Operator::GtEq => self + .get_timestamp_filter(left, right) + .map(TimestampRange::from_start), + Operator::And => { + // instead of return none when failed to extract time range from left/right, we unwrap the none into + // `TimestampRange::min_to_max`. + let left = self + .extract_time_range_from_expr(left) + .unwrap_or_else(TimestampRange::min_to_max); + let right = self + .extract_time_range_from_expr(right) + .unwrap_or_else(TimestampRange::min_to_max); + Some(left.and(&right)) + } + Operator::Or => { + let left = self.extract_time_range_from_expr(left)?; + let right = self.extract_time_range_from_expr(right)?; + Some(left.or(&right)) + } + Operator::NotEq + | Operator::Plus + | Operator::Minus + | Operator::Multiply + | Operator::Divide + | Operator::Modulo + | Operator::Like + | Operator::NotLike + | Operator::ILike + | Operator::NotILike + | Operator::IsDistinctFrom + | Operator::IsNotDistinctFrom + | Operator::RegexMatch + | Operator::RegexIMatch + | Operator::RegexNotMatch + | Operator::RegexNotIMatch + | Operator::BitwiseAnd + | Operator::BitwiseOr + | Operator::BitwiseXor + | Operator::BitwiseShiftRight + | Operator::BitwiseShiftLeft + | Operator::StringConcat => None, + } + } + + fn get_timestamp_filter(&self, left: &DfExpr, right: &DfExpr) -> Option { + let (col, lit) = match (left, right) { + (DfExpr::Column(column), DfExpr::Literal(scalar)) => (column, scalar), + (DfExpr::Literal(scalar), DfExpr::Column(column)) => (column, scalar), + _ => { + return None; + } + }; + if col.name != self.ts_col_name { + return None; + } + scalar_value_to_timestamp(lit) + } + + fn extract_from_between_expr( + &self, + expr: &DfExpr, + negated: &bool, + low: &DfExpr, + high: &DfExpr, + ) -> Option { + let DfExpr::Column(col) = expr else { return None; }; + if col.name != self.ts_col_name { + return None; + } + + if *negated { + return None; + } + + match (low, high) { + (DfExpr::Literal(low), DfExpr::Literal(high)) => { + let low_opt = scalar_value_to_timestamp(low); + let high_opt = scalar_value_to_timestamp(high); + Some(TimestampRange::new_inclusive(low_opt, high_opt)) + } + _ => None, + } + } + + /// Extract time range filter from `IN (...)` expr. + fn extract_from_in_list_expr( + &self, + expr: &DfExpr, + negated: bool, + list: &[DfExpr], + ) -> Option { + if negated { + return None; + } + let DfExpr::Column(col) = expr else { return None; }; + if col.name != self.ts_col_name { + return None; + } + + if list.is_empty() { + return Some(TimestampRange::empty()); + } + let mut init_range = TimestampRange::empty(); + for expr in list { + if let DfExpr::Literal(scalar) = expr { + if let Some(timestamp) = scalar_value_to_timestamp(scalar) { + init_range = init_range.or(&TimestampRange::single(timestamp)) + } else { + // TODO(hl): maybe we should raise an error here since cannot parse + // timestamp value from in list expr + return None; + } + } + } + Some(init_range) + } +} + #[cfg(test)] mod tests { use std::sync::Arc;