put it all into the rule

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
Ruihang Xia
2026-03-28 13:40:58 +08:00
parent 0583725b16
commit c86574eb82
2 changed files with 70 additions and 56 deletions

View File

@@ -23,6 +23,9 @@ use datafusion::physical_plan::{ExecutionPlan, ExecutionPlanProperties, InputOrd
use datafusion_common::Result as DfResult;
use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
use datafusion_physical_expr::{Distribution, Partitioning};
use promql::extension_plan::{
InstantManipulateExec, RangeManipulateExec, SeriesDivideExec, SeriesNormalizeExec,
};
/// Replaces a redundant hash repartition before a coarser aggregate with a
/// single fan-in.
@@ -80,12 +83,6 @@ impl ReduceAggregateRepartition {
return Ok(Transformed::no(plan));
}
let Partitioning::Hash(finer_partition_exprs, _) =
repartition_exec.input().output_partitioning()
else {
return Ok(Transformed::no(plan));
};
let Some(required_distribution) =
agg_exec.required_input_distribution().into_iter().next()
else {
@@ -100,15 +97,7 @@ impl ReduceAggregateRepartition {
return Ok(Transformed::no(plan));
}
let coarsening_satisfaction = repartition_exec.partitioning().satisfaction(
&Distribution::HashPartitioned(finer_partition_exprs.clone()),
repartition_exec
.input()
.properties()
.equivalence_properties(),
true,
);
if !coarsening_satisfaction.is_subset() {
if !Self::can_reduce_repartition(repartition_exec) {
return Ok(Transformed::no(plan));
}
@@ -129,6 +118,71 @@ impl ReduceAggregateRepartition {
})
.data()
}
fn can_reduce_repartition(repartition_exec: &RepartitionExec) -> bool {
let has_direct_promql_input =
Self::has_direct_promql_partial_input(repartition_exec.input());
if Self::contains_promql_exec_deep(repartition_exec.input()) {
return has_direct_promql_input;
}
let Partitioning::Hash(finer_partition_exprs, _) =
repartition_exec.input().output_partitioning()
else {
return false;
};
let coarsening_satisfaction = repartition_exec.partitioning().satisfaction(
&Distribution::HashPartitioned(finer_partition_exprs.clone()),
repartition_exec
.input()
.properties()
.equivalence_properties(),
true,
);
coarsening_satisfaction.is_subset()
}
fn has_direct_promql_partial_input(plan: &Arc<dyn ExecutionPlan>) -> bool {
let Some(partial_agg) = plan.as_any().downcast_ref::<AggregateExec>() else {
return false;
};
partial_agg.mode() == &AggregateMode::Partial
&& Self::contains_promql_vector_exec(partial_agg.input())
}
fn contains_promql_vector_exec(plan: &Arc<dyn ExecutionPlan>) -> bool {
if Self::is_promql_vector_exec(plan) {
return true;
}
if plan.as_any().is::<AggregateExec>() {
return false;
}
plan.children()
.into_iter()
.any(Self::contains_promql_vector_exec)
}
fn contains_promql_exec_deep(plan: &Arc<dyn ExecutionPlan>) -> bool {
if Self::is_promql_vector_exec(plan) {
return true;
}
plan.children()
.into_iter()
.any(Self::contains_promql_exec_deep)
}
fn is_promql_vector_exec(plan: &Arc<dyn ExecutionPlan>) -> bool {
let plan = plan.as_any();
plan.is::<SeriesDivideExec>()
|| plan.is::<SeriesNormalizeExec>()
|| plan.is::<RangeManipulateExec>()
|| plan.is::<InstantManipulateExec>()
}
}
#[cfg(test)]

View File

@@ -49,9 +49,7 @@ use datafusion_optimizer::Analyzer;
use datafusion_optimizer::analyzer::function_rewrite::ApplyFunctionRewrites;
use datafusion_optimizer::optimizer::Optimizer;
use partition::manager::PartitionRuleManagerRef;
use promql::extension_plan::{
InstantManipulate, PromExtensionPlanner, RangeManipulate, SeriesDivide, SeriesNormalize,
};
use promql::extension_plan::PromExtensionPlanner;
use table::TableRef;
use table::table::adapter::DfTableProviderAdapter;
@@ -461,50 +459,12 @@ impl QueryPlanner for DfQueryPlanner {
logical_plan: &DfLogicalPlan,
session_state: &SessionState,
) -> DfResult<Arc<dyn ExecutionPlan>> {
let scoped_session_state;
let session_state = if should_disable_repartitioned_aggregations(logical_plan) {
scoped_session_state = {
let mut session_state = session_state.clone();
*session_state.config_mut() = session_state
.config()
.clone()
.with_repartition_aggregations(false);
session_state
};
&scoped_session_state
} else {
session_state
};
self.physical_planner
.create_physical_plan(logical_plan, session_state)
.await
}
}
fn should_disable_repartitioned_aggregations(plan: &DfLogicalPlan) -> bool {
match plan {
DfLogicalPlan::Aggregate(aggregate) => contains_promql_vector_node(&aggregate.input),
_ => plan
.inputs()
.into_iter()
.any(should_disable_repartitioned_aggregations),
}
}
fn contains_promql_vector_node(plan: &DfLogicalPlan) -> bool {
match plan {
DfLogicalPlan::Extension(extension) => {
let node = extension.node.as_any();
node.is::<SeriesDivide>()
|| node.is::<SeriesNormalize>()
|| node.is::<RangeManipulate>()
|| node.is::<InstantManipulate>()
}
_ => plan.inputs().into_iter().any(contains_promql_vector_node),
}
}
/// MySQL-compatible scalar function aliases: (target_name, alias)
const SCALAR_FUNCTION_ALIASES: &[(&str, &str)] = &[
("upper", "ucase"),