fix: recover plan schema after dist analyzer (#5665)

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
Ruihang Xia
2025-03-06 16:29:55 -08:00
committed by GitHub
parent 0124a0d156
commit e463942a5b
4 changed files with 67 additions and 28 deletions

View File

@@ -19,6 +19,7 @@ use datafusion::datasource::DefaultTableSource;
use datafusion::error::Result as DfResult;
use datafusion_common::config::ConfigOptions;
use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeRewriter};
use datafusion_common::Column;
use datafusion_expr::expr::{Exists, InSubquery};
use datafusion_expr::utils::expr_to_columns;
use datafusion_expr::{col as col_fn, Expr, LogicalPlan, LogicalPlanBuilder, Subquery};
@@ -215,7 +216,7 @@ impl PlanRewriter {
}
for col in container {
self.column_requirements.insert(col.flat_name());
self.column_requirements.insert(col.quoted_flat_name());
}
}
@@ -270,6 +271,8 @@ impl PlanRewriter {
}
fn expand(&mut self, mut on_node: LogicalPlan) -> DfResult<LogicalPlan> {
// store schema before expand
let schema = on_node.schema().clone();
let mut rewriter = EnforceDistRequirementRewriter {
column_requirements: std::mem::take(&mut self.column_requirements),
};
@@ -285,6 +288,13 @@ impl PlanRewriter {
}
self.set_expanded();
// recover the schema
let node = LogicalPlanBuilder::from(node)
.project(schema.iter().map(|(qualifier, field)| {
Expr::Column(Column::new(qualifier.cloned(), field.name()))
}))?
.build()?;
Ok(node)
}
}
@@ -447,7 +457,8 @@ mod test {
let config = ConfigOptions::default();
let result = DistPlannerAnalyzer {}.analyze(plan, &config).unwrap();
let expected = "MergeScan [is_placeholder=false]";
let expected = "Projection: avg(t.number)\
\n MergeScan [is_placeholder=false]";
assert_eq!(expected, result.to_string());
}
@@ -472,7 +483,8 @@ mod test {
let expected = [
"Sort: t.number ASC NULLS LAST",
" Distinct:",
" MergeScan [is_placeholder=false]",
" Projection: t.number",
" MergeScan [is_placeholder=false]",
]
.join("\n");
assert_eq!(expected, result.to_string());
@@ -494,7 +506,8 @@ mod test {
let config = ConfigOptions::default();
let result = DistPlannerAnalyzer {}.analyze(plan, &config).unwrap();
let expected = "MergeScan [is_placeholder=false]";
let expected = "Projection: t.number\
\n MergeScan [is_placeholder=false]";
assert_eq!(expected, result.to_string());
}
@@ -531,11 +544,16 @@ mod test {
let config = ConfigOptions::default();
let result = DistPlannerAnalyzer {}.analyze(plan, &config).unwrap();
let expected = "Limit: skip=0, fetch=1\
\n LeftSemi Join: Filter: t.number = right.number\
\n MergeScan [is_placeholder=false]\
\n SubqueryAlias: right\
\n MergeScan [is_placeholder=false]";
let expected = [
"Limit: skip=0, fetch=1",
" LeftSemi Join: Filter: t.number = right.number",
" Projection: t.number",
" MergeScan [is_placeholder=false]",
" SubqueryAlias: right",
" Projection: t.number",
" MergeScan [is_placeholder=false]",
]
.join("\n");
assert_eq!(expected, result.to_string());
}
}

View File

@@ -263,23 +263,23 @@ select * from t where num > 3 order by ts desc limit 2;
select tag from t where num > 6 order by ts desc limit 2;
+-----+---------------------+
| tag | ts |
+-----+---------------------+
| zzz | 1970-01-01T00:00:09 |
| zzz | 1970-01-01T00:00:06 |
+-----+---------------------+
+-----+
| tag |
+-----+
| zzz |
| zzz |
+-----+
select tag from t where num > 6 order by ts;
+-----+---------------------+
| tag | ts |
+-----+---------------------+
| zzz | 1970-01-01T00:00:00 |
| zzz | 1970-01-01T00:00:03 |
| zzz | 1970-01-01T00:00:06 |
| zzz | 1970-01-01T00:00:09 |
+-----+---------------------+
+-----+
| tag |
+-----+
| zzz |
| zzz |
| zzz |
| zzz |
+-----+
-- SQLNESS REPLACE (-+) -
-- SQLNESS REPLACE (\s\s+) _
@@ -292,7 +292,8 @@ explain analyze select tag from t where num > 6 order by ts desc limit 2;
+-+-+-+
| stage | node | plan_|
+-+-+-+
| 0_| 0_|_SortPreservingMergeExec: [ts@1 DESC], fetch=2 REDACTED
| 0_| 0_|_ProjectionExec: expr=[tag@0 as tag] REDACTED
|_|_|_SortPreservingMergeExec: [ts@1 DESC], fetch=2 REDACTED
|_|_|_SortExec: TopK(fetch=2), expr=[ts@1 DESC], preserve_partitioning=[true] REDACTED
|_|_|_MergeScanExec: REDACTED
|_|_|_|

