From 5dfa7aaed8873032a8b148dd8d2f98aabd188274 Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Wed, 22 Apr 2026 02:40:56 +0800 Subject: [PATCH] reimplementation Signed-off-by: Ruihang Xia --- src/query/src/optimizer/pass_distribution.rs | 19 +- .../optimizer/reduce_aggregate_repartition.rs | 624 +++++------------- src/query/src/query_engine/state.rs | 25 +- .../common/tql-explain-analyze/explain.result | 4 +- .../reduce_aggregate_repartition.result | 365 +--------- .../reduce_aggregate_repartition.sql | 91 +-- .../tql-explain-analyze/tsid_column.result | 4 +- 7 files changed, 211 insertions(+), 921 deletions(-) diff --git a/src/query/src/optimizer/pass_distribution.rs b/src/query/src/optimizer/pass_distribution.rs index 7ce2ffd752..a6a9c002a3 100644 --- a/src/query/src/optimizer/pass_distribution.rs +++ b/src/query/src/optimizer/pass_distribution.rs @@ -82,11 +82,8 @@ impl PassDistribution { let required = plan.required_input_distribution(); let mut new_children = Vec::with_capacity(children.len()); for (idx, child) in children.into_iter().enumerate() { - let child_req = match required.get(idx) { - Some(Distribution::UnspecifiedDistribution) => None, - None => current_req.clone(), - Some(req) => Some(req.clone()), - }; + let child_req = + Self::child_distribution_requirement(idx, current_req.as_ref(), &required); let new_child = Self::rewrite_with_distribution(child.clone(), child_req)?; new_children.push(new_child); } @@ -103,4 +100,16 @@ impl PassDistribution { plan.with_new_children(new_children) } } + + fn child_distribution_requirement( + child_idx: usize, + current_req: Option<&Distribution>, + required: &[Distribution], + ) -> Option { + match required.get(child_idx) { + Some(Distribution::UnspecifiedDistribution) => None, + None => current_req.cloned(), + Some(req) => Some(req.clone()), + } + } } diff --git a/src/query/src/optimizer/reduce_aggregate_repartition.rs b/src/query/src/optimizer/reduce_aggregate_repartition.rs index 4e3164ddc4..2eecd0ff60 100644 --- a/src/query/src/optimizer/reduce_aggregate_repartition.rs +++ b/src/query/src/optimizer/reduce_aggregate_repartition.rs @@ -16,38 +16,40 @@ use std::sync::Arc; use datafusion::config::ConfigOptions; use datafusion::physical_optimizer::PhysicalOptimizerRule; +use datafusion::physical_plan::ExecutionPlan; use datafusion::physical_plan::aggregates::{AggregateExec, AggregateMode}; -use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec; -use datafusion::physical_plan::repartition::RepartitionExec; -use datafusion::physical_plan::{ExecutionPlan, ExecutionPlanProperties, InputOrderMode}; use datafusion_common::Result as DfResult; -use datafusion_physical_expr::{Distribution, Partitioning}; +use datafusion_physical_expr::expressions::Column; +use datafusion_physical_expr::{Distribution, PhysicalExpr}; -/// Replaces a redundant hash repartition before a coarser aggregate with a -/// single fan-in. +use crate::dist_plan::MergeScanExec; + +/// This is a [`PhysicalOptimizerRule`] to reduce repartition overhead between +/// partial and final aggregates by pushing a coarser hash distribution +/// requirement through [`AggregateExec`] in partial mode down to [`MergeScanExec`]. /// -/// This only applies when the aggregate already receives hash partitioning -/// satisfying its grouping keys and the repartition input is already hash -/// partitioned on a strict superset of those repartition keys. +/// This rule keeps the partition fanout unchanged. It only enables +/// source-aware partition-column coarsening that [`MergeScanExec`] can already +/// satisfy without a full reshuffle. +/// +/// This rule is expected to be run before [`EnforceDistribution`]. +/// +/// [`EnforceDistribution`]: datafusion::physical_optimizer::enforce_distribution::EnforceDistribution +/// [`MergeScanExec`]: crate::dist_plan::MergeScanExec #[derive(Debug)] pub struct ReduceAggregateRepartition; -struct OptimizedPlan { - plan: Arc, - subtree_is_linear: bool, -} - impl PhysicalOptimizerRule for ReduceAggregateRepartition { fn optimize( &self, plan: Arc, - _config: &ConfigOptions, + config: &ConfigOptions, ) -> DfResult> { - Ok(Self::do_optimize_with_context(plan, false)?.plan) + Self::do_optimize(plan, config) } fn name(&self) -> &str { - "ReduceAggregateRepartition" + "ReduceAggregateRepartitionRule" } fn schema_check(&self) -> bool { @@ -56,117 +58,110 @@ impl PhysicalOptimizerRule for ReduceAggregateRepartition { } impl ReduceAggregateRepartition { - fn do_optimize_with_context( + fn do_optimize( plan: Arc, - under_branching_ancestor: bool, - ) -> DfResult { - let children = plan.children(); - let subtree_branches_here = children.len() > 1; - let child_under_branching_ancestor = under_branching_ancestor || subtree_branches_here; - let optimized_children = children - .into_iter() - .map(|child| { - Self::do_optimize_with_context(Arc::clone(child), child_under_branching_ancestor) - }) - .collect::>>()?; - let subtree_is_linear = !subtree_branches_here - && optimized_children - .iter() - .all(|child| child.subtree_is_linear); - let new_children = optimized_children - .into_iter() - .map(|child| child.plan) - .collect(); - let plan = plan.with_new_children(new_children)?; - - let plan = if under_branching_ancestor || !subtree_is_linear { - plan - } else { - Self::try_reduce_repartition(plan)? - }; - - Ok(OptimizedPlan { - plan, - subtree_is_linear, - }) + _config: &ConfigOptions, + ) -> DfResult> { + Self::rewrite_with_distribution(plan, None) } - fn aggregate_repartition_candidate( - plan: &Arc, - ) -> Option<(&AggregateExec, &RepartitionExec)> { - let agg_exec = plan.as_any().downcast_ref::()?; - let repartition_exec = agg_exec - .input() - .as_any() - .downcast_ref::()?; - Some((agg_exec, repartition_exec)) - } - - fn can_rewrite_aggregate(agg_exec: &AggregateExec, repartition_exec: &RepartitionExec) -> bool { - if !matches!( - agg_exec.mode(), - AggregateMode::FinalPartitioned | AggregateMode::SinglePartitioned - ) || agg_exec.input_order_mode() != &InputOrderMode::Linear - || agg_exec.group_expr().has_grouping_set() - || repartition_exec.preserve_order() + fn rewrite_with_distribution( + plan: Arc, + current_req: Option, + ) -> DfResult> { + if let Some(merge_scan) = plan.as_any().downcast_ref::() + && let Some(distribution) = current_req.as_ref() + && let Some(new_plan) = merge_scan.try_with_new_distribution(distribution.clone()) { - return false; + return Ok(Arc::new(new_plan) as _); } - let Some(required_distribution) = agg_exec.required_input_distribution().into_iter().next() - else { - return false; - }; - let repartition_satisfaction = repartition_exec.partitioning().satisfaction( - &required_distribution, - repartition_exec.properties().equivalence_properties(), - true, - ); - repartition_satisfaction.is_satisfied() && Self::can_reduce_repartition(repartition_exec) - } - - fn try_reduce_repartition(plan: Arc) -> DfResult> { - let Some((agg_exec, repartition_exec)) = Self::aggregate_repartition_candidate(&plan) - else { - return Ok(plan); - }; - - if !Self::can_rewrite_aggregate(agg_exec, repartition_exec) { + let children = plan.children(); + if children.is_empty() { return Ok(plan); } - let new_input = Arc::new(CoalescePartitionsExec::new( - repartition_exec.input().clone(), - )); - let new_agg = AggregateExec::try_new( - *agg_exec.mode(), - agg_exec.group_expr().clone(), - agg_exec.aggr_expr().to_vec(), - agg_exec.filter_expr().to_vec(), - new_input, - agg_exec.input_schema(), - )? - .with_limit_options(agg_exec.limit_options()); + let required = plan.required_input_distribution(); + let mut new_children = Vec::with_capacity(children.len()); + for (idx, child) in children.into_iter().enumerate() { + let child_req = + Self::child_distribution_requirement(&plan, idx, current_req.as_ref(), &required); + let new_child = Self::rewrite_with_distribution(child.clone(), child_req)?; + new_children.push(new_child); + } - Ok(Arc::new(new_agg)) + let unchanged = plan + .children() + .into_iter() + .zip(new_children.iter()) + .all(|(old, new)| Arc::ptr_eq(old, new)); + if unchanged { + Ok(plan) + } else { + plan.with_new_children(new_children) + } } - fn can_reduce_repartition(repartition_exec: &RepartitionExec) -> bool { - let Partitioning::Hash(finer_partition_exprs, _) = - repartition_exec.input().output_partitioning() - else { - return false; + fn child_distribution_requirement( + plan: &Arc, + child_idx: usize, + current_req: Option<&Distribution>, + required: &[Distribution], + ) -> Option { + match required.get(child_idx) { + Some(Distribution::UnspecifiedDistribution) => { + Self::partial_aggregate_child_distribution(plan, child_idx, current_req) + } + None => current_req.cloned(), + Some(req) => Some(req.clone()), + } + } + + fn partial_aggregate_child_distribution( + plan: &Arc, + child_idx: usize, + current_req: Option<&Distribution>, + ) -> Option { + if child_idx == 0 { + Self::partial_aggregate_input_distribution(plan, current_req) + } else { + None + } + } + + fn partial_aggregate_input_distribution( + plan: &Arc, + current_req: Option<&Distribution>, + ) -> Option { + let agg_exec = plan.as_any().downcast_ref::()?; + if agg_exec.mode() != &AggregateMode::Partial || agg_exec.group_expr().has_grouping_set() { + return None; + } + + let Distribution::HashPartitioned(required_exprs) = current_req? else { + return None; }; - let coarsening_satisfaction = repartition_exec.partitioning().satisfaction( - &Distribution::HashPartitioned(finer_partition_exprs.clone()), - repartition_exec - .input() - .properties() - .equivalence_properties(), - true, - ); - coarsening_satisfaction.is_subset() + let output_group_exprs = agg_exec.output_group_expr(); + let input_group_exprs = agg_exec.group_expr().input_exprs(); + let mut mapped_exprs = Vec::with_capacity(required_exprs.len()); + for required_expr in required_exprs { + let required_col = required_expr.as_any().downcast_ref::()?; + let group_idx = output_group_exprs + .iter() + .position(|expr| Self::same_column(expr, required_col))?; + mapped_exprs.push(input_group_exprs[group_idx].clone()); + } + + Some(Distribution::HashPartitioned(mapped_exprs)) + } + + fn same_column(expr: &Arc, expected: &Column) -> bool { + expr.as_any() + .downcast_ref::() + .is_some_and(|column| { + column.index() == expected.index() && column.name() == expected.name() + }) } } @@ -177,19 +172,11 @@ mod tests { use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; use datafusion::datasource::memory::MemorySourceConfig; use datafusion::datasource::source::DataSourceExec; - use datafusion::logical_expr::{EmptyRelation, LogicalPlan}; - use datafusion::physical_optimizer::PhysicalOptimizerRule; + use datafusion::physical_plan::ExecutionPlan; use datafusion::physical_plan::aggregates::{AggregateExec, AggregateMode, PhysicalGroupBy}; - use datafusion::physical_plan::projection::ProjectionExec; - use datafusion::physical_plan::repartition::RepartitionExec; - use datafusion::physical_plan::union::UnionExec; - use datafusion::physical_plan::{ExecutionPlan, displayable}; - use datafusion_common::{DFSchema, Result}; - use datafusion_physical_expr::aggregate::AggregateFunctionExpr; - use datafusion_physical_expr::expressions::col; - use datafusion_physical_expr::{Partitioning, PhysicalExpr}; - use pretty_assertions::assert_eq; - use promql::extension_plan::SeriesNormalize; + use datafusion_common::Result; + use datafusion_physical_expr::Distribution; + use datafusion_physical_expr::expressions::{Column, col}; use super::ReduceAggregateRepartition; @@ -204,362 +191,85 @@ mod tests { } fn group_by(names: &[&str], schema: &SchemaRef) -> Result { - let groups: Result, String)>> = names + let groups = names .iter() .map(|name| Ok((col(name, schema)?, (*name).to_string()))) - .collect(); - Ok(PhysicalGroupBy::new_single(groups?)) - } - - fn repartition( - input: Arc, - keys: &[&str], - schema: &SchemaRef, - ) -> Result> { - let exprs = keys - .iter() - .map(|name| col(name, schema)) .collect::>>()?; - Ok(Arc::new(RepartitionExec::try_new( - input, - Partitioning::Hash(exprs, 8), - )?)) + Ok(PhysicalGroupBy::new_single(groups)) } - fn aggregate( - mode: AggregateMode, - input: Arc, - group_by: PhysicalGroupBy, - input_schema: SchemaRef, - aggr_expr: Vec>, - ) -> Result> { - let filter_expr = vec![None; aggr_expr.len()]; + fn partial_aggregate(group_keys: &[&str]) -> Result> { + let input = input_exec(); + let schema = input.schema(); Ok(Arc::new(AggregateExec::try_new( - mode, - group_by, - aggr_expr, - filter_expr, + AggregateMode::Partial, + group_by(group_keys, &schema)?, + vec![], + vec![], input, - input_schema, + schema, )?)) } - fn optimize(plan: Arc) -> Result { - let optimized = ReduceAggregateRepartition.optimize(plan, &Default::default())?; - Ok(displayable(optimized.as_ref()).indent(true).to_string()) + fn hash_columns(distribution: Distribution) -> Vec<(String, usize)> { + let Distribution::HashPartitioned(exprs) = distribution else { + panic!("expected hash distribution"); + }; + exprs + .into_iter() + .map(|expr| { + let column = expr + .as_any() + .downcast_ref::() + .unwrap(); + (column.name().to_string(), column.index()) + }) + .collect() } - fn empty_logical_plan() -> LogicalPlan { - LogicalPlan::EmptyRelation(EmptyRelation { - produce_one_row: false, - schema: Arc::new(DFSchema::empty()), - }) - } + #[test] + fn maps_partial_aggregate_hash_requirement_to_input_columns() -> Result<()> { + let aggregate = partial_aggregate(&["b", "a", "c"])?; + let required = Distribution::HashPartitioned(vec![ + aggregate + .as_any() + .downcast_ref::() + .unwrap() + .output_group_expr()[0] + .clone(), + aggregate + .as_any() + .downcast_ref::() + .unwrap() + .output_group_expr()[1] + .clone(), + ]); - fn project_with_aliases( - input: Arc, - aliases: &[(&str, &str)], - ) -> Result> { - let exprs: Result, String)>> = aliases - .iter() - .map(|(from, to)| Ok((col(from, &input.schema())?, (*to).to_string()))) - .collect(); - Ok(Arc::new(ProjectionExec::try_new(exprs?, input)?)) - } - - fn promql_normalize(input: Arc) -> Arc { - SeriesNormalize::new( - 0, - "a", - false, - vec!["b".to_string(), "c".to_string()], - empty_logical_plan(), + let mapped = ReduceAggregateRepartition::partial_aggregate_input_distribution( + &aggregate, + Some(&required), ) - .to_execution_plan(input) - } - - #[test] - fn rewrites_final_partitioned_subset_repartition() -> Result<()> { - let raw = input_exec(); - let finer = repartition(raw.clone(), &["a", "b"], &raw.schema())?; - let partial = aggregate( - AggregateMode::Partial, - finer, - group_by(&["a", "b"], &raw.schema())?, - raw.schema(), - vec![], - )?; - let final_repartition = repartition(partial.clone(), &["a"], &partial.schema())?; - let final_agg = aggregate( - AggregateMode::FinalPartitioned, - final_repartition, - group_by(&["a"], &partial.schema())?, - raw.schema(), - vec![], - )?; + .unwrap(); assert_eq!( - optimize(final_agg)?.trim(), - r#"AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[] - CoalescePartitionsExec - AggregateExec: mode=Partial, gby=[a@0 as a, b@1 as b], aggr=[] - RepartitionExec: partitioning=Hash([a@0, b@1], 8), input_partitions=1 - DataSourceExec: partitions=1, partition_sizes=[0]"# + hash_columns(mapped), + vec![("b".to_string(), 1), ("a".to_string(), 0)] ); Ok(()) } #[test] - fn rewrites_single_partitioned_subset_repartition() -> Result<()> { - let raw = input_exec(); - let finer = repartition(raw.clone(), &["a", "b"], &raw.schema())?; - let final_repartition = repartition(finer.clone(), &["a"], &finer.schema())?; - let final_agg = aggregate( - AggregateMode::SinglePartitioned, - final_repartition, - group_by(&["a"], &finer.schema())?, - raw.schema(), - vec![], - )?; + fn rejects_non_grouping_hash_requirement_for_partial_aggregate() -> Result<()> { + let aggregate = partial_aggregate(&["b", "a"])?; + let unrelated = Distribution::HashPartitioned(vec![Arc::new(Column::new("c", 2)) as _]); - assert_eq!( - optimize(final_agg)?.trim(), - r#"AggregateExec: mode=SinglePartitioned, gby=[a@0 as a], aggr=[] - CoalescePartitionsExec - RepartitionExec: partitioning=Hash([a@0, b@1], 8), input_partitions=1 - DataSourceExec: partitions=1, partition_sizes=[0]"# - ); - Ok(()) - } - - #[test] - fn keeps_equal_partitioning_keys() -> Result<()> { - let raw = input_exec(); - let finer = repartition(raw.clone(), &["a", "b"], &raw.schema())?; - let partial = aggregate( - AggregateMode::Partial, - finer, - group_by(&["a", "b"], &raw.schema())?, - raw.schema(), - vec![], - )?; - let final_repartition = repartition(partial.clone(), &["a", "b"], &partial.schema())?; - let final_agg = aggregate( - AggregateMode::FinalPartitioned, - final_repartition, - group_by(&["a", "b"], &partial.schema())?, - raw.schema(), - vec![], - )?; - - assert_eq!( - optimize(final_agg)?.trim(), - r#"AggregateExec: mode=FinalPartitioned, gby=[a@0 as a, b@1 as b], aggr=[] - RepartitionExec: partitioning=Hash([a@0, b@1], 8), input_partitions=8 - AggregateExec: mode=Partial, gby=[a@0 as a, b@1 as b], aggr=[] - RepartitionExec: partitioning=Hash([a@0, b@1], 8), input_partitions=1 - DataSourceExec: partitions=1, partition_sizes=[0]"# - ); - Ok(()) - } - - #[test] - fn rewrites_when_finer_key_order_differs() -> Result<()> { - let raw = input_exec(); - let finer = repartition(raw.clone(), &["c", "a", "b"], &raw.schema())?; - let partial = aggregate( - AggregateMode::Partial, - finer, - group_by(&["c", "a", "b"], &raw.schema())?, - raw.schema(), - vec![], - )?; - let final_repartition = repartition(partial.clone(), &["b", "c"], &partial.schema())?; - let final_agg = aggregate( - AggregateMode::FinalPartitioned, - final_repartition, - group_by(&["b", "c"], &partial.schema())?, - raw.schema(), - vec![], - )?; - - assert_eq!( - optimize(final_agg)?.trim(), - r#"AggregateExec: mode=FinalPartitioned, gby=[b@2 as b, c@0 as c], aggr=[] - CoalescePartitionsExec - AggregateExec: mode=Partial, gby=[c@2 as c, a@0 as a, b@1 as b], aggr=[] - RepartitionExec: partitioning=Hash([c@2, a@0, b@1], 8), input_partitions=1 - DataSourceExec: partitions=1, partition_sizes=[0]"# - ); - Ok(()) - } - - #[test] - fn rewrites_when_repartition_satisfies_group_by_with_subset_keys() -> Result<()> { - let raw = input_exec(); - let finer = repartition(raw.clone(), &["a", "b", "c"], &raw.schema())?; - let final_repartition = repartition(finer.clone(), &["a"], &finer.schema())?; - let final_agg = aggregate( - AggregateMode::FinalPartitioned, - final_repartition, - group_by(&["a", "b"], &finer.schema())?, - raw.schema(), - vec![], - )?; - - assert_eq!( - optimize(final_agg)?.trim(), - r#"AggregateExec: mode=FinalPartitioned, gby=[a@0 as a, b@1 as b], aggr=[] - CoalescePartitionsExec - RepartitionExec: partitioning=Hash([a@0, b@1, c@2], 8), input_partitions=1 - DataSourceExec: partitions=1, partition_sizes=[0]"# - ); - Ok(()) - } - - #[test] - fn keeps_non_hash_repartition_child() -> Result<()> { - let raw = input_exec(); - let finer = repartition(raw.clone(), &["a", "b"], &raw.schema())?; - let partial = aggregate( - AggregateMode::Partial, - finer, - group_by(&["a", "b"], &raw.schema())?, - raw.schema(), - vec![], - )?; - let final_repartition = Arc::new(RepartitionExec::try_new( - partial.clone(), - Partitioning::RoundRobinBatch(8), - )?); - let final_agg = aggregate( - AggregateMode::FinalPartitioned, - final_repartition, - group_by(&["a"], &partial.schema())?, - raw.schema(), - vec![], - )?; - - assert_eq!( - optimize(final_agg)?.trim(), - r#"AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[] - RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=8 - AggregateExec: mode=Partial, gby=[a@0 as a, b@1 as b], aggr=[] - RepartitionExec: partitioning=Hash([a@0, b@1], 8), input_partitions=1 - DataSourceExec: partitions=1, partition_sizes=[0]"# - ); - Ok(()) - } - - #[test] - fn rewrites_subset_partitioning_through_projection() -> Result<()> { - let raw = input_exec(); - let finer = repartition(raw.clone(), &["a", "b", "c"], &raw.schema())?; - let projected = project_with_aliases(finer, &[("a", "x"), ("b", "y"), ("c", "z")])?; - let final_repartition = repartition(projected.clone(), &["x", "y"], &projected.schema())?; - let final_agg = aggregate( - AggregateMode::SinglePartitioned, - final_repartition, - group_by(&["x", "y"], &projected.schema())?, - projected.schema(), - vec![], - )?; - - let optimized = optimize(final_agg)?; assert!( - optimized.contains( - "AggregateExec: mode=SinglePartitioned, gby=[x@0 as x, y@1 as y], aggr=[]" + ReduceAggregateRepartition::partial_aggregate_input_distribution( + &aggregate, + Some(&unrelated), ) + .is_none() ); - assert!(optimized.contains("CoalescePartitionsExec")); - assert!(optimized.contains("ProjectionExec: expr=[a@0 as x, b@1 as y, c@2 as z]")); - Ok(()) - } - - #[test] - fn rewrites_promql_subset_partitioning_through_projection() -> Result<()> { - let raw = input_exec(); - let finer = repartition(raw.clone(), &["a", "b", "c"], &raw.schema())?; - let normalized = promql_normalize(finer); - let projected = project_with_aliases(normalized, &[("a", "ts"), ("b", "x"), ("c", "y")])?; - let final_repartition = repartition(projected.clone(), &["x", "y"], &projected.schema())?; - let final_agg = aggregate( - AggregateMode::SinglePartitioned, - final_repartition, - group_by(&["x", "y"], &projected.schema())?, - projected.schema(), - vec![], - )?; - - let optimized = optimize(final_agg)?; - assert!( - optimized.contains( - "AggregateExec: mode=SinglePartitioned, gby=[x@1 as x, y@2 as y], aggr=[]" - ), - "{optimized}" - ); - assert!(optimized.contains("CoalescePartitionsExec"), "{optimized}"); - assert!(optimized.contains("PromSeriesNormalizeExec"), "{optimized}"); - Ok(()) - } - - #[test] - fn keeps_subset_partitioning_with_branching_descendant() -> Result<()> { - let left = input_exec(); - let right = input_exec(); - let union = UnionExec::try_new(vec![left, right])?; - let finer = repartition(union.clone(), &["a", "b", "c"], &union.schema())?; - let partial = aggregate( - AggregateMode::Partial, - finer, - group_by(&["a", "b", "c"], &union.schema())?, - union.schema(), - vec![], - )?; - let final_repartition = repartition(partial.clone(), &["a", "b"], &partial.schema())?; - let final_agg = aggregate( - AggregateMode::FinalPartitioned, - final_repartition, - group_by(&["a", "b"], &partial.schema())?, - union.schema(), - vec![], - )?; - - let optimized = optimize(final_agg)?; - assert!( - optimized.contains("RepartitionExec: partitioning=Hash([a@0, b@1], 8)"), - "{optimized}" - ); - assert!(optimized.contains("UnionExec"), "{optimized}"); - assert!(!optimized.contains("CoalescePartitionsExec"), "{optimized}"); - Ok(()) - } - - #[test] - fn keeps_non_subset_repartition() -> Result<()> { - let raw = input_exec(); - let coarser = repartition(raw.clone(), &["a"], &raw.schema())?; - let final_repartition = repartition(coarser.clone(), &["a", "b"], &coarser.schema())?; - let final_agg = aggregate( - AggregateMode::FinalPartitioned, - final_repartition, - group_by(&["a", "b"], &coarser.schema())?, - raw.schema(), - vec![], - )?; - - let optimized = optimize(final_agg)?; - assert!( - optimized.contains( - "AggregateExec: mode=FinalPartitioned, gby=[a@0 as a, b@1 as b], aggr=[]" - ), - "{optimized}" - ); - assert!( - optimized.contains("RepartitionExec: partitioning=Hash([a@0, b@1], 8)"), - "{optimized}" - ); - assert!(!optimized.contains("CoalescePartitionsExec"), "{optimized}"); Ok(()) } } diff --git a/src/query/src/query_engine/state.rs b/src/query/src/query_engine/state.rs index 90680565f9..56eccfcb77 100644 --- a/src/query/src/query_engine/state.rs +++ b/src/query/src/query_engine/state.rs @@ -186,16 +186,16 @@ impl QueryEngineState { physical_optimizer .rules .insert(6, Arc::new(PassDistribution)); + // Reduce aggregate repartition overhead by pushing coarser hash requirements + // through partial aggregates before EnforceDistribution runs. + physical_optimizer + .rules + .insert(7, Arc::new(ReduceAggregateRepartition)); // Enforce sorting AFTER custom rules that modify the plan structure physical_optimizer.rules.insert( - 7, + 8, Arc::new(datafusion::physical_optimizer::enforce_sorting::EnforceSorting {}), ); - Self::insert_physical_optimizer_rule_after( - &mut physical_optimizer.rules, - datafusion::physical_optimizer::update_aggr_exprs::OptimizeAggregateOrder::new().name(), - Arc::new(ReduceAggregateRepartition), - ); // Add rule for windowed sort physical_optimizer .rules @@ -257,19 +257,6 @@ impl QueryEngineState { rules.retain(|rule| rule.name() != name); } - fn insert_physical_optimizer_rule_after( - rules: &mut Vec>, - name: &str, - rule: Arc, - ) { - let index = rules - .iter() - .position(|candidate| candidate.name() == name) - .map(|index| index + 1) - .unwrap_or(rules.len()); - rules.insert(index, rule); - } - /// Optimize the logical plan by the extension analyzer rules. pub fn optimize_by_extension_rules( &self, diff --git a/tests/cases/standalone/common/tql-explain-analyze/explain.result b/tests/cases/standalone/common/tql-explain-analyze/explain.result index b44aa87264..f503fa8145 100644 --- a/tests/cases/standalone/common/tql-explain-analyze/explain.result +++ b/tests/cases/standalone/common/tql-explain-analyze/explain.result @@ -178,12 +178,12 @@ TQL EXPLAIN VERBOSE (0, 10, '5s') test; | physical_plan after FilterPushdown_| SAME TEXT AS ABOVE_| | physical_plan after parallelize_scan_| SAME TEXT AS ABOVE_| | physical_plan after PassDistributionRule_| SAME TEXT AS ABOVE_| +| physical_plan after ReduceAggregateRepartitionRule_| SAME TEXT AS ABOVE_| | physical_plan after EnforceSorting_| SAME TEXT AS ABOVE_| | physical_plan after EnforceDistribution_| SAME TEXT AS ABOVE_| | physical_plan after CombinePartialFinalAggregate_| SAME TEXT AS ABOVE_| | physical_plan after EnforceSorting_| SAME TEXT AS ABOVE_| | physical_plan after OptimizeAggregateOrder_| SAME TEXT AS ABOVE_| -| physical_plan after ReduceAggregateRepartition_| SAME TEXT AS ABOVE_| | physical_plan after ProjectionPushdown_| SAME TEXT AS ABOVE_| | physical_plan after OutputRequirements_| MergeScanExec: REDACTED |_|_| @@ -325,12 +325,12 @@ TQL EXPLAIN VERBOSE (0, 10, '5s') test AS series; | physical_plan after FilterPushdown_| SAME TEXT AS ABOVE_| | physical_plan after parallelize_scan_| SAME TEXT AS ABOVE_| | physical_plan after PassDistributionRule_| SAME TEXT AS ABOVE_| +| physical_plan after ReduceAggregateRepartitionRule_| SAME TEXT AS ABOVE_| | physical_plan after EnforceSorting_| SAME TEXT AS ABOVE_| | physical_plan after EnforceDistribution_| SAME TEXT AS ABOVE_| | physical_plan after CombinePartialFinalAggregate_| SAME TEXT AS ABOVE_| | physical_plan after EnforceSorting_| SAME TEXT AS ABOVE_| | physical_plan after OptimizeAggregateOrder_| SAME TEXT AS ABOVE_| -| physical_plan after ReduceAggregateRepartition_| SAME TEXT AS ABOVE_| | physical_plan after ProjectionPushdown_| SAME TEXT AS ABOVE_| | physical_plan after OutputRequirements_| MergeScanExec: REDACTED |_|_| diff --git a/tests/cases/standalone/optimizer/reduce_aggregate_repartition.result b/tests/cases/standalone/optimizer/reduce_aggregate_repartition.result index 1a5245f9b1..d168ce9437 100644 --- a/tests/cases/standalone/optimizer/reduce_aggregate_repartition.result +++ b/tests/cases/standalone/optimizer/reduce_aggregate_repartition.result @@ -22,8 +22,9 @@ INSERT INTO reduce_aggregate_repartition VALUES Affected Rows: 6 --- Keep the outer aggregate on `sum` so this case isolates --- ReduceAggregateRepartition rather than the min/max combiner rule. +-- The source is already partitioned by (a, b, c), so the planner can satisfy +-- the outer aggregate's coarser (b, a) requirement at the MergeScan without a +-- repartition between the outer partial/final aggregate pair. SELECT a, b, sum(m) FROM ( SELECT a, b, c, min(val) AS m @@ -42,190 +43,6 @@ ORDER BY a, b; | n | y | 3 | +---+---+----------+ --- SQLNESS REPLACE (-+) - --- SQLNESS REPLACE (\s\s+) _ --- SQLNESS REPLACE (Hash.*) REDACTED --- SQLNESS REPLACE (peers.*) REDACTED -EXPLAIN VERBOSE -SELECT a, b, sum(m) -FROM ( - SELECT a, b, c, min(val) AS m - FROM reduce_aggregate_repartition - GROUP BY a, b, c -) s -GROUP BY b, a; - -+-+-+ -| plan_type_| plan_| -+-+-+ -| initial_logical_plan_| Projection: s.a, s.b, sum(s.m)_| -|_|_Aggregate: groupBy=[[s.b, s.a]], aggr=[[sum(s.m)]]_| -|_|_SubqueryAlias: s_| -|_|_Projection: reduce_aggregate_repartition.a, reduce_aggregate_repartition.b, reduce_aggregate_repartition.c, min(reduce_aggregate_repartition.val) AS m_| -|_|_Aggregate: groupBy=[[reduce_aggregate_repartition.a, reduce_aggregate_repartition.b, reduce_aggregate_repartition.c]], aggr=[[min(reduce_aggregate_repartition.val)]] | -|_|_TableScan: reduce_aggregate_repartition_| -| logical_plan after apply_function_rewrites_| SAME TEXT AS ABOVE_| -| logical_plan after count_wildcard_to_time_index_rule_| SAME TEXT AS ABOVE_| -| logical_plan after StringNormalizationRule_| SAME TEXT AS ABOVE_| -| logical_plan after TranscribeAtatRule_| SAME TEXT AS ABOVE_| -| logical_plan after resolve_grouping_function_| SAME TEXT AS ABOVE_| -| logical_plan after type_coercion_| SAME TEXT AS ABOVE_| -| logical_plan after DistPlannerAnalyzer_| Projection: s.a, s.b, sum(s.m)_| -|_|_Projection: s.b, s.a, sum(s.m)_| -|_|_Aggregate: groupBy=[[s.b, s.a]], aggr=[[__sum_merge(__sum_state(s.m)) AS sum(s.m)]]_| -|_|_MergeScan [is_placeholder=false, remote_input=[_| -|_| Aggregate: groupBy=[[s.b, s.a]], aggr=[[__sum_state(s.m)]]_| -|_|_SubqueryAlias: s_| -|_|_Projection: reduce_aggregate_repartition.a, reduce_aggregate_repartition.b, reduce_aggregate_repartition.c, min(reduce_aggregate_repartition.val) AS m_| -|_|_Aggregate: groupBy=[[reduce_aggregate_repartition.a, reduce_aggregate_repartition.b, reduce_aggregate_repartition.c]], aggr=[[min(reduce_aggregate_repartition.val)]]_| -|_|_TableScan: reduce_aggregate_repartition_| -|_| ]]_| -| logical_plan after FixStateUdafOrderingAnalyzer_| SAME TEXT AS ABOVE_| -| analyzed_logical_plan_| SAME TEXT AS ABOVE_| -| logical_plan after rewrite_set_comparison_| SAME TEXT AS ABOVE_| -| logical_plan after optimize_unions_| SAME TEXT AS ABOVE_| -| logical_plan after simplify_expressions_| SAME TEXT AS ABOVE_| -| logical_plan after replace_distinct_aggregate_| SAME TEXT AS ABOVE_| -| logical_plan after eliminate_join_| SAME TEXT AS ABOVE_| -| logical_plan after decorrelate_predicate_subquery_| SAME TEXT AS ABOVE_| -| logical_plan after scalar_subquery_to_join_| SAME TEXT AS ABOVE_| -| logical_plan after decorrelate_lateral_join_| SAME TEXT AS ABOVE_| -| logical_plan after extract_equijoin_predicate_| SAME TEXT AS ABOVE_| -| logical_plan after eliminate_duplicated_expr_| SAME TEXT AS ABOVE_| -| logical_plan after eliminate_filter_| SAME TEXT AS ABOVE_| -| logical_plan after eliminate_cross_join_| SAME TEXT AS ABOVE_| -| logical_plan after eliminate_limit_| SAME TEXT AS ABOVE_| -| logical_plan after propagate_empty_relation_| SAME TEXT AS ABOVE_| -| logical_plan after filter_null_join_keys_| SAME TEXT AS ABOVE_| -| logical_plan after eliminate_outer_join_| SAME TEXT AS ABOVE_| -| logical_plan after push_down_limit_| SAME TEXT AS ABOVE_| -| logical_plan after push_down_filter_| SAME TEXT AS ABOVE_| -| logical_plan after single_distinct_aggregation_to_group_by | SAME TEXT AS ABOVE_| -| logical_plan after eliminate_group_by_constant_| SAME TEXT AS ABOVE_| -| logical_plan after common_sub_expression_eliminate_| SAME TEXT AS ABOVE_| -| logical_plan after extract_leaf_expressions_| SAME TEXT AS ABOVE_| -| logical_plan after push_down_leaf_projections_| SAME TEXT AS ABOVE_| -| logical_plan after optimize_projections_| Projection: s.a, s.b, sum(s.m)_| -|_|_Aggregate: groupBy=[[s.b, s.a]], aggr=[[__sum_merge(__sum_state(s.m)) AS sum(s.m)]]_| -|_|_MergeScan [is_placeholder=false, remote_input=[_| -|_| Aggregate: groupBy=[[s.b, s.a]], aggr=[[__sum_state(s.m)]]_| -|_|_SubqueryAlias: s_| -|_|_Projection: reduce_aggregate_repartition.a, reduce_aggregate_repartition.b, reduce_aggregate_repartition.c, min(reduce_aggregate_repartition.val) AS m_| -|_|_Aggregate: groupBy=[[reduce_aggregate_repartition.a, reduce_aggregate_repartition.b, reduce_aggregate_repartition.c]], aggr=[[min(reduce_aggregate_repartition.val)]]_| -|_|_TableScan: reduce_aggregate_repartition_| -|_| ]]_| -| logical_plan after ScanHintRule_| SAME TEXT AS ABOVE_| -| logical_plan after rewrite_set_comparison_| SAME TEXT AS ABOVE_| -| logical_plan after optimize_unions_| SAME TEXT AS ABOVE_| -| logical_plan after simplify_expressions_| SAME TEXT AS ABOVE_| -| logical_plan after replace_distinct_aggregate_| SAME TEXT AS ABOVE_| -| logical_plan after eliminate_join_| SAME TEXT AS ABOVE_| -| logical_plan after decorrelate_predicate_subquery_| SAME TEXT AS ABOVE_| -| logical_plan after scalar_subquery_to_join_| SAME TEXT AS ABOVE_| -| logical_plan after decorrelate_lateral_join_| SAME TEXT AS ABOVE_| -| logical_plan after extract_equijoin_predicate_| SAME TEXT AS ABOVE_| -| logical_plan after eliminate_duplicated_expr_| SAME TEXT AS ABOVE_| -| logical_plan after eliminate_filter_| SAME TEXT AS ABOVE_| -| logical_plan after eliminate_cross_join_| SAME TEXT AS ABOVE_| -| logical_plan after eliminate_limit_| SAME TEXT AS ABOVE_| -| logical_plan after propagate_empty_relation_| SAME TEXT AS ABOVE_| -| logical_plan after filter_null_join_keys_| SAME TEXT AS ABOVE_| -| logical_plan after eliminate_outer_join_| SAME TEXT AS ABOVE_| -| logical_plan after push_down_limit_| SAME TEXT AS ABOVE_| -| logical_plan after push_down_filter_| SAME TEXT AS ABOVE_| -| logical_plan after single_distinct_aggregation_to_group_by | SAME TEXT AS ABOVE_| -| logical_plan after eliminate_group_by_constant_| SAME TEXT AS ABOVE_| -| logical_plan after common_sub_expression_eliminate_| SAME TEXT AS ABOVE_| -| logical_plan after extract_leaf_expressions_| SAME TEXT AS ABOVE_| -| logical_plan after push_down_leaf_projections_| SAME TEXT AS ABOVE_| -| logical_plan after optimize_projections_| SAME TEXT AS ABOVE_| -| logical_plan after ScanHintRule_| SAME TEXT AS ABOVE_| -| logical_plan_| Projection: s.a, s.b, sum(s.m)_| -|_|_Aggregate: groupBy=[[s.b, s.a]], aggr=[[__sum_merge(__sum_state(s.m)) AS sum(s.m)]]_| -|_|_MergeScan [is_placeholder=false, remote_input=[_| -|_| Aggregate: groupBy=[[s.b, s.a]], aggr=[[__sum_state(s.m)]]_| -|_|_SubqueryAlias: s_| -|_|_Projection: reduce_aggregate_repartition.a, reduce_aggregate_repartition.b, reduce_aggregate_repartition.c, min(reduce_aggregate_repartition.val) AS m_| -|_|_Aggregate: groupBy=[[reduce_aggregate_repartition.a, reduce_aggregate_repartition.b, reduce_aggregate_repartition.c]], aggr=[[min(reduce_aggregate_repartition.val)]]_| -|_|_TableScan: reduce_aggregate_repartition_| -|_| ]]_| -| initial_physical_plan_| ProjectionExec: expr=[a@1 as a, b@0 as b, sum(s.m)@2 as sum(s.m)]_| -|_|_AggregateExec: mode=FinalPartitioned, gby=[b@0 as b, a@1 as a], aggr=[sum(s.m)]_| -|_|_AggregateExec: mode=Partial, gby=[b@0 as b, a@1 as a], aggr=[sum(s.m)]_| -|_|_MergeScanExec: REDACTED -|_|_| -| initial_physical_plan_with_stats_| ProjectionExec: expr=[a@1 as a, b@0 as b, sum(s.m)@2 as sum(s.m)], statistics=[Rows=Absent, Bytes=Absent, [(Col[0]:),(Col[1]:),(Col[2]:)]]_| -|_|_AggregateExec: mode=FinalPartitioned, gby=[b@0 as b, a@1 as a], aggr=[sum(s.m)], statistics=[Rows=Absent, Bytes=Absent, [(Col[0]:),(Col[1]:),(Col[2]:)]]_| -|_|_AggregateExec: mode=Partial, gby=[b@0 as b, a@1 as a], aggr=[sum(s.m)], statistics=[Rows=Absent, Bytes=Absent, [(Col[0]:),(Col[1]:),(Col[2]:)]]_| -|_|_MergeScanExec: REDACTED -|_|_| -| initial_physical_plan_with_schema_| ProjectionExec: expr=[a@1 as a, b@0 as b, sum(s.m)@2 as sum(s.m)], schema=[a:Utf8;N, b:Utf8;N, sum(s.m):Int64;N]_| -|_|_AggregateExec: mode=FinalPartitioned, gby=[b@0 as b, a@1 as a], aggr=[sum(s.m)], schema=[b:Utf8;N, a:Utf8;N, sum(s.m):Int64;N]_| -|_|_AggregateExec: mode=Partial, gby=[b@0 as b, a@1 as a], aggr=[sum(s.m)], schema=[b:Utf8;N, a:Utf8;N, sum(s.m)[sum]:Int64;N]_| -|_|_MergeScanExec: REDACTED -|_|_| -| physical_plan after OutputRequirements_| OutputRequirementExec: order_by=[], dist_by=Unspecified_| -|_|_ProjectionExec: expr=[a@1 as a, b@0 as b, sum(s.m)@2 as sum(s.m)]_| -|_|_AggregateExec: mode=FinalPartitioned, gby=[b@0 as b, a@1 as a], aggr=[sum(s.m)]_| -|_|_AggregateExec: mode=Partial, gby=[b@0 as b, a@1 as a], aggr=[sum(s.m)]_| -|_|_MergeScanExec: REDACTED -|_|_| -| physical_plan after aggregate_statistics_| SAME TEXT AS ABOVE_| -| physical_plan after join_selection_| SAME TEXT AS ABOVE_| -| physical_plan after LimitedDistinctAggregation_| SAME TEXT AS ABOVE_| -| physical_plan after FilterPushdown_| SAME TEXT AS ABOVE_| -| physical_plan after parallelize_scan_| SAME TEXT AS ABOVE_| -| physical_plan after PassDistributionRule_| SAME TEXT AS ABOVE_| -| physical_plan after EnforceSorting_| SAME TEXT AS ABOVE_| -| physical_plan after EnforceDistribution_| OutputRequirementExec: order_by=[], dist_by=Unspecified_| -|_|_ProjectionExec: expr=[a@1 as a, b@0 as b, sum(s.m)@2 as sum(s.m)]_| -|_|_AggregateExec: mode=FinalPartitioned, gby=[b@0 as b, a@1 as a], aggr=[sum(s.m)]_| -|_|_RepartitionExec: partitioning=REDACTED -|_|_AggregateExec: mode=Partial, gby=[b@0 as b, a@1 as a], aggr=[sum(s.m)]_| -|_|_MergeScanExec: REDACTED -|_|_| -| physical_plan after CombinePartialFinalAggregate_| SAME TEXT AS ABOVE_| -| physical_plan after EnforceSorting_| SAME TEXT AS ABOVE_| -| physical_plan after OptimizeAggregateOrder_| SAME TEXT AS ABOVE_| -| physical_plan after ReduceAggregateRepartition_| SAME TEXT AS ABOVE_| -| physical_plan after ProjectionPushdown_| SAME TEXT AS ABOVE_| -| physical_plan after OutputRequirements_| ProjectionExec: expr=[a@1 as a, b@0 as b, sum(s.m)@2 as sum(s.m)]_| -|_|_AggregateExec: mode=FinalPartitioned, gby=[b@0 as b, a@1 as a], aggr=[sum(s.m)]_| -|_|_RepartitionExec: partitioning=REDACTED -|_|_AggregateExec: mode=Partial, gby=[b@0 as b, a@1 as a], aggr=[sum(s.m)]_| -|_|_MergeScanExec: REDACTED -|_|_| -| physical_plan after LimitAggregation_| SAME TEXT AS ABOVE_| -| physical_plan after LimitPushPastWindows_| SAME TEXT AS ABOVE_| -| physical_plan after LimitPushdown_| SAME TEXT AS ABOVE_| -| physical_plan after ProjectionPushdown_| SAME TEXT AS ABOVE_| -| physical_plan after PushdownSort_| SAME TEXT AS ABOVE_| -| physical_plan after EnsureCooperative_| SAME TEXT AS ABOVE_| -| physical_plan after FilterPushdown(Post)_| SAME TEXT AS ABOVE_| -| physical_plan after WindowedSortRule_| SAME TEXT AS ABOVE_| -| physical_plan after MatchesConstantTerm_| SAME TEXT AS ABOVE_| -| physical_plan after RemoveDuplicateRule_| SAME TEXT AS ABOVE_| -| physical_plan after SanityCheckPlan_| SAME TEXT AS ABOVE_| -| physical_plan_| ProjectionExec: expr=[a@1 as a, b@0 as b, sum(s.m)@2 as sum(s.m)]_| -|_|_AggregateExec: mode=FinalPartitioned, gby=[b@0 as b, a@1 as a], aggr=[sum(s.m)]_| -|_|_RepartitionExec: partitioning=REDACTED -|_|_AggregateExec: mode=Partial, gby=[b@0 as b, a@1 as a], aggr=[sum(s.m)]_| -|_|_MergeScanExec: REDACTED -|_|_| -| physical_plan_with_stats_| ProjectionExec: expr=[a@1 as a, b@0 as b, sum(s.m)@2 as sum(s.m)], statistics=[Rows=Absent, Bytes=Absent, [(Col[0]:),(Col[1]:),(Col[2]:)]]_| -|_|_AggregateExec: mode=FinalPartitioned, gby=[b@0 as b, a@1 as a], aggr=[sum(s.m)], statistics=[Rows=Absent, Bytes=Absent, [(Col[0]:),(Col[1]:),(Col[2]:)]]_| -|_|_RepartitionExec: partitioning=REDACTED -|_|_AggregateExec: mode=Partial, gby=[b@0 as b, a@1 as a], aggr=[sum(s.m)], statistics=[Rows=Absent, Bytes=Absent, [(Col[0]:),(Col[1]:),(Col[2]:)]]_| -|_|_MergeScanExec: REDACTED -|_|_| -| physical_plan_with_schema_| ProjectionExec: expr=[a@1 as a, b@0 as b, sum(s.m)@2 as sum(s.m)], schema=[a:Utf8;N, b:Utf8;N, sum(s.m):Int64;N]_| -|_|_AggregateExec: mode=FinalPartitioned, gby=[b@0 as b, a@1 as a], aggr=[sum(s.m)], schema=[b:Utf8;N, a:Utf8;N, sum(s.m):Int64;N]_| -|_|_RepartitionExec: partitioning=REDACTED -|_|_AggregateExec: mode=Partial, gby=[b@0 as b, a@1 as a], aggr=[sum(s.m)], schema=[b:Utf8;N, a:Utf8;N, sum(s.m)[sum]:Int64;N]_| -|_|_MergeScanExec: REDACTED -|_|_| -+-+-+ - -- SQLNESS REPLACE (-+) - -- SQLNESS REPLACE (\s\s+) _ -- SQLNESS REPLACE (Hash.*) REDACTED @@ -246,13 +63,12 @@ GROUP BY b, a; | stage | node | plan_| +-+-+-+ | 0_| 0_|_ProjectionExec: expr=[a@1 as a, b@0 as b, sum(s.m)@2 as sum(s.m)] REDACTED -|_|_|_AggregateExec: mode=FinalPartitioned, gby=[b@0 as b, a@1 as a], aggr=[sum(s.m)] REDACTED -|_|_|_RepartitionExec: partitioning=REDACTED -|_|_|_AggregateExec: mode=Partial, gby=[b@0 as b, a@1 as a], aggr=[sum(s.m)] REDACTED +|_|_|_AggregateExec: mode=SinglePartitioned, gby=[b@0 as b, a@1 as a], aggr=[sum(s.m)] REDACTED +|_|_|_CooperativeExec REDACTED |_|_|_MergeScanExec: REDACTED |_|_|_| | 1_| 0_|_AggregateExec: mode=FinalPartitioned, gby=[b@0 as b, a@1 as a], aggr=[__sum_state(s.m)] REDACTED -|_|_|_CoalescePartitionsExec REDACTED +|_|_|_RepartitionExec: partitioning=REDACTED |_|_|_AggregateExec: mode=Partial, gby=[b@1 as b, a@0 as a], aggr=[__sum_state(s.m)] REDACTED |_|_|_ProjectionExec: expr=[a@0 as a, b@1 as b, min(reduce_aggregate_repartition.val)@3 as m] REDACTED |_|_|_AggregateExec: mode=FinalPartitioned, gby=[a@0 as a, b@1 as b, c@2 as c], aggr=[min(reduce_aggregate_repartition.val)] REDACTED @@ -261,7 +77,7 @@ GROUP BY b, a; |_|_|_SeqScan: region=REDACTED, "partition_count":{"count":1, "mem_ranges":1, "files":0, "file_ranges":0} REDACTED |_|_|_| | 1_| 1_|_AggregateExec: mode=FinalPartitioned, gby=[b@0 as b, a@1 as a], aggr=[__sum_state(s.m)] REDACTED -|_|_|_CoalescePartitionsExec REDACTED +|_|_|_RepartitionExec: partitioning=REDACTED |_|_|_AggregateExec: mode=Partial, gby=[b@1 as b, a@0 as a], aggr=[__sum_state(s.m)] REDACTED |_|_|_ProjectionExec: expr=[a@0 as a, b@1 as b, min(reduce_aggregate_repartition.val)@3 as m] REDACTED |_|_|_AggregateExec: mode=FinalPartitioned, gby=[a@0 as a, b@1 as b, c@2 as c], aggr=[min(reduce_aggregate_repartition.val)] REDACTED @@ -272,8 +88,8 @@ GROUP BY b, a; |_|_| Total rows: 4_| +-+-+-+ --- Another grouped SQL reduction case to keep the rule coverage centered on SQL, --- not just TQL/PromQL planning. +-- The same optimization should work when dropping both b and c from the +-- downstream partitioning requirement. SELECT a, count(m) FROM ( SELECT a, b, c, min(val) AS m @@ -314,7 +130,7 @@ GROUP BY a; |_|_|_MergeScanExec: REDACTED |_|_|_| | 1_| 0_|_AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[__count_state(s.m)] REDACTED -|_|_|_CoalescePartitionsExec REDACTED +|_|_|_RepartitionExec: partitioning=REDACTED |_|_|_AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[__count_state(s.m)] REDACTED |_|_|_ProjectionExec: expr=[a@0 as a, min(reduce_aggregate_repartition.val)@3 as m] REDACTED |_|_|_AggregateExec: mode=FinalPartitioned, gby=[a@0 as a, b@1 as b, c@2 as c], aggr=[min(reduce_aggregate_repartition.val)] REDACTED @@ -323,7 +139,7 @@ GROUP BY a; |_|_|_SeqScan: region=REDACTED, "partition_count":{"count":1, "mem_ranges":1, "files":0, "file_ranges":0} REDACTED |_|_|_| | 1_| 1_|_AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[__count_state(s.m)] REDACTED -|_|_|_CoalescePartitionsExec REDACTED +|_|_|_RepartitionExec: partitioning=REDACTED |_|_|_AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[__count_state(s.m)] REDACTED |_|_|_ProjectionExec: expr=[a@0 as a, min(reduce_aggregate_repartition.val)@3 as m] REDACTED |_|_|_AggregateExec: mode=FinalPartitioned, gby=[a@0 as a, b@1 as b, c@2 as c], aggr=[min(reduce_aggregate_repartition.val)] REDACTED @@ -334,65 +150,6 @@ GROUP BY a; |_|_| Total rows: 2_| +-+-+-+ --- Zero-key reduction should collapse the redundant repartition while keeping --- the outer aggregate in SinglePartitioned mode. -SELECT sum(m) -FROM ( - SELECT a, b, c, min(val) AS m - FROM reduce_aggregate_repartition - GROUP BY a, b, c -) s; - -+----------+ -| sum(s.m) | -+----------+ -| 37 | -+----------+ - --- SQLNESS REPLACE (-+) - --- SQLNESS REPLACE (\s\s+) _ --- SQLNESS REPLACE (Hash.*) REDACTED --- SQLNESS REPLACE (metrics.*) REDACTED --- SQLNESS REPLACE (peers.*) REDACTED --- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED --- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED -EXPLAIN ANALYZE -SELECT sum(m) -FROM ( - SELECT a, b, c, min(val) AS m - FROM reduce_aggregate_repartition - GROUP BY a, b, c -) s; - -+-+-+-+ -| stage | node | plan_| -+-+-+-+ -| 0_| 0_|_AggregateExec: mode=Final, gby=[], aggr=[sum(s.m)] REDACTED -|_|_|_CoalescePartitionsExec REDACTED -|_|_|_AggregateExec: mode=Partial, gby=[], aggr=[sum(s.m)] REDACTED -|_|_|_MergeScanExec: REDACTED -|_|_|_| -| 1_| 0_|_AggregateExec: mode=Final, gby=[], aggr=[__sum_state(s.m)] REDACTED -|_|_|_CoalescePartitionsExec REDACTED -|_|_|_AggregateExec: mode=Partial, gby=[], aggr=[__sum_state(s.m)] REDACTED -|_|_|_ProjectionExec: expr=[min(reduce_aggregate_repartition.val)@3 as m] REDACTED -|_|_|_AggregateExec: mode=FinalPartitioned, gby=[a@0 as a, b@1 as b, c@2 as c], aggr=[min(reduce_aggregate_repartition.val)] REDACTED -|_|_|_RepartitionExec: partitioning=REDACTED -|_|_|_AggregateExec: mode=Partial, gby=[a@0 as a, b@1 as b, c@2 as c], aggr=[min(reduce_aggregate_repartition.val)] REDACTED -|_|_|_SeqScan: region=REDACTED, "partition_count":{"count":1, "mem_ranges":1, "files":0, "file_ranges":0} REDACTED -|_|_|_| -| 1_| 1_|_AggregateExec: mode=Final, gby=[], aggr=[__sum_state(s.m)] REDACTED -|_|_|_CoalescePartitionsExec REDACTED -|_|_|_AggregateExec: mode=Partial, gby=[], aggr=[__sum_state(s.m)] REDACTED -|_|_|_ProjectionExec: expr=[min(reduce_aggregate_repartition.val)@3 as m] REDACTED -|_|_|_AggregateExec: mode=FinalPartitioned, gby=[a@0 as a, b@1 as b, c@2 as c], aggr=[min(reduce_aggregate_repartition.val)] REDACTED -|_|_|_RepartitionExec: partitioning=REDACTED -|_|_|_AggregateExec: mode=Partial, gby=[a@0 as a, b@1 as b, c@2 as c], aggr=[min(reduce_aggregate_repartition.val)] REDACTED -|_|_|_SeqScan: region=REDACTED, "partition_count":{"count":1, "mem_ranges":1, "files":0, "file_ranges":0} REDACTED -|_|_|_| -|_|_| Total rows: 1_| -+-+-+-+ - DROP TABLE reduce_aggregate_repartition; Affected Rows: 0 @@ -418,8 +175,8 @@ INSERT INTO reduce_aggregate_repartition_non_subset VALUES Affected Rows: 4 --- Changing the group key from `a` to `b` is not a reduction, so the --- repartition must remain in place. +-- Changing the group key from a to b is not a coarsening of the existing +-- partition columns, so the repartition must remain. SELECT b, sum(val) FROM reduce_aggregate_repartition_non_subset GROUP BY b @@ -469,99 +226,3 @@ DROP TABLE reduce_aggregate_repartition_non_subset; Affected Rows: 0 -CREATE TABLE reduce_aggregate_repartition_metric ( - ts TIMESTAMP(3) TIME INDEX, - job STRING, - instance STRING, - greptime_value DOUBLE, - PRIMARY KEY(job, instance), -); - -Affected Rows: 0 - -INSERT INTO reduce_aggregate_repartition_metric VALUES - (0, 'job1', 'instance1', 1), - (0, 'job1', 'instance2', 2), - (0, 'job2', 'instance1', 3), - (5000, 'job1', 'instance1', 4), - (5000, 'job1', 'instance2', 5), - (5000, 'job2', 'instance1', 6), - (10000, 'job1', 'instance1', 7), - (10000, 'job1', 'instance2', 8), - (10000, 'job2', 'instance1', 9); - -Affected Rows: 9 - --- SQLNESS REPLACE (metrics.*) REDACTED --- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED --- SQLNESS REPLACE (-+) - --- SQLNESS REPLACE (\s\s+) _ --- SQLNESS REPLACE (peers.*) REDACTED --- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED --- SQLNESS REPLACE (Hash.*) REDACTED -TQL ANALYZE (0, 10, '5s') -count(count(reduce_aggregate_repartition_metric) by (job)); - -+-+-+-+ -| stage | node | plan_| -+-+-+-+ -| 0_| 0_|_CooperativeExec REDACTED -|_|_|_MergeScanExec: REDACTED -|_|_|_| -| 1_| 0_|_SortPreservingMergeExec: [ts@0 ASC NULLS LAST] REDACTED -|_|_|_SortExec: expr=[ts@0 ASC NULLS LAST], preserve_partitioning=[true] REDACTED -|_|_|_AggregateExec: mode=FinalPartitioned, gby=[ts@0 as ts], aggr=[count(count(reduce_aggregate_repartition_metric.greptime_value))] REDACTED -|_|_|_CoalescePartitionsExec REDACTED -|_|_|_AggregateExec: mode=Partial, gby=[ts@0 as ts], aggr=[count(count(reduce_aggregate_repartition_metric.greptime_value))] REDACTED -|_|_|_ProjectionExec: expr=[ts@0 as ts] REDACTED -|_|_|_AggregateExec: mode=FinalPartitioned, gby=[ts@0 as ts, job@1 as job], aggr=[], ordering_mode=PartiallySorted([1]) REDACTED -|_|_|_SortExec: expr=[job@1 ASC], preserve_partitioning=[true] REDACTED -|_|_|_RepartitionExec: partitioning=REDACTED -|_|_|_AggregateExec: mode=Partial, gby=[ts@0 as ts, job@1 as job], aggr=[], ordering_mode=PartiallySorted([1]) REDACTED -|_|_|_ProjectionExec: expr=[ts@0 as ts, job@1 as job] REDACTED -|_|_|_PromInstantManipulateExec: range=[0..10000], lookback=[300000], interval=[5000], time index=[ts] REDACTED -|_|_|_PromSeriesDivideExec: tags=["job", "instance"] REDACTED -|_|_|_SeriesScan: region=REDACTED, "partition_count":{"count":1, "mem_ranges":1, "files":0, "file_ranges":0}, "distribution":"PerSeries" REDACTED -|_|_|_| -|_|_| Total rows: 3_| -+-+-+-+ - --- SQLNESS REPLACE (metrics.*) REDACTED --- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED --- SQLNESS REPLACE (-+) - --- SQLNESS REPLACE (\s\s+) _ --- SQLNESS REPLACE (peers.*) REDACTED --- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED --- SQLNESS REPLACE (Hash.*) REDACTED -TQL ANALYZE (0, 10, '5s') -count(count(rate(reduce_aggregate_repartition_metric[5s])) by (job)); - -+-+-+-+ -| stage | node | plan_| -+-+-+-+ -| 0_| 0_|_CooperativeExec REDACTED -|_|_|_MergeScanExec: REDACTED -|_|_|_| -| 1_| 0_|_SortPreservingMergeExec: [ts@0 ASC NULLS LAST] REDACTED -|_|_|_SortExec: expr=[ts@0 ASC NULLS LAST], preserve_partitioning=[true] REDACTED -|_|_|_AggregateExec: mode=FinalPartitioned, gby=[ts@0 as ts], aggr=[count(count(prom_rate(ts_range,greptime_value,ts,Int64(5000))))] REDACTED -|_|_|_CoalescePartitionsExec REDACTED -|_|_|_AggregateExec: mode=Partial, gby=[ts@0 as ts], aggr=[count(count(prom_rate(ts_range,greptime_value,ts,Int64(5000))))] REDACTED -|_|_|_ProjectionExec: expr=[ts@1 as ts, count(prom_rate(ts_range,greptime_value,ts,Int64(5000)))@2 as count(prom_rate(ts_range,greptime_value,ts,Int64(5000)))] REDACTED -|_|_|_AggregateExec: mode=FinalPartitioned, gby=[job@0 as job, ts@1 as ts], aggr=[count(prom_rate(ts_range,greptime_value,ts,Int64(5000)))] REDACTED -|_|_|_RepartitionExec: partitioning=REDACTED -|_|_|_AggregateExec: mode=Partial, gby=[job@2 as job, ts@0 as ts], aggr=[count(prom_rate(ts_range,greptime_value,ts,Int64(5000)))] REDACTED -|_|_|_FilterExec: prom_rate(ts_range,greptime_value,ts,Int64(5000))@1 IS NOT NULL REDACTED -|_|_|_ProjectionExec: expr=[ts@0 as ts, prom_rate(ts_range@4, greptime_value@3, ts@0, 5000) as prom_rate(ts_range,greptime_value,ts,Int64(5000)), job@1 as job] REDACTED -|_|_|_PromRangeManipulateExec: req range=[0..10000], interval=[5000], eval range=[5000], time index=[ts] REDACTED -|_|_|_PromSeriesNormalizeExec: offset=[0], time index=[ts], filter NaN: [true] REDACTED -|_|_|_PromSeriesDivideExec: tags=["job", "instance"] REDACTED -|_|_|_SeriesScan: region=REDACTED, "partition_count":{"count":1, "mem_ranges":1, "files":0, "file_ranges":0}, "distribution":"PerSeries" REDACTED -|_|_|_| -|_|_| Total rows: 0_| -+-+-+-+ - -DROP TABLE reduce_aggregate_repartition_metric; - -Affected Rows: 0 - diff --git a/tests/cases/standalone/optimizer/reduce_aggregate_repartition.sql b/tests/cases/standalone/optimizer/reduce_aggregate_repartition.sql index 969143c4cc..82e3a4929e 100644 --- a/tests/cases/standalone/optimizer/reduce_aggregate_repartition.sql +++ b/tests/cases/standalone/optimizer/reduce_aggregate_repartition.sql @@ -18,8 +18,9 @@ INSERT INTO reduce_aggregate_repartition VALUES ('n', 'x', 'v', 5000, 8), ('n', 'y', 'u', 6000, 3); --- Keep the outer aggregate on `sum` so this case isolates --- ReduceAggregateRepartition rather than the min/max combiner rule. +-- The source is already partitioned by (a, b, c), so the planner can satisfy +-- the outer aggregate's coarser (b, a) requirement at the MergeScan without a +-- repartition between the outer partial/final aggregate pair. SELECT a, b, sum(m) FROM ( SELECT a, b, c, min(val) AS m @@ -29,19 +30,6 @@ FROM ( GROUP BY b, a ORDER BY a, b; --- SQLNESS REPLACE (-+) - --- SQLNESS REPLACE (\s\s+) _ --- SQLNESS REPLACE (Hash.*) REDACTED --- SQLNESS REPLACE (peers.*) REDACTED -EXPLAIN VERBOSE -SELECT a, b, sum(m) -FROM ( - SELECT a, b, c, min(val) AS m - FROM reduce_aggregate_repartition - GROUP BY a, b, c -) s -GROUP BY b, a; - -- SQLNESS REPLACE (-+) - -- SQLNESS REPLACE (\s\s+) _ -- SQLNESS REPLACE (Hash.*) REDACTED @@ -58,8 +46,8 @@ FROM ( ) s GROUP BY b, a; --- Another grouped SQL reduction case to keep the rule coverage centered on SQL, --- not just TQL/PromQL planning. +-- The same optimization should work when dropping both b and c from the +-- downstream partitioning requirement. SELECT a, count(m) FROM ( SELECT a, b, c, min(val) AS m @@ -85,30 +73,6 @@ FROM ( ) s GROUP BY a; --- Zero-key reduction should collapse the redundant repartition while keeping --- the outer aggregate in SinglePartitioned mode. -SELECT sum(m) -FROM ( - SELECT a, b, c, min(val) AS m - FROM reduce_aggregate_repartition - GROUP BY a, b, c -) s; - --- SQLNESS REPLACE (-+) - --- SQLNESS REPLACE (\s\s+) _ --- SQLNESS REPLACE (Hash.*) REDACTED --- SQLNESS REPLACE (metrics.*) REDACTED --- SQLNESS REPLACE (peers.*) REDACTED --- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED --- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED -EXPLAIN ANALYZE -SELECT sum(m) -FROM ( - SELECT a, b, c, min(val) AS m - FROM reduce_aggregate_repartition - GROUP BY a, b, c -) s; - DROP TABLE reduce_aggregate_repartition; CREATE TABLE reduce_aggregate_repartition_non_subset ( @@ -128,8 +92,8 @@ INSERT INTO reduce_aggregate_repartition_non_subset VALUES ('a', 'y', 3000, 7), ('n', 'y', 4000, 3); --- Changing the group key from `a` to `b` is not a reduction, so the --- repartition must remain in place. +-- Changing the group key from a to b is not a coarsening of the existing +-- partition columns, so the repartition must remain. SELECT b, sum(val) FROM reduce_aggregate_repartition_non_subset GROUP BY b @@ -148,44 +112,3 @@ FROM reduce_aggregate_repartition_non_subset GROUP BY b; DROP TABLE reduce_aggregate_repartition_non_subset; - -CREATE TABLE reduce_aggregate_repartition_metric ( - ts TIMESTAMP(3) TIME INDEX, - job STRING, - instance STRING, - greptime_value DOUBLE, - PRIMARY KEY(job, instance), -); - -INSERT INTO reduce_aggregate_repartition_metric VALUES - (0, 'job1', 'instance1', 1), - (0, 'job1', 'instance2', 2), - (0, 'job2', 'instance1', 3), - (5000, 'job1', 'instance1', 4), - (5000, 'job1', 'instance2', 5), - (5000, 'job2', 'instance1', 6), - (10000, 'job1', 'instance1', 7), - (10000, 'job1', 'instance2', 8), - (10000, 'job2', 'instance1', 9); - --- SQLNESS REPLACE (metrics.*) REDACTED --- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED --- SQLNESS REPLACE (-+) - --- SQLNESS REPLACE (\s\s+) _ --- SQLNESS REPLACE (peers.*) REDACTED --- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED --- SQLNESS REPLACE (Hash.*) REDACTED -TQL ANALYZE (0, 10, '5s') -count(count(reduce_aggregate_repartition_metric) by (job)); - --- SQLNESS REPLACE (metrics.*) REDACTED --- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED --- SQLNESS REPLACE (-+) - --- SQLNESS REPLACE (\s\s+) _ --- SQLNESS REPLACE (peers.*) REDACTED --- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED --- SQLNESS REPLACE (Hash.*) REDACTED -TQL ANALYZE (0, 10, '5s') -count(count(rate(reduce_aggregate_repartition_metric[5s])) by (job)); - -DROP TABLE reduce_aggregate_repartition_metric; diff --git a/tests/cases/standalone/tql-explain-analyze/tsid_column.result b/tests/cases/standalone/tql-explain-analyze/tsid_column.result index e0c10e9989..4a7a875060 100644 --- a/tests/cases/standalone/tql-explain-analyze/tsid_column.result +++ b/tests/cases/standalone/tql-explain-analyze/tsid_column.result @@ -110,7 +110,7 @@ TQL ANALYZE (0, 10, '5s') sum(irate(tsid_metric[1h])) / scalar(count(count(tsid | 1_| 0_|_SortPreservingMergeExec: [ts@0 ASC NULLS LAST] REDACTED |_|_|_SortExec: expr=[ts@0 ASC NULLS LAST], preserve_partitioning=[true] REDACTED |_|_|_AggregateExec: mode=FinalPartitioned, gby=[ts@0 as ts], aggr=[count(count(tsid_metric.val))] REDACTED -|_|_|_CoalescePartitionsExec REDACTED +|_|_|_RepartitionExec: partitioning=REDACTED |_|_|_AggregateExec: mode=Partial, gby=[ts@0 as ts], aggr=[count(count(tsid_metric.val))] REDACTED |_|_|_ProjectionExec: expr=[ts@0 as ts] REDACTED |_|_|_AggregateExec: mode=FinalPartitioned, gby=[ts@0 as ts, job@1 as job], aggr=[] REDACTED @@ -161,7 +161,7 @@ TQL ANALYZE (0, 10, '5s') sum(irate(tsid_metric[1h])) / scalar(count(sum(tsid_m | 1_| 0_|_SortPreservingMergeExec: [ts@0 ASC NULLS LAST] REDACTED |_|_|_SortExec: expr=[ts@0 ASC NULLS LAST], preserve_partitioning=[true] REDACTED |_|_|_AggregateExec: mode=FinalPartitioned, gby=[ts@0 as ts], aggr=[count(sum(tsid_metric.val))] REDACTED -|_|_|_CoalescePartitionsExec REDACTED +|_|_|_RepartitionExec: partitioning=REDACTED |_|_|_AggregateExec: mode=Partial, gby=[ts@0 as ts], aggr=[count(sum(tsid_metric.val))] REDACTED |_|_|_ProjectionExec: expr=[ts@0 as ts] REDACTED |_|_|_AggregateExec: mode=FinalPartitioned, gby=[ts@0 as ts, job@1 as job], aggr=[] REDACTED