From b6cd91f446031b697fad0e1935073b7a8727b4a5 Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Sat, 4 Apr 2026 03:50:28 +0800 Subject: [PATCH] init impl Signed-off-by: Ruihang Xia --- src/query/src/optimizer.rs | 1 + .../src/optimizer/const_normalization.rs | 740 ++++++++++++++++++ src/query/src/query_engine/state.rs | 2 + .../common/tql-explain-analyze/explain.sql | 11 + .../time_index_filter_pushdown.result | 42 +- .../optimizer/time_index_filter_pushdown.sql | 31 +- 6 files changed, 806 insertions(+), 21 deletions(-) create mode 100644 src/query/src/optimizer/const_normalization.rs diff --git a/src/query/src/optimizer.rs b/src/query/src/optimizer.rs index aaac1e3124..752cac1436 100644 --- a/src/query/src/optimizer.rs +++ b/src/query/src/optimizer.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +pub mod const_normalization; pub mod constant_term; pub mod count_nest_aggr; pub mod count_wildcard; diff --git a/src/query/src/optimizer/const_normalization.rs b/src/query/src/optimizer/const_normalization.rs new file mode 100644 index 0000000000..8c8dc60089 --- /dev/null +++ b/src/query/src/optimizer/const_normalization.rs @@ -0,0 +1,740 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use arrow_schema::{DataType, TimeUnit as ArrowTimeUnit}; +use datafusion::config::ConfigOptions; +use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeRewriter}; +use datafusion_common::{DFSchemaRef, Result, ScalarValue}; +use datafusion_expr::expr::{Cast, InList, TryCast}; +use datafusion_expr::{ + Between, BinaryExpr, Expr, ExprSchemable, Filter, LogicalPlan, Operator, TableScan, +}; +use datafusion_optimizer::analyzer::AnalyzerRule; + +/// ConstNormalizationRule removes casts on column operands in filters when the +/// constant side can be normalized to the source column type ahead of filter +/// pushdown. +#[derive(Debug)] +pub struct ConstNormalizationRule; + +impl AnalyzerRule for ConstNormalizationRule { + fn analyze(&self, plan: LogicalPlan, _config: &ConfigOptions) -> Result { + plan.transform(|plan| match plan { + LogicalPlan::Filter(filter) => normalize_filter_plan(filter), + LogicalPlan::TableScan(scan) => normalize_scan_plan(scan), + _ => Ok(Transformed::no(plan)), + }) + .map(|x| x.data) + } + + fn name(&self) -> &str { + "ConstNormalizationRule" + } +} + +fn normalize_filter_plan(filter: Filter) -> Result> { + let predicate = normalize_filter_expr(filter.predicate.clone(), filter.input.schema().clone())?; + if predicate != filter.predicate { + Ok(Transformed::yes(LogicalPlan::Filter(Filter::try_new( + predicate, + filter.input, + )?))) + } else { + Ok(Transformed::no(LogicalPlan::Filter(filter))) + } +} + +fn normalize_scan_plan(scan: TableScan) -> Result> { + if scan.filters.is_empty() { + return Ok(Transformed::no(LogicalPlan::TableScan(scan))); + } + + let filters = scan + .filters + .iter() + .map(|expr| normalize_filter_expr(expr.clone(), scan.projected_schema.clone())) + .collect::>>()?; + + if filters != scan.filters { + Ok(Transformed::yes(LogicalPlan::TableScan(TableScan { + table_name: scan.table_name, + source: scan.source, + projection: scan.projection, + projected_schema: scan.projected_schema, + filters, + fetch: scan.fetch, + }))) + } else { + Ok(Transformed::no(LogicalPlan::TableScan(scan))) + } +} + +fn normalize_filter_expr(expr: Expr, schema: DFSchemaRef) -> Result { + expr.rewrite(&mut ConstNormalizer { schema }) + .map(|x| x.data) +} + +struct ConstNormalizer { + schema: DFSchemaRef, +} + +impl TreeNodeRewriter for ConstNormalizer { + type Node = Expr; + + fn f_up(&mut self, expr: Expr) -> Result> { + let original = expr.clone(); + let new_expr = match expr { + Expr::BinaryExpr(BinaryExpr { left, op, right }) => { + normalize_binary_expr(*left, op, *right, &self.schema)? + } + Expr::Between(Between { + expr, + negated, + low, + high, + }) => normalize_between_expr(*expr, negated, *low, *high, &self.schema)?, + Expr::InList(InList { + expr, + list, + negated, + }) => normalize_in_list_expr(*expr, list, negated, &self.schema)?, + expr => expr, + }; + + if new_expr != original { + Ok(Transformed::yes(new_expr)) + } else { + Ok(Transformed::no(new_expr)) + } + } +} + +fn normalize_binary_expr( + left: Expr, + op: Operator, + right: Expr, + schema: &DFSchemaRef, +) -> Result { + if let Some(expr) = + normalize_binary_with_casted_column(left.clone(), op, right.clone(), schema)? + { + return Ok(expr); + } + + if let Some(swapped_op) = op.swap() + && let Some(expr) = + normalize_binary_with_casted_column(right.clone(), swapped_op, left.clone(), schema)? + { + return Ok(expr); + } + + Ok(Expr::BinaryExpr(BinaryExpr { + left: Box::new(left), + op, + right: Box::new(right), + })) +} + +fn normalize_binary_with_casted_column( + casted_column: Expr, + op: Operator, + constant: Expr, + schema: &DFSchemaRef, +) -> Result> { + let Some(casted_column) = extract_casted_column(&casted_column, schema)? else { + return Ok(None); + }; + let Some(constant) = extract_constant_scalar(&constant)? else { + return Ok(None); + }; + + if let Some(expr) = normalize_lossless_binary(&casted_column, op, &constant) { + return Ok(Some(expr)); + } + + Ok(normalize_timestamp_downcast_binary( + &casted_column, + op, + &constant, + )) +} + +fn normalize_between_expr( + expr: Expr, + negated: bool, + low: Expr, + high: Expr, + schema: &DFSchemaRef, +) -> Result { + if negated { + return Ok(Expr::Between(Between { + expr: Box::new(expr), + negated, + low: Box::new(low), + high: Box::new(high), + })); + } + + let Some(casted_column) = extract_casted_column(&expr, schema)? else { + return Ok(Expr::Between(Between { + expr: Box::new(expr), + negated, + low: Box::new(low), + high: Box::new(high), + })); + }; + let Some(low_value) = extract_constant_scalar(&low)? else { + return Ok(Expr::Between(Between { + expr: Box::new(expr), + negated, + low: Box::new(low), + high: Box::new(high), + })); + }; + let Some(high_value) = extract_constant_scalar(&high)? else { + return Ok(Expr::Between(Between { + expr: Box::new(expr), + negated, + low: Box::new(low), + high: Box::new(high), + })); + }; + + if let (Some(low), Some(high)) = ( + normalize_lossless_literal(&casted_column, &low_value), + normalize_lossless_literal(&casted_column, &high_value), + ) { + return Ok(Expr::Between(Between { + expr: Box::new(casted_column.source_expr.clone()), + negated, + low: Box::new(Expr::Literal(low, None)), + high: Box::new(Expr::Literal(high, None)), + })); + } + + if let Some(expr) = + normalize_timestamp_downcast_between(&casted_column, &low_value, &high_value) + { + return Ok(expr); + } + + Ok(Expr::Between(Between { + expr: Box::new(expr), + negated, + low: Box::new(low), + high: Box::new(high), + })) +} + +fn normalize_in_list_expr( + expr: Expr, + list: Vec, + negated: bool, + schema: &DFSchemaRef, +) -> Result { + let Some(casted_column) = extract_casted_column(&expr, schema)? else { + return Ok(Expr::InList(InList { + expr: Box::new(expr), + list, + negated, + })); + }; + + let mut new_list = Vec::with_capacity(list.len()); + for item in &list { + let Some(value) = extract_constant_scalar(item)? else { + return Ok(Expr::InList(InList { + expr: Box::new(expr), + list, + negated, + })); + }; + let Some(normalized) = normalize_lossless_literal(&casted_column, &value) else { + return Ok(Expr::InList(InList { + expr: Box::new(expr), + list, + negated, + })); + }; + new_list.push(Expr::Literal(normalized, None)); + } + + Ok(Expr::InList(InList { + expr: Box::new(casted_column.source_expr.clone()), + list: new_list, + negated, + })) +} + +#[derive(Clone)] +struct CastedColumn { + source_expr: Expr, + source_type: DataType, + kind: CastKind, +} + +#[derive(Clone)] +enum CastKind { + Lossless, + TimestampDowncast { + source_unit: ArrowTimeUnit, + target_unit: ArrowTimeUnit, + timezone: Option>, + }, +} + +fn extract_casted_column(expr: &Expr, schema: &DFSchemaRef) -> Result> { + let (source_expr, target_type) = match expr { + Expr::Cast(Cast { expr, data_type }) | Expr::TryCast(TryCast { expr, data_type }) => { + (expr.as_ref(), data_type) + } + _ => return Ok(None), + }; + + if !matches!(source_expr, Expr::Column(_)) { + return Ok(None); + } + + let source_type = source_expr.get_type(schema)?; + let Some(kind) = classify_cast_kind(&source_type, target_type) else { + return Ok(None); + }; + + Ok(Some(CastedColumn { + source_expr: source_expr.clone(), + source_type, + kind, + })) +} + +fn classify_cast_kind(source_type: &DataType, target_type: &DataType) -> Option { + if is_lossless_column_cast(source_type, target_type) { + return Some(CastKind::Lossless); + } + + match (source_type, target_type) { + ( + DataType::Timestamp(source_unit, source_tz), + DataType::Timestamp(target_unit, target_tz), + ) if source_tz == target_tz + && time_unit_rank(*source_unit) > time_unit_rank(*target_unit) => + { + Some(CastKind::TimestampDowncast { + source_unit: *source_unit, + target_unit: *target_unit, + timezone: source_tz.clone(), + }) + } + _ => None, + } +} + +fn is_lossless_column_cast(source_type: &DataType, target_type: &DataType) -> bool { + match (source_type, target_type) { + (DataType::Int8, DataType::Int16 | DataType::Int32 | DataType::Int64) + | (DataType::Int16, DataType::Int32 | DataType::Int64) + | (DataType::Int32, DataType::Int64) + | (DataType::UInt8, DataType::UInt16 | DataType::UInt32 | DataType::UInt64) + | (DataType::UInt16, DataType::UInt32 | DataType::UInt64) + | (DataType::UInt32, DataType::UInt64) => true, + ( + DataType::Timestamp(source_unit, source_tz), + DataType::Timestamp(target_unit, target_tz), + ) => source_tz == target_tz && time_unit_rank(*source_unit) <= time_unit_rank(*target_unit), + _ => false, + } +} + +fn normalize_lossless_binary( + casted_column: &CastedColumn, + op: Operator, + constant: &ScalarValue, +) -> Option { + let normalized = normalize_lossless_literal(casted_column, constant)?; + Some(Expr::BinaryExpr(BinaryExpr { + left: Box::new(casted_column.source_expr.clone()), + op, + right: Box::new(Expr::Literal(normalized, None)), + })) +} + +fn normalize_lossless_literal( + casted_column: &CastedColumn, + constant: &ScalarValue, +) -> Option { + matches!(casted_column.kind, CastKind::Lossless) + .then_some(()) + .and_then(|_| cast_literal_losslessly(constant, &casted_column.source_type)) +} + +fn normalize_timestamp_downcast_binary( + casted_column: &CastedColumn, + op: Operator, + constant: &ScalarValue, +) -> Option { + let CastKind::TimestampDowncast { + source_unit, + target_unit, + timezone, + } = &casted_column.kind + else { + return None; + }; + + let constant = constant + .cast_to(&DataType::Timestamp(*target_unit, timezone.clone())) + .ok()?; + let value = timestamp_scalar_value(&constant)?; + let bound = match op { + Operator::GtEq => lower_bound_for_ge(value, *source_unit, *target_unit)?, + Operator::Gt => lower_bound_for_ge(value.checked_add(1)?, *source_unit, *target_unit)?, + Operator::Lt => lower_bound_for_ge(value, *source_unit, *target_unit)?, + Operator::LtEq => lower_bound_for_ge(value.checked_add(1)?, *source_unit, *target_unit)?, + _ => return None, + }; + + let normalized_op = match op { + Operator::GtEq | Operator::Gt => Operator::GtEq, + Operator::Lt | Operator::LtEq => Operator::Lt, + _ => return None, + }; + + Some(Expr::BinaryExpr(BinaryExpr { + left: Box::new(casted_column.source_expr.clone()), + op: normalized_op, + right: Box::new(Expr::Literal( + timestamp_scalar(*source_unit, timezone.clone(), bound), + None, + )), + })) +} + +fn normalize_timestamp_downcast_between( + casted_column: &CastedColumn, + low: &ScalarValue, + high: &ScalarValue, +) -> Option { + let CastKind::TimestampDowncast { + source_unit, + target_unit, + timezone, + } = &casted_column.kind + else { + return None; + }; + + let target_type = DataType::Timestamp(*target_unit, timezone.clone()); + let low = low.cast_to(&target_type).ok()?; + let high = high.cast_to(&target_type).ok()?; + let low = timestamp_scalar_value(&low)?; + let high = timestamp_scalar_value(&high)?; + + let lower = lower_bound_for_ge(low, *source_unit, *target_unit)?; + let upper = lower_bound_for_ge(high.checked_add(1)?, *source_unit, *target_unit)?; + + Some( + Expr::BinaryExpr(BinaryExpr { + left: Box::new(casted_column.source_expr.clone()), + op: Operator::GtEq, + right: Box::new(Expr::Literal( + timestamp_scalar(*source_unit, timezone.clone(), lower), + None, + )), + }) + .and(Expr::BinaryExpr(BinaryExpr { + left: Box::new(casted_column.source_expr.clone()), + op: Operator::Lt, + right: Box::new(Expr::Literal( + timestamp_scalar(*source_unit, timezone.clone(), upper), + None, + )), + })), + ) +} + +fn extract_constant_scalar(expr: &Expr) -> Result> { + match expr { + Expr::Literal(value, _) => Ok(Some(value.clone())), + Expr::Cast(Cast { expr, data_type }) => extract_constant_scalar(expr)? + .map(|value| value.cast_to(data_type)) + .transpose(), + Expr::TryCast(TryCast { expr, data_type }) => { + Ok(extract_constant_scalar(expr)?.and_then(|value| value.cast_to(data_type).ok())) + } + _ => Ok(None), + } +} + +fn cast_literal_losslessly(value: &ScalarValue, target_type: &DataType) -> Option { + let casted = value.cast_to(target_type).ok()?; + let round_trip = casted.cast_to(&value.data_type()).ok()?; + (round_trip == *value).then_some(casted) +} + +fn time_unit_rank(unit: ArrowTimeUnit) -> usize { + match unit { + ArrowTimeUnit::Second => 0, + ArrowTimeUnit::Millisecond => 1, + ArrowTimeUnit::Microsecond => 2, + ArrowTimeUnit::Nanosecond => 3, + } +} + +fn time_unit_scale(unit: ArrowTimeUnit) -> i64 { + match unit { + ArrowTimeUnit::Second => 1, + ArrowTimeUnit::Millisecond => 1_000, + ArrowTimeUnit::Microsecond => 1_000_000, + ArrowTimeUnit::Nanosecond => 1_000_000_000, + } +} + +fn finer_to_coarser_ratio(source_unit: ArrowTimeUnit, target_unit: ArrowTimeUnit) -> Option { + let source_scale = time_unit_scale(source_unit); + let target_scale = time_unit_scale(target_unit); + (source_scale >= target_scale).then_some(source_scale / target_scale) +} + +fn lower_bound_for_ge( + target_value: i64, + source_unit: ArrowTimeUnit, + target_unit: ArrowTimeUnit, +) -> Option { + let ratio = finer_to_coarser_ratio(source_unit, target_unit)?; + let base = target_value.checked_mul(ratio)?; + if target_value < 0 { + base.checked_sub(ratio - 1) + } else { + Some(base) + } +} + +fn timestamp_scalar_value(value: &ScalarValue) -> Option { + match value { + ScalarValue::TimestampSecond(Some(value), _) + | ScalarValue::TimestampMillisecond(Some(value), _) + | ScalarValue::TimestampMicrosecond(Some(value), _) + | ScalarValue::TimestampNanosecond(Some(value), _) => Some(*value), + _ => None, + } +} + +fn timestamp_scalar(unit: ArrowTimeUnit, timezone: Option>, value: i64) -> ScalarValue { + match unit { + ArrowTimeUnit::Second => ScalarValue::TimestampSecond(Some(value), timezone), + ArrowTimeUnit::Millisecond => ScalarValue::TimestampMillisecond(Some(value), timezone), + ArrowTimeUnit::Microsecond => ScalarValue::TimestampMicrosecond(Some(value), timezone), + ArrowTimeUnit::Nanosecond => ScalarValue::TimestampNanosecond(Some(value), timezone), + } +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use arrow_schema::{DataType, TimeUnit as ArrowTimeUnit}; + use async_trait::async_trait; + use common_time::Timestamp; + use common_time::range::TimestampRange; + use common_time::timestamp::TimeUnit; + use datafusion::catalog::Session; + use datafusion::config::ConfigOptions; + use datafusion::datasource::{TableProvider, provider_as_source}; + use datafusion::physical_plan::ExecutionPlan; + use datafusion_common::arrow::datatypes::Field; + use datafusion_common::{DFSchema, ScalarValue, ToDFSchema}; + use datafusion_expr::expr_fn::{cast, col}; + use datafusion_expr::{ + Expr, LogicalPlan, LogicalPlanBuilder, TableProviderFilterPushDown, TableSource, TableType, + lit, + }; + use datafusion_optimizer::analyzer::AnalyzerRule; + use datafusion_optimizer::optimizer::{Optimizer, OptimizerContext}; + use datafusion_optimizer::push_down_filter::PushDownFilter; + use table::predicate::build_time_range_predicate; + + use super::ConstNormalizationRule; + + #[test] + fn test_normalize_direct_integer_cast_comparison() { + let schema = test_schema(vec![Field::new("v", DataType::Int32, false)]); + let plan = LogicalPlanBuilder::scan("t", test_source(schema.clone()), None) + .unwrap() + .filter(cast(col("v"), DataType::Int64).gt_eq(lit(42_i64))) + .unwrap() + .build() + .unwrap(); + + let result = ConstNormalizationRule + .analyze(plan, &ConfigOptions::default()) + .unwrap(); + + assert_eq!( + "Filter: t.v >= Int32(42)\n TableScan: t", + result.to_string() + ); + } + + #[test] + fn test_normalize_in_list_and_between() { + let schema = test_schema(vec![Field::new("v", DataType::Int16, false)]); + let in_list_plan = LogicalPlanBuilder::scan("t", test_source(schema.clone()), None) + .unwrap() + .filter(cast(col("v"), DataType::Int64).in_list(vec![lit(1_i64), lit(2_i64)], false)) + .unwrap() + .build() + .unwrap(); + + let between_plan = LogicalPlanBuilder::scan("t", test_source(schema), None) + .unwrap() + .filter(cast(col("v"), DataType::Int64).between(lit(3_i64), lit(5_i64))) + .unwrap() + .build() + .unwrap(); + + let in_list = ConstNormalizationRule + .analyze(in_list_plan, &ConfigOptions::default()) + .unwrap(); + let between = ConstNormalizationRule + .analyze(between_plan, &ConfigOptions::default()) + .unwrap(); + + assert_eq!( + "Filter: t.v IN ([Int16(1), Int16(2)])\n TableScan: t", + in_list.to_string() + ); + assert_eq!( + "Filter: t.v BETWEEN Int16(3) AND Int16(5)\n TableScan: t", + between.to_string() + ); + } + + #[test] + fn test_normalize_direct_timestamp_filter() { + let schema = test_schema(vec![ + Field::new( + "ts", + DataType::Timestamp(ArrowTimeUnit::Nanosecond, None), + false, + ), + Field::new("tag", DataType::Utf8, true), + ]); + let plan = LogicalPlanBuilder::scan("t", test_source(schema), None) + .unwrap() + .filter( + cast( + col("ts"), + DataType::Timestamp(ArrowTimeUnit::Millisecond, None), + ) + .gt_eq(lit(ScalarValue::TimestampMillisecond(Some(-299_999), None))) + .and( + cast( + col("ts"), + DataType::Timestamp(ArrowTimeUnit::Millisecond, None), + ) + .lt_eq(lit(ScalarValue::TimestampMillisecond(Some(10_000), None))), + ) + .and(col("tag").eq(lit("api"))), + ) + .unwrap() + .build() + .unwrap(); + + let analyzed = ConstNormalizationRule + .analyze(plan, &ConfigOptions::default()) + .unwrap(); + let expected = "\ +\nFilter: t.ts >= TimestampNanosecond(-299999999999, None) AND t.ts < TimestampNanosecond(10001000000, None) AND t.tag = Utf8(\"api\")\ +\n TableScan: t"; + assert_eq!(expected.trim_start(), analyzed.to_string()); + + let pushed = Optimizer::with_rules(vec![Arc::new(PushDownFilter::new())]) + .optimize(analyzed, &OptimizerContext::new(), |_, _| {}) + .unwrap(); + let expected = "\ +\nTableScan: t, full_filters=[t.ts >= TimestampNanosecond(-299999999999, None), t.ts < TimestampNanosecond(10001000000, None), t.tag = Utf8(\"api\")]"; + assert_eq!(expected.trim_start(), pushed.to_string()); + + let filters = extract_scan_filters(&pushed); + let range = build_time_range_predicate("ts", TimeUnit::Nanosecond, &filters); + assert_eq!( + TimestampRange::new_inclusive( + Some(Timestamp::new_nanosecond(-299_999_999_999)), + Some(Timestamp::new_nanosecond(10_000_999_999)) + ), + range + ); + } + + fn extract_scan_filters(plan: &LogicalPlan) -> Vec { + match plan { + LogicalPlan::TableScan(scan) => scan.filters.clone(), + _ => plan + .inputs() + .into_iter() + .flat_map(extract_scan_filters) + .collect(), + } + } + + fn test_schema(fields: Vec) -> Arc { + arrow_schema::Schema::new(fields).to_dfschema_ref().unwrap() + } + + fn test_source(schema: Arc) -> Arc { + let table = ExactPushdownProvider { + schema: Arc::new(schema.as_ref().as_arrow().clone()), + }; + provider_as_source(Arc::new(table)) + } + + #[derive(Debug)] + struct ExactPushdownProvider { + schema: arrow_schema::SchemaRef, + } + + #[async_trait] + impl TableProvider for ExactPushdownProvider { + fn as_any(&self) -> &dyn std::any::Any { + self + } + + fn schema(&self) -> arrow_schema::SchemaRef { + self.schema.clone() + } + + fn table_type(&self) -> TableType { + TableType::Base + } + + async fn scan( + &self, + _state: &dyn Session, + _projection: Option<&Vec>, + _filters: &[Expr], + _limit: Option, + ) -> datafusion::error::Result> { + unreachable!("scan should not be called in const_normalization tests") + } + + fn supports_filters_pushdown( + &self, + filters: &[&Expr], + ) -> datafusion::error::Result> { + Ok(vec![TableProviderFilterPushDown::Exact; filters.len()]) + } + } +} diff --git a/src/query/src/query_engine/state.rs b/src/query/src/query_engine/state.rs index f696c8b53e..3646f468f4 100644 --- a/src/query/src/query_engine/state.rs +++ b/src/query/src/query_engine/state.rs @@ -59,6 +59,7 @@ use crate::dist_plan::{ }; use crate::metrics::{QUERY_MEMORY_POOL_REJECTED_TOTAL, QUERY_MEMORY_POOL_USAGE_BYTES}; use crate::optimizer::ExtensionAnalyzerRule; +use crate::optimizer::const_normalization::ConstNormalizationRule; use crate::optimizer::constant_term::MatchesConstantTermOptimizer; use crate::optimizer::count_nest_aggr::CountNestAggrRule; use crate::optimizer::count_wildcard::CountWildcardToTimeIndexRule; @@ -156,6 +157,7 @@ impl QueryEngineState { analyzer .rules .insert(0, Arc::new(CountWildcardToTimeIndexRule)); + analyzer.rules.push(Arc::new(ConstNormalizationRule)); // Add ApplyFunctionRewrites rule, // Note we cannot use `analyzer.add_function_rewrite` diff --git a/tests/cases/standalone/common/tql-explain-analyze/explain.sql b/tests/cases/standalone/common/tql-explain-analyze/explain.sql index 5e296d4b6b..ecd438f960 100644 --- a/tests/cases/standalone/common/tql-explain-analyze/explain.sql +++ b/tests/cases/standalone/common/tql-explain-analyze/explain.sql @@ -35,4 +35,15 @@ TQL EXPLAIN VERBOSE (0, 10, '5s') test; -- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED TQL EXPLAIN VERBOSE (0, 10, '5s') test AS series; +CREATE TABLE test_nano(i DOUBLE, j TIMESTAMP(9) TIME INDEX, k STRING PRIMARY KEY); + +INSERT INTO test_nano VALUES (1, 1000000, "a"), (1, 1000000, "b"), (2, 2000000, "a"); + +-- explain at 0s, 5s and 10s for a nanosecond time index. +-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED +-- SQLNESS REPLACE (peers.*) REDACTED +TQL EXPLAIN (0, 10, '5s') test_nano; + +DROP TABLE test_nano; + DROP TABLE test; diff --git a/tests/cases/standalone/optimizer/time_index_filter_pushdown.result b/tests/cases/standalone/optimizer/time_index_filter_pushdown.result index ea10cfc416..40948080a1 100644 --- a/tests/cases/standalone/optimizer/time_index_filter_pushdown.result +++ b/tests/cases/standalone/optimizer/time_index_filter_pushdown.result @@ -4,6 +4,7 @@ CREATE TABLE IF NOT EXISTS `cpu` ( `rack` STRING NULL, `os` STRING NULL, + `usage_small` SMALLINT NULL, `usage_user` BIGINT NULL, `greptime_timestamp` TIMESTAMP(9) NOT NULL, TIME INDEX (`greptime_timestamp`), @@ -25,11 +26,11 @@ Affected Rows: 0 INSERT INTO cpu VALUES - ("1", "linux", 10, "2023-06-12 01:04:49"), - ("1", "linux", 15, "2023-06-12 01:04:50"), - ("3", "windows", 25, "2023-06-12 01:05:00"), - ("5", "mac", 30, "2023-06-12 01:03:00"), - ("7", "linux", 45, "2023-06-12 02:00:00"); + ("1", "linux", 10, 10, "2023-06-12 01:04:49"), + ("1", "linux", 15, 15, "2023-06-12 01:04:50"), + ("3", "windows", 25, 25, "2023-06-12 01:05:00"), + ("5", "mac", 30, 30, "2023-06-12 01:03:00"), + ("7", "linux", 45, 45, "2023-06-12 02:00:00"); Affected Rows: 5 @@ -44,14 +45,35 @@ ADMIN FLUSH_TABLE ('cpu'); INSERT INTO cpu VALUES - ("2", "linux", 20, "2023-06-12 01:04:51"), - ("2", "windows", 22, "2023-06-12 01:06:00"), - ("4", "mac", 12, "2023-06-12 00:59:00"), - ("6", "linux", 35, "2023-06-12 01:04:55"), - ("8", "windows", 50, "2023-06-12 02:10:00"); + ("2", "linux", 20, 20, "2023-06-12 01:04:51"), + ("2", "windows", 22, 22, "2023-06-12 01:06:00"), + ("4", "mac", 12, 12, "2023-06-12 00:59:00"), + ("6", "linux", 35, 35, "2023-06-12 01:04:55"), + ("8", "windows", 50, 50, "2023-06-12 02:10:00"); Affected Rows: 5 +-- SQLNESS REPLACE (peers.*) REDACTED +EXPLAIN SELECT + rack +FROM + cpu +WHERE + usage_small IN (CAST(10 AS BIGINT), CAST(20 AS BIGINT)); + ++---------------+---------------------------------------------------------------------------------------------------------------------------------------------------+ +| plan_type | plan | ++---------------+---------------------------------------------------------------------------------------------------------------------------------------------------+ +| logical_plan | MergeScan [is_placeholder=false, remote_input=[ | +| | Projection: cpu.rack | +| | Filter: cpu.usage_small = Int16(10) OR cpu.usage_small = Int16(20) | +| | TableScan: cpu | +| | ]] | +| physical_plan | CooperativeExec | +| | MergeScanExec: REDACTED +| | | ++---------------+---------------------------------------------------------------------------------------------------------------------------------------------------+ + -- SQLNESS SORT_RESULT 3 1 select count(*) diff --git a/tests/cases/standalone/optimizer/time_index_filter_pushdown.sql b/tests/cases/standalone/optimizer/time_index_filter_pushdown.sql index 28f4180a2c..de16d8e607 100644 --- a/tests/cases/standalone/optimizer/time_index_filter_pushdown.sql +++ b/tests/cases/standalone/optimizer/time_index_filter_pushdown.sql @@ -4,6 +4,7 @@ CREATE TABLE IF NOT EXISTS `cpu` ( `rack` STRING NULL, `os` STRING NULL, + `usage_small` SMALLINT NULL, `usage_user` BIGINT NULL, `greptime_timestamp` TIMESTAMP(9) NOT NULL, TIME INDEX (`greptime_timestamp`), @@ -23,22 +24,30 @@ WITH INSERT INTO cpu VALUES - ("1", "linux", 10, "2023-06-12 01:04:49"), - ("1", "linux", 15, "2023-06-12 01:04:50"), - ("3", "windows", 25, "2023-06-12 01:05:00"), - ("5", "mac", 30, "2023-06-12 01:03:00"), - ("7", "linux", 45, "2023-06-12 02:00:00"); + ("1", "linux", 10, 10, "2023-06-12 01:04:49"), + ("1", "linux", 15, 15, "2023-06-12 01:04:50"), + ("3", "windows", 25, 25, "2023-06-12 01:05:00"), + ("5", "mac", 30, 30, "2023-06-12 01:03:00"), + ("7", "linux", 45, 45, "2023-06-12 02:00:00"); ADMIN FLUSH_TABLE ('cpu'); INSERT INTO cpu VALUES - ("2", "linux", 20, "2023-06-12 01:04:51"), - ("2", "windows", 22, "2023-06-12 01:06:00"), - ("4", "mac", 12, "2023-06-12 00:59:00"), - ("6", "linux", 35, "2023-06-12 01:04:55"), - ("8", "windows", 50, "2023-06-12 02:10:00"); + ("2", "linux", 20, 20, "2023-06-12 01:04:51"), + ("2", "windows", 22, 22, "2023-06-12 01:06:00"), + ("4", "mac", 12, 12, "2023-06-12 00:59:00"), + ("6", "linux", 35, 35, "2023-06-12 01:04:55"), + ("8", "windows", 50, 50, "2023-06-12 02:10:00"); + +-- SQLNESS REPLACE (peers.*) REDACTED +EXPLAIN SELECT + rack +FROM + cpu +WHERE + usage_small IN (CAST(10 AS BIGINT), CAST(20 AS BIGINT)); -- SQLNESS SORT_RESULT 3 1 select @@ -70,4 +79,4 @@ where group by os; -drop table cpu; \ No newline at end of file +drop table cpu;