mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-17 02:32:56 +00:00
feat: respect time range when building parquet reader (#3947)
* feat: convert timestamp range filters to predicates * chore: rebase main * fix: remove prediactes once they have been added to timestamp filters to avoid duplicate filtering * fix: some comments * fix: resolve conflicts
This commit is contained in:
@@ -14,7 +14,7 @@
|
||||
|
||||
//! Util record batch stream wrapper that can perform precise filter.
|
||||
|
||||
use datafusion::logical_expr::{Expr, Operator};
|
||||
use datafusion::logical_expr::{Expr, Literal, Operator};
|
||||
use datafusion_common::arrow::array::{ArrayRef, Datum, Scalar};
|
||||
use datafusion_common::arrow::buffer::BooleanBuffer;
|
||||
use datafusion_common::arrow::compute::kernels::cmp;
|
||||
@@ -43,6 +43,28 @@ pub struct SimpleFilterEvaluator {
|
||||
}
|
||||
|
||||
impl SimpleFilterEvaluator {
|
||||
pub fn new<T: Literal>(column_name: String, lit: T, op: Operator) -> Option<Self> {
|
||||
match op {
|
||||
Operator::Eq
|
||||
| Operator::NotEq
|
||||
| Operator::Lt
|
||||
| Operator::LtEq
|
||||
| Operator::Gt
|
||||
| Operator::GtEq => {}
|
||||
_ => return None,
|
||||
}
|
||||
|
||||
let Expr::Literal(val) = lit.lit() else {
|
||||
return None;
|
||||
};
|
||||
|
||||
Some(Self {
|
||||
column_name,
|
||||
literal: val.to_scalar().ok()?,
|
||||
op,
|
||||
})
|
||||
}
|
||||
|
||||
pub fn try_new(predicate: &Expr) -> Option<Self> {
|
||||
match predicate {
|
||||
Expr::BinaryExpr(binary) => {
|
||||
|
||||
@@ -26,7 +26,7 @@ use crate::test_util::{
|
||||
build_rows, flush_region, put_rows, rows_schema, CreateRequestBuilder, TestEnv,
|
||||
};
|
||||
|
||||
async fn check_prune_row_groups(expr: Expr, expected: &str) {
|
||||
async fn check_prune_row_groups(exprs: Vec<Expr>, expected: &str) {
|
||||
let mut env = TestEnv::new();
|
||||
let engine = env.create_engine(MitoConfig::default()).await;
|
||||
|
||||
@@ -55,7 +55,7 @@ async fn check_prune_row_groups(expr: Expr, expected: &str) {
|
||||
.scan_to_stream(
|
||||
region_id,
|
||||
ScanRequest {
|
||||
filters: vec![expr],
|
||||
filters: exprs,
|
||||
..Default::default()
|
||||
},
|
||||
)
|
||||
@@ -70,7 +70,9 @@ async fn test_read_parquet_stats() {
|
||||
common_telemetry::init_default_ut_logging();
|
||||
|
||||
check_prune_row_groups(
|
||||
datafusion_expr::col("ts").gt(lit(ScalarValue::TimestampMillisecond(Some(4000), None))),
|
||||
vec![
|
||||
datafusion_expr::col("ts").gt(lit(ScalarValue::TimestampMillisecond(Some(4000), None)))
|
||||
],
|
||||
"\
|
||||
+-------+---------+---------------------+
|
||||
| tag_0 | field_0 | ts |
|
||||
@@ -94,7 +96,7 @@ async fn test_read_parquet_stats() {
|
||||
async fn test_prune_tag() {
|
||||
// prune result: only row group 1&2
|
||||
check_prune_row_groups(
|
||||
datafusion_expr::col("tag_0").gt(lit(ScalarValue::Utf8(Some("4".to_string())))),
|
||||
vec![datafusion_expr::col("tag_0").gt(lit(ScalarValue::Utf8(Some("4".to_string()))))],
|
||||
"\
|
||||
+-------+---------+---------------------+
|
||||
| tag_0 | field_0 | ts |
|
||||
@@ -114,9 +116,10 @@ async fn test_prune_tag_and_field() {
|
||||
common_telemetry::init_default_ut_logging();
|
||||
// prune result: only row group 1
|
||||
check_prune_row_groups(
|
||||
col("tag_0")
|
||||
.gt(lit(ScalarValue::Utf8(Some("4".to_string()))))
|
||||
.and(col("field_0").lt(lit(8.0))),
|
||||
vec![
|
||||
col("tag_0").gt(lit(ScalarValue::Utf8(Some("4".to_string())))),
|
||||
col("field_0").lt(lit(8.0)),
|
||||
],
|
||||
"\
|
||||
+-------+---------+---------------------+
|
||||
| tag_0 | field_0 | ts |
|
||||
@@ -124,8 +127,6 @@ async fn test_prune_tag_and_field() {
|
||||
| 5 | 5.0 | 1970-01-01T00:00:05 |
|
||||
| 6 | 6.0 | 1970-01-01T00:00:06 |
|
||||
| 7 | 7.0 | 1970-01-01T00:00:07 |
|
||||
| 8 | 8.0 | 1970-01-01T00:00:08 |
|
||||
| 9 | 9.0 | 1970-01-01T00:00:09 |
|
||||
+-------+---------+---------------------+",
|
||||
)
|
||||
.await;
|
||||
|
||||
@@ -20,6 +20,7 @@ use common_error::ext::{BoxedError, ErrorExt};
|
||||
use common_error::status_code::StatusCode;
|
||||
use common_macro::stack_trace_debug;
|
||||
use common_runtime::JoinError;
|
||||
use common_time::Timestamp;
|
||||
use datatypes::arrow::error::ArrowError;
|
||||
use datatypes::prelude::ConcreteDataType;
|
||||
use object_store::ErrorKind;
|
||||
@@ -693,6 +694,13 @@ pub enum Error {
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to build time range filters for value: {:?}", timestamp))]
|
||||
BuildTimeRangeFilter {
|
||||
timestamp: Timestamp,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
}
|
||||
|
||||
pub type Result<T, E = Error> = std::result::Result<T, E>;
|
||||
@@ -802,6 +810,7 @@ impl ErrorExt for Error {
|
||||
EncodeMemtable { .. } | ReadDataPart { .. } => StatusCode::Internal,
|
||||
ChecksumMismatch { .. } => StatusCode::Unexpected,
|
||||
RegionStopped { .. } => StatusCode::RegionNotReady,
|
||||
BuildTimeRangeFilter { .. } => StatusCode::Unexpected,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -22,7 +22,7 @@ use common_telemetry::{debug, error, warn};
|
||||
use common_time::range::TimestampRange;
|
||||
use store_api::region_engine::{RegionScannerRef, SinglePartitionScanner};
|
||||
use store_api::storage::ScanRequest;
|
||||
use table::predicate::{Predicate, TimeRangePredicateBuilder};
|
||||
use table::predicate::{build_time_range_predicate, Predicate};
|
||||
use tokio::sync::{mpsc, Semaphore};
|
||||
use tokio_stream::wrappers::ReceiverStream;
|
||||
|
||||
@@ -235,7 +235,7 @@ impl ScanRegion {
|
||||
}
|
||||
|
||||
/// Creates a scan input.
|
||||
fn scan_input(self, filter_deleted: bool) -> Result<ScanInput> {
|
||||
fn scan_input(mut self, filter_deleted: bool) -> Result<ScanInput> {
|
||||
let time_range = self.build_time_range_predicate();
|
||||
|
||||
let ssts = &self.version.ssts;
|
||||
@@ -300,7 +300,7 @@ impl ScanRegion {
|
||||
}
|
||||
|
||||
/// Build time range predicate from filters.
|
||||
fn build_time_range_predicate(&self) -> TimestampRange {
|
||||
fn build_time_range_predicate(&mut self) -> TimestampRange {
|
||||
let time_index = self.version.metadata.time_index_column();
|
||||
let unit = time_index
|
||||
.column_schema
|
||||
@@ -308,8 +308,11 @@ impl ScanRegion {
|
||||
.as_timestamp()
|
||||
.expect("Time index must have timestamp-compatible type")
|
||||
.unit();
|
||||
TimeRangePredicateBuilder::new(&time_index.column_schema.name, unit, &self.request.filters)
|
||||
.build()
|
||||
build_time_range_predicate(
|
||||
&time_index.column_schema.name,
|
||||
unit,
|
||||
&mut self.request.filters,
|
||||
)
|
||||
}
|
||||
|
||||
/// Use the latest schema to build the index applier.
|
||||
|
||||
@@ -23,7 +23,10 @@ use async_trait::async_trait;
|
||||
use common_recordbatch::filter::SimpleFilterEvaluator;
|
||||
use common_telemetry::{debug, warn};
|
||||
use common_time::range::TimestampRange;
|
||||
use datafusion_expr::Expr;
|
||||
use common_time::timestamp::TimeUnit;
|
||||
use common_time::Timestamp;
|
||||
use datafusion_common::ScalarValue;
|
||||
use datafusion_expr::{Expr, Operator};
|
||||
use datatypes::arrow::record_batch::RecordBatch;
|
||||
use datatypes::data_type::ConcreteDataType;
|
||||
use itertools::Itertools;
|
||||
@@ -38,6 +41,7 @@ use store_api::storage::ColumnId;
|
||||
use table::predicate::Predicate;
|
||||
|
||||
use crate::cache::CacheManagerRef;
|
||||
use crate::error;
|
||||
use crate::error::{
|
||||
ArrowReaderSnafu, InvalidMetadataSnafu, InvalidParquetSnafu, ReadParquetSnafu, Result,
|
||||
};
|
||||
@@ -225,7 +229,7 @@ impl ParquetReaderBuilder {
|
||||
|
||||
metrics.build_cost = start.elapsed();
|
||||
|
||||
let filters = if let Some(predicate) = &self.predicate {
|
||||
let mut filters = if let Some(predicate) = &self.predicate {
|
||||
predicate
|
||||
.exprs()
|
||||
.iter()
|
||||
@@ -240,6 +244,11 @@ impl ParquetReaderBuilder {
|
||||
} else {
|
||||
vec![]
|
||||
};
|
||||
|
||||
if let Some(time_range) = &self.time_range {
|
||||
filters.extend(time_range_to_predicate(*time_range, ®ion_meta)?);
|
||||
}
|
||||
|
||||
let codec = McmpRowCodec::new(
|
||||
read_format
|
||||
.metadata()
|
||||
@@ -449,6 +458,59 @@ impl ParquetReaderBuilder {
|
||||
}
|
||||
}
|
||||
|
||||
/// Transforms time range into [SimpleFilterEvaluator].
|
||||
fn time_range_to_predicate(
|
||||
time_range: TimestampRange,
|
||||
metadata: &RegionMetadataRef,
|
||||
) -> Result<Vec<SimpleFilterContext>> {
|
||||
let ts_col = metadata.time_index_column();
|
||||
let ts_col_id = ts_col.column_id;
|
||||
|
||||
let ts_to_filter = |op: Operator, timestamp: &Timestamp| {
|
||||
let value = match timestamp.unit() {
|
||||
TimeUnit::Second => ScalarValue::TimestampSecond(Some(timestamp.value()), None),
|
||||
TimeUnit::Millisecond => {
|
||||
ScalarValue::TimestampMillisecond(Some(timestamp.value()), None)
|
||||
}
|
||||
TimeUnit::Microsecond => {
|
||||
ScalarValue::TimestampMicrosecond(Some(timestamp.value()), None)
|
||||
}
|
||||
TimeUnit::Nanosecond => ScalarValue::TimestampNanosecond(Some(timestamp.value()), None),
|
||||
};
|
||||
let evaluator = SimpleFilterEvaluator::new(ts_col.column_schema.name.clone(), value, op)
|
||||
.context(error::BuildTimeRangeFilterSnafu {
|
||||
timestamp: *timestamp,
|
||||
})?;
|
||||
Ok(SimpleFilterContext::new(
|
||||
evaluator,
|
||||
ts_col_id,
|
||||
SemanticType::Timestamp,
|
||||
ts_col.column_schema.data_type.clone(),
|
||||
))
|
||||
};
|
||||
|
||||
let predicates = match (time_range.start(), time_range.end()) {
|
||||
(Some(start), Some(end)) => {
|
||||
vec![
|
||||
ts_to_filter(Operator::GtEq, start)?,
|
||||
ts_to_filter(Operator::Lt, end)?,
|
||||
]
|
||||
}
|
||||
|
||||
(Some(start), None) => {
|
||||
vec![ts_to_filter(Operator::GtEq, start)?]
|
||||
}
|
||||
|
||||
(None, Some(end)) => {
|
||||
vec![ts_to_filter(Operator::Lt, end)?]
|
||||
}
|
||||
(None, None) => {
|
||||
vec![]
|
||||
}
|
||||
};
|
||||
Ok(predicates)
|
||||
}
|
||||
|
||||
/// Parquet reader metrics.
|
||||
#[derive(Debug, Default)]
|
||||
struct Metrics {
|
||||
@@ -570,6 +632,20 @@ pub(crate) struct SimpleFilterContext {
|
||||
}
|
||||
|
||||
impl SimpleFilterContext {
|
||||
fn new(
|
||||
filter: SimpleFilterEvaluator,
|
||||
column_id: ColumnId,
|
||||
semantic_type: SemanticType,
|
||||
data_type: ConcreteDataType,
|
||||
) -> Self {
|
||||
Self {
|
||||
filter,
|
||||
column_id,
|
||||
semantic_type,
|
||||
data_type,
|
||||
}
|
||||
}
|
||||
|
||||
/// Creates a context for the `expr`.
|
||||
///
|
||||
/// Returns None if the column to filter doesn't exist in the SST metadata or the
|
||||
|
||||
@@ -29,7 +29,7 @@ use datatypes::vectors::{Int64Vector, TimestampMillisecondVector};
|
||||
use store_api::data_source::{DataSource, DataSourceRef};
|
||||
use store_api::storage::ScanRequest;
|
||||
use table::metadata::FilterPushDownType;
|
||||
use table::predicate::TimeRangePredicateBuilder;
|
||||
use table::predicate::build_time_range_predicate;
|
||||
use table::test_util::MemTable;
|
||||
use table::{Table, TableRef};
|
||||
|
||||
@@ -114,14 +114,14 @@ struct TimeRangeTester {
|
||||
impl TimeRangeTester {
|
||||
async fn check(&self, sql: &str, expect: TimestampRange) {
|
||||
let _ = exec_selection(self.engine.clone(), sql).await;
|
||||
let filters = self.get_filters();
|
||||
let mut filters = self.take_filters();
|
||||
|
||||
let range = TimeRangePredicateBuilder::new("ts", TimeUnit::Millisecond, &filters).build();
|
||||
let range = build_time_range_predicate("ts", TimeUnit::Millisecond, &mut filters);
|
||||
assert_eq!(expect, range);
|
||||
}
|
||||
|
||||
fn get_filters(&self) -> Vec<Expr> {
|
||||
self.filter.write().unwrap().drain(..).collect()
|
||||
fn take_filters(&self) -> Vec<Expr> {
|
||||
std::mem::take(&mut self.filter.write().unwrap())
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -128,250 +128,244 @@ impl Predicate {
|
||||
}
|
||||
}
|
||||
|
||||
// tests for `TimeRangePredicateBuilder` locates in src/query/tests/time_range_filter_test.rs
|
||||
// tests for `build_time_range_predicate` locates in src/query/tests/time_range_filter_test.rs
|
||||
// since it requires query engine to convert sql to filters.
|
||||
/// `TimeRangePredicateBuilder` extracts time range from logical exprs to facilitate fast
|
||||
/// `build_time_range_predicate` extracts time range from logical exprs to facilitate fast
|
||||
/// time range pruning.
|
||||
pub struct TimeRangePredicateBuilder<'a> {
|
||||
pub fn build_time_range_predicate<'a>(
|
||||
ts_col_name: &'a str,
|
||||
ts_col_unit: TimeUnit,
|
||||
filters: &'a [Expr],
|
||||
filters: &'a mut Vec<Expr>,
|
||||
) -> TimestampRange {
|
||||
let mut res = TimestampRange::min_to_max();
|
||||
let mut filters_remain = vec![];
|
||||
for expr in std::mem::take(filters) {
|
||||
if let Some(range) = extract_time_range_from_expr(ts_col_name, ts_col_unit, &expr) {
|
||||
res = res.and(&range);
|
||||
} else {
|
||||
filters_remain.push(expr);
|
||||
}
|
||||
}
|
||||
*filters = filters_remain;
|
||||
res
|
||||
}
|
||||
|
||||
impl<'a> TimeRangePredicateBuilder<'a> {
|
||||
pub fn new(ts_col_name: &'a str, ts_col_unit: TimeUnit, filters: &'a [Expr]) -> Self {
|
||||
Self {
|
||||
ts_col_name,
|
||||
ts_col_unit,
|
||||
filters,
|
||||
/// Extract time range filter from `WHERE`/`IN (...)`/`BETWEEN` clauses.
|
||||
/// Return None if no time range can be found in expr.
|
||||
fn extract_time_range_from_expr(
|
||||
ts_col_name: &str,
|
||||
ts_col_unit: TimeUnit,
|
||||
expr: &Expr,
|
||||
) -> Option<TimestampRange> {
|
||||
match expr {
|
||||
Expr::BinaryExpr(BinaryExpr { left, op, right }) => {
|
||||
extract_from_binary_expr(ts_col_name, ts_col_unit, left, op, right)
|
||||
}
|
||||
Expr::Between(Between {
|
||||
expr,
|
||||
negated,
|
||||
low,
|
||||
high,
|
||||
}) => extract_from_between_expr(ts_col_name, ts_col_unit, expr, negated, low, high),
|
||||
Expr::InList(InList {
|
||||
expr,
|
||||
list,
|
||||
negated,
|
||||
}) => extract_from_in_list_expr(ts_col_name, expr, *negated, list),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn build(&self) -> TimestampRange {
|
||||
let mut res = TimestampRange::min_to_max();
|
||||
for expr in self.filters {
|
||||
let range = self
|
||||
.extract_time_range_from_expr(expr)
|
||||
fn extract_from_binary_expr(
|
||||
ts_col_name: &str,
|
||||
ts_col_unit: TimeUnit,
|
||||
left: &Expr,
|
||||
op: &Operator,
|
||||
right: &Expr,
|
||||
) -> Option<TimestampRange> {
|
||||
match op {
|
||||
Operator::Eq => get_timestamp_filter(ts_col_name, left, right)
|
||||
.and_then(|(ts, _)| ts.convert_to(ts_col_unit))
|
||||
.map(TimestampRange::single),
|
||||
Operator::Lt => {
|
||||
let (ts, reverse) = get_timestamp_filter(ts_col_name, left, right)?;
|
||||
if reverse {
|
||||
// [lit] < ts_col
|
||||
let ts_val = ts.convert_to(ts_col_unit)?.value();
|
||||
Some(TimestampRange::from_start(Timestamp::new(
|
||||
ts_val + 1,
|
||||
ts_col_unit,
|
||||
)))
|
||||
} else {
|
||||
// ts_col < [lit]
|
||||
ts.convert_to_ceil(ts_col_unit)
|
||||
.map(|ts| TimestampRange::until_end(ts, false))
|
||||
}
|
||||
}
|
||||
Operator::LtEq => {
|
||||
let (ts, reverse) = get_timestamp_filter(ts_col_name, left, right)?;
|
||||
if reverse {
|
||||
// [lit] <= ts_col
|
||||
ts.convert_to_ceil(ts_col_unit)
|
||||
.map(TimestampRange::from_start)
|
||||
} else {
|
||||
// ts_col <= [lit]
|
||||
ts.convert_to(ts_col_unit)
|
||||
.map(|ts| TimestampRange::until_end(ts, true))
|
||||
}
|
||||
}
|
||||
Operator::Gt => {
|
||||
let (ts, reverse) = get_timestamp_filter(ts_col_name, left, right)?;
|
||||
if reverse {
|
||||
// [lit] > ts_col
|
||||
ts.convert_to_ceil(ts_col_unit)
|
||||
.map(|t| TimestampRange::until_end(t, false))
|
||||
} else {
|
||||
// ts_col > [lit]
|
||||
let ts_val = ts.convert_to(ts_col_unit)?.value();
|
||||
Some(TimestampRange::from_start(Timestamp::new(
|
||||
ts_val + 1,
|
||||
ts_col_unit,
|
||||
)))
|
||||
}
|
||||
}
|
||||
Operator::GtEq => {
|
||||
let (ts, reverse) = get_timestamp_filter(ts_col_name, left, right)?;
|
||||
if reverse {
|
||||
// [lit] >= ts_col
|
||||
ts.convert_to(ts_col_unit)
|
||||
.map(|t| TimestampRange::until_end(t, true))
|
||||
} else {
|
||||
// ts_col >= [lit]
|
||||
ts.convert_to_ceil(ts_col_unit)
|
||||
.map(TimestampRange::from_start)
|
||||
}
|
||||
}
|
||||
Operator::And => {
|
||||
// instead of return none when failed to extract time range from left/right, we unwrap the none into
|
||||
// `TimestampRange::min_to_max`.
|
||||
let left = extract_time_range_from_expr(ts_col_name, ts_col_unit, left)
|
||||
.unwrap_or_else(TimestampRange::min_to_max);
|
||||
res = res.and(&range);
|
||||
let right = extract_time_range_from_expr(ts_col_name, ts_col_unit, right)
|
||||
.unwrap_or_else(TimestampRange::min_to_max);
|
||||
Some(left.and(&right))
|
||||
}
|
||||
res
|
||||
Operator::Or => {
|
||||
let left = extract_time_range_from_expr(ts_col_name, ts_col_unit, left)?;
|
||||
let right = extract_time_range_from_expr(ts_col_name, ts_col_unit, right)?;
|
||||
Some(left.or(&right))
|
||||
}
|
||||
Operator::NotEq
|
||||
| Operator::Plus
|
||||
| Operator::Minus
|
||||
| Operator::Multiply
|
||||
| Operator::Divide
|
||||
| Operator::Modulo
|
||||
| Operator::IsDistinctFrom
|
||||
| Operator::IsNotDistinctFrom
|
||||
| Operator::RegexMatch
|
||||
| Operator::RegexIMatch
|
||||
| Operator::RegexNotMatch
|
||||
| Operator::RegexNotIMatch
|
||||
| Operator::BitwiseAnd
|
||||
| Operator::BitwiseOr
|
||||
| Operator::BitwiseXor
|
||||
| Operator::BitwiseShiftRight
|
||||
| Operator::BitwiseShiftLeft
|
||||
| Operator::StringConcat
|
||||
| Operator::ArrowAt
|
||||
| Operator::AtArrow
|
||||
| Operator::LikeMatch
|
||||
| Operator::ILikeMatch
|
||||
| Operator::NotLikeMatch
|
||||
| Operator::NotILikeMatch => None,
|
||||
}
|
||||
}
|
||||
|
||||
fn get_timestamp_filter(ts_col_name: &str, left: &Expr, right: &Expr) -> Option<(Timestamp, bool)> {
|
||||
let (col, lit, reverse) = match (left, right) {
|
||||
(Expr::Column(column), Expr::Literal(scalar)) => (column, scalar, false),
|
||||
(Expr::Literal(scalar), Expr::Column(column)) => (column, scalar, true),
|
||||
_ => {
|
||||
return None;
|
||||
}
|
||||
};
|
||||
if col.name != ts_col_name {
|
||||
return None;
|
||||
}
|
||||
|
||||
/// Extract time range filter from `WHERE`/`IN (...)`/`BETWEEN` clauses.
|
||||
/// Return None if no time range can be found in expr.
|
||||
fn extract_time_range_from_expr(&self, expr: &Expr) -> Option<TimestampRange> {
|
||||
match expr {
|
||||
Expr::BinaryExpr(BinaryExpr { left, op, right }) => {
|
||||
self.extract_from_binary_expr(left, op, right)
|
||||
}
|
||||
Expr::Between(Between {
|
||||
expr,
|
||||
negated,
|
||||
low,
|
||||
high,
|
||||
}) => self.extract_from_between_expr(expr, negated, low, high),
|
||||
Expr::InList(InList {
|
||||
expr,
|
||||
list,
|
||||
negated,
|
||||
}) => self.extract_from_in_list_expr(expr, *negated, list),
|
||||
_ => None,
|
||||
}
|
||||
return_none_if_utf8!(lit);
|
||||
scalar_value_to_timestamp(lit, None).map(|t| (t, reverse))
|
||||
}
|
||||
|
||||
fn extract_from_between_expr(
|
||||
ts_col_name: &str,
|
||||
ts_col_unit: TimeUnit,
|
||||
expr: &Expr,
|
||||
negated: &bool,
|
||||
low: &Expr,
|
||||
high: &Expr,
|
||||
) -> Option<TimestampRange> {
|
||||
let Expr::Column(col) = expr else {
|
||||
return None;
|
||||
};
|
||||
if col.name != ts_col_name {
|
||||
return None;
|
||||
}
|
||||
|
||||
fn extract_from_binary_expr(
|
||||
&self,
|
||||
left: &Expr,
|
||||
op: &Operator,
|
||||
right: &Expr,
|
||||
) -> Option<TimestampRange> {
|
||||
match op {
|
||||
Operator::Eq => self
|
||||
.get_timestamp_filter(left, right)
|
||||
.and_then(|(ts, _)| ts.convert_to(self.ts_col_unit))
|
||||
.map(TimestampRange::single),
|
||||
Operator::Lt => {
|
||||
let (ts, reverse) = self.get_timestamp_filter(left, right)?;
|
||||
if reverse {
|
||||
// [lit] < ts_col
|
||||
let ts_val = ts.convert_to(self.ts_col_unit)?.value();
|
||||
Some(TimestampRange::from_start(Timestamp::new(
|
||||
ts_val + 1,
|
||||
self.ts_col_unit,
|
||||
)))
|
||||
} else {
|
||||
// ts_col < [lit]
|
||||
ts.convert_to_ceil(self.ts_col_unit)
|
||||
.map(|ts| TimestampRange::until_end(ts, false))
|
||||
}
|
||||
}
|
||||
Operator::LtEq => {
|
||||
let (ts, reverse) = self.get_timestamp_filter(left, right)?;
|
||||
if reverse {
|
||||
// [lit] <= ts_col
|
||||
ts.convert_to_ceil(self.ts_col_unit)
|
||||
.map(TimestampRange::from_start)
|
||||
} else {
|
||||
// ts_col <= [lit]
|
||||
ts.convert_to(self.ts_col_unit)
|
||||
.map(|ts| TimestampRange::until_end(ts, true))
|
||||
}
|
||||
}
|
||||
Operator::Gt => {
|
||||
let (ts, reverse) = self.get_timestamp_filter(left, right)?;
|
||||
if reverse {
|
||||
// [lit] > ts_col
|
||||
ts.convert_to_ceil(self.ts_col_unit)
|
||||
.map(|t| TimestampRange::until_end(t, false))
|
||||
} else {
|
||||
// ts_col > [lit]
|
||||
let ts_val = ts.convert_to(self.ts_col_unit)?.value();
|
||||
Some(TimestampRange::from_start(Timestamp::new(
|
||||
ts_val + 1,
|
||||
self.ts_col_unit,
|
||||
)))
|
||||
}
|
||||
}
|
||||
Operator::GtEq => {
|
||||
let (ts, reverse) = self.get_timestamp_filter(left, right)?;
|
||||
if reverse {
|
||||
// [lit] >= ts_col
|
||||
ts.convert_to(self.ts_col_unit)
|
||||
.map(|t| TimestampRange::until_end(t, true))
|
||||
} else {
|
||||
// ts_col >= [lit]
|
||||
ts.convert_to_ceil(self.ts_col_unit)
|
||||
.map(TimestampRange::from_start)
|
||||
}
|
||||
}
|
||||
Operator::And => {
|
||||
// instead of return none when failed to extract time range from left/right, we unwrap the none into
|
||||
// `TimestampRange::min_to_max`.
|
||||
let left = self
|
||||
.extract_time_range_from_expr(left)
|
||||
.unwrap_or_else(TimestampRange::min_to_max);
|
||||
let right = self
|
||||
.extract_time_range_from_expr(right)
|
||||
.unwrap_or_else(TimestampRange::min_to_max);
|
||||
Some(left.and(&right))
|
||||
}
|
||||
Operator::Or => {
|
||||
let left = self.extract_time_range_from_expr(left)?;
|
||||
let right = self.extract_time_range_from_expr(right)?;
|
||||
Some(left.or(&right))
|
||||
}
|
||||
Operator::NotEq
|
||||
| Operator::Plus
|
||||
| Operator::Minus
|
||||
| Operator::Multiply
|
||||
| Operator::Divide
|
||||
| Operator::Modulo
|
||||
| Operator::IsDistinctFrom
|
||||
| Operator::IsNotDistinctFrom
|
||||
| Operator::RegexMatch
|
||||
| Operator::RegexIMatch
|
||||
| Operator::RegexNotMatch
|
||||
| Operator::RegexNotIMatch
|
||||
| Operator::BitwiseAnd
|
||||
| Operator::BitwiseOr
|
||||
| Operator::BitwiseXor
|
||||
| Operator::BitwiseShiftRight
|
||||
| Operator::BitwiseShiftLeft
|
||||
| Operator::StringConcat
|
||||
| Operator::ArrowAt
|
||||
| Operator::AtArrow
|
||||
| Operator::LikeMatch
|
||||
| Operator::ILikeMatch
|
||||
| Operator::NotLikeMatch
|
||||
| Operator::NotILikeMatch => None,
|
||||
}
|
||||
if *negated {
|
||||
return None;
|
||||
}
|
||||
|
||||
fn get_timestamp_filter(&self, left: &Expr, right: &Expr) -> Option<(Timestamp, bool)> {
|
||||
let (col, lit, reverse) = match (left, right) {
|
||||
(Expr::Column(column), Expr::Literal(scalar)) => (column, scalar, false),
|
||||
(Expr::Literal(scalar), Expr::Column(column)) => (column, scalar, true),
|
||||
_ => {
|
||||
match (low, high) {
|
||||
(Expr::Literal(low), Expr::Literal(high)) => {
|
||||
return_none_if_utf8!(low);
|
||||
return_none_if_utf8!(high);
|
||||
|
||||
let low_opt =
|
||||
scalar_value_to_timestamp(low, None).and_then(|ts| ts.convert_to(ts_col_unit));
|
||||
let high_opt = scalar_value_to_timestamp(high, None)
|
||||
.and_then(|ts| ts.convert_to_ceil(ts_col_unit));
|
||||
Some(TimestampRange::new_inclusive(low_opt, high_opt))
|
||||
}
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
|
||||
/// Extract time range filter from `IN (...)` expr.
|
||||
fn extract_from_in_list_expr(
|
||||
ts_col_name: &str,
|
||||
expr: &Expr,
|
||||
negated: bool,
|
||||
list: &[Expr],
|
||||
) -> Option<TimestampRange> {
|
||||
if negated {
|
||||
return None;
|
||||
}
|
||||
let Expr::Column(col) = expr else {
|
||||
return None;
|
||||
};
|
||||
if col.name != ts_col_name {
|
||||
return None;
|
||||
}
|
||||
|
||||
if list.is_empty() {
|
||||
return Some(TimestampRange::empty());
|
||||
}
|
||||
let mut init_range = TimestampRange::empty();
|
||||
for expr in list {
|
||||
if let Expr::Literal(scalar) = expr {
|
||||
return_none_if_utf8!(scalar);
|
||||
if let Some(timestamp) = scalar_value_to_timestamp(scalar, None) {
|
||||
init_range = init_range.or(&TimestampRange::single(timestamp))
|
||||
} else {
|
||||
// TODO(hl): maybe we should raise an error here since cannot parse
|
||||
// timestamp value from in list expr
|
||||
return None;
|
||||
}
|
||||
};
|
||||
if col.name != self.ts_col_name {
|
||||
return None;
|
||||
}
|
||||
|
||||
return_none_if_utf8!(lit);
|
||||
scalar_value_to_timestamp(lit, None).map(|t| (t, reverse))
|
||||
}
|
||||
|
||||
fn extract_from_between_expr(
|
||||
&self,
|
||||
expr: &Expr,
|
||||
negated: &bool,
|
||||
low: &Expr,
|
||||
high: &Expr,
|
||||
) -> Option<TimestampRange> {
|
||||
let Expr::Column(col) = expr else {
|
||||
return None;
|
||||
};
|
||||
if col.name != self.ts_col_name {
|
||||
return None;
|
||||
}
|
||||
|
||||
if *negated {
|
||||
return None;
|
||||
}
|
||||
|
||||
match (low, high) {
|
||||
(Expr::Literal(low), Expr::Literal(high)) => {
|
||||
return_none_if_utf8!(low);
|
||||
return_none_if_utf8!(high);
|
||||
|
||||
let low_opt = scalar_value_to_timestamp(low, None)
|
||||
.and_then(|ts| ts.convert_to(self.ts_col_unit));
|
||||
let high_opt = scalar_value_to_timestamp(high, None)
|
||||
.and_then(|ts| ts.convert_to_ceil(self.ts_col_unit));
|
||||
Some(TimestampRange::new_inclusive(low_opt, high_opt))
|
||||
}
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
|
||||
/// Extract time range filter from `IN (...)` expr.
|
||||
fn extract_from_in_list_expr(
|
||||
&self,
|
||||
expr: &Expr,
|
||||
negated: bool,
|
||||
list: &[Expr],
|
||||
) -> Option<TimestampRange> {
|
||||
if negated {
|
||||
return None;
|
||||
}
|
||||
let Expr::Column(col) = expr else {
|
||||
return None;
|
||||
};
|
||||
if col.name != self.ts_col_name {
|
||||
return None;
|
||||
}
|
||||
|
||||
if list.is_empty() {
|
||||
return Some(TimestampRange::empty());
|
||||
}
|
||||
let mut init_range = TimestampRange::empty();
|
||||
for expr in list {
|
||||
if let Expr::Literal(scalar) = expr {
|
||||
return_none_if_utf8!(scalar);
|
||||
if let Some(timestamp) = scalar_value_to_timestamp(scalar, None) {
|
||||
init_range = init_range.or(&TimestampRange::single(timestamp))
|
||||
} else {
|
||||
// TODO(hl): maybe we should raise an error here since cannot parse
|
||||
// timestamp value from in list expr
|
||||
return None;
|
||||
}
|
||||
}
|
||||
}
|
||||
Some(init_range)
|
||||
}
|
||||
Some(init_range)
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
@@ -395,7 +389,7 @@ mod tests {
|
||||
fn check_build_predicate(expr: Expr, expect: TimestampRange) {
|
||||
assert_eq!(
|
||||
expect,
|
||||
TimeRangePredicateBuilder::new("ts", TimeUnit::Millisecond, &[expr]).build()
|
||||
build_time_range_predicate("ts", TimeUnit::Millisecond, &mut vec![expr])
|
||||
);
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user