From 99c78b2f972b8d34e93e7e19fe34c4873bff6f39 Mon Sep 17 00:00:00 2001 From: discord9 Date: Thu, 10 Jul 2025 14:49:22 +0800 Subject: [PATCH] fix: expand differently Signed-off-by: discord9 --- src/query/src/dist_plan/analyzer.rs | 42 ++++++++++++++++++- .../standalone/common/order/order_by.result | 15 ++++--- 2 files changed, 47 insertions(+), 10 deletions(-) diff --git a/src/query/src/dist_plan/analyzer.rs b/src/query/src/dist_plan/analyzer.rs index 55797398f5..2dcbd6a77d 100644 --- a/src/query/src/dist_plan/analyzer.rs +++ b/src/query/src/dist_plan/analyzer.rs @@ -155,7 +155,23 @@ struct PlanRewriter { /// Partition columns of the table in current pass partition_cols: Option>, column_requirements: HashSet, + /// Whether to expand on next call + /// This is used to handle the case where a plan is transformed, but need to be expanded from it's + /// parent node. For example a Aggregate plan is split into two parts in frontend and datanode, and need + /// to be expanded from the parent node of the Aggregate plan. expand_on_next_call: bool, + /// Expanding on next partial/conditional/transformed commutative plan + /// This is used to handle the case where a plan is transformed, but still + /// need to push down as many node as possible before next partial/conditional/transformed commutative + /// plan. I.e. + /// ``` + /// Limit: + /// Sort: + /// ``` + /// where `Limit` is partial commutative, and `Sort` is conditional commutative. + /// In this case, we need to expand the `Limit` plan, + /// so that we can push down the `Sort` plan as much as possible. + expand_on_next_part_cond_trans_commutative: bool, new_child_plan: Option, } @@ -177,16 +193,38 @@ impl PlanRewriter { { return true; } + if self.expand_on_next_call { self.expand_on_next_call = false; return true; } + + if self.expand_on_next_part_cond_trans_commutative { + let comm = Categorizer::check_plan(plan, self.partition_cols.clone()); + match comm { + Commutativity::PartialCommutative => { + // a small difference is that for partial commutative, we still need to + // expand on next call(so `Limit` can be pushed down) + self.expand_on_next_part_cond_trans_commutative = false; + self.expand_on_next_call = true; + } + Commutativity::ConditionalCommutative(_) + | Commutativity::TransformedCommutative { .. } => { + // for conditional commutative and transformed commutative, we can + // expand now + self.expand_on_next_part_cond_trans_commutative = false; + return true; + } + _ => (), + } + } + match Categorizer::check_plan(plan, self.partition_cols.clone()) { Commutativity::Commutative => {} Commutativity::PartialCommutative => { if let Some(plan) = partial_commutative_transformer(plan) { self.update_column_requirements(&plan); - self.expand_on_next_call = true; + self.expand_on_next_part_cond_trans_commutative = true; self.stage.push(plan) } } @@ -195,7 +233,7 @@ impl PlanRewriter { && let Some(plan) = transformer(plan) { self.update_column_requirements(&plan); - self.expand_on_next_call = true; + self.expand_on_next_part_cond_trans_commutative = true; self.stage.push(plan) } } diff --git a/tests/cases/standalone/common/order/order_by.result b/tests/cases/standalone/common/order/order_by.result index c276b31f39..5bedb221c7 100644 --- a/tests/cases/standalone/common/order/order_by.result +++ b/tests/cases/standalone/common/order/order_by.result @@ -295,19 +295,18 @@ explain analyze select tag from t where num > 6 order by ts desc limit 2; +-+-+-+ | 0_| 0_|_ProjectionExec: expr=[tag@0 as tag] REDACTED |_|_|_SortPreservingMergeExec: [ts@1 DESC], fetch=2 REDACTED +|_|_|_SortExec: TopK(fetch=2), expr=[ts@1 DESC], preserve_partitioning=[true] REDACTED |_|_|_MergeScanExec: REDACTED |_|_|_| -| 1_| 0_|_SortPreservingMergeExec: [ts@1 DESC] REDACTED -|_|_|_WindowedSortExec: expr=ts@1 DESC num_ranges=REDACTED REDACTED -|_|_|_PartSortExec: expr=ts@1 DESC num_ranges=REDACTED REDACTED -|_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED +| 1_| 0_|_SortPreservingMergeExec: [ts@1 DESC], fetch=2 REDACTED +|_|_|_WindowedSortExec: expr=ts@1 DESC num_ranges=REDACTED fetch=2 REDACTED +|_|_|_PartSortExec: expr=ts@1 DESC num_ranges=REDACTED limit=2 REDACTED |_|_|_FilterExec: num@2 > 6, projection=[tag@0, ts@1] REDACTED |_|_|_SeqScan: region=REDACTED, "partition_count":{"count":1, "mem_ranges":1, "files":0, "file_ranges":0} REDACTED |_|_|_| -| 1_| 1_|_SortPreservingMergeExec: [ts@1 DESC] REDACTED -|_|_|_WindowedSortExec: expr=ts@1 DESC num_ranges=REDACTED REDACTED -|_|_|_PartSortExec: expr=ts@1 DESC num_ranges=REDACTED REDACTED -|_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED +| 1_| 1_|_SortPreservingMergeExec: [ts@1 DESC], fetch=2 REDACTED +|_|_|_WindowedSortExec: expr=ts@1 DESC num_ranges=REDACTED fetch=2 REDACTED +|_|_|_PartSortExec: expr=ts@1 DESC num_ranges=REDACTED limit=2 REDACTED |_|_|_FilterExec: num@2 > 6, projection=[tag@0, ts@1] REDACTED |_|_|_SeqScan: region=REDACTED, "partition_count":{"count":1, "mem_ranges":1, "files":0, "file_ranges":0} REDACTED |_|_|_|