generalize

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
Ruihang Xia
2026-04-15 08:44:53 +08:00
parent 024ca1af79
commit c2f8445483
8 changed files with 134 additions and 113 deletions

View File

@@ -23,9 +23,6 @@ use datafusion::physical_plan::{ExecutionPlan, ExecutionPlanProperties, InputOrd
use datafusion_common::Result as DfResult;
use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
use datafusion_physical_expr::{Distribution, Partitioning};
use promql::extension_plan::{
InstantManipulateExec, RangeManipulateExec, SeriesDivideExec, SeriesNormalizeExec,
};
/// Replaces a redundant hash repartition before a coarser aggregate with a
/// single fan-in.
@@ -120,16 +117,6 @@ impl ReduceAggregateRepartition {
}
fn can_reduce_repartition(repartition_exec: &RepartitionExec) -> bool {
let has_direct_promql_input = matches!(
repartition_exec.input().as_any().downcast_ref::<AggregateExec>(),
Some(partial_agg)
if partial_agg.mode() == &AggregateMode::Partial
&& Self::contains_promql_exec(partial_agg.input(), true)
);
if Self::contains_promql_exec(repartition_exec.input(), false) {
return has_direct_promql_input;
}
let Partitioning::Hash(finer_partition_exprs, _) =
repartition_exec.input().output_partitioning()
else {
@@ -146,25 +133,6 @@ impl ReduceAggregateRepartition {
);
coarsening_satisfaction.is_subset()
}
fn contains_promql_exec(plan: &Arc<dyn ExecutionPlan>, stop_at_aggregate: bool) -> bool {
let plan_any = plan.as_any();
if plan_any.is::<SeriesDivideExec>()
|| plan_any.is::<SeriesNormalizeExec>()
|| plan_any.is::<RangeManipulateExec>()
|| plan_any.is::<InstantManipulateExec>()
{
return true;
}
if stop_at_aggregate && plan_any.is::<AggregateExec>() {
return false;
}
plan.children()
.into_iter()
.any(|child| Self::contains_promql_exec(child, stop_at_aggregate))
}
}
#[cfg(test)]
@@ -174,16 +142,18 @@ mod tests {
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use datafusion::datasource::memory::MemorySourceConfig;
use datafusion::datasource::source::DataSourceExec;
use datafusion::logical_expr::{EmptyRelation, LogicalPlan};
use datafusion::physical_optimizer::PhysicalOptimizerRule;
use datafusion::physical_plan::aggregates::{AggregateExec, AggregateMode, PhysicalGroupBy};
use datafusion::physical_plan::projection::ProjectionExec;
use datafusion::physical_plan::repartition::RepartitionExec;
use datafusion::physical_plan::{ExecutionPlan, displayable};
use datafusion_common::Result;
use datafusion_common::{DFSchema, Result};
use datafusion_physical_expr::aggregate::AggregateFunctionExpr;
use datafusion_physical_expr::expressions::col;
use datafusion_physical_expr::{Partitioning, PhysicalExpr};
use pretty_assertions::assert_eq;
use promql::extension_plan::SeriesNormalize;
use super::ReduceAggregateRepartition;
@@ -243,6 +213,13 @@ mod tests {
Ok(displayable(optimized.as_ref()).indent(true).to_string())
}
fn empty_logical_plan() -> LogicalPlan {
LogicalPlan::EmptyRelation(EmptyRelation {
produce_one_row: false,
schema: Arc::new(DFSchema::empty()),
})
}
fn project_with_aliases(
input: Arc<dyn ExecutionPlan>,
aliases: &[(&str, &str)],
@@ -254,6 +231,17 @@ mod tests {
Ok(Arc::new(ProjectionExec::try_new(exprs?, input)?))
}
fn promql_normalize(input: Arc<dyn ExecutionPlan>) -> Arc<dyn ExecutionPlan> {
SeriesNormalize::new(
0,
"a",
false,
vec!["b".to_string(), "c".to_string()],
empty_logical_plan(),
)
.to_execution_plan(input)
}
#[test]
fn rewrites_final_partitioned_subset_repartition() -> Result<()> {
let raw = input_exec();
@@ -450,6 +438,31 @@ mod tests {
Ok(())
}
#[test]
fn rewrites_promql_subset_partitioning_through_projection() -> Result<()> {
let raw = input_exec();
let finer = repartition(raw.clone(), &["a", "b", "c"], &raw.schema())?;
let normalized = promql_normalize(finer);
let projected = project_with_aliases(normalized, &[("a", "ts"), ("b", "x"), ("c", "y")])?;
let final_repartition = repartition(projected.clone(), &["x", "y"], &projected.schema())?;
let final_agg = aggregate(
AggregateMode::SinglePartitioned,
final_repartition,
group_by(&["x", "y"], &projected.schema())?,
projected.schema(),
vec![],
)?;
let optimized = optimize(final_agg)?;
assert!(
optimized.contains("AggregateExec: mode=Single, gby=[x@1 as x, y@2 as y], aggr=[]"),
"{optimized}"
);
assert!(optimized.contains("CoalescePartitionsExec"), "{optimized}");
assert!(optimized.contains("PromSeriesNormalizeExec"), "{optimized}");
Ok(())
}
#[test]
fn keeps_non_subset_repartition() -> Result<()> {
let raw = input_exec();