From ac6d68aa2dc5957da445e19f22b6ca447ccb94af Mon Sep 17 00:00:00 2001 From: discord9 <55937128+discord9@users.noreply.github.com> Date: Wed, 7 Jan 2026 17:22:26 +0800 Subject: [PATCH] fix: simp expr recursively (#7523) * fix: simp expr recursively Signed-off-by: discord9 * test: some simple constant folding case Signed-off-by: discord9 * fix: literal ts cast to UTC Signed-off-by: discord9 * fix: patch merge scan batch col tz instead Signed-off-by: discord9 * test: fix Signed-off-by: discord9 --------- Signed-off-by: discord9 --- src/query/src/dist_plan/analyzer.rs | 45 ++----- src/query/src/dist_plan/analyzer/test.rs | 115 ++++++++++++++++-- src/query/src/dist_plan/analyzer/utils.rs | 89 +++++++++++++- src/query/src/dist_plan/merge_scan.rs | 4 +- .../explain/multi_partitions.result | 2 +- .../distributed/explain/step_aggr.result | 2 +- .../explain/step_aggr_basic.result | 4 +- .../explain/step_aggr_massive.result | 80 ++++++------ .../distributed/explain/step_aggr_massive.sql | 8 +- .../optimizer/first_value_advance.result | 4 +- .../optimizer/last_value_advance.result | 4 +- .../standalone/common/select/tz_encode.result | 71 +++++++++++ .../standalone/common/select/tz_encode.sql | 38 ++++++ .../standalone/common/tql/tql-cte.result | 10 +- tests/cases/standalone/limit/limit.result | 2 +- .../optimizer/first_value_advance.result | 4 +- .../optimizer/last_value_advance.result | 4 +- 17 files changed, 377 insertions(+), 109 deletions(-) create mode 100644 tests/cases/standalone/common/select/tz_encode.result create mode 100644 tests/cases/standalone/common/select/tz_encode.sql diff --git a/src/query/src/dist_plan/analyzer.rs b/src/query/src/dist_plan/analyzer.rs index c8e21347e0..6eb8b5eac6 100644 --- a/src/query/src/dist_plan/analyzer.rs +++ b/src/query/src/dist_plan/analyzer.rs @@ -15,27 +15,26 @@ use std::collections::{BTreeMap, BTreeSet, HashSet}; use std::sync::Arc; -use chrono::{DateTime, Utc}; use common_telemetry::debug; use datafusion::config::{ConfigExtension, ExtensionOptions}; use datafusion::datasource::DefaultTableSource; use datafusion::error::Result as DfResult; use datafusion_common::Column; -use datafusion_common::alias::AliasGenerator; use datafusion_common::config::ConfigOptions; use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeRewriter}; use datafusion_expr::expr::{Exists, InSubquery}; use datafusion_expr::utils::expr_to_columns; use datafusion_expr::{Expr, LogicalPlan, LogicalPlanBuilder, Subquery, col as col_fn}; use datafusion_optimizer::analyzer::AnalyzerRule; -use datafusion_optimizer::simplify_expressions::SimplifyExpressions; -use datafusion_optimizer::{OptimizerConfig, OptimizerRule}; use promql::extension_plan::SeriesDivide; use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan}; use table::metadata::TableType; use table::table::adapter::DfTableProviderAdapter; -use crate::dist_plan::analyzer::utils::{aliased_columns_for, rewrite_merge_sort_exprs}; +use crate::dist_plan::analyzer::utils::{ + PatchOptimizerContext, PlanTreeExpressionSimplifier, aliased_columns_for, + rewrite_merge_sort_exprs, +}; use crate::dist_plan::commutativity::{ Categorizer, Commutativity, partial_commutative_transformer, }; @@ -49,7 +48,7 @@ use crate::query_engine::DefaultSerializer; mod test; mod fallback; -mod utils; +pub(crate) mod utils; pub(crate) use utils::AliasMapping; @@ -111,41 +110,13 @@ impl AnalyzerRule for DistPlannerAnalyzer { config.optimizer.filter_null_join_keys = true; let config = Arc::new(config); - // The `ConstEvaluator` in `SimplifyExpressions` might evaluate some UDFs early in the - // planning stage, by executing them directly. For example, the `database()` function. - // So the `ConfigOptions` here (which is set from the session context) should be present - // in the UDF's `ScalarFunctionArgs`. However, the default implementation in DataFusion - // seems to lost track on it: the `ConfigOptions` is recreated with its default values again. - // So we create a custom `OptimizerConfig` with the desired `ConfigOptions` - // to walk around the issue. - // TODO(LFC): Maybe use DataFusion's `OptimizerContext` again - // once https://github.com/apache/datafusion/pull/17742 is merged. - struct OptimizerContext { - inner: datafusion_optimizer::OptimizerContext, - config: Arc, - } - - impl OptimizerConfig for OptimizerContext { - fn query_execution_start_time(&self) -> DateTime { - self.inner.query_execution_start_time() - } - - fn alias_generator(&self) -> &Arc { - self.inner.alias_generator() - } - - fn options(&self) -> Arc { - self.config.clone() - } - } - - let optimizer_context = OptimizerContext { + let optimizer_context = PatchOptimizerContext { inner: datafusion_optimizer::OptimizerContext::new(), config: config.clone(), }; - let plan = SimplifyExpressions::new() - .rewrite(plan, &optimizer_context)? + let plan = plan + .rewrite_with_subqueries(&mut PlanTreeExpressionSimplifier::new(optimizer_context))? .data; let opt = config.extensions.get::(); diff --git a/src/query/src/dist_plan/analyzer/test.rs b/src/query/src/dist_plan/analyzer/test.rs index f2b39f5a9c..166fb89fdd 100644 --- a/src/query/src/dist_plan/analyzer/test.rs +++ b/src/query/src/dist_plan/analyzer/test.rs @@ -15,7 +15,7 @@ use std::pin::Pin; use std::sync::Arc; -use arrow::datatypes::IntervalDayTime; +use arrow::datatypes::{DataType, IntervalDayTime, TimeUnit}; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use common_error::ext::BoxedError; use common_function::aggrs::aggr_wrapper::{StateMergeHelper, StateWrapper}; @@ -28,16 +28,20 @@ use datafusion::execution::SessionState; use datafusion::functions_aggregate::expr_fn::avg; use datafusion::functions_aggregate::min_max::{max, min}; use datafusion::prelude::SessionContext; -use datafusion_common::JoinType; +use datafusion_common::{JoinType, ScalarValue}; use datafusion_expr::expr::ScalarFunction; -use datafusion_expr::{AggregateUDF, Expr, LogicalPlanBuilder, col, lit}; +use datafusion_expr::{ + AggregateUDF, Expr, ExprSchemable as _, LogicalPlanBuilder, Operator, binary_expr, col, lit, +}; use datafusion_functions::datetime::date_bin; +use datafusion_functions::datetime::expr_fn::now; use datafusion_sql::TableReference; use datatypes::data_type::ConcreteDataType; use datatypes::schema::{ColumnSchema, SchemaBuilder, SchemaRef}; use futures::Stream; use futures::task::{Context, Poll}; use pretty_assertions::assert_eq; +use regex::Regex; use store_api::data_source::DataSource; use store_api::storage::ScanRequest; use table::metadata::{ @@ -163,12 +167,13 @@ fn try_encode_decode_substrait(plan: &LogicalPlan, state: SessionState) { .encode(plan, crate::query_engine::DefaultSerializer) .unwrap(); let inner = sub_plan_bytes.clone(); + let inner_state = state.clone(); let decoded_plan = futures::executor::block_on(async move { substrait::DFLogicalSubstraitConvertor - .decode(inner, state) + .decode(inner, inner_state) .await }).inspect_err(|e|{ -use prost::Message; + use prost::Message; let sub_plan = substrait::substrait_proto_df::proto::Plan::decode(sub_plan_bytes).unwrap(); common_telemetry::error!("Failed to decode substrait plan: {e},substrait plan: {sub_plan:#?}\nlogical plan: {plan:#?}"); }) @@ -215,7 +220,7 @@ fn expand_proj_sort_proj() { "Projection: t.number, t.pk1 = t.pk2", " Projection: t.number, t.pk1 = t.pk2", // notice both projections added `t.pk1 = t.pk2` column requirement " Sort: t.pk1 = t.pk2 ASC NULLS FIRST", - " Projection: t.number, t.pk1, t.pk3, t.pk1 = t.pk2", + " Projection: t.number, t.pk1, t.pk3, t.pk2 = t.pk1 AS t.pk1 = t.pk2", " Projection: t.number, t.pk1, t.pk2, t.pk3", // notice this projection doesn't add `t.pk1 = t.pk2` column requirement " TableScan: t", "]]", @@ -262,7 +267,7 @@ fn expand_proj_sort_partial_proj() { "Projection: t.number, eq_sorted", // notice how `eq_sorted` is added not `t.pk1 = t.pk2` " Projection: t.number, t.pk1 = t.pk2 AS eq_sorted", " Sort: t.pk1 = t.pk2 ASC NULLS FIRST", - " Projection: t.number, t.pk1, t.pk3, t.pk1 = t.pk2", + " Projection: t.number, t.pk1, t.pk3, t.pk2 = t.pk1 AS t.pk1 = t.pk2", " Projection: t.number, t.pk1, t.pk2, t.pk3", // notice this projection doesn't add `t.pk1 = t.pk2` column requirement " TableScan: t", "]]", @@ -1133,6 +1138,102 @@ fn expand_proj_part_col_aggr_sort_limit() { assert_eq!(expected, result.to_string()); } +#[test] +fn test_simplify_select_now_expression() { + init_default_ut_logging(); + let test_table = TestTable::table_with_name(0, "t".to_string()); + let table_provider = Arc::new(DfTableProviderAdapter::new(test_table)); + let table_source = Arc::new(DefaultTableSource::new(table_provider.clone())); + let ctx = SessionContext::new(); + ctx.register_table(TableReference::bare("t"), table_provider.clone() as _) + .unwrap(); + + let plan = LogicalPlanBuilder::scan_with_filters("t", table_source.clone(), None, vec![]) + .unwrap() + .project(vec![now()]) + .unwrap() + .build() + .unwrap(); + + let config = ConfigOptions::default(); + let result = DistPlannerAnalyzer {} + .analyze(plan.clone(), &config) + .unwrap(); + + common_telemetry::info!("Analyzed plan: {}", result); + + let result_str = result.to_string(); + // Normalize timestamp values to make test deterministic + let re = Regex::new(r"TimestampNanosecond\(\d+,").unwrap(); + let normalized = re.replace_all(&result_str, "TimestampNanosecond(