diff --git a/src/query/src/dist_plan/analyzer.rs b/src/query/src/dist_plan/analyzer.rs index b3c5a0024d..d755050653 100644 --- a/src/query/src/dist_plan/analyzer.rs +++ b/src/query/src/dist_plan/analyzer.rs @@ -155,7 +155,23 @@ struct PlanRewriter { /// Partition columns of the table in current pass partition_cols: Option>, column_requirements: HashSet, + /// Whether to expand on next call + /// This is used to handle the case where a plan is transformed, but need to be expanded from it's + /// parent node. For example a Aggregate plan is split into two parts in frontend and datanode, and need + /// to be expanded from the parent node of the Aggregate plan. expand_on_next_call: bool, + /// Expanding on next partial/conditional/transformed commutative plan + /// This is used to handle the case where a plan is transformed, but still + /// need to push down as many node as possible before next partial/conditional/transformed commutative + /// plan. I.e. + /// ``` + /// Limit: + /// Sort: + /// ``` + /// where `Limit` is partial commutative, and `Sort` is conditional commutative. + /// In this case, we need to expand the `Limit` plan, + /// so that we can push down the `Sort` plan as much as possible. + expand_on_next_part_cond_trans_commutative: bool, new_child_plan: Option, } @@ -177,15 +193,38 @@ impl PlanRewriter { { return true; } + if self.expand_on_next_call { self.expand_on_next_call = false; return true; } + + if self.expand_on_next_part_cond_trans_commutative { + let comm = Categorizer::check_plan(plan, self.partition_cols.clone()); + match comm { + Commutativity::PartialCommutative => { + // a small difference is that for partial commutative, we still need to + // expand on next call(so `Limit` can be pushed down) + self.expand_on_next_part_cond_trans_commutative = false; + self.expand_on_next_call = true; + } + Commutativity::ConditionalCommutative(_) + | Commutativity::TransformedCommutative { .. } => { + // for conditional commutative and transformed commutative, we can + // expand now + self.expand_on_next_part_cond_trans_commutative = false; + return true; + } + _ => (), + } + } + match Categorizer::check_plan(plan, self.partition_cols.clone()) { Commutativity::Commutative => {} Commutativity::PartialCommutative => { if let Some(plan) = partial_commutative_transformer(plan) { self.update_column_requirements(&plan); + self.expand_on_next_part_cond_trans_commutative = true; self.stage.push(plan) } } @@ -194,6 +233,7 @@ impl PlanRewriter { && let Some(plan) = transformer(plan) { self.update_column_requirements(&plan); + self.expand_on_next_part_cond_trans_commutative = true; self.stage.push(plan) } } @@ -202,7 +242,7 @@ impl PlanRewriter { && let Some(transformer_actions) = transformer(plan) { debug!( - "PlanRewriter: transformed plan: {:#?}\n from {plan}", + "PlanRewriter: transformed plan: {:?}\n from {plan}", transformer_actions.extra_parent_plans ); if let Some(last_stage) = transformer_actions.extra_parent_plans.last() { @@ -226,6 +266,10 @@ impl PlanRewriter { } fn update_column_requirements(&mut self, plan: &LogicalPlan) { + debug!( + "PlanRewriter: update column requirements for plan: {plan}\n withcolumn_requirements: {:?}", + self.column_requirements + ); let mut container = HashSet::new(); for expr in plan.expressions() { // this method won't fail @@ -235,6 +279,10 @@ impl PlanRewriter { for col in container { self.column_requirements.insert(col); } + debug!( + "PlanRewriter: updated column requirements: {:?}", + self.column_requirements + ); } fn is_expanded(&self) -> bool { diff --git a/tests/cases/standalone/tql-explain-analyze/analyze.result b/tests/cases/standalone/tql-explain-analyze/analyze.result index f79956ace3..623ed4fade 100644 --- a/tests/cases/standalone/tql-explain-analyze/analyze.result +++ b/tests/cases/standalone/tql-explain-analyze/analyze.result @@ -234,3 +234,58 @@ drop table test; Affected Rows: 0 +CREATE TABLE test2 ( + "greptime_timestamp" TIMESTAMP(3) NOT NULL, + "greptime_value" DOUBLE NULL, + "shard" STRING NULL INVERTED INDEX, + TIME INDEX ("greptime_timestamp"), + PRIMARY KEY ("shard") +) +PARTITION ON COLUMNS ("shard") ( + shard <= '2', + shard > '2' +); + +Affected Rows: 0 + +TQL EVAL sum(test2); + +++ +++ + +-- SQLNESS REPLACE (metrics.*) REDACTED +-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED +-- SQLNESS REPLACE (Hash.*) REDACTED +-- SQLNESS REPLACE (-+) - +-- SQLNESS REPLACE (\s\s+) _ +-- SQLNESS REPLACE (peers.*) REDACTED +-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED +TQL ANALYZE sum(test2); + ++-+-+-+ +| stage | node | plan_| ++-+-+-+ +| 0_| 0_|_SortPreservingMergeExec: [greptime_timestamp@0 ASC NULLS LAST] REDACTED +|_|_|_SortExec: expr=[greptime_timestamp@0 ASC NULLS LAST], preserve_partitioning=[true] REDACTED +|_|_|_AggregateExec: mode=FinalPartitioned, gby=[greptime_timestamp@0 as greptime_timestamp], aggr=[sum(test2.greptime_value)] REDACTED +|_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED +|_|_|_RepartitionExec: partitioning=REDACTED +|_|_|_AggregateExec: mode=Partial, gby=[greptime_timestamp@0 as greptime_timestamp], aggr=[sum(test2.greptime_value)] REDACTED +|_|_|_ProjectionExec: expr=[greptime_timestamp@0 as greptime_timestamp, greptime_value@1 as greptime_value] REDACTED +|_|_|_MergeScanExec: REDACTED +|_|_|_| +| 1_| 0_|_PromInstantManipulateExec: range=[0..0], lookback=[300000], interval=[300000], time index=[greptime_timestamp] REDACTED +|_|_|_PromSeriesDivideExec: tags=["shard"] REDACTED +|_|_|_SeriesScan: region=REDACTED, "partition_count":{"count":0, "mem_ranges":0, "files":0, "file_ranges":0}, "distribution":"PerSeries" REDACTED +|_|_|_| +| 1_| 1_|_PromInstantManipulateExec: range=[0..0], lookback=[300000], interval=[300000], time index=[greptime_timestamp] REDACTED +|_|_|_PromSeriesDivideExec: tags=["shard"] REDACTED +|_|_|_SeriesScan: region=REDACTED, "partition_count":{"count":0, "mem_ranges":0, "files":0, "file_ranges":0}, "distribution":"PerSeries" REDACTED +|_|_|_| +|_|_| Total rows: 0_| ++-+-+-+ + +DROP TABLE test2; + +Affected Rows: 0 + diff --git a/tests/cases/standalone/tql-explain-analyze/analyze.sql b/tests/cases/standalone/tql-explain-analyze/analyze.sql index c585ded0f0..ef81ef9db2 100644 --- a/tests/cases/standalone/tql-explain-analyze/analyze.sql +++ b/tests/cases/standalone/tql-explain-analyze/analyze.sql @@ -95,3 +95,28 @@ TQL ANALYZE VERBOSE FORMAT JSON (0, 10, '5s') test; TQL ANALYZE FORMAT TEXT (0, 10, '5s') test; drop table test; + +CREATE TABLE test2 ( + "greptime_timestamp" TIMESTAMP(3) NOT NULL, + "greptime_value" DOUBLE NULL, + "shard" STRING NULL INVERTED INDEX, + TIME INDEX ("greptime_timestamp"), + PRIMARY KEY ("shard") +) +PARTITION ON COLUMNS ("shard") ( + shard <= '2', + shard > '2' +); + +TQL EVAL sum(test2); + +-- SQLNESS REPLACE (metrics.*) REDACTED +-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED +-- SQLNESS REPLACE (Hash.*) REDACTED +-- SQLNESS REPLACE (-+) - +-- SQLNESS REPLACE (\s\s+) _ +-- SQLNESS REPLACE (peers.*) REDACTED +-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED +TQL ANALYZE sum(test2); + +DROP TABLE test2;