From e463942a5bfef7a7b7f052d3bbb8016719d906eb Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Thu, 6 Mar 2025 16:29:55 -0800 Subject: [PATCH] fix: recover plan schema after dist analyzer (#5665) Signed-off-by: Ruihang Xia --- src/query/src/dist_plan/analyzer.rs | 36 ++++++++++++++----- .../standalone/common/order/order_by.result | 31 ++++++++-------- .../cases/standalone/common/range/nest.result | 1 + .../common/tql-explain-analyze/explain.result | 27 +++++++++++--- 4 files changed, 67 insertions(+), 28 deletions(-) diff --git a/src/query/src/dist_plan/analyzer.rs b/src/query/src/dist_plan/analyzer.rs index 7b07870dcb..0ec083c1a6 100644 --- a/src/query/src/dist_plan/analyzer.rs +++ b/src/query/src/dist_plan/analyzer.rs @@ -19,6 +19,7 @@ use datafusion::datasource::DefaultTableSource; use datafusion::error::Result as DfResult; use datafusion_common::config::ConfigOptions; use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeRewriter}; +use datafusion_common::Column; use datafusion_expr::expr::{Exists, InSubquery}; use datafusion_expr::utils::expr_to_columns; use datafusion_expr::{col as col_fn, Expr, LogicalPlan, LogicalPlanBuilder, Subquery}; @@ -215,7 +216,7 @@ impl PlanRewriter { } for col in container { - self.column_requirements.insert(col.flat_name()); + self.column_requirements.insert(col.quoted_flat_name()); } } @@ -270,6 +271,8 @@ impl PlanRewriter { } fn expand(&mut self, mut on_node: LogicalPlan) -> DfResult { + // store schema before expand + let schema = on_node.schema().clone(); let mut rewriter = EnforceDistRequirementRewriter { column_requirements: std::mem::take(&mut self.column_requirements), }; @@ -285,6 +288,13 @@ impl PlanRewriter { } self.set_expanded(); + // recover the schema + let node = LogicalPlanBuilder::from(node) + .project(schema.iter().map(|(qualifier, field)| { + Expr::Column(Column::new(qualifier.cloned(), field.name())) + }))? + .build()?; + Ok(node) } } @@ -447,7 +457,8 @@ mod test { let config = ConfigOptions::default(); let result = DistPlannerAnalyzer {}.analyze(plan, &config).unwrap(); - let expected = "MergeScan [is_placeholder=false]"; + let expected = "Projection: avg(t.number)\ + \n MergeScan [is_placeholder=false]"; assert_eq!(expected, result.to_string()); } @@ -472,7 +483,8 @@ mod test { let expected = [ "Sort: t.number ASC NULLS LAST", " Distinct:", - " MergeScan [is_placeholder=false]", + " Projection: t.number", + " MergeScan [is_placeholder=false]", ] .join("\n"); assert_eq!(expected, result.to_string()); @@ -494,7 +506,8 @@ mod test { let config = ConfigOptions::default(); let result = DistPlannerAnalyzer {}.analyze(plan, &config).unwrap(); - let expected = "MergeScan [is_placeholder=false]"; + let expected = "Projection: t.number\ + \n MergeScan [is_placeholder=false]"; assert_eq!(expected, result.to_string()); } @@ -531,11 +544,16 @@ mod test { let config = ConfigOptions::default(); let result = DistPlannerAnalyzer {}.analyze(plan, &config).unwrap(); - let expected = "Limit: skip=0, fetch=1\ - \n LeftSemi Join: Filter: t.number = right.number\ - \n MergeScan [is_placeholder=false]\ - \n SubqueryAlias: right\ - \n MergeScan [is_placeholder=false]"; + let expected = [ + "Limit: skip=0, fetch=1", + " LeftSemi Join: Filter: t.number = right.number", + " Projection: t.number", + " MergeScan [is_placeholder=false]", + " SubqueryAlias: right", + " Projection: t.number", + " MergeScan [is_placeholder=false]", + ] + .join("\n"); assert_eq!(expected, result.to_string()); } } diff --git a/tests/cases/standalone/common/order/order_by.result b/tests/cases/standalone/common/order/order_by.result index ad185e642f..5535d40d6d 100644 --- a/tests/cases/standalone/common/order/order_by.result +++ b/tests/cases/standalone/common/order/order_by.result @@ -263,23 +263,23 @@ select * from t where num > 3 order by ts desc limit 2; select tag from t where num > 6 order by ts desc limit 2; -+-----+---------------------+ -| tag | ts | -+-----+---------------------+ -| zzz | 1970-01-01T00:00:09 | -| zzz | 1970-01-01T00:00:06 | -+-----+---------------------+ ++-----+ +| tag | ++-----+ +| zzz | +| zzz | ++-----+ select tag from t where num > 6 order by ts; -+-----+---------------------+ -| tag | ts | -+-----+---------------------+ -| zzz | 1970-01-01T00:00:00 | -| zzz | 1970-01-01T00:00:03 | -| zzz | 1970-01-01T00:00:06 | -| zzz | 1970-01-01T00:00:09 | -+-----+---------------------+ ++-----+ +| tag | ++-----+ +| zzz | +| zzz | +| zzz | +| zzz | ++-----+ -- SQLNESS REPLACE (-+) - -- SQLNESS REPLACE (\s\s+) _ @@ -292,7 +292,8 @@ explain analyze select tag from t where num > 6 order by ts desc limit 2; +-+-+-+ | stage | node | plan_| +-+-+-+ -| 0_| 0_|_SortPreservingMergeExec: [ts@1 DESC], fetch=2 REDACTED +| 0_| 0_|_ProjectionExec: expr=[tag@0 as tag] REDACTED +|_|_|_SortPreservingMergeExec: [ts@1 DESC], fetch=2 REDACTED |_|_|_SortExec: TopK(fetch=2), expr=[ts@1 DESC], preserve_partitioning=[true] REDACTED |_|_|_MergeScanExec: REDACTED |_|_|_| diff --git a/tests/cases/standalone/common/range/nest.result b/tests/cases/standalone/common/range/nest.result index da4fe15d92..e3333ff253 100644 --- a/tests/cases/standalone/common/range/nest.result +++ b/tests/cases/standalone/common/range/nest.result @@ -56,6 +56,7 @@ EXPLAIN SELECT ts, host, min(val) RANGE '5s' FROM host ALIGN '5s'; | plan_type_| plan_| +-+-+ | logical_plan_| RangeSelect: range_exprs=[min(host.val) RANGE 5s], align=5000ms, align_to=0ms, align_by=[host.host], time_index=ts | +|_|_Projection: host.ts, host.host, host.val_| |_|_MergeScan [is_placeholder=false]_| | physical_plan | RangeSelectExec: range_expr=[min(host.val) RANGE 5s], align=5000ms, align_to=0ms, align_by=[host@1], time_index=ts | |_|_CoalescePartitionsExec_| diff --git a/tests/cases/standalone/common/tql-explain-analyze/explain.result b/tests/cases/standalone/common/tql-explain-analyze/explain.result index 68be4f7ee2..5f2fa0ec6e 100644 --- a/tests/cases/standalone/common/tql-explain-analyze/explain.result +++ b/tests/cases/standalone/common/tql-explain-analyze/explain.result @@ -18,7 +18,8 @@ TQL EXPLAIN (0, 10, '5s') test; | logical_plan | PromInstantManipulate: range=[0..0], lookback=[300000], interval=[300000], time index=[j] | | | PromSeriesNormalize: offset=[0], time index=[j], filter NaN: [false] | | | PromSeriesDivide: tags=["k"] | -| | MergeScan [is_placeholder=false] | +| | Projection: test.i, test.j, test.k | +| | MergeScan [is_placeholder=false] | | physical_plan | PromInstantManipulateExec: range=[0..0], lookback=[300000], interval=[300000], time index=[j] | | | PromSeriesNormalizeExec: offset=[0], time index=[j], filter NaN: [false] | | | PromSeriesDivideExec: tags=["k"] | @@ -40,7 +41,8 @@ TQL EXPLAIN (0, 10, '1s', '2s') test; | logical_plan | PromInstantManipulate: range=[0..0], lookback=[2000], interval=[300000], time index=[j] | | | PromSeriesNormalize: offset=[0], time index=[j], filter NaN: [false] | | | PromSeriesDivide: tags=["k"] | -| | MergeScan [is_placeholder=false] | +| | Projection: test.i, test.j, test.k | +| | MergeScan [is_placeholder=false] | | physical_plan | PromInstantManipulateExec: range=[0..0], lookback=[2000], interval=[300000], time index=[j] | | | PromSeriesNormalizeExec: offset=[0], time index=[j], filter NaN: [false] | | | PromSeriesDivideExec: tags=["k"] | @@ -61,7 +63,8 @@ TQL EXPLAIN ('1970-01-01T00:00:00'::timestamp, '1970-01-01T00:00:00'::timestamp | logical_plan | PromInstantManipulate: range=[0..0], lookback=[300000], interval=[300000], time index=[j] | | | PromSeriesNormalize: offset=[0], time index=[j], filter NaN: [false] | | | PromSeriesDivide: tags=["k"] | -| | MergeScan [is_placeholder=false] | +| | Projection: test.i, test.j, test.k | +| | MergeScan [is_placeholder=false] | | physical_plan | PromInstantManipulateExec: range=[0..0], lookback=[300000], interval=[300000], time index=[j] | | | PromSeriesNormalizeExec: offset=[0], time index=[j], filter NaN: [false] | | | PromSeriesDivideExec: tags=["k"] | @@ -97,6 +100,7 @@ TQL EXPLAIN VERBOSE (0, 10, '5s') test; | logical_plan after DistPlannerAnalyzer_| PromInstantManipulate: range=[0..0], lookback=[300000], interval=[300000], time index=[j]_| |_|_PromSeriesNormalize: offset=[0], time index=[j], filter NaN: [false]_| |_|_PromSeriesDivide: tags=["k"]_| +|_|_Projection: test.i, test.j, test.k_| |_|_MergeScan [is_placeholder=false]_| | analyzed_logical_plan_| SAME TEXT AS ABOVE_| | logical_plan after eliminate_nested_union_| SAME TEXT AS ABOVE_| @@ -128,31 +132,37 @@ TQL EXPLAIN VERBOSE (0, 10, '5s') test; | logical_plan_| PromInstantManipulate: range=[0..0], lookback=[300000], interval=[300000], time index=[j]_| |_|_PromSeriesNormalize: offset=[0], time index=[j], filter NaN: [false]_| |_|_PromSeriesDivide: tags=["k"]_| +|_|_Projection: test.i, test.j, test.k_| |_|_MergeScan [is_placeholder=false]_| | initial_physical_plan_| PromInstantManipulateExec: range=[0..0], lookback=[300000], interval=[300000], time index=[j]_| |_|_PromSeriesNormalizeExec: offset=[0], time index=[j], filter NaN: [false]_| |_|_PromSeriesDivideExec: tags=["k"]_| +|_|_ProjectionExec: expr=[i@0 as i, j@1 as j, k@2 as k]_| |_|_MergeScanExec: REDACTED |_|_| | initial_physical_plan_with_stats_| PromInstantManipulateExec: range=[0..0], lookback=[300000], interval=[300000], time index=[j], statistics=[Rows=Inexact(0), Bytes=Absent, [(Col[0]:),(Col[1]:),(Col[2]:)]] | |_|_PromSeriesNormalizeExec: offset=[0], time index=[j], filter NaN: [false], statistics=[Rows=Absent, Bytes=Absent, [(Col[0]:),(Col[1]:),(Col[2]:)]]_| |_|_PromSeriesDivideExec: tags=["k"], statistics=[Rows=Absent, Bytes=Absent, [(Col[0]:),(Col[1]:),(Col[2]:)]]_| +|_|_ProjectionExec: expr=[i@0 as i, j@1 as j, k@2 as k], statistics=[Rows=Absent, Bytes=Absent, [(Col[0]:),(Col[1]:),(Col[2]:)]]_| |_|_MergeScanExec: REDACTED |_|_| | initial_physical_plan_with_schema_| PromInstantManipulateExec: range=[0..0], lookback=[300000], interval=[300000], time index=[j], schema=[i:Float64;N, j:Timestamp(Millisecond, None), k:Utf8;N]_| |_|_PromSeriesNormalizeExec: offset=[0], time index=[j], filter NaN: [false], schema=[i:Float64;N, j:Timestamp(Millisecond, None), k:Utf8;N]_| |_|_PromSeriesDivideExec: tags=["k"], schema=[i:Float64;N, j:Timestamp(Millisecond, None), k:Utf8;N]_| +|_|_ProjectionExec: expr=[i@0 as i, j@1 as j, k@2 as k], schema=[i:Float64;N, j:Timestamp(Millisecond, None), k:Utf8;N]_| |_|_MergeScanExec: REDACTED |_|_| | physical_plan after parallelize_scan_| PromInstantManipulateExec: range=[0..0], lookback=[300000], interval=[300000], time index=[j]_| |_|_PromSeriesNormalizeExec: offset=[0], time index=[j], filter NaN: [false]_| |_|_PromSeriesDivideExec: tags=["k"]_| +|_|_ProjectionExec: expr=[i@0 as i, j@1 as j, k@2 as k]_| |_|_MergeScanExec: REDACTED |_|_| | physical_plan after OutputRequirements_| OutputRequirementExec_| |_|_PromInstantManipulateExec: range=[0..0], lookback=[300000], interval=[300000], time index=[j]_| |_|_PromSeriesNormalizeExec: offset=[0], time index=[j], filter NaN: [false]_| |_|_PromSeriesDivideExec: tags=["k"]_| +|_|_ProjectionExec: expr=[i@0 as i, j@1 as j, k@2 as k]_| |_|_MergeScanExec: REDACTED |_|_| | physical_plan after aggregate_statistics_| SAME TEXT AS ABOVE_| @@ -163,6 +173,7 @@ TQL EXPLAIN VERBOSE (0, 10, '5s') test; |_|_PromSeriesNormalizeExec: offset=[0], time index=[j], filter NaN: [false]_| |_|_PromSeriesDivideExec: tags=["k"]_| |_|_CoalescePartitionsExec_| +|_|_ProjectionExec: expr=[i@0 as i, j@1 as j, k@2 as k]_| |_|_MergeScanExec: REDACTED |_|_| | physical_plan after CombinePartialFinalAggregate_| SAME TEXT AS ABOVE_| @@ -172,10 +183,18 @@ TQL EXPLAIN VERBOSE (0, 10, '5s') test; |_|_PromSeriesDivideExec: tags=["k"]_| |_|_SortPreservingMergeExec: [k@2 ASC NULLS LAST]_| |_|_SortExec: expr=[k@2 ASC NULLS LAST], preserve_partitioning=[true]_| +|_|_ProjectionExec: expr=[i@0 as i, j@1 as j, k@2 as k]_| |_|_MergeScanExec: REDACTED |_|_| | physical_plan after OptimizeAggregateOrder_| SAME TEXT AS ABOVE_| -| physical_plan after ProjectionPushdown_| SAME TEXT AS ABOVE_| +| physical_plan after ProjectionPushdown_| OutputRequirementExec_| +|_|_PromInstantManipulateExec: range=[0..0], lookback=[300000], interval=[300000], time index=[j]_| +|_|_PromSeriesNormalizeExec: offset=[0], time index=[j], filter NaN: [false]_| +|_|_PromSeriesDivideExec: tags=["k"]_| +|_|_SortPreservingMergeExec: [k@2 ASC NULLS LAST]_| +|_|_SortExec: expr=[k@2 ASC NULLS LAST], preserve_partitioning=[true]_| +|_|_MergeScanExec: REDACTED +|_|_| | physical_plan after coalesce_batches_| SAME TEXT AS ABOVE_| | physical_plan after OutputRequirements_| PromInstantManipulateExec: range=[0..0], lookback=[300000], interval=[300000], time index=[j]_| |_|_PromSeriesNormalizeExec: offset=[0], time index=[j], filter NaN: [false]_|