reimplementation

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
Ruihang Xia
2026-04-22 02:40:56 +08:00
parent 2277ebd9f6
commit 5dfa7aaed8
7 changed files with 211 additions and 921 deletions

View File

@@ -82,11 +82,8 @@ impl PassDistribution {
let required = plan.required_input_distribution();
let mut new_children = Vec::with_capacity(children.len());
for (idx, child) in children.into_iter().enumerate() {
let child_req = match required.get(idx) {
Some(Distribution::UnspecifiedDistribution) => None,
None => current_req.clone(),
Some(req) => Some(req.clone()),
};
let child_req =
Self::child_distribution_requirement(idx, current_req.as_ref(), &required);
let new_child = Self::rewrite_with_distribution(child.clone(), child_req)?;
new_children.push(new_child);
}
@@ -103,4 +100,16 @@ impl PassDistribution {
plan.with_new_children(new_children)
}
}
fn child_distribution_requirement(
child_idx: usize,
current_req: Option<&Distribution>,
required: &[Distribution],
) -> Option<Distribution> {
match required.get(child_idx) {
Some(Distribution::UnspecifiedDistribution) => None,
None => current_req.cloned(),
Some(req) => Some(req.clone()),
}
}
}

View File

@@ -16,38 +16,40 @@ use std::sync::Arc;
use datafusion::config::ConfigOptions;
use datafusion::physical_optimizer::PhysicalOptimizerRule;
use datafusion::physical_plan::ExecutionPlan;
use datafusion::physical_plan::aggregates::{AggregateExec, AggregateMode};
use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
use datafusion::physical_plan::repartition::RepartitionExec;
use datafusion::physical_plan::{ExecutionPlan, ExecutionPlanProperties, InputOrderMode};
use datafusion_common::Result as DfResult;
use datafusion_physical_expr::{Distribution, Partitioning};
use datafusion_physical_expr::expressions::Column;
use datafusion_physical_expr::{Distribution, PhysicalExpr};
/// Replaces a redundant hash repartition before a coarser aggregate with a
/// single fan-in.
use crate::dist_plan::MergeScanExec;
/// This is a [`PhysicalOptimizerRule`] to reduce repartition overhead between
/// partial and final aggregates by pushing a coarser hash distribution
/// requirement through [`AggregateExec`] in partial mode down to [`MergeScanExec`].
///
/// This only applies when the aggregate already receives hash partitioning
/// satisfying its grouping keys and the repartition input is already hash
/// partitioned on a strict superset of those repartition keys.
/// This rule keeps the partition fanout unchanged. It only enables
/// source-aware partition-column coarsening that [`MergeScanExec`] can already
/// satisfy without a full reshuffle.
///
/// This rule is expected to be run before [`EnforceDistribution`].
///
/// [`EnforceDistribution`]: datafusion::physical_optimizer::enforce_distribution::EnforceDistribution
/// [`MergeScanExec`]: crate::dist_plan::MergeScanExec
#[derive(Debug)]
pub struct ReduceAggregateRepartition;
struct OptimizedPlan {
plan: Arc<dyn ExecutionPlan>,
subtree_is_linear: bool,
}
impl PhysicalOptimizerRule for ReduceAggregateRepartition {
fn optimize(
&self,
plan: Arc<dyn ExecutionPlan>,
_config: &ConfigOptions,
config: &ConfigOptions,
) -> DfResult<Arc<dyn ExecutionPlan>> {
Ok(Self::do_optimize_with_context(plan, false)?.plan)
Self::do_optimize(plan, config)
}
fn name(&self) -> &str {
"ReduceAggregateRepartition"
"ReduceAggregateRepartitionRule"
}
fn schema_check(&self) -> bool {
@@ -56,117 +58,110 @@ impl PhysicalOptimizerRule for ReduceAggregateRepartition {
}
impl ReduceAggregateRepartition {
fn do_optimize_with_context(
fn do_optimize(
plan: Arc<dyn ExecutionPlan>,
under_branching_ancestor: bool,
) -> DfResult<OptimizedPlan> {
let children = plan.children();
let subtree_branches_here = children.len() > 1;
let child_under_branching_ancestor = under_branching_ancestor || subtree_branches_here;
let optimized_children = children
.into_iter()
.map(|child| {
Self::do_optimize_with_context(Arc::clone(child), child_under_branching_ancestor)
})
.collect::<DfResult<Vec<_>>>()?;
let subtree_is_linear = !subtree_branches_here
&& optimized_children
.iter()
.all(|child| child.subtree_is_linear);
let new_children = optimized_children
.into_iter()
.map(|child| child.plan)
.collect();
let plan = plan.with_new_children(new_children)?;
let plan = if under_branching_ancestor || !subtree_is_linear {
plan
} else {
Self::try_reduce_repartition(plan)?
};
Ok(OptimizedPlan {
plan,
subtree_is_linear,
})
_config: &ConfigOptions,
) -> DfResult<Arc<dyn ExecutionPlan>> {
Self::rewrite_with_distribution(plan, None)
}
fn aggregate_repartition_candidate(
plan: &Arc<dyn ExecutionPlan>,
) -> Option<(&AggregateExec, &RepartitionExec)> {
let agg_exec = plan.as_any().downcast_ref::<AggregateExec>()?;
let repartition_exec = agg_exec
.input()
.as_any()
.downcast_ref::<RepartitionExec>()?;
Some((agg_exec, repartition_exec))
}
fn can_rewrite_aggregate(agg_exec: &AggregateExec, repartition_exec: &RepartitionExec) -> bool {
if !matches!(
agg_exec.mode(),
AggregateMode::FinalPartitioned | AggregateMode::SinglePartitioned
) || agg_exec.input_order_mode() != &InputOrderMode::Linear
|| agg_exec.group_expr().has_grouping_set()
|| repartition_exec.preserve_order()
fn rewrite_with_distribution(
plan: Arc<dyn ExecutionPlan>,
current_req: Option<Distribution>,
) -> DfResult<Arc<dyn ExecutionPlan>> {
if let Some(merge_scan) = plan.as_any().downcast_ref::<MergeScanExec>()
&& let Some(distribution) = current_req.as_ref()
&& let Some(new_plan) = merge_scan.try_with_new_distribution(distribution.clone())
{
return false;
return Ok(Arc::new(new_plan) as _);
}
let Some(required_distribution) = agg_exec.required_input_distribution().into_iter().next()
else {
return false;
};
let repartition_satisfaction = repartition_exec.partitioning().satisfaction(
&required_distribution,
repartition_exec.properties().equivalence_properties(),
true,
);
repartition_satisfaction.is_satisfied() && Self::can_reduce_repartition(repartition_exec)
}
fn try_reduce_repartition(plan: Arc<dyn ExecutionPlan>) -> DfResult<Arc<dyn ExecutionPlan>> {
let Some((agg_exec, repartition_exec)) = Self::aggregate_repartition_candidate(&plan)
else {
return Ok(plan);
};
if !Self::can_rewrite_aggregate(agg_exec, repartition_exec) {
let children = plan.children();
if children.is_empty() {
return Ok(plan);
}
let new_input = Arc::new(CoalescePartitionsExec::new(
repartition_exec.input().clone(),
));
let new_agg = AggregateExec::try_new(
*agg_exec.mode(),
agg_exec.group_expr().clone(),
agg_exec.aggr_expr().to_vec(),
agg_exec.filter_expr().to_vec(),
new_input,
agg_exec.input_schema(),
)?
.with_limit_options(agg_exec.limit_options());
let required = plan.required_input_distribution();
let mut new_children = Vec::with_capacity(children.len());
for (idx, child) in children.into_iter().enumerate() {
let child_req =
Self::child_distribution_requirement(&plan, idx, current_req.as_ref(), &required);
let new_child = Self::rewrite_with_distribution(child.clone(), child_req)?;
new_children.push(new_child);
}
Ok(Arc::new(new_agg))
let unchanged = plan
.children()
.into_iter()
.zip(new_children.iter())
.all(|(old, new)| Arc::ptr_eq(old, new));
if unchanged {
Ok(plan)
} else {
plan.with_new_children(new_children)
}
}
fn can_reduce_repartition(repartition_exec: &RepartitionExec) -> bool {
let Partitioning::Hash(finer_partition_exprs, _) =
repartition_exec.input().output_partitioning()
else {
return false;
fn child_distribution_requirement(
plan: &Arc<dyn ExecutionPlan>,
child_idx: usize,
current_req: Option<&Distribution>,
required: &[Distribution],
) -> Option<Distribution> {
match required.get(child_idx) {
Some(Distribution::UnspecifiedDistribution) => {
Self::partial_aggregate_child_distribution(plan, child_idx, current_req)
}
None => current_req.cloned(),
Some(req) => Some(req.clone()),
}
}
fn partial_aggregate_child_distribution(
plan: &Arc<dyn ExecutionPlan>,
child_idx: usize,
current_req: Option<&Distribution>,
) -> Option<Distribution> {
if child_idx == 0 {
Self::partial_aggregate_input_distribution(plan, current_req)
} else {
None
}
}
fn partial_aggregate_input_distribution(
plan: &Arc<dyn ExecutionPlan>,
current_req: Option<&Distribution>,
) -> Option<Distribution> {
let agg_exec = plan.as_any().downcast_ref::<AggregateExec>()?;
if agg_exec.mode() != &AggregateMode::Partial || agg_exec.group_expr().has_grouping_set() {
return None;
}
let Distribution::HashPartitioned(required_exprs) = current_req? else {
return None;
};
let coarsening_satisfaction = repartition_exec.partitioning().satisfaction(
&Distribution::HashPartitioned(finer_partition_exprs.clone()),
repartition_exec
.input()
.properties()
.equivalence_properties(),
true,
);
coarsening_satisfaction.is_subset()
let output_group_exprs = agg_exec.output_group_expr();
let input_group_exprs = agg_exec.group_expr().input_exprs();
let mut mapped_exprs = Vec::with_capacity(required_exprs.len());
for required_expr in required_exprs {
let required_col = required_expr.as_any().downcast_ref::<Column>()?;
let group_idx = output_group_exprs
.iter()
.position(|expr| Self::same_column(expr, required_col))?;
mapped_exprs.push(input_group_exprs[group_idx].clone());
}
Some(Distribution::HashPartitioned(mapped_exprs))
}
fn same_column(expr: &Arc<dyn PhysicalExpr>, expected: &Column) -> bool {
expr.as_any()
.downcast_ref::<Column>()
.is_some_and(|column| {
column.index() == expected.index() && column.name() == expected.name()
})
}
}
@@ -177,19 +172,11 @@ mod tests {
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use datafusion::datasource::memory::MemorySourceConfig;
use datafusion::datasource::source::DataSourceExec;
use datafusion::logical_expr::{EmptyRelation, LogicalPlan};
use datafusion::physical_optimizer::PhysicalOptimizerRule;
use datafusion::physical_plan::ExecutionPlan;
use datafusion::physical_plan::aggregates::{AggregateExec, AggregateMode, PhysicalGroupBy};
use datafusion::physical_plan::projection::ProjectionExec;
use datafusion::physical_plan::repartition::RepartitionExec;
use datafusion::physical_plan::union::UnionExec;
use datafusion::physical_plan::{ExecutionPlan, displayable};
use datafusion_common::{DFSchema, Result};
use datafusion_physical_expr::aggregate::AggregateFunctionExpr;
use datafusion_physical_expr::expressions::col;
use datafusion_physical_expr::{Partitioning, PhysicalExpr};
use pretty_assertions::assert_eq;
use promql::extension_plan::SeriesNormalize;
use datafusion_common::Result;
use datafusion_physical_expr::Distribution;
use datafusion_physical_expr::expressions::{Column, col};
use super::ReduceAggregateRepartition;
@@ -204,362 +191,85 @@ mod tests {
}
fn group_by(names: &[&str], schema: &SchemaRef) -> Result<PhysicalGroupBy> {
let groups: Result<Vec<(Arc<dyn PhysicalExpr>, String)>> = names
let groups = names
.iter()
.map(|name| Ok((col(name, schema)?, (*name).to_string())))
.collect();
Ok(PhysicalGroupBy::new_single(groups?))
}
fn repartition(
input: Arc<dyn ExecutionPlan>,
keys: &[&str],
schema: &SchemaRef,
) -> Result<Arc<dyn ExecutionPlan>> {
let exprs = keys
.iter()
.map(|name| col(name, schema))
.collect::<Result<Vec<_>>>()?;
Ok(Arc::new(RepartitionExec::try_new(
input,
Partitioning::Hash(exprs, 8),
)?))
Ok(PhysicalGroupBy::new_single(groups))
}
fn aggregate(
mode: AggregateMode,
input: Arc<dyn ExecutionPlan>,
group_by: PhysicalGroupBy,
input_schema: SchemaRef,
aggr_expr: Vec<Arc<AggregateFunctionExpr>>,
) -> Result<Arc<dyn ExecutionPlan>> {
let filter_expr = vec![None; aggr_expr.len()];
fn partial_aggregate(group_keys: &[&str]) -> Result<Arc<dyn ExecutionPlan>> {
let input = input_exec();
let schema = input.schema();
Ok(Arc::new(AggregateExec::try_new(
mode,
group_by,
aggr_expr,
filter_expr,
AggregateMode::Partial,
group_by(group_keys, &schema)?,
vec![],
vec![],
input,
input_schema,
schema,
)?))
}
fn optimize(plan: Arc<dyn ExecutionPlan>) -> Result<String> {
let optimized = ReduceAggregateRepartition.optimize(plan, &Default::default())?;
Ok(displayable(optimized.as_ref()).indent(true).to_string())
fn hash_columns(distribution: Distribution) -> Vec<(String, usize)> {
let Distribution::HashPartitioned(exprs) = distribution else {
panic!("expected hash distribution");
};
exprs
.into_iter()
.map(|expr| {
let column = expr
.as_any()
.downcast_ref::<datafusion_physical_expr::expressions::Column>()
.unwrap();
(column.name().to_string(), column.index())
})
.collect()
}
fn empty_logical_plan() -> LogicalPlan {
LogicalPlan::EmptyRelation(EmptyRelation {
produce_one_row: false,
schema: Arc::new(DFSchema::empty()),
})
}
#[test]
fn maps_partial_aggregate_hash_requirement_to_input_columns() -> Result<()> {
let aggregate = partial_aggregate(&["b", "a", "c"])?;
let required = Distribution::HashPartitioned(vec![
aggregate
.as_any()
.downcast_ref::<AggregateExec>()
.unwrap()
.output_group_expr()[0]
.clone(),
aggregate
.as_any()
.downcast_ref::<AggregateExec>()
.unwrap()
.output_group_expr()[1]
.clone(),
]);
fn project_with_aliases(
input: Arc<dyn ExecutionPlan>,
aliases: &[(&str, &str)],
) -> Result<Arc<dyn ExecutionPlan>> {
let exprs: Result<Vec<(Arc<dyn PhysicalExpr>, String)>> = aliases
.iter()
.map(|(from, to)| Ok((col(from, &input.schema())?, (*to).to_string())))
.collect();
Ok(Arc::new(ProjectionExec::try_new(exprs?, input)?))
}
fn promql_normalize(input: Arc<dyn ExecutionPlan>) -> Arc<dyn ExecutionPlan> {
SeriesNormalize::new(
0,
"a",
false,
vec!["b".to_string(), "c".to_string()],
empty_logical_plan(),
let mapped = ReduceAggregateRepartition::partial_aggregate_input_distribution(
&aggregate,
Some(&required),
)
.to_execution_plan(input)
}
#[test]
fn rewrites_final_partitioned_subset_repartition() -> Result<()> {
let raw = input_exec();
let finer = repartition(raw.clone(), &["a", "b"], &raw.schema())?;
let partial = aggregate(
AggregateMode::Partial,
finer,
group_by(&["a", "b"], &raw.schema())?,
raw.schema(),
vec![],
)?;
let final_repartition = repartition(partial.clone(), &["a"], &partial.schema())?;
let final_agg = aggregate(
AggregateMode::FinalPartitioned,
final_repartition,
group_by(&["a"], &partial.schema())?,
raw.schema(),
vec![],
)?;
.unwrap();
assert_eq!(
optimize(final_agg)?.trim(),
r#"AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[]
CoalescePartitionsExec
AggregateExec: mode=Partial, gby=[a@0 as a, b@1 as b], aggr=[]
RepartitionExec: partitioning=Hash([a@0, b@1], 8), input_partitions=1
DataSourceExec: partitions=1, partition_sizes=[0]"#
hash_columns(mapped),
vec![("b".to_string(), 1), ("a".to_string(), 0)]
);
Ok(())
}
#[test]
fn rewrites_single_partitioned_subset_repartition() -> Result<()> {
let raw = input_exec();
let finer = repartition(raw.clone(), &["a", "b"], &raw.schema())?;
let final_repartition = repartition(finer.clone(), &["a"], &finer.schema())?;
let final_agg = aggregate(
AggregateMode::SinglePartitioned,
final_repartition,
group_by(&["a"], &finer.schema())?,
raw.schema(),
vec![],
)?;
fn rejects_non_grouping_hash_requirement_for_partial_aggregate() -> Result<()> {
let aggregate = partial_aggregate(&["b", "a"])?;
let unrelated = Distribution::HashPartitioned(vec![Arc::new(Column::new("c", 2)) as _]);
assert_eq!(
optimize(final_agg)?.trim(),
r#"AggregateExec: mode=SinglePartitioned, gby=[a@0 as a], aggr=[]
CoalescePartitionsExec
RepartitionExec: partitioning=Hash([a@0, b@1], 8), input_partitions=1
DataSourceExec: partitions=1, partition_sizes=[0]"#
);
Ok(())
}
#[test]
fn keeps_equal_partitioning_keys() -> Result<()> {
let raw = input_exec();
let finer = repartition(raw.clone(), &["a", "b"], &raw.schema())?;
let partial = aggregate(
AggregateMode::Partial,
finer,
group_by(&["a", "b"], &raw.schema())?,
raw.schema(),
vec![],
)?;
let final_repartition = repartition(partial.clone(), &["a", "b"], &partial.schema())?;
let final_agg = aggregate(
AggregateMode::FinalPartitioned,
final_repartition,
group_by(&["a", "b"], &partial.schema())?,
raw.schema(),
vec![],
)?;
assert_eq!(
optimize(final_agg)?.trim(),
r#"AggregateExec: mode=FinalPartitioned, gby=[a@0 as a, b@1 as b], aggr=[]
RepartitionExec: partitioning=Hash([a@0, b@1], 8), input_partitions=8
AggregateExec: mode=Partial, gby=[a@0 as a, b@1 as b], aggr=[]
RepartitionExec: partitioning=Hash([a@0, b@1], 8), input_partitions=1
DataSourceExec: partitions=1, partition_sizes=[0]"#
);
Ok(())
}
#[test]
fn rewrites_when_finer_key_order_differs() -> Result<()> {
let raw = input_exec();
let finer = repartition(raw.clone(), &["c", "a", "b"], &raw.schema())?;
let partial = aggregate(
AggregateMode::Partial,
finer,
group_by(&["c", "a", "b"], &raw.schema())?,
raw.schema(),
vec![],
)?;
let final_repartition = repartition(partial.clone(), &["b", "c"], &partial.schema())?;
let final_agg = aggregate(
AggregateMode::FinalPartitioned,
final_repartition,
group_by(&["b", "c"], &partial.schema())?,
raw.schema(),
vec![],
)?;
assert_eq!(
optimize(final_agg)?.trim(),
r#"AggregateExec: mode=FinalPartitioned, gby=[b@2 as b, c@0 as c], aggr=[]
CoalescePartitionsExec
AggregateExec: mode=Partial, gby=[c@2 as c, a@0 as a, b@1 as b], aggr=[]
RepartitionExec: partitioning=Hash([c@2, a@0, b@1], 8), input_partitions=1
DataSourceExec: partitions=1, partition_sizes=[0]"#
);
Ok(())
}
#[test]
fn rewrites_when_repartition_satisfies_group_by_with_subset_keys() -> Result<()> {
let raw = input_exec();
let finer = repartition(raw.clone(), &["a", "b", "c"], &raw.schema())?;
let final_repartition = repartition(finer.clone(), &["a"], &finer.schema())?;
let final_agg = aggregate(
AggregateMode::FinalPartitioned,
final_repartition,
group_by(&["a", "b"], &finer.schema())?,
raw.schema(),
vec![],
)?;
assert_eq!(
optimize(final_agg)?.trim(),
r#"AggregateExec: mode=FinalPartitioned, gby=[a@0 as a, b@1 as b], aggr=[]
CoalescePartitionsExec
RepartitionExec: partitioning=Hash([a@0, b@1, c@2], 8), input_partitions=1
DataSourceExec: partitions=1, partition_sizes=[0]"#
);
Ok(())
}
#[test]
fn keeps_non_hash_repartition_child() -> Result<()> {
let raw = input_exec();
let finer = repartition(raw.clone(), &["a", "b"], &raw.schema())?;
let partial = aggregate(
AggregateMode::Partial,
finer,
group_by(&["a", "b"], &raw.schema())?,
raw.schema(),
vec![],
)?;
let final_repartition = Arc::new(RepartitionExec::try_new(
partial.clone(),
Partitioning::RoundRobinBatch(8),
)?);
let final_agg = aggregate(
AggregateMode::FinalPartitioned,
final_repartition,
group_by(&["a"], &partial.schema())?,
raw.schema(),
vec![],
)?;
assert_eq!(
optimize(final_agg)?.trim(),
r#"AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[]
RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=8
AggregateExec: mode=Partial, gby=[a@0 as a, b@1 as b], aggr=[]
RepartitionExec: partitioning=Hash([a@0, b@1], 8), input_partitions=1
DataSourceExec: partitions=1, partition_sizes=[0]"#
);
Ok(())
}
#[test]
fn rewrites_subset_partitioning_through_projection() -> Result<()> {
let raw = input_exec();
let finer = repartition(raw.clone(), &["a", "b", "c"], &raw.schema())?;
let projected = project_with_aliases(finer, &[("a", "x"), ("b", "y"), ("c", "z")])?;
let final_repartition = repartition(projected.clone(), &["x", "y"], &projected.schema())?;
let final_agg = aggregate(
AggregateMode::SinglePartitioned,
final_repartition,
group_by(&["x", "y"], &projected.schema())?,
projected.schema(),
vec![],
)?;
let optimized = optimize(final_agg)?;
assert!(
optimized.contains(
"AggregateExec: mode=SinglePartitioned, gby=[x@0 as x, y@1 as y], aggr=[]"
ReduceAggregateRepartition::partial_aggregate_input_distribution(
&aggregate,
Some(&unrelated),
)
.is_none()
);
assert!(optimized.contains("CoalescePartitionsExec"));
assert!(optimized.contains("ProjectionExec: expr=[a@0 as x, b@1 as y, c@2 as z]"));
Ok(())
}
#[test]
fn rewrites_promql_subset_partitioning_through_projection() -> Result<()> {
let raw = input_exec();
let finer = repartition(raw.clone(), &["a", "b", "c"], &raw.schema())?;
let normalized = promql_normalize(finer);
let projected = project_with_aliases(normalized, &[("a", "ts"), ("b", "x"), ("c", "y")])?;
let final_repartition = repartition(projected.clone(), &["x", "y"], &projected.schema())?;
let final_agg = aggregate(
AggregateMode::SinglePartitioned,
final_repartition,
group_by(&["x", "y"], &projected.schema())?,
projected.schema(),
vec![],
)?;
let optimized = optimize(final_agg)?;
assert!(
optimized.contains(
"AggregateExec: mode=SinglePartitioned, gby=[x@1 as x, y@2 as y], aggr=[]"
),
"{optimized}"
);
assert!(optimized.contains("CoalescePartitionsExec"), "{optimized}");
assert!(optimized.contains("PromSeriesNormalizeExec"), "{optimized}");
Ok(())
}
#[test]
fn keeps_subset_partitioning_with_branching_descendant() -> Result<()> {
let left = input_exec();
let right = input_exec();
let union = UnionExec::try_new(vec![left, right])?;
let finer = repartition(union.clone(), &["a", "b", "c"], &union.schema())?;
let partial = aggregate(
AggregateMode::Partial,
finer,
group_by(&["a", "b", "c"], &union.schema())?,
union.schema(),
vec![],
)?;
let final_repartition = repartition(partial.clone(), &["a", "b"], &partial.schema())?;
let final_agg = aggregate(
AggregateMode::FinalPartitioned,
final_repartition,
group_by(&["a", "b"], &partial.schema())?,
union.schema(),
vec![],
)?;
let optimized = optimize(final_agg)?;
assert!(
optimized.contains("RepartitionExec: partitioning=Hash([a@0, b@1], 8)"),
"{optimized}"
);
assert!(optimized.contains("UnionExec"), "{optimized}");
assert!(!optimized.contains("CoalescePartitionsExec"), "{optimized}");
Ok(())
}
#[test]
fn keeps_non_subset_repartition() -> Result<()> {
let raw = input_exec();
let coarser = repartition(raw.clone(), &["a"], &raw.schema())?;
let final_repartition = repartition(coarser.clone(), &["a", "b"], &coarser.schema())?;
let final_agg = aggregate(
AggregateMode::FinalPartitioned,
final_repartition,
group_by(&["a", "b"], &coarser.schema())?,
raw.schema(),
vec![],
)?;
let optimized = optimize(final_agg)?;
assert!(
optimized.contains(
"AggregateExec: mode=FinalPartitioned, gby=[a@0 as a, b@1 as b], aggr=[]"
),
"{optimized}"
);
assert!(
optimized.contains("RepartitionExec: partitioning=Hash([a@0, b@1], 8)"),
"{optimized}"
);
assert!(!optimized.contains("CoalescePartitionsExec"), "{optimized}");
Ok(())
}
}

View File

@@ -186,16 +186,16 @@ impl QueryEngineState {
physical_optimizer
.rules
.insert(6, Arc::new(PassDistribution));
// Reduce aggregate repartition overhead by pushing coarser hash requirements
// through partial aggregates before EnforceDistribution runs.
physical_optimizer
.rules
.insert(7, Arc::new(ReduceAggregateRepartition));
// Enforce sorting AFTER custom rules that modify the plan structure
physical_optimizer.rules.insert(
7,
8,
Arc::new(datafusion::physical_optimizer::enforce_sorting::EnforceSorting {}),
);
Self::insert_physical_optimizer_rule_after(
&mut physical_optimizer.rules,
datafusion::physical_optimizer::update_aggr_exprs::OptimizeAggregateOrder::new().name(),
Arc::new(ReduceAggregateRepartition),
);
// Add rule for windowed sort
physical_optimizer
.rules
@@ -257,19 +257,6 @@ impl QueryEngineState {
rules.retain(|rule| rule.name() != name);
}
fn insert_physical_optimizer_rule_after(
rules: &mut Vec<Arc<dyn PhysicalOptimizerRule + Send + Sync>>,
name: &str,
rule: Arc<dyn PhysicalOptimizerRule + Send + Sync>,
) {
let index = rules
.iter()
.position(|candidate| candidate.name() == name)
.map(|index| index + 1)
.unwrap_or(rules.len());
rules.insert(index, rule);
}
/// Optimize the logical plan by the extension analyzer rules.
pub fn optimize_by_extension_rules(
&self,

View File

@@ -178,12 +178,12 @@ TQL EXPLAIN VERBOSE (0, 10, '5s') test;
| physical_plan after FilterPushdown_| SAME TEXT AS ABOVE_|
| physical_plan after parallelize_scan_| SAME TEXT AS ABOVE_|
| physical_plan after PassDistributionRule_| SAME TEXT AS ABOVE_|
| physical_plan after ReduceAggregateRepartitionRule_| SAME TEXT AS ABOVE_|
| physical_plan after EnforceSorting_| SAME TEXT AS ABOVE_|
| physical_plan after EnforceDistribution_| SAME TEXT AS ABOVE_|
| physical_plan after CombinePartialFinalAggregate_| SAME TEXT AS ABOVE_|
| physical_plan after EnforceSorting_| SAME TEXT AS ABOVE_|
| physical_plan after OptimizeAggregateOrder_| SAME TEXT AS ABOVE_|
| physical_plan after ReduceAggregateRepartition_| SAME TEXT AS ABOVE_|
| physical_plan after ProjectionPushdown_| SAME TEXT AS ABOVE_|
| physical_plan after OutputRequirements_| MergeScanExec: REDACTED
|_|_|
@@ -325,12 +325,12 @@ TQL EXPLAIN VERBOSE (0, 10, '5s') test AS series;
| physical_plan after FilterPushdown_| SAME TEXT AS ABOVE_|
| physical_plan after parallelize_scan_| SAME TEXT AS ABOVE_|
| physical_plan after PassDistributionRule_| SAME TEXT AS ABOVE_|
| physical_plan after ReduceAggregateRepartitionRule_| SAME TEXT AS ABOVE_|
| physical_plan after EnforceSorting_| SAME TEXT AS ABOVE_|
| physical_plan after EnforceDistribution_| SAME TEXT AS ABOVE_|
| physical_plan after CombinePartialFinalAggregate_| SAME TEXT AS ABOVE_|
| physical_plan after EnforceSorting_| SAME TEXT AS ABOVE_|
| physical_plan after OptimizeAggregateOrder_| SAME TEXT AS ABOVE_|
| physical_plan after ReduceAggregateRepartition_| SAME TEXT AS ABOVE_|
| physical_plan after ProjectionPushdown_| SAME TEXT AS ABOVE_|
| physical_plan after OutputRequirements_| MergeScanExec: REDACTED
|_|_|

View File

@@ -22,8 +22,9 @@ INSERT INTO reduce_aggregate_repartition VALUES
Affected Rows: 6
-- Keep the outer aggregate on `sum` so this case isolates
-- ReduceAggregateRepartition rather than the min/max combiner rule.
-- The source is already partitioned by (a, b, c), so the planner can satisfy
-- the outer aggregate's coarser (b, a) requirement at the MergeScan without a
-- repartition between the outer partial/final aggregate pair.
SELECT a, b, sum(m)
FROM (
SELECT a, b, c, min(val) AS m
@@ -42,190 +43,6 @@ ORDER BY a, b;
| n | y | 3 |
+---+---+----------+
-- SQLNESS REPLACE (-+) -
-- SQLNESS REPLACE (\s\s+) _
-- SQLNESS REPLACE (Hash.*) REDACTED
-- SQLNESS REPLACE (peers.*) REDACTED
EXPLAIN VERBOSE
SELECT a, b, sum(m)
FROM (
SELECT a, b, c, min(val) AS m
FROM reduce_aggregate_repartition
GROUP BY a, b, c
) s
GROUP BY b, a;
+-+-+
| plan_type_| plan_|
+-+-+
| initial_logical_plan_| Projection: s.a, s.b, sum(s.m)_|
|_|_Aggregate: groupBy=[[s.b, s.a]], aggr=[[sum(s.m)]]_|
|_|_SubqueryAlias: s_|
|_|_Projection: reduce_aggregate_repartition.a, reduce_aggregate_repartition.b, reduce_aggregate_repartition.c, min(reduce_aggregate_repartition.val) AS m_|
|_|_Aggregate: groupBy=[[reduce_aggregate_repartition.a, reduce_aggregate_repartition.b, reduce_aggregate_repartition.c]], aggr=[[min(reduce_aggregate_repartition.val)]] |
|_|_TableScan: reduce_aggregate_repartition_|
| logical_plan after apply_function_rewrites_| SAME TEXT AS ABOVE_|
| logical_plan after count_wildcard_to_time_index_rule_| SAME TEXT AS ABOVE_|
| logical_plan after StringNormalizationRule_| SAME TEXT AS ABOVE_|
| logical_plan after TranscribeAtatRule_| SAME TEXT AS ABOVE_|
| logical_plan after resolve_grouping_function_| SAME TEXT AS ABOVE_|
| logical_plan after type_coercion_| SAME TEXT AS ABOVE_|
| logical_plan after DistPlannerAnalyzer_| Projection: s.a, s.b, sum(s.m)_|
|_|_Projection: s.b, s.a, sum(s.m)_|
|_|_Aggregate: groupBy=[[s.b, s.a]], aggr=[[__sum_merge(__sum_state(s.m)) AS sum(s.m)]]_|
|_|_MergeScan [is_placeholder=false, remote_input=[_|
|_| Aggregate: groupBy=[[s.b, s.a]], aggr=[[__sum_state(s.m)]]_|
|_|_SubqueryAlias: s_|
|_|_Projection: reduce_aggregate_repartition.a, reduce_aggregate_repartition.b, reduce_aggregate_repartition.c, min(reduce_aggregate_repartition.val) AS m_|
|_|_Aggregate: groupBy=[[reduce_aggregate_repartition.a, reduce_aggregate_repartition.b, reduce_aggregate_repartition.c]], aggr=[[min(reduce_aggregate_repartition.val)]]_|
|_|_TableScan: reduce_aggregate_repartition_|
|_| ]]_|
| logical_plan after FixStateUdafOrderingAnalyzer_| SAME TEXT AS ABOVE_|
| analyzed_logical_plan_| SAME TEXT AS ABOVE_|
| logical_plan after rewrite_set_comparison_| SAME TEXT AS ABOVE_|
| logical_plan after optimize_unions_| SAME TEXT AS ABOVE_|
| logical_plan after simplify_expressions_| SAME TEXT AS ABOVE_|
| logical_plan after replace_distinct_aggregate_| SAME TEXT AS ABOVE_|
| logical_plan after eliminate_join_| SAME TEXT AS ABOVE_|
| logical_plan after decorrelate_predicate_subquery_| SAME TEXT AS ABOVE_|
| logical_plan after scalar_subquery_to_join_| SAME TEXT AS ABOVE_|
| logical_plan after decorrelate_lateral_join_| SAME TEXT AS ABOVE_|
| logical_plan after extract_equijoin_predicate_| SAME TEXT AS ABOVE_|
| logical_plan after eliminate_duplicated_expr_| SAME TEXT AS ABOVE_|
| logical_plan after eliminate_filter_| SAME TEXT AS ABOVE_|
| logical_plan after eliminate_cross_join_| SAME TEXT AS ABOVE_|
| logical_plan after eliminate_limit_| SAME TEXT AS ABOVE_|
| logical_plan after propagate_empty_relation_| SAME TEXT AS ABOVE_|
| logical_plan after filter_null_join_keys_| SAME TEXT AS ABOVE_|
| logical_plan after eliminate_outer_join_| SAME TEXT AS ABOVE_|
| logical_plan after push_down_limit_| SAME TEXT AS ABOVE_|
| logical_plan after push_down_filter_| SAME TEXT AS ABOVE_|
| logical_plan after single_distinct_aggregation_to_group_by | SAME TEXT AS ABOVE_|
| logical_plan after eliminate_group_by_constant_| SAME TEXT AS ABOVE_|
| logical_plan after common_sub_expression_eliminate_| SAME TEXT AS ABOVE_|
| logical_plan after extract_leaf_expressions_| SAME TEXT AS ABOVE_|
| logical_plan after push_down_leaf_projections_| SAME TEXT AS ABOVE_|
| logical_plan after optimize_projections_| Projection: s.a, s.b, sum(s.m)_|
|_|_Aggregate: groupBy=[[s.b, s.a]], aggr=[[__sum_merge(__sum_state(s.m)) AS sum(s.m)]]_|
|_|_MergeScan [is_placeholder=false, remote_input=[_|
|_| Aggregate: groupBy=[[s.b, s.a]], aggr=[[__sum_state(s.m)]]_|
|_|_SubqueryAlias: s_|
|_|_Projection: reduce_aggregate_repartition.a, reduce_aggregate_repartition.b, reduce_aggregate_repartition.c, min(reduce_aggregate_repartition.val) AS m_|
|_|_Aggregate: groupBy=[[reduce_aggregate_repartition.a, reduce_aggregate_repartition.b, reduce_aggregate_repartition.c]], aggr=[[min(reduce_aggregate_repartition.val)]]_|
|_|_TableScan: reduce_aggregate_repartition_|
|_| ]]_|
| logical_plan after ScanHintRule_| SAME TEXT AS ABOVE_|
| logical_plan after rewrite_set_comparison_| SAME TEXT AS ABOVE_|
| logical_plan after optimize_unions_| SAME TEXT AS ABOVE_|
| logical_plan after simplify_expressions_| SAME TEXT AS ABOVE_|
| logical_plan after replace_distinct_aggregate_| SAME TEXT AS ABOVE_|
| logical_plan after eliminate_join_| SAME TEXT AS ABOVE_|
| logical_plan after decorrelate_predicate_subquery_| SAME TEXT AS ABOVE_|
| logical_plan after scalar_subquery_to_join_| SAME TEXT AS ABOVE_|
| logical_plan after decorrelate_lateral_join_| SAME TEXT AS ABOVE_|
| logical_plan after extract_equijoin_predicate_| SAME TEXT AS ABOVE_|
| logical_plan after eliminate_duplicated_expr_| SAME TEXT AS ABOVE_|
| logical_plan after eliminate_filter_| SAME TEXT AS ABOVE_|
| logical_plan after eliminate_cross_join_| SAME TEXT AS ABOVE_|
| logical_plan after eliminate_limit_| SAME TEXT AS ABOVE_|
| logical_plan after propagate_empty_relation_| SAME TEXT AS ABOVE_|
| logical_plan after filter_null_join_keys_| SAME TEXT AS ABOVE_|
| logical_plan after eliminate_outer_join_| SAME TEXT AS ABOVE_|
| logical_plan after push_down_limit_| SAME TEXT AS ABOVE_|
| logical_plan after push_down_filter_| SAME TEXT AS ABOVE_|
| logical_plan after single_distinct_aggregation_to_group_by | SAME TEXT AS ABOVE_|
| logical_plan after eliminate_group_by_constant_| SAME TEXT AS ABOVE_|
| logical_plan after common_sub_expression_eliminate_| SAME TEXT AS ABOVE_|
| logical_plan after extract_leaf_expressions_| SAME TEXT AS ABOVE_|
| logical_plan after push_down_leaf_projections_| SAME TEXT AS ABOVE_|
| logical_plan after optimize_projections_| SAME TEXT AS ABOVE_|
| logical_plan after ScanHintRule_| SAME TEXT AS ABOVE_|
| logical_plan_| Projection: s.a, s.b, sum(s.m)_|
|_|_Aggregate: groupBy=[[s.b, s.a]], aggr=[[__sum_merge(__sum_state(s.m)) AS sum(s.m)]]_|
|_|_MergeScan [is_placeholder=false, remote_input=[_|
|_| Aggregate: groupBy=[[s.b, s.a]], aggr=[[__sum_state(s.m)]]_|
|_|_SubqueryAlias: s_|
|_|_Projection: reduce_aggregate_repartition.a, reduce_aggregate_repartition.b, reduce_aggregate_repartition.c, min(reduce_aggregate_repartition.val) AS m_|
|_|_Aggregate: groupBy=[[reduce_aggregate_repartition.a, reduce_aggregate_repartition.b, reduce_aggregate_repartition.c]], aggr=[[min(reduce_aggregate_repartition.val)]]_|
|_|_TableScan: reduce_aggregate_repartition_|
|_| ]]_|
| initial_physical_plan_| ProjectionExec: expr=[a@1 as a, b@0 as b, sum(s.m)@2 as sum(s.m)]_|
|_|_AggregateExec: mode=FinalPartitioned, gby=[b@0 as b, a@1 as a], aggr=[sum(s.m)]_|
|_|_AggregateExec: mode=Partial, gby=[b@0 as b, a@1 as a], aggr=[sum(s.m)]_|
|_|_MergeScanExec: REDACTED
|_|_|
| initial_physical_plan_with_stats_| ProjectionExec: expr=[a@1 as a, b@0 as b, sum(s.m)@2 as sum(s.m)], statistics=[Rows=Absent, Bytes=Absent, [(Col[0]:),(Col[1]:),(Col[2]:)]]_|
|_|_AggregateExec: mode=FinalPartitioned, gby=[b@0 as b, a@1 as a], aggr=[sum(s.m)], statistics=[Rows=Absent, Bytes=Absent, [(Col[0]:),(Col[1]:),(Col[2]:)]]_|
|_|_AggregateExec: mode=Partial, gby=[b@0 as b, a@1 as a], aggr=[sum(s.m)], statistics=[Rows=Absent, Bytes=Absent, [(Col[0]:),(Col[1]:),(Col[2]:)]]_|
|_|_MergeScanExec: REDACTED
|_|_|
| initial_physical_plan_with_schema_| ProjectionExec: expr=[a@1 as a, b@0 as b, sum(s.m)@2 as sum(s.m)], schema=[a:Utf8;N, b:Utf8;N, sum(s.m):Int64;N]_|
|_|_AggregateExec: mode=FinalPartitioned, gby=[b@0 as b, a@1 as a], aggr=[sum(s.m)], schema=[b:Utf8;N, a:Utf8;N, sum(s.m):Int64;N]_|
|_|_AggregateExec: mode=Partial, gby=[b@0 as b, a@1 as a], aggr=[sum(s.m)], schema=[b:Utf8;N, a:Utf8;N, sum(s.m)[sum]:Int64;N]_|
|_|_MergeScanExec: REDACTED
|_|_|
| physical_plan after OutputRequirements_| OutputRequirementExec: order_by=[], dist_by=Unspecified_|
|_|_ProjectionExec: expr=[a@1 as a, b@0 as b, sum(s.m)@2 as sum(s.m)]_|
|_|_AggregateExec: mode=FinalPartitioned, gby=[b@0 as b, a@1 as a], aggr=[sum(s.m)]_|
|_|_AggregateExec: mode=Partial, gby=[b@0 as b, a@1 as a], aggr=[sum(s.m)]_|
|_|_MergeScanExec: REDACTED
|_|_|
| physical_plan after aggregate_statistics_| SAME TEXT AS ABOVE_|
| physical_plan after join_selection_| SAME TEXT AS ABOVE_|
| physical_plan after LimitedDistinctAggregation_| SAME TEXT AS ABOVE_|
| physical_plan after FilterPushdown_| SAME TEXT AS ABOVE_|
| physical_plan after parallelize_scan_| SAME TEXT AS ABOVE_|
| physical_plan after PassDistributionRule_| SAME TEXT AS ABOVE_|
| physical_plan after EnforceSorting_| SAME TEXT AS ABOVE_|
| physical_plan after EnforceDistribution_| OutputRequirementExec: order_by=[], dist_by=Unspecified_|
|_|_ProjectionExec: expr=[a@1 as a, b@0 as b, sum(s.m)@2 as sum(s.m)]_|
|_|_AggregateExec: mode=FinalPartitioned, gby=[b@0 as b, a@1 as a], aggr=[sum(s.m)]_|
|_|_RepartitionExec: partitioning=REDACTED
|_|_AggregateExec: mode=Partial, gby=[b@0 as b, a@1 as a], aggr=[sum(s.m)]_|
|_|_MergeScanExec: REDACTED
|_|_|
| physical_plan after CombinePartialFinalAggregate_| SAME TEXT AS ABOVE_|
| physical_plan after EnforceSorting_| SAME TEXT AS ABOVE_|
| physical_plan after OptimizeAggregateOrder_| SAME TEXT AS ABOVE_|
| physical_plan after ReduceAggregateRepartition_| SAME TEXT AS ABOVE_|
| physical_plan after ProjectionPushdown_| SAME TEXT AS ABOVE_|
| physical_plan after OutputRequirements_| ProjectionExec: expr=[a@1 as a, b@0 as b, sum(s.m)@2 as sum(s.m)]_|
|_|_AggregateExec: mode=FinalPartitioned, gby=[b@0 as b, a@1 as a], aggr=[sum(s.m)]_|
|_|_RepartitionExec: partitioning=REDACTED
|_|_AggregateExec: mode=Partial, gby=[b@0 as b, a@1 as a], aggr=[sum(s.m)]_|
|_|_MergeScanExec: REDACTED
|_|_|
| physical_plan after LimitAggregation_| SAME TEXT AS ABOVE_|
| physical_plan after LimitPushPastWindows_| SAME TEXT AS ABOVE_|
| physical_plan after LimitPushdown_| SAME TEXT AS ABOVE_|
| physical_plan after ProjectionPushdown_| SAME TEXT AS ABOVE_|
| physical_plan after PushdownSort_| SAME TEXT AS ABOVE_|
| physical_plan after EnsureCooperative_| SAME TEXT AS ABOVE_|
| physical_plan after FilterPushdown(Post)_| SAME TEXT AS ABOVE_|
| physical_plan after WindowedSortRule_| SAME TEXT AS ABOVE_|
| physical_plan after MatchesConstantTerm_| SAME TEXT AS ABOVE_|
| physical_plan after RemoveDuplicateRule_| SAME TEXT AS ABOVE_|
| physical_plan after SanityCheckPlan_| SAME TEXT AS ABOVE_|
| physical_plan_| ProjectionExec: expr=[a@1 as a, b@0 as b, sum(s.m)@2 as sum(s.m)]_|
|_|_AggregateExec: mode=FinalPartitioned, gby=[b@0 as b, a@1 as a], aggr=[sum(s.m)]_|
|_|_RepartitionExec: partitioning=REDACTED
|_|_AggregateExec: mode=Partial, gby=[b@0 as b, a@1 as a], aggr=[sum(s.m)]_|
|_|_MergeScanExec: REDACTED
|_|_|
| physical_plan_with_stats_| ProjectionExec: expr=[a@1 as a, b@0 as b, sum(s.m)@2 as sum(s.m)], statistics=[Rows=Absent, Bytes=Absent, [(Col[0]:),(Col[1]:),(Col[2]:)]]_|
|_|_AggregateExec: mode=FinalPartitioned, gby=[b@0 as b, a@1 as a], aggr=[sum(s.m)], statistics=[Rows=Absent, Bytes=Absent, [(Col[0]:),(Col[1]:),(Col[2]:)]]_|
|_|_RepartitionExec: partitioning=REDACTED
|_|_AggregateExec: mode=Partial, gby=[b@0 as b, a@1 as a], aggr=[sum(s.m)], statistics=[Rows=Absent, Bytes=Absent, [(Col[0]:),(Col[1]:),(Col[2]:)]]_|
|_|_MergeScanExec: REDACTED
|_|_|
| physical_plan_with_schema_| ProjectionExec: expr=[a@1 as a, b@0 as b, sum(s.m)@2 as sum(s.m)], schema=[a:Utf8;N, b:Utf8;N, sum(s.m):Int64;N]_|
|_|_AggregateExec: mode=FinalPartitioned, gby=[b@0 as b, a@1 as a], aggr=[sum(s.m)], schema=[b:Utf8;N, a:Utf8;N, sum(s.m):Int64;N]_|
|_|_RepartitionExec: partitioning=REDACTED
|_|_AggregateExec: mode=Partial, gby=[b@0 as b, a@1 as a], aggr=[sum(s.m)], schema=[b:Utf8;N, a:Utf8;N, sum(s.m)[sum]:Int64;N]_|
|_|_MergeScanExec: REDACTED
|_|_|
+-+-+
-- SQLNESS REPLACE (-+) -
-- SQLNESS REPLACE (\s\s+) _
-- SQLNESS REPLACE (Hash.*) REDACTED
@@ -246,13 +63,12 @@ GROUP BY b, a;
| stage | node | plan_|
+-+-+-+
| 0_| 0_|_ProjectionExec: expr=[a@1 as a, b@0 as b, sum(s.m)@2 as sum(s.m)] REDACTED
|_|_|_AggregateExec: mode=FinalPartitioned, gby=[b@0 as b, a@1 as a], aggr=[sum(s.m)] REDACTED
|_|_|_RepartitionExec: partitioning=REDACTED
|_|_|_AggregateExec: mode=Partial, gby=[b@0 as b, a@1 as a], aggr=[sum(s.m)] REDACTED
|_|_|_AggregateExec: mode=SinglePartitioned, gby=[b@0 as b, a@1 as a], aggr=[sum(s.m)] REDACTED
|_|_|_CooperativeExec REDACTED
|_|_|_MergeScanExec: REDACTED
|_|_|_|
| 1_| 0_|_AggregateExec: mode=FinalPartitioned, gby=[b@0 as b, a@1 as a], aggr=[__sum_state(s.m)] REDACTED
|_|_|_CoalescePartitionsExec REDACTED
|_|_|_RepartitionExec: partitioning=REDACTED
|_|_|_AggregateExec: mode=Partial, gby=[b@1 as b, a@0 as a], aggr=[__sum_state(s.m)] REDACTED
|_|_|_ProjectionExec: expr=[a@0 as a, b@1 as b, min(reduce_aggregate_repartition.val)@3 as m] REDACTED
|_|_|_AggregateExec: mode=FinalPartitioned, gby=[a@0 as a, b@1 as b, c@2 as c], aggr=[min(reduce_aggregate_repartition.val)] REDACTED
@@ -261,7 +77,7 @@ GROUP BY b, a;
|_|_|_SeqScan: region=REDACTED, "partition_count":{"count":1, "mem_ranges":1, "files":0, "file_ranges":0} REDACTED
|_|_|_|
| 1_| 1_|_AggregateExec: mode=FinalPartitioned, gby=[b@0 as b, a@1 as a], aggr=[__sum_state(s.m)] REDACTED
|_|_|_CoalescePartitionsExec REDACTED
|_|_|_RepartitionExec: partitioning=REDACTED
|_|_|_AggregateExec: mode=Partial, gby=[b@1 as b, a@0 as a], aggr=[__sum_state(s.m)] REDACTED
|_|_|_ProjectionExec: expr=[a@0 as a, b@1 as b, min(reduce_aggregate_repartition.val)@3 as m] REDACTED
|_|_|_AggregateExec: mode=FinalPartitioned, gby=[a@0 as a, b@1 as b, c@2 as c], aggr=[min(reduce_aggregate_repartition.val)] REDACTED
@@ -272,8 +88,8 @@ GROUP BY b, a;
|_|_| Total rows: 4_|
+-+-+-+
-- Another grouped SQL reduction case to keep the rule coverage centered on SQL,
-- not just TQL/PromQL planning.
-- The same optimization should work when dropping both b and c from the
-- downstream partitioning requirement.
SELECT a, count(m)
FROM (
SELECT a, b, c, min(val) AS m
@@ -314,7 +130,7 @@ GROUP BY a;
|_|_|_MergeScanExec: REDACTED
|_|_|_|
| 1_| 0_|_AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[__count_state(s.m)] REDACTED
|_|_|_CoalescePartitionsExec REDACTED
|_|_|_RepartitionExec: partitioning=REDACTED
|_|_|_AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[__count_state(s.m)] REDACTED
|_|_|_ProjectionExec: expr=[a@0 as a, min(reduce_aggregate_repartition.val)@3 as m] REDACTED
|_|_|_AggregateExec: mode=FinalPartitioned, gby=[a@0 as a, b@1 as b, c@2 as c], aggr=[min(reduce_aggregate_repartition.val)] REDACTED
@@ -323,7 +139,7 @@ GROUP BY a;
|_|_|_SeqScan: region=REDACTED, "partition_count":{"count":1, "mem_ranges":1, "files":0, "file_ranges":0} REDACTED
|_|_|_|
| 1_| 1_|_AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[__count_state(s.m)] REDACTED
|_|_|_CoalescePartitionsExec REDACTED
|_|_|_RepartitionExec: partitioning=REDACTED
|_|_|_AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[__count_state(s.m)] REDACTED
|_|_|_ProjectionExec: expr=[a@0 as a, min(reduce_aggregate_repartition.val)@3 as m] REDACTED
|_|_|_AggregateExec: mode=FinalPartitioned, gby=[a@0 as a, b@1 as b, c@2 as c], aggr=[min(reduce_aggregate_repartition.val)] REDACTED
@@ -334,65 +150,6 @@ GROUP BY a;
|_|_| Total rows: 2_|
+-+-+-+
-- Zero-key reduction should collapse the redundant repartition while keeping
-- the outer aggregate in SinglePartitioned mode.
SELECT sum(m)
FROM (
SELECT a, b, c, min(val) AS m
FROM reduce_aggregate_repartition
GROUP BY a, b, c
) s;
+----------+
| sum(s.m) |
+----------+
| 37 |
+----------+
-- SQLNESS REPLACE (-+) -
-- SQLNESS REPLACE (\s\s+) _
-- SQLNESS REPLACE (Hash.*) REDACTED
-- SQLNESS REPLACE (metrics.*) REDACTED
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
EXPLAIN ANALYZE
SELECT sum(m)
FROM (
SELECT a, b, c, min(val) AS m
FROM reduce_aggregate_repartition
GROUP BY a, b, c
) s;
+-+-+-+
| stage | node | plan_|
+-+-+-+
| 0_| 0_|_AggregateExec: mode=Final, gby=[], aggr=[sum(s.m)] REDACTED
|_|_|_CoalescePartitionsExec REDACTED
|_|_|_AggregateExec: mode=Partial, gby=[], aggr=[sum(s.m)] REDACTED
|_|_|_MergeScanExec: REDACTED
|_|_|_|
| 1_| 0_|_AggregateExec: mode=Final, gby=[], aggr=[__sum_state(s.m)] REDACTED
|_|_|_CoalescePartitionsExec REDACTED
|_|_|_AggregateExec: mode=Partial, gby=[], aggr=[__sum_state(s.m)] REDACTED
|_|_|_ProjectionExec: expr=[min(reduce_aggregate_repartition.val)@3 as m] REDACTED
|_|_|_AggregateExec: mode=FinalPartitioned, gby=[a@0 as a, b@1 as b, c@2 as c], aggr=[min(reduce_aggregate_repartition.val)] REDACTED
|_|_|_RepartitionExec: partitioning=REDACTED
|_|_|_AggregateExec: mode=Partial, gby=[a@0 as a, b@1 as b, c@2 as c], aggr=[min(reduce_aggregate_repartition.val)] REDACTED
|_|_|_SeqScan: region=REDACTED, "partition_count":{"count":1, "mem_ranges":1, "files":0, "file_ranges":0} REDACTED
|_|_|_|
| 1_| 1_|_AggregateExec: mode=Final, gby=[], aggr=[__sum_state(s.m)] REDACTED
|_|_|_CoalescePartitionsExec REDACTED
|_|_|_AggregateExec: mode=Partial, gby=[], aggr=[__sum_state(s.m)] REDACTED
|_|_|_ProjectionExec: expr=[min(reduce_aggregate_repartition.val)@3 as m] REDACTED
|_|_|_AggregateExec: mode=FinalPartitioned, gby=[a@0 as a, b@1 as b, c@2 as c], aggr=[min(reduce_aggregate_repartition.val)] REDACTED
|_|_|_RepartitionExec: partitioning=REDACTED
|_|_|_AggregateExec: mode=Partial, gby=[a@0 as a, b@1 as b, c@2 as c], aggr=[min(reduce_aggregate_repartition.val)] REDACTED
|_|_|_SeqScan: region=REDACTED, "partition_count":{"count":1, "mem_ranges":1, "files":0, "file_ranges":0} REDACTED
|_|_|_|
|_|_| Total rows: 1_|
+-+-+-+
DROP TABLE reduce_aggregate_repartition;
Affected Rows: 0
@@ -418,8 +175,8 @@ INSERT INTO reduce_aggregate_repartition_non_subset VALUES
Affected Rows: 4
-- Changing the group key from `a` to `b` is not a reduction, so the
-- repartition must remain in place.
-- Changing the group key from a to b is not a coarsening of the existing
-- partition columns, so the repartition must remain.
SELECT b, sum(val)
FROM reduce_aggregate_repartition_non_subset
GROUP BY b
@@ -469,99 +226,3 @@ DROP TABLE reduce_aggregate_repartition_non_subset;
Affected Rows: 0
CREATE TABLE reduce_aggregate_repartition_metric (
ts TIMESTAMP(3) TIME INDEX,
job STRING,
instance STRING,
greptime_value DOUBLE,
PRIMARY KEY(job, instance),
);
Affected Rows: 0
INSERT INTO reduce_aggregate_repartition_metric VALUES
(0, 'job1', 'instance1', 1),
(0, 'job1', 'instance2', 2),
(0, 'job2', 'instance1', 3),
(5000, 'job1', 'instance1', 4),
(5000, 'job1', 'instance2', 5),
(5000, 'job2', 'instance1', 6),
(10000, 'job1', 'instance1', 7),
(10000, 'job1', 'instance2', 8),
(10000, 'job2', 'instance1', 9);
Affected Rows: 9
-- SQLNESS REPLACE (metrics.*) REDACTED
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
-- SQLNESS REPLACE (-+) -
-- SQLNESS REPLACE (\s\s+) _
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
-- SQLNESS REPLACE (Hash.*) REDACTED
TQL ANALYZE (0, 10, '5s')
count(count(reduce_aggregate_repartition_metric) by (job));
+-+-+-+
| stage | node | plan_|
+-+-+-+
| 0_| 0_|_CooperativeExec REDACTED
|_|_|_MergeScanExec: REDACTED
|_|_|_|
| 1_| 0_|_SortPreservingMergeExec: [ts@0 ASC NULLS LAST] REDACTED
|_|_|_SortExec: expr=[ts@0 ASC NULLS LAST], preserve_partitioning=[true] REDACTED
|_|_|_AggregateExec: mode=FinalPartitioned, gby=[ts@0 as ts], aggr=[count(count(reduce_aggregate_repartition_metric.greptime_value))] REDACTED
|_|_|_CoalescePartitionsExec REDACTED
|_|_|_AggregateExec: mode=Partial, gby=[ts@0 as ts], aggr=[count(count(reduce_aggregate_repartition_metric.greptime_value))] REDACTED
|_|_|_ProjectionExec: expr=[ts@0 as ts] REDACTED
|_|_|_AggregateExec: mode=FinalPartitioned, gby=[ts@0 as ts, job@1 as job], aggr=[], ordering_mode=PartiallySorted([1]) REDACTED
|_|_|_SortExec: expr=[job@1 ASC], preserve_partitioning=[true] REDACTED
|_|_|_RepartitionExec: partitioning=REDACTED
|_|_|_AggregateExec: mode=Partial, gby=[ts@0 as ts, job@1 as job], aggr=[], ordering_mode=PartiallySorted([1]) REDACTED
|_|_|_ProjectionExec: expr=[ts@0 as ts, job@1 as job] REDACTED
|_|_|_PromInstantManipulateExec: range=[0..10000], lookback=[300000], interval=[5000], time index=[ts] REDACTED
|_|_|_PromSeriesDivideExec: tags=["job", "instance"] REDACTED
|_|_|_SeriesScan: region=REDACTED, "partition_count":{"count":1, "mem_ranges":1, "files":0, "file_ranges":0}, "distribution":"PerSeries" REDACTED
|_|_|_|
|_|_| Total rows: 3_|
+-+-+-+
-- SQLNESS REPLACE (metrics.*) REDACTED
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
-- SQLNESS REPLACE (-+) -
-- SQLNESS REPLACE (\s\s+) _
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
-- SQLNESS REPLACE (Hash.*) REDACTED
TQL ANALYZE (0, 10, '5s')
count(count(rate(reduce_aggregate_repartition_metric[5s])) by (job));
+-+-+-+
| stage | node | plan_|
+-+-+-+
| 0_| 0_|_CooperativeExec REDACTED
|_|_|_MergeScanExec: REDACTED
|_|_|_|
| 1_| 0_|_SortPreservingMergeExec: [ts@0 ASC NULLS LAST] REDACTED
|_|_|_SortExec: expr=[ts@0 ASC NULLS LAST], preserve_partitioning=[true] REDACTED
|_|_|_AggregateExec: mode=FinalPartitioned, gby=[ts@0 as ts], aggr=[count(count(prom_rate(ts_range,greptime_value,ts,Int64(5000))))] REDACTED
|_|_|_CoalescePartitionsExec REDACTED
|_|_|_AggregateExec: mode=Partial, gby=[ts@0 as ts], aggr=[count(count(prom_rate(ts_range,greptime_value,ts,Int64(5000))))] REDACTED
|_|_|_ProjectionExec: expr=[ts@1 as ts, count(prom_rate(ts_range,greptime_value,ts,Int64(5000)))@2 as count(prom_rate(ts_range,greptime_value,ts,Int64(5000)))] REDACTED
|_|_|_AggregateExec: mode=FinalPartitioned, gby=[job@0 as job, ts@1 as ts], aggr=[count(prom_rate(ts_range,greptime_value,ts,Int64(5000)))] REDACTED
|_|_|_RepartitionExec: partitioning=REDACTED
|_|_|_AggregateExec: mode=Partial, gby=[job@2 as job, ts@0 as ts], aggr=[count(prom_rate(ts_range,greptime_value,ts,Int64(5000)))] REDACTED
|_|_|_FilterExec: prom_rate(ts_range,greptime_value,ts,Int64(5000))@1 IS NOT NULL REDACTED
|_|_|_ProjectionExec: expr=[ts@0 as ts, prom_rate(ts_range@4, greptime_value@3, ts@0, 5000) as prom_rate(ts_range,greptime_value,ts,Int64(5000)), job@1 as job] REDACTED
|_|_|_PromRangeManipulateExec: req range=[0..10000], interval=[5000], eval range=[5000], time index=[ts] REDACTED
|_|_|_PromSeriesNormalizeExec: offset=[0], time index=[ts], filter NaN: [true] REDACTED
|_|_|_PromSeriesDivideExec: tags=["job", "instance"] REDACTED
|_|_|_SeriesScan: region=REDACTED, "partition_count":{"count":1, "mem_ranges":1, "files":0, "file_ranges":0}, "distribution":"PerSeries" REDACTED
|_|_|_|
|_|_| Total rows: 0_|
+-+-+-+
DROP TABLE reduce_aggregate_repartition_metric;
Affected Rows: 0

View File

@@ -18,8 +18,9 @@ INSERT INTO reduce_aggregate_repartition VALUES
('n', 'x', 'v', 5000, 8),
('n', 'y', 'u', 6000, 3);
-- Keep the outer aggregate on `sum` so this case isolates
-- ReduceAggregateRepartition rather than the min/max combiner rule.
-- The source is already partitioned by (a, b, c), so the planner can satisfy
-- the outer aggregate's coarser (b, a) requirement at the MergeScan without a
-- repartition between the outer partial/final aggregate pair.
SELECT a, b, sum(m)
FROM (
SELECT a, b, c, min(val) AS m
@@ -29,19 +30,6 @@ FROM (
GROUP BY b, a
ORDER BY a, b;
-- SQLNESS REPLACE (-+) -
-- SQLNESS REPLACE (\s\s+) _
-- SQLNESS REPLACE (Hash.*) REDACTED
-- SQLNESS REPLACE (peers.*) REDACTED
EXPLAIN VERBOSE
SELECT a, b, sum(m)
FROM (
SELECT a, b, c, min(val) AS m
FROM reduce_aggregate_repartition
GROUP BY a, b, c
) s
GROUP BY b, a;
-- SQLNESS REPLACE (-+) -
-- SQLNESS REPLACE (\s\s+) _
-- SQLNESS REPLACE (Hash.*) REDACTED
@@ -58,8 +46,8 @@ FROM (
) s
GROUP BY b, a;
-- Another grouped SQL reduction case to keep the rule coverage centered on SQL,
-- not just TQL/PromQL planning.
-- The same optimization should work when dropping both b and c from the
-- downstream partitioning requirement.
SELECT a, count(m)
FROM (
SELECT a, b, c, min(val) AS m
@@ -85,30 +73,6 @@ FROM (
) s
GROUP BY a;
-- Zero-key reduction should collapse the redundant repartition while keeping
-- the outer aggregate in SinglePartitioned mode.
SELECT sum(m)
FROM (
SELECT a, b, c, min(val) AS m
FROM reduce_aggregate_repartition
GROUP BY a, b, c
) s;
-- SQLNESS REPLACE (-+) -
-- SQLNESS REPLACE (\s\s+) _
-- SQLNESS REPLACE (Hash.*) REDACTED
-- SQLNESS REPLACE (metrics.*) REDACTED
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
EXPLAIN ANALYZE
SELECT sum(m)
FROM (
SELECT a, b, c, min(val) AS m
FROM reduce_aggregate_repartition
GROUP BY a, b, c
) s;
DROP TABLE reduce_aggregate_repartition;
CREATE TABLE reduce_aggregate_repartition_non_subset (
@@ -128,8 +92,8 @@ INSERT INTO reduce_aggregate_repartition_non_subset VALUES
('a', 'y', 3000, 7),
('n', 'y', 4000, 3);
-- Changing the group key from `a` to `b` is not a reduction, so the
-- repartition must remain in place.
-- Changing the group key from a to b is not a coarsening of the existing
-- partition columns, so the repartition must remain.
SELECT b, sum(val)
FROM reduce_aggregate_repartition_non_subset
GROUP BY b
@@ -148,44 +112,3 @@ FROM reduce_aggregate_repartition_non_subset
GROUP BY b;
DROP TABLE reduce_aggregate_repartition_non_subset;
CREATE TABLE reduce_aggregate_repartition_metric (
ts TIMESTAMP(3) TIME INDEX,
job STRING,
instance STRING,
greptime_value DOUBLE,
PRIMARY KEY(job, instance),
);
INSERT INTO reduce_aggregate_repartition_metric VALUES
(0, 'job1', 'instance1', 1),
(0, 'job1', 'instance2', 2),
(0, 'job2', 'instance1', 3),
(5000, 'job1', 'instance1', 4),
(5000, 'job1', 'instance2', 5),
(5000, 'job2', 'instance1', 6),
(10000, 'job1', 'instance1', 7),
(10000, 'job1', 'instance2', 8),
(10000, 'job2', 'instance1', 9);
-- SQLNESS REPLACE (metrics.*) REDACTED
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
-- SQLNESS REPLACE (-+) -
-- SQLNESS REPLACE (\s\s+) _
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
-- SQLNESS REPLACE (Hash.*) REDACTED
TQL ANALYZE (0, 10, '5s')
count(count(reduce_aggregate_repartition_metric) by (job));
-- SQLNESS REPLACE (metrics.*) REDACTED
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
-- SQLNESS REPLACE (-+) -
-- SQLNESS REPLACE (\s\s+) _
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
-- SQLNESS REPLACE (Hash.*) REDACTED
TQL ANALYZE (0, 10, '5s')
count(count(rate(reduce_aggregate_repartition_metric[5s])) by (job));
DROP TABLE reduce_aggregate_repartition_metric;

View File

@@ -110,7 +110,7 @@ TQL ANALYZE (0, 10, '5s') sum(irate(tsid_metric[1h])) / scalar(count(count(tsid
| 1_| 0_|_SortPreservingMergeExec: [ts@0 ASC NULLS LAST] REDACTED
|_|_|_SortExec: expr=[ts@0 ASC NULLS LAST], preserve_partitioning=[true] REDACTED
|_|_|_AggregateExec: mode=FinalPartitioned, gby=[ts@0 as ts], aggr=[count(count(tsid_metric.val))] REDACTED
|_|_|_CoalescePartitionsExec REDACTED
|_|_|_RepartitionExec: partitioning=REDACTED
|_|_|_AggregateExec: mode=Partial, gby=[ts@0 as ts], aggr=[count(count(tsid_metric.val))] REDACTED
|_|_|_ProjectionExec: expr=[ts@0 as ts] REDACTED
|_|_|_AggregateExec: mode=FinalPartitioned, gby=[ts@0 as ts, job@1 as job], aggr=[] REDACTED
@@ -161,7 +161,7 @@ TQL ANALYZE (0, 10, '5s') sum(irate(tsid_metric[1h])) / scalar(count(sum(tsid_m
| 1_| 0_|_SortPreservingMergeExec: [ts@0 ASC NULLS LAST] REDACTED
|_|_|_SortExec: expr=[ts@0 ASC NULLS LAST], preserve_partitioning=[true] REDACTED
|_|_|_AggregateExec: mode=FinalPartitioned, gby=[ts@0 as ts], aggr=[count(sum(tsid_metric.val))] REDACTED
|_|_|_CoalescePartitionsExec REDACTED
|_|_|_RepartitionExec: partitioning=REDACTED
|_|_|_AggregateExec: mode=Partial, gby=[ts@0 as ts], aggr=[count(sum(tsid_metric.val))] REDACTED
|_|_|_ProjectionExec: expr=[ts@0 as ts] REDACTED
|_|_|_AggregateExec: mode=FinalPartitioned, gby=[ts@0 as ts, job@1 as job], aggr=[] REDACTED