feat: prunine sst files according to time range in filters (#887)

* 1. Reimplement Eq for Timestamp
2. Add and/or for GenericRange

* feat: extract time range from filters

* feat: select sst files according to time range

* fix: clippy

* fix: empty value in range

* fix: some cr comments

* fix: return optional timestamp range

* fix: cr comments
This commit is contained in:
Lei, HUANG
2023-01-28 15:16:41 +08:00
committed by GitHub
parent b33937f48e
commit 43aefc5d74
12 changed files with 667 additions and 32 deletions

2
Cargo.lock generated
View File

@@ -1592,6 +1592,7 @@ name = "common-time"
version = "0.1.0"
dependencies = [
"chrono",
"common-error",
"rand 0.8.5",
"serde",
"serde_json",
@@ -2210,6 +2211,7 @@ dependencies = [
"arrow-schema",
"common-base",
"common-error",
"common-telemetry",
"common-time",
"datafusion-common",
"enum_dispatch",

View File

@@ -6,6 +6,7 @@ license.workspace = true
[dependencies]
chrono = "0.4"
common-error = { path = "../error" }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
snafu = { version = "0.7", features = ["backtraces"] }

View File

@@ -12,8 +12,12 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::any::Any;
use chrono::ParseError;
use snafu::{Backtrace, Snafu};
use common_error::ext::ErrorExt;
use common_error::prelude::StatusCode;
use snafu::{Backtrace, ErrorCompat, Snafu};
#[derive(Debug, Snafu)]
#[snafu(visibility(pub))]
@@ -24,6 +28,24 @@ pub enum Error {
ParseTimestamp { raw: String, backtrace: Backtrace },
}
impl ErrorExt for Error {
fn status_code(&self) -> StatusCode {
match self {
Error::ParseDateStr { .. } | Error::ParseTimestamp { .. } => {
StatusCode::InvalidArguments
}
}
}
fn backtrace_opt(&self) -> Option<&Backtrace> {
ErrorCompat::backtrace(self)
}
fn as_any(&self) -> &dyn Any {
self
}
}
pub type Result<T> = std::result::Result<T, Error>;
#[cfg(test)]

View File

@@ -18,8 +18,9 @@ use crate::Timestamp;
/// A half-open time range.
///
/// The time range contains all timestamp `ts` that `ts >= start` and `ts < end`. It is
/// empty if `start >= end`.
/// The range contains values that `value >= start` and `val < end`.
///
/// The range is empty iff `start == end == "the default value of T"`
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct GenericRange<T> {
start: Option<T>,
@@ -28,8 +29,9 @@ pub struct GenericRange<T> {
impl<T> GenericRange<T>
where
T: Copy + PartialOrd,
T: Copy + PartialOrd + Default,
{
/// Computes the AND'ed range with other.
pub fn and(&self, other: &GenericRange<T>) -> GenericRange<T> {
let start = match (self.start(), other.start()) {
(Some(l), Some(r)) => {
@@ -57,7 +59,7 @@ where
(None, None) => None,
};
Self { start, end }
Self::from_optional(start, end)
}
/// Compute the OR'ed range of two ranges.
@@ -98,12 +100,44 @@ where
(None, None) => None,
};
Self { start, end }
Self::from_optional(start, end)
}
/// Checks if current range intersect with target.
pub fn intersects(&self, target: &GenericRange<T>) -> bool {
!self.and(target).is_empty()
}
/// Create an empty range.
pub fn empty() -> GenericRange<T> {
GenericRange {
start: Some(T::default()),
end: Some(T::default()),
}
}
/// Create GenericRange from optional start and end.
/// If the present value of start >= the present value of end, it will return an empty range
/// with the default value of `T`.
fn from_optional(start: Option<T>, end: Option<T>) -> GenericRange<T> {
match (start, end) {
(Some(start_val), Some(end_val)) => {
if start_val < end_val {
Self {
start: Some(start_val),
end: Some(end_val),
}
} else {
Self::empty()
}
}
(s, e) => Self { start: s, end: e },
}
}
}
impl<T> GenericRange<T> {
/// Creates a new range that contains timestamp in `[start, end)`.
/// Creates a new range that contains values in `[start, end)`.
///
/// Returns `None` if `start` > `end`.
pub fn new<U: PartialOrd + Into<T>>(start: U, end: U) -> Option<GenericRange<T>> {
@@ -115,14 +149,7 @@ impl<T> GenericRange<T> {
}
}
/// Given a value, creates an empty time range that `start == end == value`.
pub fn empty_with_value<U: Clone + Into<T>>(value: U) -> GenericRange<T> {
GenericRange {
start: Some(value.clone().into()),
end: Some(value.into()),
}
}
/// Return a range containing all possible values.
pub fn min_to_max() -> GenericRange<T> {
Self {
start: None,
@@ -143,11 +170,11 @@ impl<T> GenericRange<T> {
}
/// Returns true if `timestamp` is contained in the range.
pub fn contains<U: PartialOrd<T>>(&self, timestamp: &U) -> bool {
pub fn contains<U: PartialOrd<T>>(&self, target: &U) -> bool {
match (&self.start, &self.end) {
(Some(start), Some(end)) => *timestamp >= *start && *timestamp < *end,
(Some(start), None) => *timestamp >= *start,
(None, Some(end)) => *timestamp < *end,
(Some(start), Some(end)) => *target >= *start && *target < *end,
(Some(start), None) => *target >= *start,
(None, Some(end)) => *target < *end,
(None, None) => true,
}
}
@@ -158,19 +185,70 @@ impl<T: PartialOrd> GenericRange<T> {
#[inline]
pub fn is_empty(&self) -> bool {
match (&self.start, &self.end) {
(Some(start), Some(end)) => start >= end,
(Some(start), Some(end)) => start == end,
_ => false,
}
}
}
pub type TimestampRange = GenericRange<Timestamp>;
impl TimestampRange {
pub fn new_inclusive(start: Option<Timestamp>, end: Option<Timestamp>) -> Self {
let end = if let Some(end) = end {
end.value()
.checked_add(1)
.map(|v| Timestamp::new(v, end.unit()))
} else {
None
};
Self::from_optional(start, end)
}
/// Shortcut method to create a timestamp range with given start/end value and time unit.
pub fn with_unit(start: i64, end: i64, unit: TimeUnit) -> Option<Self> {
let start = Timestamp::new(start, unit);
let end = Timestamp::new(end, unit);
Self::new(start, end)
}
/// Create a range that containing only given `ts`.
/// ### Notice:
/// Left-close right-open range cannot properly represent range with a single value.
/// For simplicity, this implementation returns an approximate range `[ts, ts+1)` instead.
pub fn single(ts: Timestamp) -> Self {
let unit = ts.unit();
let start = Some(ts);
let end = ts.value().checked_add(1).map(|v| Timestamp::new(v, unit));
Self::from_optional(start, end)
}
/// Create a range `[start, INF)`.
/// ### Notice
/// Left-close right-open range cannot properly represent range with exclusive start like: `(start, ...)`.
/// You may resort to `[start-1, ...)` instead.
pub fn from_start(start: Timestamp) -> Self {
Self {
start: Some(start),
end: None,
}
}
/// Create a range `[-INF, end)`.
/// ### Notice
/// Left-close right-open range cannot properly represent range with inclusive end like: `[..., END]`.
/// If `inclusive` is true, this method returns `[-INF, end+1)` instead.
pub fn until_end(end: Timestamp, inclusive: bool) -> Self {
let end = if inclusive {
end.value()
.checked_add(1)
.map(|v| Timestamp::new(v, end.unit()))
} else {
Some(end)
};
Self { start: None, end }
}
}
/// Time range in milliseconds.
@@ -197,9 +275,9 @@ mod tests {
assert_eq!(None, RangeMillis::new(1, 0));
let range = RangeMillis::empty_with_value(1024);
let range = RangeMillis::empty();
assert_eq!(range.start(), range.end());
assert_eq!(Some(TimestampMillis::new(1024)), *range.start());
assert_eq!(Some(TimestampMillis::new(0)), *range.start());
}
#[test]
@@ -295,9 +373,7 @@ mod tests {
TimestampRange::min_to_max().or(&TimestampRange::min_to_max())
);
let empty = TimestampRange::empty_with_value(Timestamp::new_millisecond(1)).or(
&TimestampRange::empty_with_value(Timestamp::new_millisecond(2)),
);
let empty = TimestampRange::empty().or(&TimestampRange::empty());
assert!(empty.is_empty());
let t1 = TimestampRange::with_unit(-10, 0, TimeUnit::Second).unwrap();
@@ -310,4 +386,44 @@ mod tests {
let t1 = TimestampRange::with_unit(-10, 0, TimeUnit::Second).unwrap();
assert_eq!(t1, t1.or(&t1));
}
#[test]
fn test_intersect() {
let t1 = TimestampRange::with_unit(-10, 0, TimeUnit::Second).unwrap();
let t2 = TimestampRange::with_unit(-30, -20, TimeUnit::Second).unwrap();
assert!(!t1.intersects(&t2));
let t1 = TimestampRange::with_unit(10, 20, TimeUnit::Second).unwrap();
let t2 = TimestampRange::with_unit(0, 30, TimeUnit::Second).unwrap();
assert!(t1.intersects(&t2));
let t1 = TimestampRange::with_unit(-20, -10, TimeUnit::Second).unwrap();
let t2 = TimestampRange::with_unit(-10, 0, TimeUnit::Second).unwrap();
assert!(!t1.intersects(&t2));
let t1 = TimestampRange::with_unit(0, 1, TimeUnit::Second).unwrap();
let t2 = TimestampRange::with_unit(999, 1000, TimeUnit::Millisecond).unwrap();
assert!(t1.intersects(&t2));
let t1 = TimestampRange::with_unit(1, 2, TimeUnit::Second).unwrap();
let t2 = TimestampRange::with_unit(1000, 2000, TimeUnit::Millisecond).unwrap();
assert!(t1.intersects(&t2));
let t1 = TimestampRange::with_unit(0, 1, TimeUnit::Second).unwrap();
assert!(t1.intersects(&t1));
let t1 = TimestampRange::with_unit(0, 1, TimeUnit::Second).unwrap();
let t2 = TimestampRange::empty();
assert!(!t1.intersects(&t2));
// empty range does not intersect with empty range
let empty = TimestampRange::empty();
assert!(!empty.intersects(&empty));
// full range intersects with full range
let full = TimestampRange::min_to_max();
assert!(full.intersects(&full));
assert!(!full.intersects(&empty));
}
}

View File

@@ -352,7 +352,7 @@ mod tests {
}
}
/// Generate timestamp less than or equal to `threshold`
/// Generate timestamp less than or equal to `threshold`
fn gen_ts_le(threshold: &Timestamp) -> Timestamp {
let mut rng = rand::thread_rng();
let timestamp = rng.gen_range(i64::MIN..=threshold.value);

View File

@@ -17,7 +17,7 @@ use std::cmp::Ordering;
/// Unix timestamp in millisecond resolution.
///
/// Negative timestamp is allowed, which represents timestamp before '1970-01-01T00:00:00'.
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Default)]
pub struct TimestampMillis(i64);
impl TimestampMillis {

View File

@@ -14,6 +14,7 @@ arrow-schema.workspace = true
common-base = { path = "../common/base" }
common-error = { path = "../common/error" }
common-time = { path = "../common/time" }
common-telemetry = { path = "../common/telemetry" }
datafusion-common.workspace = true
enum_dispatch = "0.3"
num = "0.4"

View File

@@ -14,9 +14,11 @@
use std::cmp::Ordering;
use std::fmt::{Display, Formatter};
use std::str::FromStr;
use arrow::datatypes::{DataType as ArrowDataType, Field};
use common_base::bytes::{Bytes, StringBytes};
use common_telemetry::logging;
use common_time::date::Date;
use common_time::datetime::DateTime;
use common_time::timestamp::{TimeUnit, Timestamp};
@@ -25,7 +27,8 @@ pub use ordered_float::OrderedFloat;
use serde::{Deserialize, Serialize};
use snafu::ensure;
use crate::error::{self, Result};
use crate::error;
use crate::error::Result;
use crate::prelude::*;
use crate::type_id::LogicalTypeId;
use crate::types::ListType;
@@ -286,6 +289,26 @@ fn timestamp_to_scalar_value(unit: TimeUnit, val: Option<i64>) -> ScalarValue {
}
}
/// Convert [ScalarValue] to [Timestamp].
/// Return `None` if given scalar value cannot be converted to a valid timestamp.
pub fn scalar_value_to_timestamp(scalar: &ScalarValue) -> Option<Timestamp> {
match scalar {
ScalarValue::Int64(val) => val.map(Timestamp::new_millisecond),
ScalarValue::Utf8(Some(s)) => match Timestamp::from_str(s) {
Ok(t) => Some(t),
Err(e) => {
logging::error!(e;"Failed to convert string literal {s} to timestamp");
None
}
},
ScalarValue::TimestampSecond(v, _) => v.map(Timestamp::new_second),
ScalarValue::TimestampMillisecond(v, _) => v.map(Timestamp::new_millisecond),
ScalarValue::TimestampMicrosecond(v, _) => v.map(Timestamp::new_microsecond),
ScalarValue::TimestampNanosecond(v, _) => v.map(Timestamp::new_nanosecond),
_ => None,
}
}
macro_rules! impl_ord_for_value_like {
($Type: ident, $left: ident, $right: ident) => {
if $left.is_null() && !$right.is_null() {

View File

@@ -0,0 +1,241 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::any::Any;
use std::sync::Arc;
use catalog::local::{new_memory_catalog_list, MemoryCatalogProvider, MemorySchemaProvider};
use catalog::{CatalogList, CatalogProvider, SchemaProvider};
use common_query::physical_plan::PhysicalPlanRef;
use common_query::prelude::Expr;
use common_recordbatch::RecordBatch;
use common_time::range::TimestampRange;
use common_time::timestamp::TimeUnit;
use common_time::Timestamp;
use datatypes::data_type::ConcreteDataType;
use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
use datatypes::vectors::{Int64Vector, TimestampMillisecondVector};
use query::QueryEngineRef;
use session::context::QueryContext;
use table::metadata::{FilterPushDownType, TableInfoRef};
use table::predicate::TimeRangePredicateBuilder;
use table::test_util::MemTable;
use table::Table;
use tokio::sync::RwLock;
struct MemTableWrapper {
inner: MemTable,
filter: RwLock<Vec<Expr>>,
}
impl MemTableWrapper {
pub async fn get_filters(&self) -> Vec<Expr> {
self.filter.write().await.drain(..).collect()
}
}
#[async_trait::async_trait]
impl Table for MemTableWrapper {
fn as_any(&self) -> &dyn Any {
self
}
fn schema(&self) -> SchemaRef {
self.inner.schema()
}
fn table_info(&self) -> TableInfoRef {
self.inner.table_info()
}
async fn scan(
&self,
projection: Option<&Vec<usize>>,
filters: &[Expr],
limit: Option<usize>,
) -> table::Result<PhysicalPlanRef> {
*self.filter.write().await = filters.to_vec();
self.inner.scan(projection, filters, limit).await
}
fn supports_filter_pushdown(&self, _filter: &Expr) -> table::Result<FilterPushDownType> {
Ok(FilterPushDownType::Exact)
}
}
fn create_test_engine() -> TimeRangeTester {
let schema = Schema::try_new(vec![
ColumnSchema::new("v".to_string(), ConcreteDataType::int64_datatype(), false),
ColumnSchema::new(
"ts".to_string(),
ConcreteDataType::timestamp_millisecond_datatype(),
false,
),
])
.unwrap();
let table = Arc::new(MemTableWrapper {
inner: MemTable::new(
"m",
RecordBatch::new(
Arc::new(schema),
vec![
Arc::new(Int64Vector::from_slice((0..1000).collect::<Vec<i64>>())) as Arc<_>,
Arc::new(TimestampMillisecondVector::from_slice(
(0..1000).collect::<Vec<i64>>(),
)) as Arc<_>,
],
)
.unwrap(),
),
filter: Default::default(),
});
let catalog_list = new_memory_catalog_list().unwrap();
let default_schema = Arc::new(MemorySchemaProvider::new());
MemorySchemaProvider::register_table(&default_schema, "m".to_string(), table.clone()).unwrap();
let default_catalog = Arc::new(MemoryCatalogProvider::new());
default_catalog
.register_schema("public".to_string(), default_schema)
.unwrap();
catalog_list
.register_catalog("greptime".to_string(), default_catalog)
.unwrap();
let engine = query::QueryEngineFactory::new(catalog_list).query_engine();
TimeRangeTester { engine, table }
}
struct TimeRangeTester {
engine: QueryEngineRef,
table: Arc<MemTableWrapper>,
}
impl TimeRangeTester {
async fn check(&self, sql: &str, expect: TimestampRange) {
let stmt = query::parser::QueryLanguageParser::parse_sql(sql).unwrap();
let _ = self
.engine
.execute(
&self
.engine
.statement_to_plan(stmt, Arc::new(QueryContext::new()))
.unwrap(),
)
.await
.unwrap();
let filters = self.table.get_filters().await;
let range = TimeRangePredicateBuilder::new("ts", &filters).build();
assert_eq!(expect, range);
}
}
#[tokio::test]
async fn test_range_filter() {
let tester = create_test_engine();
tester
.check(
"select * from m where ts >= 990;",
TimestampRange::from_start(Timestamp::new(990, TimeUnit::Millisecond)),
)
.await;
tester
.check(
"select * from m where ts <=1000;",
TimestampRange::until_end(Timestamp::new(1000, TimeUnit::Millisecond), true),
)
.await;
tester
.check(
"select * from m where ts > 1000;",
TimestampRange::from_start(Timestamp::new(1000, TimeUnit::Millisecond)),
)
.await;
tester
.check(
"select * from m where ts < 1000;",
TimestampRange::until_end(Timestamp::new(1000, TimeUnit::Millisecond), false),
)
.await;
tester
.check(
"select * from m where ts > 1000 and ts < 2000;",
TimestampRange::with_unit(1000, 2000, TimeUnit::Millisecond).unwrap(),
)
.await;
tester
.check(
"select * from m where ts >= 1000 or ts < 0;",
TimestampRange::min_to_max(),
)
.await;
tester
.check(
"select * from m where (ts >= 1000 and ts < 2000) or (ts>=3000 and ts<4000);",
TimestampRange::with_unit(1000, 4000, TimeUnit::Millisecond).unwrap(),
)
.await;
// sql's between is inclusive in both ends
tester
.check(
"select * from m where ts between 1000 and 2000",
TimestampRange::with_unit(1000, 2001, TimeUnit::Millisecond).unwrap(),
)
.await;
tester
.check(
"select * from m where ts in (10, 20, 30, 40)",
TimestampRange::with_unit(10, 41, TimeUnit::Millisecond).unwrap(),
)
.await;
tester
.check(
"select * from m where ts=1000",
TimestampRange::with_unit(1000, 1001, TimeUnit::Millisecond).unwrap(),
)
.await;
tester
.check(
"select * from m where ts=1000 or ts=2000",
TimestampRange::with_unit(1000, 2001, TimeUnit::Millisecond).unwrap(),
)
.await;
tester
.check(
"select * from m where ts>='2023-01-16 17:01:57+08:00'",
TimestampRange::from_start(Timestamp::new(1673859717000, TimeUnit::Millisecond)),
)
.await;
tester
.check(
"select * from m where ts > 10 and ts < 9",
TimestampRange::empty(),
)
.await;
}

View File

@@ -16,9 +16,11 @@ use std::sync::Arc;
use async_trait::async_trait;
use common_query::logical_plan::Expr;
use common_telemetry::debug;
use common_time::range::TimestampRange;
use snafu::ResultExt;
use store_api::storage::{Chunk, ChunkReader, SchemaRef, SequenceNumber};
use table::predicate::Predicate;
use table::predicate::{Predicate, TimeRangePredicateBuilder};
use crate::error::{self, Error, Result};
use crate::memtable::{IterContext, MemtableRef};
@@ -126,6 +128,7 @@ impl ChunkReaderBuilder {
}
pub async fn build(mut self) -> Result<ChunkReaderImpl> {
let time_range_predicate = self.build_time_range_predicate();
let schema = Arc::new(
ProjectedSchema::new(self.schema, self.projection)
.context(error::InvalidProjectionSnafu)?,
@@ -147,6 +150,13 @@ impl ChunkReaderBuilder {
predicate: Predicate::new(self.filters),
};
for file in &self.files_to_read {
if !Self::file_in_range(file, time_range_predicate) {
debug!(
"Skip file {:?}, predicate: {:?}",
file, time_range_predicate
);
continue;
}
let reader = self
.sst_layer
.read_sst(file.file_name(), &read_opts)
@@ -160,6 +170,24 @@ impl ChunkReaderBuilder {
Ok(ChunkReaderImpl::new(schema, Box::new(reader)))
}
/// Build time range predicate from schema and filters.
pub fn build_time_range_predicate(&self) -> TimestampRange {
let Some(ts_col) = self.schema.user_schema().timestamp_column() else { return TimestampRange::min_to_max() };
TimeRangePredicateBuilder::new(&ts_col.name, &self.filters).build()
}
/// Check if SST file's time range matches predicate.
#[inline]
fn file_in_range(file: &FileHandle, predicate: TimestampRange) -> bool {
if predicate == TimestampRange::min_to_max() {
return true;
}
// end_timestamp of sst file is inclusive.
let file_ts_range =
TimestampRange::new_inclusive(file.start_timestamp(), file.end_timestamp());
file_ts_range.intersects(&predicate)
}
}
impl Visitor for ChunkReaderBuilder {

View File

@@ -157,6 +157,18 @@ impl FileHandle {
pub fn file_name(&self) -> &str {
&self.inner.meta.file_name
}
/// Return the start timestamp of current SST file.
#[inline]
pub fn start_timestamp(&self) -> Option<Timestamp> {
self.inner.meta.start_timestamp
}
/// Return the end timestamp of current SST file.
#[inline]
pub fn end_timestamp(&self) -> Option<Timestamp> {
self.inner.meta.end_timestamp
}
}
/// Actually data of [FileHandle].

View File

@@ -12,16 +12,20 @@
// See the License for the specific language governing permissions and
// limitations under the License.
mod stats;
use common_query::logical_plan::Expr;
use common_query::logical_plan::{DfExpr, Expr};
use common_telemetry::{error, warn};
use common_time::range::TimestampRange;
use common_time::Timestamp;
use datafusion::parquet::file::metadata::RowGroupMetaData;
use datafusion::physical_optimizer::pruning::PruningPredicate;
use datafusion_expr::{Between, BinaryExpr, Operator};
use datatypes::schema::SchemaRef;
use datatypes::value::scalar_value_to_timestamp;
use crate::predicate::stats::RowGroupPruningStatistics;
mod stats;
#[derive(Default, Clone)]
pub struct Predicate {
exprs: Vec<Expr>,
@@ -66,6 +70,191 @@ impl Predicate {
}
}
// tests for `TimeRangePredicateBuilder` locates in src/query/tests/time_range_filter_test.rs
// since it requires query engine to convert sql to filters.
pub struct TimeRangePredicateBuilder<'a> {
ts_col_name: &'a str,
filters: &'a [Expr],
}
impl<'a> TimeRangePredicateBuilder<'a> {
pub fn new(ts_col_name: &'a str, filters: &'a [Expr]) -> Self {
Self {
ts_col_name,
filters,
}
}
pub fn build(&self) -> TimestampRange {
let mut res = TimestampRange::min_to_max();
for expr in self.filters {
let range = self
.extract_time_range_from_expr(expr.df_expr())
.unwrap_or_else(TimestampRange::min_to_max);
res = res.and(&range);
}
res
}
/// Extract time range filter from `WHERE`/`IN (...)`/`BETWEEN` clauses.
/// Return None if no time range can be found in expr.
fn extract_time_range_from_expr(&self, expr: &DfExpr) -> Option<TimestampRange> {
match expr {
DfExpr::BinaryExpr(BinaryExpr { left, op, right }) => {
self.extract_from_binary_expr(left, op, right)
}
DfExpr::Between(Between {
expr,
negated,
low,
high,
}) => self.extract_from_between_expr(expr, negated, low, high),
DfExpr::InList {
expr,
list,
negated,
} => self.extract_from_in_list_expr(expr, *negated, list),
_ => None,
}
}
fn extract_from_binary_expr(
&self,
left: &DfExpr,
op: &Operator,
right: &DfExpr,
) -> Option<TimestampRange> {
match op {
Operator::Eq => self
.get_timestamp_filter(left, right)
.map(TimestampRange::single),
Operator::Lt => self
.get_timestamp_filter(left, right)
.map(|ts| TimestampRange::until_end(ts, false)),
Operator::LtEq => self
.get_timestamp_filter(left, right)
.map(|ts| TimestampRange::until_end(ts, true)),
Operator::Gt => self
.get_timestamp_filter(left, right)
.map(TimestampRange::from_start),
Operator::GtEq => self
.get_timestamp_filter(left, right)
.map(TimestampRange::from_start),
Operator::And => {
// instead of return none when failed to extract time range from left/right, we unwrap the none into
// `TimestampRange::min_to_max`.
let left = self
.extract_time_range_from_expr(left)
.unwrap_or_else(TimestampRange::min_to_max);
let right = self
.extract_time_range_from_expr(right)
.unwrap_or_else(TimestampRange::min_to_max);
Some(left.and(&right))
}
Operator::Or => {
let left = self.extract_time_range_from_expr(left)?;
let right = self.extract_time_range_from_expr(right)?;
Some(left.or(&right))
}
Operator::NotEq
| Operator::Plus
| Operator::Minus
| Operator::Multiply
| Operator::Divide
| Operator::Modulo
| Operator::Like
| Operator::NotLike
| Operator::ILike
| Operator::NotILike
| Operator::IsDistinctFrom
| Operator::IsNotDistinctFrom
| Operator::RegexMatch
| Operator::RegexIMatch
| Operator::RegexNotMatch
| Operator::RegexNotIMatch
| Operator::BitwiseAnd
| Operator::BitwiseOr
| Operator::BitwiseXor
| Operator::BitwiseShiftRight
| Operator::BitwiseShiftLeft
| Operator::StringConcat => None,
}
}
fn get_timestamp_filter(&self, left: &DfExpr, right: &DfExpr) -> Option<Timestamp> {
let (col, lit) = match (left, right) {
(DfExpr::Column(column), DfExpr::Literal(scalar)) => (column, scalar),
(DfExpr::Literal(scalar), DfExpr::Column(column)) => (column, scalar),
_ => {
return None;
}
};
if col.name != self.ts_col_name {
return None;
}
scalar_value_to_timestamp(lit)
}
fn extract_from_between_expr(
&self,
expr: &DfExpr,
negated: &bool,
low: &DfExpr,
high: &DfExpr,
) -> Option<TimestampRange> {
let DfExpr::Column(col) = expr else { return None; };
if col.name != self.ts_col_name {
return None;
}
if *negated {
return None;
}
match (low, high) {
(DfExpr::Literal(low), DfExpr::Literal(high)) => {
let low_opt = scalar_value_to_timestamp(low);
let high_opt = scalar_value_to_timestamp(high);
Some(TimestampRange::new_inclusive(low_opt, high_opt))
}
_ => None,
}
}
/// Extract time range filter from `IN (...)` expr.
fn extract_from_in_list_expr(
&self,
expr: &DfExpr,
negated: bool,
list: &[DfExpr],
) -> Option<TimestampRange> {
if negated {
return None;
}
let DfExpr::Column(col) = expr else { return None; };
if col.name != self.ts_col_name {
return None;
}
if list.is_empty() {
return Some(TimestampRange::empty());
}
let mut init_range = TimestampRange::empty();
for expr in list {
if let DfExpr::Literal(scalar) = expr {
if let Some(timestamp) = scalar_value_to_timestamp(scalar) {
init_range = init_range.or(&TimestampRange::single(timestamp))
} else {
// TODO(hl): maybe we should raise an error here since cannot parse
// timestamp value from in list expr
return None;
}
}
}
Some(init_range)
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;