mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-05-29 19:30:37 +00:00
fix: simp expr recursively (#7523)
* fix: simp expr recursively Signed-off-by: discord9 <discord9@163.com> * test: some simple constant folding case Signed-off-by: discord9 <discord9@163.com> * fix: literal ts cast to UTC Signed-off-by: discord9 <discord9@163.com> * fix: patch merge scan batch col tz instead Signed-off-by: discord9 <discord9@163.com> * test: fix Signed-off-by: discord9 <discord9@163.com> --------- Signed-off-by: discord9 <discord9@163.com>
This commit is contained in:
@@ -15,27 +15,26 @@
|
||||
use std::collections::{BTreeMap, BTreeSet, HashSet};
|
||||
use std::sync::Arc;
|
||||
|
||||
use chrono::{DateTime, Utc};
|
||||
use common_telemetry::debug;
|
||||
use datafusion::config::{ConfigExtension, ExtensionOptions};
|
||||
use datafusion::datasource::DefaultTableSource;
|
||||
use datafusion::error::Result as DfResult;
|
||||
use datafusion_common::Column;
|
||||
use datafusion_common::alias::AliasGenerator;
|
||||
use datafusion_common::config::ConfigOptions;
|
||||
use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeRewriter};
|
||||
use datafusion_expr::expr::{Exists, InSubquery};
|
||||
use datafusion_expr::utils::expr_to_columns;
|
||||
use datafusion_expr::{Expr, LogicalPlan, LogicalPlanBuilder, Subquery, col as col_fn};
|
||||
use datafusion_optimizer::analyzer::AnalyzerRule;
|
||||
use datafusion_optimizer::simplify_expressions::SimplifyExpressions;
|
||||
use datafusion_optimizer::{OptimizerConfig, OptimizerRule};
|
||||
use promql::extension_plan::SeriesDivide;
|
||||
use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan};
|
||||
use table::metadata::TableType;
|
||||
use table::table::adapter::DfTableProviderAdapter;
|
||||
|
||||
use crate::dist_plan::analyzer::utils::{aliased_columns_for, rewrite_merge_sort_exprs};
|
||||
use crate::dist_plan::analyzer::utils::{
|
||||
PatchOptimizerContext, PlanTreeExpressionSimplifier, aliased_columns_for,
|
||||
rewrite_merge_sort_exprs,
|
||||
};
|
||||
use crate::dist_plan::commutativity::{
|
||||
Categorizer, Commutativity, partial_commutative_transformer,
|
||||
};
|
||||
@@ -49,7 +48,7 @@ use crate::query_engine::DefaultSerializer;
|
||||
mod test;
|
||||
|
||||
mod fallback;
|
||||
mod utils;
|
||||
pub(crate) mod utils;
|
||||
|
||||
pub(crate) use utils::AliasMapping;
|
||||
|
||||
@@ -111,41 +110,13 @@ impl AnalyzerRule for DistPlannerAnalyzer {
|
||||
config.optimizer.filter_null_join_keys = true;
|
||||
let config = Arc::new(config);
|
||||
|
||||
// The `ConstEvaluator` in `SimplifyExpressions` might evaluate some UDFs early in the
|
||||
// planning stage, by executing them directly. For example, the `database()` function.
|
||||
// So the `ConfigOptions` here (which is set from the session context) should be present
|
||||
// in the UDF's `ScalarFunctionArgs`. However, the default implementation in DataFusion
|
||||
// seems to lost track on it: the `ConfigOptions` is recreated with its default values again.
|
||||
// So we create a custom `OptimizerConfig` with the desired `ConfigOptions`
|
||||
// to walk around the issue.
|
||||
// TODO(LFC): Maybe use DataFusion's `OptimizerContext` again
|
||||
// once https://github.com/apache/datafusion/pull/17742 is merged.
|
||||
struct OptimizerContext {
|
||||
inner: datafusion_optimizer::OptimizerContext,
|
||||
config: Arc<ConfigOptions>,
|
||||
}
|
||||
|
||||
impl OptimizerConfig for OptimizerContext {
|
||||
fn query_execution_start_time(&self) -> DateTime<Utc> {
|
||||
self.inner.query_execution_start_time()
|
||||
}
|
||||
|
||||
fn alias_generator(&self) -> &Arc<AliasGenerator> {
|
||||
self.inner.alias_generator()
|
||||
}
|
||||
|
||||
fn options(&self) -> Arc<ConfigOptions> {
|
||||
self.config.clone()
|
||||
}
|
||||
}
|
||||
|
||||
let optimizer_context = OptimizerContext {
|
||||
let optimizer_context = PatchOptimizerContext {
|
||||
inner: datafusion_optimizer::OptimizerContext::new(),
|
||||
config: config.clone(),
|
||||
};
|
||||
|
||||
let plan = SimplifyExpressions::new()
|
||||
.rewrite(plan, &optimizer_context)?
|
||||
let plan = plan
|
||||
.rewrite_with_subqueries(&mut PlanTreeExpressionSimplifier::new(optimizer_context))?
|
||||
.data;
|
||||
|
||||
let opt = config.extensions.get::<DistPlannerOptions>();
|
||||
|
||||
@@ -15,7 +15,7 @@
|
||||
use std::pin::Pin;
|
||||
use std::sync::Arc;
|
||||
|
||||
use arrow::datatypes::IntervalDayTime;
|
||||
use arrow::datatypes::{DataType, IntervalDayTime, TimeUnit};
|
||||
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
|
||||
use common_error::ext::BoxedError;
|
||||
use common_function::aggrs::aggr_wrapper::{StateMergeHelper, StateWrapper};
|
||||
@@ -28,16 +28,20 @@ use datafusion::execution::SessionState;
|
||||
use datafusion::functions_aggregate::expr_fn::avg;
|
||||
use datafusion::functions_aggregate::min_max::{max, min};
|
||||
use datafusion::prelude::SessionContext;
|
||||
use datafusion_common::JoinType;
|
||||
use datafusion_common::{JoinType, ScalarValue};
|
||||
use datafusion_expr::expr::ScalarFunction;
|
||||
use datafusion_expr::{AggregateUDF, Expr, LogicalPlanBuilder, col, lit};
|
||||
use datafusion_expr::{
|
||||
AggregateUDF, Expr, ExprSchemable as _, LogicalPlanBuilder, Operator, binary_expr, col, lit,
|
||||
};
|
||||
use datafusion_functions::datetime::date_bin;
|
||||
use datafusion_functions::datetime::expr_fn::now;
|
||||
use datafusion_sql::TableReference;
|
||||
use datatypes::data_type::ConcreteDataType;
|
||||
use datatypes::schema::{ColumnSchema, SchemaBuilder, SchemaRef};
|
||||
use futures::Stream;
|
||||
use futures::task::{Context, Poll};
|
||||
use pretty_assertions::assert_eq;
|
||||
use regex::Regex;
|
||||
use store_api::data_source::DataSource;
|
||||
use store_api::storage::ScanRequest;
|
||||
use table::metadata::{
|
||||
@@ -163,12 +167,13 @@ fn try_encode_decode_substrait(plan: &LogicalPlan, state: SessionState) {
|
||||
.encode(plan, crate::query_engine::DefaultSerializer)
|
||||
.unwrap();
|
||||
let inner = sub_plan_bytes.clone();
|
||||
let inner_state = state.clone();
|
||||
let decoded_plan = futures::executor::block_on(async move {
|
||||
substrait::DFLogicalSubstraitConvertor
|
||||
.decode(inner, state)
|
||||
.decode(inner, inner_state)
|
||||
.await
|
||||
}).inspect_err(|e|{
|
||||
use prost::Message;
|
||||
use prost::Message;
|
||||
let sub_plan = substrait::substrait_proto_df::proto::Plan::decode(sub_plan_bytes).unwrap();
|
||||
common_telemetry::error!("Failed to decode substrait plan: {e},substrait plan: {sub_plan:#?}\nlogical plan: {plan:#?}");
|
||||
})
|
||||
@@ -215,7 +220,7 @@ fn expand_proj_sort_proj() {
|
||||
"Projection: t.number, t.pk1 = t.pk2",
|
||||
" Projection: t.number, t.pk1 = t.pk2", // notice both projections added `t.pk1 = t.pk2` column requirement
|
||||
" Sort: t.pk1 = t.pk2 ASC NULLS FIRST",
|
||||
" Projection: t.number, t.pk1, t.pk3, t.pk1 = t.pk2",
|
||||
" Projection: t.number, t.pk1, t.pk3, t.pk2 = t.pk1 AS t.pk1 = t.pk2",
|
||||
" Projection: t.number, t.pk1, t.pk2, t.pk3", // notice this projection doesn't add `t.pk1 = t.pk2` column requirement
|
||||
" TableScan: t",
|
||||
"]]",
|
||||
@@ -262,7 +267,7 @@ fn expand_proj_sort_partial_proj() {
|
||||
"Projection: t.number, eq_sorted", // notice how `eq_sorted` is added not `t.pk1 = t.pk2`
|
||||
" Projection: t.number, t.pk1 = t.pk2 AS eq_sorted",
|
||||
" Sort: t.pk1 = t.pk2 ASC NULLS FIRST",
|
||||
" Projection: t.number, t.pk1, t.pk3, t.pk1 = t.pk2",
|
||||
" Projection: t.number, t.pk1, t.pk3, t.pk2 = t.pk1 AS t.pk1 = t.pk2",
|
||||
" Projection: t.number, t.pk1, t.pk2, t.pk3", // notice this projection doesn't add `t.pk1 = t.pk2` column requirement
|
||||
" TableScan: t",
|
||||
"]]",
|
||||
@@ -1133,6 +1138,102 @@ fn expand_proj_part_col_aggr_sort_limit() {
|
||||
assert_eq!(expected, result.to_string());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_simplify_select_now_expression() {
|
||||
init_default_ut_logging();
|
||||
let test_table = TestTable::table_with_name(0, "t".to_string());
|
||||
let table_provider = Arc::new(DfTableProviderAdapter::new(test_table));
|
||||
let table_source = Arc::new(DefaultTableSource::new(table_provider.clone()));
|
||||
let ctx = SessionContext::new();
|
||||
ctx.register_table(TableReference::bare("t"), table_provider.clone() as _)
|
||||
.unwrap();
|
||||
|
||||
let plan = LogicalPlanBuilder::scan_with_filters("t", table_source.clone(), None, vec![])
|
||||
.unwrap()
|
||||
.project(vec![now()])
|
||||
.unwrap()
|
||||
.build()
|
||||
.unwrap();
|
||||
|
||||
let config = ConfigOptions::default();
|
||||
let result = DistPlannerAnalyzer {}
|
||||
.analyze(plan.clone(), &config)
|
||||
.unwrap();
|
||||
|
||||
common_telemetry::info!("Analyzed plan: {}", result);
|
||||
|
||||
let result_str = result.to_string();
|
||||
// Normalize timestamp values to make test deterministic
|
||||
let re = Regex::new(r"TimestampNanosecond\(\d+,").unwrap();
|
||||
let normalized = re.replace_all(&result_str, "TimestampNanosecond(<TIME>,");
|
||||
|
||||
let expected = [
|
||||
"Projection: now()",
|
||||
" MergeScan [is_placeholder=false, remote_input=[",
|
||||
r#"Projection: TimestampNanosecond(<TIME>, Some("+00:00")) AS now()"#,
|
||||
" TableScan: t",
|
||||
"]]",
|
||||
]
|
||||
.join("\n");
|
||||
assert_eq!(expected, normalized);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_simplify_now_expression() {
|
||||
init_default_ut_logging();
|
||||
let test_table = TestTable::table_with_name(0, "t".to_string());
|
||||
let table_source = Arc::new(DefaultTableSource::new(Arc::new(
|
||||
DfTableProviderAdapter::new(test_table),
|
||||
)));
|
||||
let plan = LogicalPlanBuilder::scan_with_filters("t", table_source.clone(), None, vec![])
|
||||
.unwrap()
|
||||
.build()
|
||||
.unwrap();
|
||||
|
||||
// CAST(t.ts AS Timestamp(Millisecond, Some("+00:00")))
|
||||
let ts_cast_type = DataType::Timestamp(TimeUnit::Millisecond, Some("+00:00".into()));
|
||||
|
||||
let ts_expr = col("ts").cast_to(&ts_cast_type, plan.schema()).unwrap();
|
||||
|
||||
// CAST(now() - interval AS Timestamp(Millisecond, Some("+00:00")))
|
||||
let interval = lit(ScalarValue::new_interval_mdn(0, 0, 2700000000000)); // 2700s = 45m
|
||||
let right_expr = binary_expr(now(), Operator::Minus, interval);
|
||||
let right_expr_cast = right_expr.cast_to(&ts_cast_type, plan.schema()).unwrap();
|
||||
|
||||
let filter_expr = ts_expr.lt_eq(right_expr_cast);
|
||||
|
||||
// Projection: t.b, count(Int64(1))
|
||||
// Aggregate: groupBy=[[my_table.b]], aggr=[[count(my_table.ts) AS count(Int64(1))]]
|
||||
// Filter: CAST(my_table.ts AS Timestamp(Millisecond, Some("+00:00"))) <= CAST(now() - IntervalMonthDayNano("IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 2700000000000 }") AS Timestamp(Millisecond, Some("+00:00")))
|
||||
// TableScan: my_table
|
||||
let plan = LogicalPlanBuilder::scan_with_filters("t", table_source, None, vec![])
|
||||
.unwrap()
|
||||
.filter(filter_expr)
|
||||
.unwrap()
|
||||
.aggregate(
|
||||
vec![col("pk1")],
|
||||
vec![
|
||||
datafusion::functions_aggregate::expr_fn::count(col("ts")).alias("count(Int64(1))"),
|
||||
],
|
||||
)
|
||||
.unwrap()
|
||||
.build()
|
||||
.unwrap();
|
||||
|
||||
let config = ConfigOptions::default();
|
||||
let result = DistPlannerAnalyzer {}.analyze(plan, &config).unwrap();
|
||||
|
||||
let plan_str = result.to_string();
|
||||
common_telemetry::info!("Analyzed plan: {}", plan_str);
|
||||
|
||||
// If simplified, "now()" should be replaced by a literal.
|
||||
assert!(
|
||||
!plan_str.contains("now()"),
|
||||
"Plan should be simplified but contains now(): {}",
|
||||
plan_str
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn expand_proj_limit_part_col_aggr_sort() {
|
||||
// use logging for better debugging
|
||||
|
||||
@@ -15,15 +15,102 @@
|
||||
use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
|
||||
use std::sync::Arc;
|
||||
|
||||
use arrow::array::ArrayRef;
|
||||
use arrow_schema::{ArrowError, DataType};
|
||||
use chrono::{DateTime, Utc};
|
||||
use datafusion::common::alias::AliasGenerator;
|
||||
use datafusion::config::ConfigOptions;
|
||||
use datafusion::error::Result as DfResult;
|
||||
use datafusion_common::Column;
|
||||
use datafusion_common::tree_node::{Transformed, TreeNode as _};
|
||||
use datafusion_common::tree_node::{Transformed, TreeNode as _, TreeNodeRewriter};
|
||||
use datafusion_expr::expr::Alias;
|
||||
use datafusion_expr::{Expr, Extension, LogicalPlan};
|
||||
use datafusion_optimizer::simplify_expressions::SimplifyExpressions;
|
||||
use datafusion_optimizer::{OptimizerConfig, OptimizerRule as _};
|
||||
|
||||
use crate::dist_plan::merge_sort::MergeSortLogicalPlan;
|
||||
use crate::plan::ExtractExpr as _;
|
||||
|
||||
/// The `ConstEvaluator` in `SimplifyExpressions` might evaluate some UDFs early in the
|
||||
/// planning stage, by executing them directly. For example, the `database()` function.
|
||||
/// So the `ConfigOptions` here (which is set from the session context) should be present
|
||||
/// in the UDF's `ScalarFunctionArgs`. However, the default implementation in DataFusion
|
||||
/// seems to lost track on it: the `ConfigOptions` is recreated with its default values again.
|
||||
/// So we create a custom `OptimizerConfig` with the desired `ConfigOptions`
|
||||
/// to walk around the issue.
|
||||
/// TODO(LFC): Maybe use DataFusion's `OptimizerContext` again
|
||||
/// once https://github.com/apache/datafusion/pull/17742 is merged.
|
||||
pub(crate) struct PatchOptimizerContext {
|
||||
pub(crate) inner: datafusion_optimizer::OptimizerContext,
|
||||
pub(crate) config: Arc<ConfigOptions>,
|
||||
}
|
||||
|
||||
impl OptimizerConfig for PatchOptimizerContext {
|
||||
fn query_execution_start_time(&self) -> DateTime<Utc> {
|
||||
self.inner.query_execution_start_time()
|
||||
}
|
||||
|
||||
fn alias_generator(&self) -> &Arc<AliasGenerator> {
|
||||
self.inner.alias_generator()
|
||||
}
|
||||
|
||||
fn options(&self) -> Arc<ConfigOptions> {
|
||||
self.config.clone()
|
||||
}
|
||||
}
|
||||
|
||||
/// Simplify all expressions recursively in the plan tree
|
||||
/// which keeping the output schema unchanged
|
||||
pub(crate) struct PlanTreeExpressionSimplifier {
|
||||
optimizer_context: PatchOptimizerContext,
|
||||
}
|
||||
|
||||
impl PlanTreeExpressionSimplifier {
|
||||
pub fn new(optimizer_context: PatchOptimizerContext) -> Self {
|
||||
Self { optimizer_context }
|
||||
}
|
||||
}
|
||||
|
||||
impl TreeNodeRewriter for PlanTreeExpressionSimplifier {
|
||||
type Node = LogicalPlan;
|
||||
fn f_down(&mut self, plan: Self::Node) -> DfResult<Transformed<Self::Node>> {
|
||||
let simp = SimplifyExpressions::new()
|
||||
.rewrite(plan, &self.optimizer_context)?
|
||||
.data;
|
||||
Ok(Transformed::yes(simp))
|
||||
}
|
||||
}
|
||||
|
||||
/// A patch for substrait simply throw timezone away, so when decoding, if columns have different timezone then expected schema, use expected schema's timezone
|
||||
pub fn patch_batch_timezone(
|
||||
expected_schema: arrow_schema::SchemaRef,
|
||||
columns: Vec<ArrayRef>,
|
||||
) -> Result<arrow::record_batch::RecordBatch, ArrowError> {
|
||||
let patched_columns: Vec<ArrayRef> = expected_schema
|
||||
.fields()
|
||||
.iter()
|
||||
.zip(columns.into_iter())
|
||||
.map(|(expected_field, column)| {
|
||||
let expected_type = expected_field.data_type();
|
||||
let actual_type = column.data_type();
|
||||
|
||||
// Check if both are timestamp types with different timezones
|
||||
match (expected_type, actual_type) {
|
||||
(
|
||||
DataType::Timestamp(expected_unit, expected_tz),
|
||||
DataType::Timestamp(actual_unit, actual_tz),
|
||||
) if expected_unit == actual_unit && expected_tz != actual_tz => {
|
||||
// Cast the column to the expected timezone
|
||||
arrow::compute::cast(&column, expected_type)
|
||||
}
|
||||
_ => Ok(column),
|
||||
}
|
||||
})
|
||||
.collect::<Result<Vec<_>, _>>()?;
|
||||
|
||||
arrow::record_batch::RecordBatch::try_new(expected_schema.clone(), patched_columns)
|
||||
}
|
||||
|
||||
fn rewrite_column(
|
||||
mapping: &BTreeMap<Column, BTreeSet<Column>>,
|
||||
original_node: &LogicalPlan,
|
||||
|
||||
@@ -24,7 +24,6 @@ use common_plugins::GREPTIME_EXEC_READ_COST;
|
||||
use common_query::request::QueryRequest;
|
||||
use common_recordbatch::adapter::RecordBatchMetrics;
|
||||
use common_telemetry::tracing_context::TracingContext;
|
||||
use datafusion::arrow::record_batch::RecordBatch;
|
||||
use datafusion::execution::{SessionState, TaskContext};
|
||||
use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
|
||||
use datafusion::physical_plan::metrics::{
|
||||
@@ -50,6 +49,7 @@ use tokio::time::Instant;
|
||||
use tracing::{Instrument, Span};
|
||||
|
||||
use crate::dist_plan::analyzer::AliasMapping;
|
||||
use crate::dist_plan::analyzer::utils::patch_batch_timezone;
|
||||
use crate::metrics::{MERGE_SCAN_ERRORS_TOTAL, MERGE_SCAN_POLL_ELAPSED, MERGE_SCAN_REGIONS};
|
||||
use crate::region_query::RegionQueryHandlerRef;
|
||||
|
||||
@@ -330,7 +330,7 @@ impl MergeScanExec {
|
||||
poll_duration += poll_elapsed;
|
||||
|
||||
let batch = batch.map_err(|e| DataFusionError::External(Box::new(e)))?;
|
||||
let batch = RecordBatch::try_new(
|
||||
let batch = patch_batch_timezone(
|
||||
arrow_schema.clone(),
|
||||
batch.into_df_record_batch().columns().to_vec(),
|
||||
)?;
|
||||
|
||||
Reference in New Issue
Block a user