major refactor

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
Ruihang Xia
2026-04-22 02:40:43 +08:00
parent 78c41e083f
commit 2277ebd9f6
6 changed files with 163 additions and 93 deletions

View File

@@ -21,7 +21,6 @@ use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
use datafusion::physical_plan::repartition::RepartitionExec;
use datafusion::physical_plan::{ExecutionPlan, ExecutionPlanProperties, InputOrderMode};
use datafusion_common::Result as DfResult;
use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
use datafusion_physical_expr::{Distribution, Partitioning};
/// Replaces a redundant hash repartition before a coarser aggregate with a
@@ -33,13 +32,18 @@ use datafusion_physical_expr::{Distribution, Partitioning};
#[derive(Debug)]
pub struct ReduceAggregateRepartition;
struct OptimizedPlan {
plan: Arc<dyn ExecutionPlan>,
subtree_is_linear: bool,
}
impl PhysicalOptimizerRule for ReduceAggregateRepartition {
fn optimize(
&self,
plan: Arc<dyn ExecutionPlan>,
_config: &ConfigOptions,
) -> DfResult<Arc<dyn ExecutionPlan>> {
Self::do_optimize(plan)
Ok(Self::do_optimize_with_context(plan, false)?.plan)
}
fn name(&self) -> &str {
@@ -52,68 +56,99 @@ impl PhysicalOptimizerRule for ReduceAggregateRepartition {
}
impl ReduceAggregateRepartition {
fn do_optimize(plan: Arc<dyn ExecutionPlan>) -> DfResult<Arc<dyn ExecutionPlan>> {
plan.transform_down(|plan| {
let Some(agg_exec) = plan.as_any().downcast_ref::<AggregateExec>() else {
return Ok(Transformed::no(plan));
};
fn do_optimize_with_context(
plan: Arc<dyn ExecutionPlan>,
under_branching_ancestor: bool,
) -> DfResult<OptimizedPlan> {
let children = plan.children();
let subtree_branches_here = children.len() > 1;
let child_under_branching_ancestor = under_branching_ancestor || subtree_branches_here;
let optimized_children = children
.into_iter()
.map(|child| {
Self::do_optimize_with_context(Arc::clone(child), child_under_branching_ancestor)
})
.collect::<DfResult<Vec<_>>>()?;
let subtree_is_linear = !subtree_branches_here
&& optimized_children
.iter()
.all(|child| child.subtree_is_linear);
let new_children = optimized_children
.into_iter()
.map(|child| child.plan)
.collect();
let plan = plan.with_new_children(new_children)?;
let new_mode = match agg_exec.mode() {
AggregateMode::FinalPartitioned => AggregateMode::Final,
AggregateMode::SinglePartitioned => AggregateMode::Single,
_ => return Ok(Transformed::no(plan)),
};
let plan = if under_branching_ancestor || !subtree_is_linear {
plan
} else {
Self::try_reduce_repartition(plan)?
};
if agg_exec.input_order_mode() != &InputOrderMode::Linear
|| agg_exec.group_expr().has_grouping_set()
{
return Ok(Transformed::no(plan));
}
let Some(repartition_exec) =
agg_exec.input().as_any().downcast_ref::<RepartitionExec>()
else {
return Ok(Transformed::no(plan));
};
if repartition_exec.preserve_order() {
return Ok(Transformed::no(plan));
}
let Some(required_distribution) =
agg_exec.required_input_distribution().into_iter().next()
else {
return Ok(Transformed::no(plan));
};
let repartition_satisfaction = repartition_exec.partitioning().satisfaction(
&required_distribution,
repartition_exec.properties().equivalence_properties(),
true,
);
if !repartition_satisfaction.is_satisfied() {
return Ok(Transformed::no(plan));
}
if !Self::can_reduce_repartition(repartition_exec) {
return Ok(Transformed::no(plan));
}
let new_input = Arc::new(CoalescePartitionsExec::new(
repartition_exec.input().clone(),
));
let new_agg = AggregateExec::try_new(
new_mode,
agg_exec.group_expr().clone(),
agg_exec.aggr_expr().to_vec(),
agg_exec.filter_expr().to_vec(),
new_input,
agg_exec.input_schema(),
)?
.with_limit_options(agg_exec.limit_options());
Ok(Transformed::yes(Arc::new(new_agg)))
Ok(OptimizedPlan {
plan,
subtree_is_linear,
})
.data()
}
fn aggregate_repartition_candidate(
plan: &Arc<dyn ExecutionPlan>,
) -> Option<(&AggregateExec, &RepartitionExec)> {
let agg_exec = plan.as_any().downcast_ref::<AggregateExec>()?;
let repartition_exec = agg_exec
.input()
.as_any()
.downcast_ref::<RepartitionExec>()?;
Some((agg_exec, repartition_exec))
}
fn can_rewrite_aggregate(agg_exec: &AggregateExec, repartition_exec: &RepartitionExec) -> bool {
if !matches!(
agg_exec.mode(),
AggregateMode::FinalPartitioned | AggregateMode::SinglePartitioned
) || agg_exec.input_order_mode() != &InputOrderMode::Linear
|| agg_exec.group_expr().has_grouping_set()
|| repartition_exec.preserve_order()
{
return false;
}
let Some(required_distribution) = agg_exec.required_input_distribution().into_iter().next()
else {
return false;
};
let repartition_satisfaction = repartition_exec.partitioning().satisfaction(
&required_distribution,
repartition_exec.properties().equivalence_properties(),
true,
);
repartition_satisfaction.is_satisfied() && Self::can_reduce_repartition(repartition_exec)
}
fn try_reduce_repartition(plan: Arc<dyn ExecutionPlan>) -> DfResult<Arc<dyn ExecutionPlan>> {
let Some((agg_exec, repartition_exec)) = Self::aggregate_repartition_candidate(&plan)
else {
return Ok(plan);
};
if !Self::can_rewrite_aggregate(agg_exec, repartition_exec) {
return Ok(plan);
}
let new_input = Arc::new(CoalescePartitionsExec::new(
repartition_exec.input().clone(),
));
let new_agg = AggregateExec::try_new(
*agg_exec.mode(),
agg_exec.group_expr().clone(),
agg_exec.aggr_expr().to_vec(),
agg_exec.filter_expr().to_vec(),
new_input,
agg_exec.input_schema(),
)?
.with_limit_options(agg_exec.limit_options());
Ok(Arc::new(new_agg))
}
fn can_reduce_repartition(repartition_exec: &RepartitionExec) -> bool {
@@ -147,6 +182,7 @@ mod tests {
use datafusion::physical_plan::aggregates::{AggregateExec, AggregateMode, PhysicalGroupBy};
use datafusion::physical_plan::projection::ProjectionExec;
use datafusion::physical_plan::repartition::RepartitionExec;
use datafusion::physical_plan::union::UnionExec;
use datafusion::physical_plan::{ExecutionPlan, displayable};
use datafusion_common::{DFSchema, Result};
use datafusion_physical_expr::aggregate::AggregateFunctionExpr;
@@ -264,7 +300,7 @@ mod tests {
assert_eq!(
optimize(final_agg)?.trim(),
r#"AggregateExec: mode=Final, gby=[a@0 as a], aggr=[]
r#"AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[]
CoalescePartitionsExec
AggregateExec: mode=Partial, gby=[a@0 as a, b@1 as b], aggr=[]
RepartitionExec: partitioning=Hash([a@0, b@1], 8), input_partitions=1
@@ -288,7 +324,7 @@ mod tests {
assert_eq!(
optimize(final_agg)?.trim(),
r#"AggregateExec: mode=Single, gby=[a@0 as a], aggr=[]
r#"AggregateExec: mode=SinglePartitioned, gby=[a@0 as a], aggr=[]
CoalescePartitionsExec
RepartitionExec: partitioning=Hash([a@0, b@1], 8), input_partitions=1
DataSourceExec: partitions=1, partition_sizes=[0]"#
@@ -349,7 +385,7 @@ mod tests {
assert_eq!(
optimize(final_agg)?.trim(),
r#"AggregateExec: mode=Final, gby=[b@2 as b, c@0 as c], aggr=[]
r#"AggregateExec: mode=FinalPartitioned, gby=[b@2 as b, c@0 as c], aggr=[]
CoalescePartitionsExec
AggregateExec: mode=Partial, gby=[c@2 as c, a@0 as a, b@1 as b], aggr=[]
RepartitionExec: partitioning=Hash([c@2, a@0, b@1], 8), input_partitions=1
@@ -373,7 +409,7 @@ mod tests {
assert_eq!(
optimize(final_agg)?.trim(),
r#"AggregateExec: mode=Final, gby=[a@0 as a, b@1 as b], aggr=[]
r#"AggregateExec: mode=FinalPartitioned, gby=[a@0 as a, b@1 as b], aggr=[]
CoalescePartitionsExec
RepartitionExec: partitioning=Hash([a@0, b@1, c@2], 8), input_partitions=1
DataSourceExec: partitions=1, partition_sizes=[0]"#
@@ -431,7 +467,9 @@ mod tests {
let optimized = optimize(final_agg)?;
assert!(
optimized.contains("AggregateExec: mode=Single, gby=[x@0 as x, y@1 as y], aggr=[]")
optimized.contains(
"AggregateExec: mode=SinglePartitioned, gby=[x@0 as x, y@1 as y], aggr=[]"
)
);
assert!(optimized.contains("CoalescePartitionsExec"));
assert!(optimized.contains("ProjectionExec: expr=[a@0 as x, b@1 as y, c@2 as z]"));
@@ -455,7 +493,9 @@ mod tests {
let optimized = optimize(final_agg)?;
assert!(
optimized.contains("AggregateExec: mode=Single, gby=[x@1 as x, y@2 as y], aggr=[]"),
optimized.contains(
"AggregateExec: mode=SinglePartitioned, gby=[x@1 as x, y@2 as y], aggr=[]"
),
"{optimized}"
);
assert!(optimized.contains("CoalescePartitionsExec"), "{optimized}");
@@ -463,6 +503,38 @@ mod tests {
Ok(())
}
#[test]
fn keeps_subset_partitioning_with_branching_descendant() -> Result<()> {
let left = input_exec();
let right = input_exec();
let union = UnionExec::try_new(vec![left, right])?;
let finer = repartition(union.clone(), &["a", "b", "c"], &union.schema())?;
let partial = aggregate(
AggregateMode::Partial,
finer,
group_by(&["a", "b", "c"], &union.schema())?,
union.schema(),
vec![],
)?;
let final_repartition = repartition(partial.clone(), &["a", "b"], &partial.schema())?;
let final_agg = aggregate(
AggregateMode::FinalPartitioned,
final_repartition,
group_by(&["a", "b"], &partial.schema())?,
union.schema(),
vec![],
)?;
let optimized = optimize(final_agg)?;
assert!(
optimized.contains("RepartitionExec: partitioning=Hash([a@0, b@1], 8)"),
"{optimized}"
);
assert!(optimized.contains("UnionExec"), "{optimized}");
assert!(!optimized.contains("CoalescePartitionsExec"), "{optimized}");
Ok(())
}
#[test]
fn keeps_non_subset_repartition() -> Result<()> {
let raw = input_exec();

View File

@@ -193,14 +193,9 @@ impl QueryEngineState {
);
Self::insert_physical_optimizer_rule_after(
&mut physical_optimizer.rules,
datafusion::physical_optimizer::combine_partial_final_agg::CombinePartialFinalAggregate::new().name(),
datafusion::physical_optimizer::update_aggr_exprs::OptimizeAggregateOrder::new().name(),
Arc::new(ReduceAggregateRepartition),
);
Self::insert_physical_optimizer_rule_after(
&mut physical_optimizer.rules,
ReduceAggregateRepartition.name(),
Arc::new(datafusion::physical_optimizer::enforce_distribution::EnforceDistribution {}),
);
// Add rule for windowed sort
physical_optimizer
.rules

View File

@@ -181,10 +181,9 @@ TQL EXPLAIN VERBOSE (0, 10, '5s') test;
| physical_plan after EnforceSorting_| SAME TEXT AS ABOVE_|
| physical_plan after EnforceDistribution_| SAME TEXT AS ABOVE_|
| physical_plan after CombinePartialFinalAggregate_| SAME TEXT AS ABOVE_|
| physical_plan after ReduceAggregateRepartition_| SAME TEXT AS ABOVE_|
| physical_plan after EnforceDistribution_| SAME TEXT AS ABOVE_|
| physical_plan after EnforceSorting_| SAME TEXT AS ABOVE_|
| physical_plan after OptimizeAggregateOrder_| SAME TEXT AS ABOVE_|
| physical_plan after ReduceAggregateRepartition_| SAME TEXT AS ABOVE_|
| physical_plan after ProjectionPushdown_| SAME TEXT AS ABOVE_|
| physical_plan after OutputRequirements_| MergeScanExec: REDACTED
|_|_|
@@ -329,10 +328,9 @@ TQL EXPLAIN VERBOSE (0, 10, '5s') test AS series;
| physical_plan after EnforceSorting_| SAME TEXT AS ABOVE_|
| physical_plan after EnforceDistribution_| SAME TEXT AS ABOVE_|
| physical_plan after CombinePartialFinalAggregate_| SAME TEXT AS ABOVE_|
| physical_plan after ReduceAggregateRepartition_| SAME TEXT AS ABOVE_|
| physical_plan after EnforceDistribution_| SAME TEXT AS ABOVE_|
| physical_plan after EnforceSorting_| SAME TEXT AS ABOVE_|
| physical_plan after OptimizeAggregateOrder_| SAME TEXT AS ABOVE_|
| physical_plan after ReduceAggregateRepartition_| SAME TEXT AS ABOVE_|
| physical_plan after ProjectionPushdown_| SAME TEXT AS ABOVE_|
| physical_plan after OutputRequirements_| MergeScanExec: REDACTED
|_|_|

View File

@@ -185,10 +185,9 @@ GROUP BY b, a;
|_|_MergeScanExec: REDACTED
|_|_|
| physical_plan after CombinePartialFinalAggregate_| SAME TEXT AS ABOVE_|
| physical_plan after ReduceAggregateRepartition_| SAME TEXT AS ABOVE_|
| physical_plan after EnforceDistribution_| SAME TEXT AS ABOVE_|
| physical_plan after EnforceSorting_| SAME TEXT AS ABOVE_|
| physical_plan after OptimizeAggregateOrder_| SAME TEXT AS ABOVE_|
| physical_plan after ReduceAggregateRepartition_| SAME TEXT AS ABOVE_|
| physical_plan after ProjectionPushdown_| SAME TEXT AS ABOVE_|
| physical_plan after OutputRequirements_| ProjectionExec: expr=[a@1 as a, b@0 as b, sum(s.m)@2 as sum(s.m)]_|
|_|_AggregateExec: mode=FinalPartitioned, gby=[b@0 as b, a@1 as a], aggr=[sum(s.m)]_|
@@ -252,7 +251,7 @@ GROUP BY b, a;
|_|_|_AggregateExec: mode=Partial, gby=[b@0 as b, a@1 as a], aggr=[sum(s.m)] REDACTED
|_|_|_MergeScanExec: REDACTED
|_|_|_|
| 1_| 0_|_AggregateExec: mode=Final, gby=[b@0 as b, a@1 as a], aggr=[__sum_state(s.m)] REDACTED
| 1_| 0_|_AggregateExec: mode=FinalPartitioned, gby=[b@0 as b, a@1 as a], aggr=[__sum_state(s.m)] REDACTED
|_|_|_CoalescePartitionsExec REDACTED
|_|_|_AggregateExec: mode=Partial, gby=[b@1 as b, a@0 as a], aggr=[__sum_state(s.m)] REDACTED
|_|_|_ProjectionExec: expr=[a@0 as a, b@1 as b, min(reduce_aggregate_repartition.val)@3 as m] REDACTED
@@ -261,7 +260,7 @@ GROUP BY b, a;
|_|_|_AggregateExec: mode=Partial, gby=[a@0 as a, b@1 as b, c@2 as c], aggr=[min(reduce_aggregate_repartition.val)] REDACTED
|_|_|_SeqScan: region=REDACTED, "partition_count":{"count":1, "mem_ranges":1, "files":0, "file_ranges":0} REDACTED
|_|_|_|
| 1_| 1_|_AggregateExec: mode=Final, gby=[b@0 as b, a@1 as a], aggr=[__sum_state(s.m)] REDACTED
| 1_| 1_|_AggregateExec: mode=FinalPartitioned, gby=[b@0 as b, a@1 as a], aggr=[__sum_state(s.m)] REDACTED
|_|_|_CoalescePartitionsExec REDACTED
|_|_|_AggregateExec: mode=Partial, gby=[b@1 as b, a@0 as a], aggr=[__sum_state(s.m)] REDACTED
|_|_|_ProjectionExec: expr=[a@0 as a, b@1 as b, min(reduce_aggregate_repartition.val)@3 as m] REDACTED
@@ -314,7 +313,7 @@ GROUP BY a;
|_|_|_CooperativeExec REDACTED
|_|_|_MergeScanExec: REDACTED
|_|_|_|
| 1_| 0_|_AggregateExec: mode=Final, gby=[a@0 as a], aggr=[__count_state(s.m)] REDACTED
| 1_| 0_|_AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[__count_state(s.m)] REDACTED
|_|_|_CoalescePartitionsExec REDACTED
|_|_|_AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[__count_state(s.m)] REDACTED
|_|_|_ProjectionExec: expr=[a@0 as a, min(reduce_aggregate_repartition.val)@3 as m] REDACTED
@@ -323,7 +322,7 @@ GROUP BY a;
|_|_|_AggregateExec: mode=Partial, gby=[a@0 as a, b@1 as b, c@2 as c], aggr=[min(reduce_aggregate_repartition.val)] REDACTED
|_|_|_SeqScan: region=REDACTED, "partition_count":{"count":1, "mem_ranges":1, "files":0, "file_ranges":0} REDACTED
|_|_|_|
| 1_| 1_|_AggregateExec: mode=Final, gby=[a@0 as a], aggr=[__count_state(s.m)] REDACTED
| 1_| 1_|_AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[__count_state(s.m)] REDACTED
|_|_|_CoalescePartitionsExec REDACTED
|_|_|_AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[__count_state(s.m)] REDACTED
|_|_|_ProjectionExec: expr=[a@0 as a, min(reduce_aggregate_repartition.val)@3 as m] REDACTED
@@ -335,7 +334,8 @@ GROUP BY a;
|_|_| Total rows: 2_|
+-+-+-+
-- Zero-key reduction should rewrite SinglePartitioned to Single.
-- Zero-key reduction should collapse the redundant repartition while keeping
-- the outer aggregate in SinglePartitioned mode.
SELECT sum(m)
FROM (
SELECT a, b, c, min(val) AS m
@@ -508,8 +508,9 @@ count(count(reduce_aggregate_repartition_metric) by (job));
| 0_| 0_|_CooperativeExec REDACTED
|_|_|_MergeScanExec: REDACTED
|_|_|_|
| 1_| 0_|_SortExec: expr=[ts@0 ASC NULLS LAST], preserve_partitioning=[false] REDACTED
|_|_|_AggregateExec: mode=Final, gby=[ts@0 as ts], aggr=[count(count(reduce_aggregate_repartition_metric.greptime_value))] REDACTED
| 1_| 0_|_SortPreservingMergeExec: [ts@0 ASC NULLS LAST] REDACTED
|_|_|_SortExec: expr=[ts@0 ASC NULLS LAST], preserve_partitioning=[true] REDACTED
|_|_|_AggregateExec: mode=FinalPartitioned, gby=[ts@0 as ts], aggr=[count(count(reduce_aggregate_repartition_metric.greptime_value))] REDACTED
|_|_|_CoalescePartitionsExec REDACTED
|_|_|_AggregateExec: mode=Partial, gby=[ts@0 as ts], aggr=[count(count(reduce_aggregate_repartition_metric.greptime_value))] REDACTED
|_|_|_ProjectionExec: expr=[ts@0 as ts] REDACTED
@@ -541,8 +542,9 @@ count(count(rate(reduce_aggregate_repartition_metric[5s])) by (job));
| 0_| 0_|_CooperativeExec REDACTED
|_|_|_MergeScanExec: REDACTED
|_|_|_|
| 1_| 0_|_SortExec: expr=[ts@0 ASC NULLS LAST], preserve_partitioning=[false] REDACTED
|_|_|_AggregateExec: mode=Final, gby=[ts@0 as ts], aggr=[count(count(prom_rate(ts_range,greptime_value,ts,Int64(5000))))] REDACTED
| 1_| 0_|_SortPreservingMergeExec: [ts@0 ASC NULLS LAST] REDACTED
|_|_|_SortExec: expr=[ts@0 ASC NULLS LAST], preserve_partitioning=[true] REDACTED
|_|_|_AggregateExec: mode=FinalPartitioned, gby=[ts@0 as ts], aggr=[count(count(prom_rate(ts_range,greptime_value,ts,Int64(5000))))] REDACTED
|_|_|_CoalescePartitionsExec REDACTED
|_|_|_AggregateExec: mode=Partial, gby=[ts@0 as ts], aggr=[count(count(prom_rate(ts_range,greptime_value,ts,Int64(5000))))] REDACTED
|_|_|_ProjectionExec: expr=[ts@1 as ts, count(prom_rate(ts_range,greptime_value,ts,Int64(5000)))@2 as count(prom_rate(ts_range,greptime_value,ts,Int64(5000)))] REDACTED

View File

@@ -85,7 +85,8 @@ FROM (
) s
GROUP BY a;
-- Zero-key reduction should rewrite SinglePartitioned to Single.
-- Zero-key reduction should collapse the redundant repartition while keeping
-- the outer aggregate in SinglePartitioned mode.
SELECT sum(m)
FROM (
SELECT a, b, c, min(val) AS m

View File

@@ -107,8 +107,9 @@ TQL ANALYZE (0, 10, '5s') sum(irate(tsid_metric[1h])) / scalar(count(count(tsid
|_|_|_CooperativeExec REDACTED
|_|_|_MergeScanExec: REDACTED
|_|_|_|
| 1_| 0_|_SortExec: expr=[ts@0 ASC NULLS LAST], preserve_partitioning=[false] REDACTED
|_|_|_AggregateExec: mode=Final, gby=[ts@0 as ts], aggr=[count(count(tsid_metric.val))] REDACTED
| 1_| 0_|_SortPreservingMergeExec: [ts@0 ASC NULLS LAST] REDACTED
|_|_|_SortExec: expr=[ts@0 ASC NULLS LAST], preserve_partitioning=[true] REDACTED
|_|_|_AggregateExec: mode=FinalPartitioned, gby=[ts@0 as ts], aggr=[count(count(tsid_metric.val))] REDACTED
|_|_|_CoalescePartitionsExec REDACTED
|_|_|_AggregateExec: mode=Partial, gby=[ts@0 as ts], aggr=[count(count(tsid_metric.val))] REDACTED
|_|_|_ProjectionExec: expr=[ts@0 as ts] REDACTED
@@ -157,8 +158,9 @@ TQL ANALYZE (0, 10, '5s') sum(irate(tsid_metric[1h])) / scalar(count(sum(tsid_m
|_|_|_CooperativeExec REDACTED
|_|_|_MergeScanExec: REDACTED
|_|_|_|
| 1_| 0_|_SortExec: expr=[ts@0 ASC NULLS LAST], preserve_partitioning=[false] REDACTED
|_|_|_AggregateExec: mode=Final, gby=[ts@0 as ts], aggr=[count(sum(tsid_metric.val))] REDACTED
| 1_| 0_|_SortPreservingMergeExec: [ts@0 ASC NULLS LAST] REDACTED
|_|_|_SortExec: expr=[ts@0 ASC NULLS LAST], preserve_partitioning=[true] REDACTED
|_|_|_AggregateExec: mode=FinalPartitioned, gby=[ts@0 as ts], aggr=[count(sum(tsid_metric.val))] REDACTED
|_|_|_CoalescePartitionsExec REDACTED
|_|_|_AggregateExec: mode=Partial, gby=[ts@0 as ts], aggr=[count(sum(tsid_metric.val))] REDACTED
|_|_|_ProjectionExec: expr=[ts@0 as ts] REDACTED