fix: expand differently

Signed-off-by: discord9 <discord9@163.com>
This commit is contained in:
discord9
2025-07-10 14:49:22 +08:00
parent bdecdb869e
commit 99c78b2f97
2 changed files with 47 additions and 10 deletions

View File

@@ -155,7 +155,23 @@ struct PlanRewriter {
/// Partition columns of the table in current pass
partition_cols: Option<Vec<String>>,
column_requirements: HashSet<Column>,
/// Whether to expand on next call
/// This is used to handle the case where a plan is transformed, but need to be expanded from it's
/// parent node. For example a Aggregate plan is split into two parts in frontend and datanode, and need
/// to be expanded from the parent node of the Aggregate plan.
expand_on_next_call: bool,
/// Expanding on next partial/conditional/transformed commutative plan
/// This is used to handle the case where a plan is transformed, but still
/// need to push down as many node as possible before next partial/conditional/transformed commutative
/// plan. I.e.
/// ```
/// Limit:
/// Sort:
/// ```
/// where `Limit` is partial commutative, and `Sort` is conditional commutative.
/// In this case, we need to expand the `Limit` plan,
/// so that we can push down the `Sort` plan as much as possible.
expand_on_next_part_cond_trans_commutative: bool,
new_child_plan: Option<LogicalPlan>,
}
@@ -177,16 +193,38 @@ impl PlanRewriter {
{
return true;
}
if self.expand_on_next_call {
self.expand_on_next_call = false;
return true;
}
if self.expand_on_next_part_cond_trans_commutative {
let comm = Categorizer::check_plan(plan, self.partition_cols.clone());
match comm {
Commutativity::PartialCommutative => {
// a small difference is that for partial commutative, we still need to
// expand on next call(so `Limit` can be pushed down)
self.expand_on_next_part_cond_trans_commutative = false;
self.expand_on_next_call = true;
}
Commutativity::ConditionalCommutative(_)
| Commutativity::TransformedCommutative { .. } => {
// for conditional commutative and transformed commutative, we can
// expand now
self.expand_on_next_part_cond_trans_commutative = false;
return true;
}
_ => (),
}
}
match Categorizer::check_plan(plan, self.partition_cols.clone()) {
Commutativity::Commutative => {}
Commutativity::PartialCommutative => {
if let Some(plan) = partial_commutative_transformer(plan) {
self.update_column_requirements(&plan);
self.expand_on_next_call = true;
self.expand_on_next_part_cond_trans_commutative = true;
self.stage.push(plan)
}
}
@@ -195,7 +233,7 @@ impl PlanRewriter {
&& let Some(plan) = transformer(plan)
{
self.update_column_requirements(&plan);
self.expand_on_next_call = true;
self.expand_on_next_part_cond_trans_commutative = true;
self.stage.push(plan)
}
}

View File

@@ -295,19 +295,18 @@ explain analyze select tag from t where num > 6 order by ts desc limit 2;
+-+-+-+
| 0_| 0_|_ProjectionExec: expr=[tag@0 as tag] REDACTED
|_|_|_SortPreservingMergeExec: [ts@1 DESC], fetch=2 REDACTED
|_|_|_SortExec: TopK(fetch=2), expr=[ts@1 DESC], preserve_partitioning=[true] REDACTED
|_|_|_MergeScanExec: REDACTED
|_|_|_|
| 1_| 0_|_SortPreservingMergeExec: [ts@1 DESC] REDACTED
|_|_|_WindowedSortExec: expr=ts@1 DESC num_ranges=REDACTED REDACTED
|_|_|_PartSortExec: expr=ts@1 DESC num_ranges=REDACTED REDACTED
|_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED
| 1_| 0_|_SortPreservingMergeExec: [ts@1 DESC], fetch=2 REDACTED
|_|_|_WindowedSortExec: expr=ts@1 DESC num_ranges=REDACTED fetch=2 REDACTED
|_|_|_PartSortExec: expr=ts@1 DESC num_ranges=REDACTED limit=2 REDACTED
|_|_|_FilterExec: num@2 > 6, projection=[tag@0, ts@1] REDACTED
|_|_|_SeqScan: region=REDACTED, "partition_count":{"count":1, "mem_ranges":1, "files":0, "file_ranges":0} REDACTED
|_|_|_|
| 1_| 1_|_SortPreservingMergeExec: [ts@1 DESC] REDACTED
|_|_|_WindowedSortExec: expr=ts@1 DESC num_ranges=REDACTED REDACTED
|_|_|_PartSortExec: expr=ts@1 DESC num_ranges=REDACTED REDACTED
|_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED
| 1_| 1_|_SortPreservingMergeExec: [ts@1 DESC], fetch=2 REDACTED
|_|_|_WindowedSortExec: expr=ts@1 DESC num_ranges=REDACTED fetch=2 REDACTED
|_|_|_PartSortExec: expr=ts@1 DESC num_ranges=REDACTED limit=2 REDACTED
|_|_|_FilterExec: num@2 > 6, projection=[tag@0, ts@1] REDACTED
|_|_|_SeqScan: region=REDACTED, "partition_count":{"count":1, "mem_ranges":1, "files":0, "file_ranges":0} REDACTED
|_|_|_|