View File

@@ -56,6 +56,7 @@ EXPLAIN SELECT ts, host, min(val) RANGE '5s' FROM host ALIGN '5s';
| plan_type_| plan_|
+-+-+
| logical_plan_| RangeSelect: range_exprs=[min(host.val) RANGE 5s], align=5000ms, align_to=0ms, align_by=[host.host], time_index=ts |
|_|_Projection: host.ts, host.host, host.val_|
|_|_MergeScan [is_placeholder=false]_|
| physical_plan | RangeSelectExec: range_expr=[min(host.val) RANGE 5s], align=5000ms, align_to=0ms, align_by=[host@1], time_index=ts |
|_|_CoalescePartitionsExec_|

View File

@@ -18,7 +18,8 @@ TQL EXPLAIN (0, 10, '5s') test;
| logical_plan | PromInstantManipulate: range=[0..0], lookback=[300000], interval=[300000], time index=[j] |
| | PromSeriesNormalize: offset=[0], time index=[j], filter NaN: [false] |
| | PromSeriesDivide: tags=["k"] |
| | MergeScan [is_placeholder=false] |
| | Projection: test.i, test.j, test.k |
| | MergeScan [is_placeholder=false] |
| physical_plan | PromInstantManipulateExec: range=[0..0], lookback=[300000], interval=[300000], time index=[j] |
| | PromSeriesNormalizeExec: offset=[0], time index=[j], filter NaN: [false] |
| | PromSeriesDivideExec: tags=["k"] |
@@ -40,7 +41,8 @@ TQL EXPLAIN (0, 10, '1s', '2s') test;
| logical_plan | PromInstantManipulate: range=[0..0], lookback=[2000], interval=[300000], time index=[j] |
| | PromSeriesNormalize: offset=[0], time index=[j], filter NaN: [false] |
| | PromSeriesDivide: tags=["k"] |
| | MergeScan [is_placeholder=false] |
| | Projection: test.i, test.j, test.k |
| | MergeScan [is_placeholder=false] |
| physical_plan | PromInstantManipulateExec: range=[0..0], lookback=[2000], interval=[300000], time index=[j] |
| | PromSeriesNormalizeExec: offset=[0], time index=[j], filter NaN: [false] |
| | PromSeriesDivideExec: tags=["k"] |
@@ -61,7 +63,8 @@ TQL EXPLAIN ('1970-01-01T00:00:00'::timestamp, '1970-01-01T00:00:00'::timestamp
| logical_plan | PromInstantManipulate: range=[0..0], lookback=[300000], interval=[300000], time index=[j] |
| | PromSeriesNormalize: offset=[0], time index=[j], filter NaN: [false] |
| | PromSeriesDivide: tags=["k"] |
| | MergeScan [is_placeholder=false] |
| | Projection: test.i, test.j, test.k |
| | MergeScan [is_placeholder=false] |
| physical_plan | PromInstantManipulateExec: range=[0..0], lookback=[300000], interval=[300000], time index=[j] |
| | PromSeriesNormalizeExec: offset=[0], time index=[j], filter NaN: [false] |
| | PromSeriesDivideExec: tags=["k"] |
@@ -97,6 +100,7 @@ TQL EXPLAIN VERBOSE (0, 10, '5s') test;
| logical_plan after DistPlannerAnalyzer_| PromInstantManipulate: range=[0..0], lookback=[300000], interval=[300000], time index=[j]_|
|_|_PromSeriesNormalize: offset=[0], time index=[j], filter NaN: [false]_|
|_|_PromSeriesDivide: tags=["k"]_|
|_|_Projection: test.i, test.j, test.k_|
|_|_MergeScan [is_placeholder=false]_|
| analyzed_logical_plan_| SAME TEXT AS ABOVE_|
| logical_plan after eliminate_nested_union_| SAME TEXT AS ABOVE_|
@@ -128,31 +132,37 @@ TQL EXPLAIN VERBOSE (0, 10, '5s') test;
| logical_plan_| PromInstantManipulate: range=[0..0], lookback=[300000], interval=[300000], time index=[j]_|
|_|_PromSeriesNormalize: offset=[0], time index=[j], filter NaN: [false]_|
|_|_PromSeriesDivide: tags=["k"]_|
|_|_Projection: test.i, test.j, test.k_|
|_|_MergeScan [is_placeholder=false]_|
| initial_physical_plan_| PromInstantManipulateExec: range=[0..0], lookback=[300000], interval=[300000], time index=[j]_|
|_|_PromSeriesNormalizeExec: offset=[0], time index=[j], filter NaN: [false]_|
|_|_PromSeriesDivideExec: tags=["k"]_|
|_|_ProjectionExec: expr=[i@0 as i, j@1 as j, k@2 as k]_|
|_|_MergeScanExec: REDACTED
|_|_|
| initial_physical_plan_with_stats_| PromInstantManipulateExec: range=[0..0], lookback=[300000], interval=[300000], time index=[j], statistics=[Rows=Inexact(0), Bytes=Absent, [(Col[0]:),(Col[1]:),(Col[2]:)]] |
|_|_PromSeriesNormalizeExec: offset=[0], time index=[j], filter NaN: [false], statistics=[Rows=Absent, Bytes=Absent, [(Col[0]:),(Col[1]:),(Col[2]:)]]_|
|_|_PromSeriesDivideExec: tags=["k"], statistics=[Rows=Absent, Bytes=Absent, [(Col[0]:),(Col[1]:),(Col[2]:)]]_|
|_|_ProjectionExec: expr=[i@0 as i, j@1 as j, k@2 as k], statistics=[Rows=Absent, Bytes=Absent, [(Col[0]:),(Col[1]:),(Col[2]:)]]_|
|_|_MergeScanExec: REDACTED
|_|_|
| initial_physical_plan_with_schema_| PromInstantManipulateExec: range=[0..0], lookback=[300000], interval=[300000], time index=[j], schema=[i:Float64;N, j:Timestamp(Millisecond, None), k:Utf8;N]_|
|_|_PromSeriesNormalizeExec: offset=[0], time index=[j], filter NaN: [false], schema=[i:Float64;N, j:Timestamp(Millisecond, None), k:Utf8;N]_|
|_|_PromSeriesDivideExec: tags=["k"], schema=[i:Float64;N, j:Timestamp(Millisecond, None), k:Utf8;N]_|
|_|_ProjectionExec: expr=[i@0 as i, j@1 as j, k@2 as k], schema=[i:Float64;N, j:Timestamp(Millisecond, None), k:Utf8;N]_|
|_|_MergeScanExec: REDACTED
|_|_|
| physical_plan after parallelize_scan_| PromInstantManipulateExec: range=[0..0], lookback=[300000], interval=[300000], time index=[j]_|
|_|_PromSeriesNormalizeExec: offset=[0], time index=[j], filter NaN: [false]_|
|_|_PromSeriesDivideExec: tags=["k"]_|
|_|_ProjectionExec: expr=[i@0 as i, j@1 as j, k@2 as k]_|
|_|_MergeScanExec: REDACTED
|_|_|
| physical_plan after OutputRequirements_| OutputRequirementExec_|
|_|_PromInstantManipulateExec: range=[0..0], lookback=[300000], interval=[300000], time index=[j]_|
|_|_PromSeriesNormalizeExec: offset=[0], time index=[j], filter NaN: [false]_|
|_|_PromSeriesDivideExec: tags=["k"]_|
|_|_ProjectionExec: expr=[i@0 as i, j@1 as j, k@2 as k]_|
|_|_MergeScanExec: REDACTED
|_|_|
| physical_plan after aggregate_statistics_| SAME TEXT AS ABOVE_|
@@ -163,6 +173,7 @@ TQL EXPLAIN VERBOSE (0, 10, '5s') test;
|_|_PromSeriesNormalizeExec: offset=[0], time index=[j], filter NaN: [false]_|
|_|_PromSeriesDivideExec: tags=["k"]_|
|_|_CoalescePartitionsExec_|
|_|_ProjectionExec: expr=[i@0 as i, j@1 as j, k@2 as k]_|
|_|_MergeScanExec: REDACTED
|_|_|
| physical_plan after CombinePartialFinalAggregate_| SAME TEXT AS ABOVE_|
@@ -172,10 +183,18 @@ TQL EXPLAIN VERBOSE (0, 10, '5s') test;
|_|_PromSeriesDivideExec: tags=["k"]_|
|_|_SortPreservingMergeExec: [k@2 ASC NULLS LAST]_|
|_|_SortExec: expr=[k@2 ASC NULLS LAST], preserve_partitioning=[true]_|
|_|_ProjectionExec: expr=[i@0 as i, j@1 as j, k@2 as k]_|
|_|_MergeScanExec: REDACTED
|_|_|
| physical_plan after OptimizeAggregateOrder_| SAME TEXT AS ABOVE_|
| physical_plan after ProjectionPushdown_| SAME TEXT AS ABOVE_|
| physical_plan after ProjectionPushdown_| OutputRequirementExec_|
|_|_PromInstantManipulateExec: range=[0..0], lookback=[300000], interval=[300000], time index=[j]_|
|_|_PromSeriesNormalizeExec: offset=[0], time index=[j], filter NaN: [false]_|
|_|_PromSeriesDivideExec: tags=["k"]_|
|_|_SortPreservingMergeExec: [k@2 ASC NULLS LAST]_|
|_|_SortExec: expr=[k@2 ASC NULLS LAST], preserve_partitioning=[true]_|
|_|_MergeScanExec: REDACTED
|_|_|
| physical_plan after coalesce_batches_| SAME TEXT AS ABOVE_|
| physical_plan after OutputRequirements_| PromInstantManipulateExec: range=[0..0], lookback=[300000], interval=[300000], time index=[j]_|
|_|_PromSeriesNormalizeExec: offset=[0], time index=[j], filter NaN: [false]_|