diff --git a/src/query/src/optimizer/reduce_aggregate_repartition.rs b/src/query/src/optimizer/reduce_aggregate_repartition.rs index 8d3473263f..50ca1cca7f 100644 --- a/src/query/src/optimizer/reduce_aggregate_repartition.rs +++ b/src/query/src/optimizer/reduce_aggregate_repartition.rs @@ -22,14 +22,14 @@ use datafusion::physical_plan::repartition::RepartitionExec; use datafusion::physical_plan::{ExecutionPlan, ExecutionPlanProperties, InputOrderMode}; use datafusion_common::Result as DfResult; use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; -use datafusion_physical_expr::{Partitioning, physical_exprs_equal}; +use datafusion_physical_expr::{Distribution, Partitioning}; /// Replaces a redundant hash repartition before a coarser aggregate with a /// single fan-in. /// -/// This only applies when the aggregate already receives explicit hash -/// partitioning on its final grouping keys and the repartition input is already -/// hash partitioned on a strict superset of those keys. +/// This only applies when the aggregate already receives hash partitioning +/// satisfying its grouping keys and the repartition input is already hash +/// partitioned on a strict superset of those repartition keys. #[derive(Debug)] pub struct ReduceAggregateRepartition; @@ -80,21 +80,35 @@ impl ReduceAggregateRepartition { return Ok(Transformed::no(plan)); } - let Partitioning::Hash(final_partition_exprs, _) = repartition_exec.partitioning() - else { - return Ok(Transformed::no(plan)); - }; - let Partitioning::Hash(finer_partition_exprs, _) = repartition_exec.input().output_partitioning() else { return Ok(Transformed::no(plan)); }; - let group_exprs = agg_exec.group_expr().input_exprs(); - if !physical_exprs_equal(group_exprs.as_slice(), final_partition_exprs.as_slice()) - || !is_strict_subset(final_partition_exprs, finer_partition_exprs) - { + let Some(required_distribution) = + agg_exec.required_input_distribution().into_iter().next() + else { + return Ok(Transformed::no(plan)); + }; + let repartition_satisfaction = repartition_exec.partitioning().satisfaction( + &required_distribution, + repartition_exec.properties().equivalence_properties(), + true, + ); + if !repartition_satisfaction.is_satisfied() { + return Ok(Transformed::no(plan)); + } + + let coarsening_satisfaction = repartition_exec.partitioning().satisfaction( + &Distribution::HashPartitioned(finer_partition_exprs.clone()), + repartition_exec + .input() + .properties() + .equivalence_properties(), + true, + ); + if !coarsening_satisfaction.is_subset() { return Ok(Transformed::no(plan)); } @@ -117,19 +131,6 @@ impl ReduceAggregateRepartition { } } -fn is_strict_subset( - subset_exprs: &[Arc], - superset_exprs: &[Arc], -) -> bool { - if subset_exprs.is_empty() || subset_exprs.len() >= superset_exprs.len() { - return false; - } - - subset_exprs - .iter() - .all(|subset_expr| superset_exprs.iter().any(|expr| subset_expr.eq(expr))) -} - #[cfg(test)] mod tests { use std::sync::Arc; @@ -139,6 +140,7 @@ mod tests { use datafusion::datasource::source::DataSourceExec; use datafusion::physical_optimizer::PhysicalOptimizerRule; use datafusion::physical_plan::aggregates::{AggregateExec, AggregateMode, PhysicalGroupBy}; + use datafusion::physical_plan::projection::ProjectionExec; use datafusion::physical_plan::repartition::RepartitionExec; use datafusion::physical_plan::{ExecutionPlan, displayable}; use datafusion_common::Result; @@ -185,6 +187,13 @@ mod tests { )?)) } + fn round_robin_repartition(input: Arc) -> Result> { + Ok(Arc::new(RepartitionExec::try_new( + input, + Partitioning::RoundRobinBatch(8), + )?)) + } + fn aggregate( mode: AggregateMode, input: Arc, @@ -208,6 +217,17 @@ mod tests { Ok(displayable(optimized.as_ref()).indent(true).to_string()) } + fn project_with_aliases( + input: Arc, + aliases: &[(&str, &str)], + ) -> Result> { + let exprs: Result, String)>> = aliases + .iter() + .map(|(from, to)| Ok((col(from, &input.schema())?, (*to).to_string()))) + .collect(); + Ok(Arc::new(ProjectionExec::try_new(exprs?, input)?)) + } + #[test] fn rewrites_final_partitioned_subset_repartition() -> Result<()> { let raw = input_exec(schema()); @@ -292,4 +312,140 @@ mod tests { ); Ok(()) } + + #[test] + fn rewrites_when_finer_key_order_differs() -> Result<()> { + let raw = input_exec(schema()); + let finer = repartition(raw.clone(), &["c", "a", "b"], &raw.schema())?; + let partial = aggregate( + AggregateMode::Partial, + finer, + group_by(&["c", "a", "b"], &raw.schema())?, + raw.schema(), + vec![], + )?; + let final_repartition = repartition(partial.clone(), &["b", "c"], &partial.schema())?; + let final_agg = aggregate( + AggregateMode::FinalPartitioned, + final_repartition, + group_by(&["b", "c"], &partial.schema())?, + raw.schema(), + vec![], + )?; + + assert_eq!( + optimize(final_agg)?.trim(), + r#"AggregateExec: mode=Final, gby=[b@2 as b, c@0 as c], aggr=[] + CoalescePartitionsExec + AggregateExec: mode=Partial, gby=[c@2 as c, a@0 as a, b@1 as b], aggr=[] + RepartitionExec: partitioning=Hash([c@2, a@0, b@1], 8), input_partitions=1 + DataSourceExec: partitions=1, partition_sizes=[0]"# + ); + Ok(()) + } + + #[test] + fn rewrites_when_repartition_satisfies_group_by_with_subset_keys() -> Result<()> { + let raw = input_exec(schema()); + let finer = repartition(raw.clone(), &["a", "b", "c"], &raw.schema())?; + let final_repartition = repartition(finer.clone(), &["a"], &finer.schema())?; + let final_agg = aggregate( + AggregateMode::FinalPartitioned, + final_repartition, + group_by(&["a", "b"], &finer.schema())?, + raw.schema(), + vec![], + )?; + + assert_eq!( + optimize(final_agg)?.trim(), + r#"AggregateExec: mode=Final, gby=[a@0 as a, b@1 as b], aggr=[] + CoalescePartitionsExec + RepartitionExec: partitioning=Hash([a@0, b@1, c@2], 8), input_partitions=1 + DataSourceExec: partitions=1, partition_sizes=[0]"# + ); + Ok(()) + } + + #[test] + fn keeps_non_hash_repartition_child() -> Result<()> { + let raw = input_exec(schema()); + let finer = repartition(raw.clone(), &["a", "b"], &raw.schema())?; + let partial = aggregate( + AggregateMode::Partial, + finer, + group_by(&["a", "b"], &raw.schema())?, + raw.schema(), + vec![], + )?; + let final_repartition = round_robin_repartition(partial.clone())?; + let final_agg = aggregate( + AggregateMode::FinalPartitioned, + final_repartition, + group_by(&["a"], &partial.schema())?, + raw.schema(), + vec![], + )?; + + assert_eq!( + optimize(final_agg)?.trim(), + r#"AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[] + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=8 + AggregateExec: mode=Partial, gby=[a@0 as a, b@1 as b], aggr=[] + RepartitionExec: partitioning=Hash([a@0, b@1], 8), input_partitions=1 + DataSourceExec: partitions=1, partition_sizes=[0]"# + ); + Ok(()) + } + + #[test] + fn rewrites_subset_partitioning_through_projection() -> Result<()> { + let raw = input_exec(schema()); + let finer = repartition(raw.clone(), &["a", "b", "c"], &raw.schema())?; + let projected = project_with_aliases(finer, &[("a", "x"), ("b", "y"), ("c", "z")])?; + let final_repartition = repartition(projected.clone(), &["x", "y"], &projected.schema())?; + let final_agg = aggregate( + AggregateMode::SinglePartitioned, + final_repartition, + group_by(&["x", "y"], &projected.schema())?, + projected.schema(), + vec![], + )?; + + let optimized = optimize(final_agg)?; + assert!( + optimized.contains("AggregateExec: mode=Single, gby=[x@0 as x, y@1 as y], aggr=[]") + ); + assert!(optimized.contains("CoalescePartitionsExec")); + assert!(optimized.contains("ProjectionExec: expr=[a@0 as x, b@1 as y, c@2 as z]")); + Ok(()) + } + + #[test] + fn keeps_non_subset_repartition() -> Result<()> { + let raw = input_exec(schema()); + let coarser = repartition(raw.clone(), &["a"], &raw.schema())?; + let final_repartition = repartition(coarser.clone(), &["a", "b"], &coarser.schema())?; + let final_agg = aggregate( + AggregateMode::FinalPartitioned, + final_repartition, + group_by(&["a", "b"], &coarser.schema())?, + raw.schema(), + vec![], + )?; + + let optimized = optimize(final_agg)?; + assert!( + optimized.contains( + "AggregateExec: mode=FinalPartitioned, gby=[a@0 as a, b@1 as b], aggr=[]" + ), + "{optimized}" + ); + assert!( + optimized.contains("RepartitionExec: partitioning=Hash([a@0, b@1], 8)"), + "{optimized}" + ); + assert!(!optimized.contains("CoalescePartitionsExec"), "{optimized}"); + Ok(()) + } } diff --git a/tests/cases/standalone/optimizer/reduce_aggregate_repartition.result b/tests/cases/standalone/optimizer/reduce_aggregate_repartition.result new file mode 100644 index 0000000000..920b03d78b --- /dev/null +++ b/tests/cases/standalone/optimizer/reduce_aggregate_repartition.result @@ -0,0 +1,405 @@ +CREATE TABLE reduce_aggregate_repartition ( + a STRING NULL, + b STRING NULL, + c STRING NULL, + ts TIMESTAMP TIME INDEX, + val BIGINT NULL, + PRIMARY KEY (a, b, c) +) PARTITION ON COLUMNS (a, b, c) ( + a < 'm', + a >= 'm' +); + +Affected Rows: 0 + +INSERT INTO reduce_aggregate_repartition VALUES + ('a', 'x', 'u', 1000, 10), + ('a', 'x', 'v', 2000, 5), + ('a', 'y', 'u', 3000, 7), + ('n', 'x', 'u', 4000, 4), + ('n', 'x', 'v', 5000, 8), + ('n', 'y', 'u', 6000, 3); + +Affected Rows: 6 + +-- Keep the outer aggregate on `sum` so this case isolates +-- ReduceAggregateRepartition rather than the min/max combiner rule. +SELECT a, b, sum(m) +FROM ( + SELECT a, b, c, min(val) AS m + FROM reduce_aggregate_repartition + GROUP BY a, b, c +) s +GROUP BY b, a +ORDER BY a, b; + ++---+---+----------+ +| a | b | sum(s.m) | ++---+---+----------+ +| a | x | 15 | +| a | y | 7 | +| n | x | 12 | +| n | y | 3 | ++---+---+----------+ + +-- SQLNESS REPLACE (-+) - +-- SQLNESS REPLACE (\s\s+) _ +EXPLAIN VERBOSE +SELECT a, b, sum(m) +FROM ( + SELECT a, b, c, min(val) AS m + FROM reduce_aggregate_repartition + GROUP BY a, b, c +) s +GROUP BY b, a; + ++-+-+ +| plan_type_| plan_| ++-+-+ +| initial_logical_plan_| Projection: s.a, s.b, sum(s.m)_| +|_|_Aggregate: groupBy=[[s.b, s.a]], aggr=[[sum(s.m)]]_| +|_|_SubqueryAlias: s_| +|_|_Projection: reduce_aggregate_repartition.a, reduce_aggregate_repartition.b, reduce_aggregate_repartition.c, min(reduce_aggregate_repartition.val) AS m_| +|_|_Aggregate: groupBy=[[reduce_aggregate_repartition.a, reduce_aggregate_repartition.b, reduce_aggregate_repartition.c]], aggr=[[min(reduce_aggregate_repartition.val)]] | +|_|_TableScan: reduce_aggregate_repartition_| +| logical_plan after apply_function_rewrites_| SAME TEXT AS ABOVE_| +| logical_plan after count_wildcard_to_time_index_rule_| SAME TEXT AS ABOVE_| +| logical_plan after StringNormalizationRule_| SAME TEXT AS ABOVE_| +| logical_plan after TranscribeAtatRule_| SAME TEXT AS ABOVE_| +| logical_plan after resolve_grouping_function_| SAME TEXT AS ABOVE_| +| logical_plan after type_coercion_| SAME TEXT AS ABOVE_| +| logical_plan after DistPlannerAnalyzer_| Projection: s.a, s.b, sum(s.m)_| +|_|_Projection: s.b, s.a, sum(s.m)_| +|_|_Aggregate: groupBy=[[s.b, s.a]], aggr=[[__sum_merge(__sum_state(s.m)) AS sum(s.m)]]_| +|_|_MergeScan [is_placeholder=false, remote_input=[_| +|_| Aggregate: groupBy=[[s.b, s.a]], aggr=[[__sum_state(s.m)]]_| +|_|_SubqueryAlias: s_| +|_|_Projection: reduce_aggregate_repartition.a, reduce_aggregate_repartition.b, reduce_aggregate_repartition.c, min(reduce_aggregate_repartition.val) AS m_| +|_|_Aggregate: groupBy=[[reduce_aggregate_repartition.a, reduce_aggregate_repartition.b, reduce_aggregate_repartition.c]], aggr=[[min(reduce_aggregate_repartition.val)]]_| +|_|_TableScan: reduce_aggregate_repartition_| +|_| ]]_| +| logical_plan after FixStateUdafOrderingAnalyzer_| SAME TEXT AS ABOVE_| +| analyzed_logical_plan_| SAME TEXT AS ABOVE_| +| logical_plan after rewrite_set_comparison_| SAME TEXT AS ABOVE_| +| logical_plan after optimize_unions_| SAME TEXT AS ABOVE_| +| logical_plan after simplify_expressions_| SAME TEXT AS ABOVE_| +| logical_plan after replace_distinct_aggregate_| SAME TEXT AS ABOVE_| +| logical_plan after eliminate_join_| SAME TEXT AS ABOVE_| +| logical_plan after decorrelate_predicate_subquery_| SAME TEXT AS ABOVE_| +| logical_plan after scalar_subquery_to_join_| SAME TEXT AS ABOVE_| +| logical_plan after decorrelate_lateral_join_| SAME TEXT AS ABOVE_| +| logical_plan after extract_equijoin_predicate_| SAME TEXT AS ABOVE_| +| logical_plan after eliminate_duplicated_expr_| SAME TEXT AS ABOVE_| +| logical_plan after eliminate_filter_| SAME TEXT AS ABOVE_| +| logical_plan after eliminate_cross_join_| SAME TEXT AS ABOVE_| +| logical_plan after eliminate_limit_| SAME TEXT AS ABOVE_| +| logical_plan after propagate_empty_relation_| SAME TEXT AS ABOVE_| +| logical_plan after filter_null_join_keys_| SAME TEXT AS ABOVE_| +| logical_plan after eliminate_outer_join_| SAME TEXT AS ABOVE_| +| logical_plan after push_down_limit_| SAME TEXT AS ABOVE_| +| logical_plan after push_down_filter_| SAME TEXT AS ABOVE_| +| logical_plan after single_distinct_aggregation_to_group_by | SAME TEXT AS ABOVE_| +| logical_plan after eliminate_group_by_constant_| SAME TEXT AS ABOVE_| +| logical_plan after common_sub_expression_eliminate_| SAME TEXT AS ABOVE_| +| logical_plan after extract_leaf_expressions_| SAME TEXT AS ABOVE_| +| logical_plan after push_down_leaf_projections_| SAME TEXT AS ABOVE_| +| logical_plan after optimize_projections_| Projection: s.a, s.b, sum(s.m)_| +|_|_Aggregate: groupBy=[[s.b, s.a]], aggr=[[__sum_merge(__sum_state(s.m)) AS sum(s.m)]]_| +|_|_MergeScan [is_placeholder=false, remote_input=[_| +|_| Aggregate: groupBy=[[s.b, s.a]], aggr=[[__sum_state(s.m)]]_| +|_|_SubqueryAlias: s_| +|_|_Projection: reduce_aggregate_repartition.a, reduce_aggregate_repartition.b, reduce_aggregate_repartition.c, min(reduce_aggregate_repartition.val) AS m_| +|_|_Aggregate: groupBy=[[reduce_aggregate_repartition.a, reduce_aggregate_repartition.b, reduce_aggregate_repartition.c]], aggr=[[min(reduce_aggregate_repartition.val)]]_| +|_|_TableScan: reduce_aggregate_repartition_| +|_| ]]_| +| logical_plan after ScanHintRule_| SAME TEXT AS ABOVE_| +| logical_plan after rewrite_set_comparison_| SAME TEXT AS ABOVE_| +| logical_plan after optimize_unions_| SAME TEXT AS ABOVE_| +| logical_plan after simplify_expressions_| SAME TEXT AS ABOVE_| +| logical_plan after replace_distinct_aggregate_| SAME TEXT AS ABOVE_| +| logical_plan after eliminate_join_| SAME TEXT AS ABOVE_| +| logical_plan after decorrelate_predicate_subquery_| SAME TEXT AS ABOVE_| +| logical_plan after scalar_subquery_to_join_| SAME TEXT AS ABOVE_| +| logical_plan after decorrelate_lateral_join_| SAME TEXT AS ABOVE_| +| logical_plan after extract_equijoin_predicate_| SAME TEXT AS ABOVE_| +| logical_plan after eliminate_duplicated_expr_| SAME TEXT AS ABOVE_| +| logical_plan after eliminate_filter_| SAME TEXT AS ABOVE_| +| logical_plan after eliminate_cross_join_| SAME TEXT AS ABOVE_| +| logical_plan after eliminate_limit_| SAME TEXT AS ABOVE_| +| logical_plan after propagate_empty_relation_| SAME TEXT AS ABOVE_| +| logical_plan after filter_null_join_keys_| SAME TEXT AS ABOVE_| +| logical_plan after eliminate_outer_join_| SAME TEXT AS ABOVE_| +| logical_plan after push_down_limit_| SAME TEXT AS ABOVE_| +| logical_plan after push_down_filter_| SAME TEXT AS ABOVE_| +| logical_plan after single_distinct_aggregation_to_group_by | SAME TEXT AS ABOVE_| +| logical_plan after eliminate_group_by_constant_| SAME TEXT AS ABOVE_| +| logical_plan after common_sub_expression_eliminate_| SAME TEXT AS ABOVE_| +| logical_plan after extract_leaf_expressions_| SAME TEXT AS ABOVE_| +| logical_plan after push_down_leaf_projections_| SAME TEXT AS ABOVE_| +| logical_plan after optimize_projections_| SAME TEXT AS ABOVE_| +| logical_plan after ScanHintRule_| SAME TEXT AS ABOVE_| +| logical_plan_| Projection: s.a, s.b, sum(s.m)_| +|_|_Aggregate: groupBy=[[s.b, s.a]], aggr=[[__sum_merge(__sum_state(s.m)) AS sum(s.m)]]_| +|_|_MergeScan [is_placeholder=false, remote_input=[_| +|_| Aggregate: groupBy=[[s.b, s.a]], aggr=[[__sum_state(s.m)]]_| +|_|_SubqueryAlias: s_| +|_|_Projection: reduce_aggregate_repartition.a, reduce_aggregate_repartition.b, reduce_aggregate_repartition.c, min(reduce_aggregate_repartition.val) AS m_| +|_|_Aggregate: groupBy=[[reduce_aggregate_repartition.a, reduce_aggregate_repartition.b, reduce_aggregate_repartition.c]], aggr=[[min(reduce_aggregate_repartition.val)]]_| +|_|_TableScan: reduce_aggregate_repartition_| +|_| ]]_| +| initial_physical_plan_| ProjectionExec: expr=[a@1 as a, b@0 as b, sum(s.m)@2 as sum(s.m)]_| +|_|_AggregateExec: mode=FinalPartitioned, gby=[b@0 as b, a@1 as a], aggr=[sum(s.m)]_| +|_|_AggregateExec: mode=Partial, gby=[b@0 as b, a@1 as a], aggr=[sum(s.m)]_| +|_|_MergeScanExec: peers=[4398046511104(1024, 0), 4398046511105(1024, 1), ]_| +|_|_| +| initial_physical_plan_with_stats_| ProjectionExec: expr=[a@1 as a, b@0 as b, sum(s.m)@2 as sum(s.m)], statistics=[Rows=Absent, Bytes=Absent, [(Col[0]:),(Col[1]:),(Col[2]:)]]_| +|_|_AggregateExec: mode=FinalPartitioned, gby=[b@0 as b, a@1 as a], aggr=[sum(s.m)], statistics=[Rows=Absent, Bytes=Absent, [(Col[0]:),(Col[1]:),(Col[2]:)]]_| +|_|_AggregateExec: mode=Partial, gby=[b@0 as b, a@1 as a], aggr=[sum(s.m)], statistics=[Rows=Absent, Bytes=Absent, [(Col[0]:),(Col[1]:),(Col[2]:)]]_| +|_|_MergeScanExec: peers=[4398046511104(1024, 0), 4398046511105(1024, 1), ], statistics=[Rows=Absent, Bytes=Absent, [(Col[0]:),(Col[1]:),(Col[2]:)]]_| +|_|_| +| initial_physical_plan_with_schema_| ProjectionExec: expr=[a@1 as a, b@0 as b, sum(s.m)@2 as sum(s.m)], schema=[a:Utf8;N, b:Utf8;N, sum(s.m):Int64;N]_| +|_|_AggregateExec: mode=FinalPartitioned, gby=[b@0 as b, a@1 as a], aggr=[sum(s.m)], schema=[b:Utf8;N, a:Utf8;N, sum(s.m):Int64;N]_| +|_|_AggregateExec: mode=Partial, gby=[b@0 as b, a@1 as a], aggr=[sum(s.m)], schema=[b:Utf8;N, a:Utf8;N, sum(s.m)[sum]:Int64;N]_| +|_|_MergeScanExec: peers=[4398046511104(1024, 0), 4398046511105(1024, 1), ], schema=[b:Utf8;N, a:Utf8;N, __sum_state(s.m):Struct("sum[sum]": Int64);N]_| +|_|_| +| physical_plan after OutputRequirements_| OutputRequirementExec: order_by=[], dist_by=Unspecified_| +|_|_ProjectionExec: expr=[a@1 as a, b@0 as b, sum(s.m)@2 as sum(s.m)]_| +|_|_AggregateExec: mode=FinalPartitioned, gby=[b@0 as b, a@1 as a], aggr=[sum(s.m)]_| +|_|_AggregateExec: mode=Partial, gby=[b@0 as b, a@1 as a], aggr=[sum(s.m)]_| +|_|_MergeScanExec: peers=[4398046511104(1024, 0), 4398046511105(1024, 1), ]_| +|_|_| +| physical_plan after aggregate_statistics_| SAME TEXT AS ABOVE_| +| physical_plan after join_selection_| SAME TEXT AS ABOVE_| +| physical_plan after LimitedDistinctAggregation_| SAME TEXT AS ABOVE_| +| physical_plan after FilterPushdown_| SAME TEXT AS ABOVE_| +| physical_plan after parallelize_scan_| SAME TEXT AS ABOVE_| +| physical_plan after PassDistributionRule_| SAME TEXT AS ABOVE_| +| physical_plan after EnforceSorting_| SAME TEXT AS ABOVE_| +| physical_plan after EnforceDistribution_| OutputRequirementExec: order_by=[], dist_by=Unspecified_| +|_|_ProjectionExec: expr=[a@1 as a, b@0 as b, sum(s.m)@2 as sum(s.m)]_| +|_|_AggregateExec: mode=FinalPartitioned, gby=[b@0 as b, a@1 as a], aggr=[sum(s.m)]_| +|_|_RepartitionExec: partitioning=Hash([b@0, a@1], 32), input_partitions=32_| +|_|_AggregateExec: mode=Partial, gby=[b@0 as b, a@1 as a], aggr=[sum(s.m)]_| +|_|_MergeScanExec: peers=[4398046511104(1024, 0), 4398046511105(1024, 1), ]_| +|_|_| +| physical_plan after CombinePartialFinalAggregate_| SAME TEXT AS ABOVE_| +| physical_plan after ReduceAggregateRepartition_| SAME TEXT AS ABOVE_| +| physical_plan after CombineSteppedAggregate_| SAME TEXT AS ABOVE_| +| physical_plan after EnforceDistribution_| SAME TEXT AS ABOVE_| +| physical_plan after EnforceSorting_| SAME TEXT AS ABOVE_| +| physical_plan after OptimizeAggregateOrder_| SAME TEXT AS ABOVE_| +| physical_plan after ProjectionPushdown_| SAME TEXT AS ABOVE_| +| physical_plan after OutputRequirements_| ProjectionExec: expr=[a@1 as a, b@0 as b, sum(s.m)@2 as sum(s.m)]_| +|_|_AggregateExec: mode=FinalPartitioned, gby=[b@0 as b, a@1 as a], aggr=[sum(s.m)]_| +|_|_RepartitionExec: partitioning=Hash([b@0, a@1], 32), input_partitions=32_| +|_|_AggregateExec: mode=Partial, gby=[b@0 as b, a@1 as a], aggr=[sum(s.m)]_| +|_|_MergeScanExec: peers=[4398046511104(1024, 0), 4398046511105(1024, 1), ]_| +|_|_| +| physical_plan after LimitAggregation_| SAME TEXT AS ABOVE_| +| physical_plan after LimitPushPastWindows_| SAME TEXT AS ABOVE_| +| physical_plan after LimitPushdown_| SAME TEXT AS ABOVE_| +| physical_plan after ProjectionPushdown_| SAME TEXT AS ABOVE_| +| physical_plan after PushdownSort_| SAME TEXT AS ABOVE_| +| physical_plan after EnsureCooperative_| SAME TEXT AS ABOVE_| +| physical_plan after FilterPushdown(Post)_| SAME TEXT AS ABOVE_| +| physical_plan after WindowedSortRule_| SAME TEXT AS ABOVE_| +| physical_plan after MatchesConstantTerm_| SAME TEXT AS ABOVE_| +| physical_plan after RemoveDuplicateRule_| SAME TEXT AS ABOVE_| +| physical_plan after SanityCheckPlan_| SAME TEXT AS ABOVE_| +| physical_plan_| ProjectionExec: expr=[a@1 as a, b@0 as b, sum(s.m)@2 as sum(s.m)]_| +|_|_AggregateExec: mode=FinalPartitioned, gby=[b@0 as b, a@1 as a], aggr=[sum(s.m)]_| +|_|_RepartitionExec: partitioning=Hash([b@0, a@1], 32), input_partitions=32_| +|_|_AggregateExec: mode=Partial, gby=[b@0 as b, a@1 as a], aggr=[sum(s.m)]_| +|_|_MergeScanExec: peers=[4398046511104(1024, 0), 4398046511105(1024, 1), ]_| +|_|_| +| physical_plan_with_stats_| ProjectionExec: expr=[a@1 as a, b@0 as b, sum(s.m)@2 as sum(s.m)], statistics=[Rows=Absent, Bytes=Absent, [(Col[0]:),(Col[1]:),(Col[2]:)]]_| +|_|_AggregateExec: mode=FinalPartitioned, gby=[b@0 as b, a@1 as a], aggr=[sum(s.m)], statistics=[Rows=Absent, Bytes=Absent, [(Col[0]:),(Col[1]:),(Col[2]:)]]_| +|_|_RepartitionExec: partitioning=Hash([b@0, a@1], 32), input_partitions=32, statistics=[Rows=Absent, Bytes=Absent, [(Col[0]:),(Col[1]:),(Col[2]:)]]_| +|_|_AggregateExec: mode=Partial, gby=[b@0 as b, a@1 as a], aggr=[sum(s.m)], statistics=[Rows=Absent, Bytes=Absent, [(Col[0]:),(Col[1]:),(Col[2]:)]]_| +|_|_MergeScanExec: peers=[4398046511104(1024, 0), 4398046511105(1024, 1), ], statistics=[Rows=Absent, Bytes=Absent, [(Col[0]:),(Col[1]:),(Col[2]:)]]_| +|_|_| +| physical_plan_with_schema_| ProjectionExec: expr=[a@1 as a, b@0 as b, sum(s.m)@2 as sum(s.m)], schema=[a:Utf8;N, b:Utf8;N, sum(s.m):Int64;N]_| +|_|_AggregateExec: mode=FinalPartitioned, gby=[b@0 as b, a@1 as a], aggr=[sum(s.m)], schema=[b:Utf8;N, a:Utf8;N, sum(s.m):Int64;N]_| +|_|_RepartitionExec: partitioning=Hash([b@0, a@1], 32), input_partitions=32, schema=[b:Utf8;N, a:Utf8;N, sum(s.m)[sum]:Int64;N]_| +|_|_AggregateExec: mode=Partial, gby=[b@0 as b, a@1 as a], aggr=[sum(s.m)], schema=[b:Utf8;N, a:Utf8;N, sum(s.m)[sum]:Int64;N]_| +|_|_MergeScanExec: peers=[4398046511104(1024, 0), 4398046511105(1024, 1), ], schema=[b:Utf8;N, a:Utf8;N, __sum_state(s.m):Struct("sum[sum]": Int64);N]_| +|_|_| ++-+-+ + +-- SQLNESS REPLACE (-+) - +-- SQLNESS REPLACE (\s\s+) _ +-- SQLNESS REPLACE (metrics.*) REDACTED +-- SQLNESS REPLACE (peers.*) REDACTED +-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED +-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED +EXPLAIN ANALYZE +SELECT a, b, sum(m) +FROM ( + SELECT a, b, c, min(val) AS m + FROM reduce_aggregate_repartition + GROUP BY a, b, c +) s +GROUP BY b, a; + ++-+-+-+ +| stage | node | plan_| ++-+-+-+ +| 0_| 0_|_ProjectionExec: expr=[a@1 as a, b@0 as b, sum(s.m)@2 as sum(s.m)] REDACTED +|_|_|_AggregateExec: mode=FinalPartitioned, gby=[b@0 as b, a@1 as a], aggr=[sum(s.m)] REDACTED +|_|_|_RepartitionExec: partitioning=Hash([b@0, a@1], 32), input_partitions=32 REDACTED +|_|_|_AggregateExec: mode=Partial, gby=[b@0 as b, a@1 as a], aggr=[sum(s.m)] REDACTED +|_|_|_MergeScanExec: REDACTED +|_|_|_| +| 1_| 0_|_AggregateExec: mode=Final, gby=[b@0 as b, a@1 as a], aggr=[__sum_state(s.m)] REDACTED +|_|_|_CoalescePartitionsExec REDACTED +|_|_|_AggregateExec: mode=Partial, gby=[b@1 as b, a@0 as a], aggr=[__sum_state(s.m)] REDACTED +|_|_|_ProjectionExec: expr=[a@0 as a, b@1 as b, min(reduce_aggregate_repartition.val)@3 as m] REDACTED +|_|_|_AggregateExec: mode=FinalPartitioned, gby=[a@0 as a, b@1 as b, c@2 as c], aggr=[min(reduce_aggregate_repartition.val)] REDACTED +|_|_|_RepartitionExec: partitioning=Hash([a@0, b@1, c@2], 32), input_partitions=32 REDACTED +|_|_|_AggregateExec: mode=Partial, gby=[a@0 as a, b@1 as b, c@2 as c], aggr=[min(reduce_aggregate_repartition.val)] REDACTED +|_|_|_SeqScan: region=REDACTED, "partition_count":{"count":1, "mem_ranges":1, "files":0, "file_ranges":0} REDACTED +|_|_|_| +| 1_| 1_|_AggregateExec: mode=Final, gby=[b@0 as b, a@1 as a], aggr=[__sum_state(s.m)] REDACTED +|_|_|_CoalescePartitionsExec REDACTED +|_|_|_AggregateExec: mode=Partial, gby=[b@1 as b, a@0 as a], aggr=[__sum_state(s.m)] REDACTED +|_|_|_ProjectionExec: expr=[a@0 as a, b@1 as b, min(reduce_aggregate_repartition.val)@3 as m] REDACTED +|_|_|_AggregateExec: mode=FinalPartitioned, gby=[a@0 as a, b@1 as b, c@2 as c], aggr=[min(reduce_aggregate_repartition.val)] REDACTED +|_|_|_RepartitionExec: partitioning=Hash([a@0, b@1, c@2], 32), input_partitions=32 REDACTED +|_|_|_AggregateExec: mode=Partial, gby=[a@0 as a, b@1 as b, c@2 as c], aggr=[min(reduce_aggregate_repartition.val)] REDACTED +|_|_|_SeqScan: region=REDACTED, "partition_count":{"count":1, "mem_ranges":1, "files":0, "file_ranges":0} REDACTED +|_|_|_| +|_|_| Total rows: 4_| ++-+-+-+ + +-- Zero-key reduction should rewrite SinglePartitioned to Single. +SELECT sum(m) +FROM ( + SELECT a, b, c, min(val) AS m + FROM reduce_aggregate_repartition + GROUP BY a, b, c +) s; + ++----------+ +| sum(s.m) | ++----------+ +| 37 | ++----------+ + +-- SQLNESS REPLACE (-+) - +-- SQLNESS REPLACE (\s\s+) _ +-- SQLNESS REPLACE (metrics.*) REDACTED +-- SQLNESS REPLACE (peers.*) REDACTED +-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED +-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED +EXPLAIN ANALYZE +SELECT sum(m) +FROM ( + SELECT a, b, c, min(val) AS m + FROM reduce_aggregate_repartition + GROUP BY a, b, c +) s; + ++-+-+-+ +| stage | node | plan_| ++-+-+-+ +| 0_| 0_|_AggregateExec: mode=Final, gby=[], aggr=[sum(s.m)] REDACTED +|_|_|_CoalescePartitionsExec REDACTED +|_|_|_AggregateExec: mode=Partial, gby=[], aggr=[sum(s.m)] REDACTED +|_|_|_MergeScanExec: REDACTED +|_|_|_| +| 1_| 0_|_AggregateExec: mode=Final, gby=[], aggr=[__sum_state(s.m)] REDACTED +|_|_|_CoalescePartitionsExec REDACTED +|_|_|_AggregateExec: mode=Partial, gby=[], aggr=[__sum_state(s.m)] REDACTED +|_|_|_ProjectionExec: expr=[min(reduce_aggregate_repartition.val)@3 as m] REDACTED +|_|_|_AggregateExec: mode=FinalPartitioned, gby=[a@0 as a, b@1 as b, c@2 as c], aggr=[min(reduce_aggregate_repartition.val)] REDACTED +|_|_|_RepartitionExec: partitioning=Hash([a@0, b@1, c@2], 32), input_partitions=32 REDACTED +|_|_|_AggregateExec: mode=Partial, gby=[a@0 as a, b@1 as b, c@2 as c], aggr=[min(reduce_aggregate_repartition.val)] REDACTED +|_|_|_SeqScan: region=REDACTED, "partition_count":{"count":1, "mem_ranges":1, "files":0, "file_ranges":0} REDACTED +|_|_|_| +| 1_| 1_|_AggregateExec: mode=Final, gby=[], aggr=[__sum_state(s.m)] REDACTED +|_|_|_CoalescePartitionsExec REDACTED +|_|_|_AggregateExec: mode=Partial, gby=[], aggr=[__sum_state(s.m)] REDACTED +|_|_|_ProjectionExec: expr=[min(reduce_aggregate_repartition.val)@3 as m] REDACTED +|_|_|_AggregateExec: mode=FinalPartitioned, gby=[a@0 as a, b@1 as b, c@2 as c], aggr=[min(reduce_aggregate_repartition.val)] REDACTED +|_|_|_RepartitionExec: partitioning=Hash([a@0, b@1, c@2], 32), input_partitions=32 REDACTED +|_|_|_AggregateExec: mode=Partial, gby=[a@0 as a, b@1 as b, c@2 as c], aggr=[min(reduce_aggregate_repartition.val)] REDACTED +|_|_|_SeqScan: region=REDACTED, "partition_count":{"count":1, "mem_ranges":1, "files":0, "file_ranges":0} REDACTED +|_|_|_| +|_|_| Total rows: 1_| ++-+-+-+ + +DROP TABLE reduce_aggregate_repartition; + +Affected Rows: 0 + +CREATE TABLE reduce_aggregate_repartition_non_subset ( + a STRING NULL, + b STRING NULL, + ts TIMESTAMP TIME INDEX, + val BIGINT NULL, + PRIMARY KEY (a, b) +) PARTITION ON COLUMNS (a) ( + a < 'm', + a >= 'm' +); + +Affected Rows: 0 + +INSERT INTO reduce_aggregate_repartition_non_subset VALUES + ('a', 'x', 1000, 10), + ('n', 'x', 2000, 5), + ('a', 'y', 3000, 7), + ('n', 'y', 4000, 3); + +Affected Rows: 4 + +-- Changing the group key from `a` to `b` is not a reduction, so the +-- repartition must remain in place. +SELECT b, sum(val) +FROM reduce_aggregate_repartition_non_subset +GROUP BY b +ORDER BY b; + ++---+--------------------------------------------------+ +| b | sum(reduce_aggregate_repartition_non_subset.val) | ++---+--------------------------------------------------+ +| x | 15 | +| y | 10 | ++---+--------------------------------------------------+ + +-- SQLNESS REPLACE (-+) - +-- SQLNESS REPLACE (\s\s+) _ +-- SQLNESS REPLACE (metrics.*) REDACTED +-- SQLNESS REPLACE (peers.*) REDACTED +-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED +-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED +EXPLAIN ANALYZE +SELECT b, sum(val) +FROM reduce_aggregate_repartition_non_subset +GROUP BY b; + ++-+-+-+ +| stage | node | plan_| ++-+-+-+ +| 0_| 0_|_AggregateExec: mode=FinalPartitioned, gby=[b@0 as b], aggr=[sum(reduce_aggregate_repartition_non_subset.val)] REDACTED +|_|_|_RepartitionExec: partitioning=Hash([b@0], 32), input_partitions=32 REDACTED +|_|_|_AggregateExec: mode=Partial, gby=[b@0 as b], aggr=[sum(reduce_aggregate_repartition_non_subset.val)] REDACTED +|_|_|_MergeScanExec: REDACTED +|_|_|_| +| 1_| 0_|_AggregateExec: mode=FinalPartitioned, gby=[b@0 as b], aggr=[__sum_state(reduce_aggregate_repartition_non_subset.val)] REDACTED +|_|_|_RepartitionExec: partitioning=Hash([b@0], 32), input_partitions=32 REDACTED +|_|_|_AggregateExec: mode=Partial, gby=[b@0 as b], aggr=[__sum_state(reduce_aggregate_repartition_non_subset.val)] REDACTED +|_|_|_SeqScan: region=REDACTED, "partition_count":{"count":1, "mem_ranges":1, "files":0, "file_ranges":0} REDACTED +|_|_|_| +| 1_| 1_|_AggregateExec: mode=FinalPartitioned, gby=[b@0 as b], aggr=[__sum_state(reduce_aggregate_repartition_non_subset.val)] REDACTED +|_|_|_RepartitionExec: partitioning=Hash([b@0], 32), input_partitions=32 REDACTED +|_|_|_AggregateExec: mode=Partial, gby=[b@0 as b], aggr=[__sum_state(reduce_aggregate_repartition_non_subset.val)] REDACTED +|_|_|_SeqScan: region=REDACTED, "partition_count":{"count":1, "mem_ranges":1, "files":0, "file_ranges":0} REDACTED +|_|_|_| +|_|_| Total rows: 2_| ++-+-+-+ + +DROP TABLE reduce_aggregate_repartition_non_subset; + +Affected Rows: 0 + diff --git a/tests/cases/standalone/optimizer/reduce_aggregate_repartition.sql b/tests/cases/standalone/optimizer/reduce_aggregate_repartition.sql new file mode 100644 index 0000000000..c549f56158 --- /dev/null +++ b/tests/cases/standalone/optimizer/reduce_aggregate_repartition.sql @@ -0,0 +1,117 @@ +CREATE TABLE reduce_aggregate_repartition ( + a STRING NULL, + b STRING NULL, + c STRING NULL, + ts TIMESTAMP TIME INDEX, + val BIGINT NULL, + PRIMARY KEY (a, b, c) +) PARTITION ON COLUMNS (a, b, c) ( + a < 'm', + a >= 'm' +); + +INSERT INTO reduce_aggregate_repartition VALUES + ('a', 'x', 'u', 1000, 10), + ('a', 'x', 'v', 2000, 5), + ('a', 'y', 'u', 3000, 7), + ('n', 'x', 'u', 4000, 4), + ('n', 'x', 'v', 5000, 8), + ('n', 'y', 'u', 6000, 3); + +-- Keep the outer aggregate on `sum` so this case isolates +-- ReduceAggregateRepartition rather than the min/max combiner rule. +SELECT a, b, sum(m) +FROM ( + SELECT a, b, c, min(val) AS m + FROM reduce_aggregate_repartition + GROUP BY a, b, c +) s +GROUP BY b, a +ORDER BY a, b; + +-- SQLNESS REPLACE (-+) - +-- SQLNESS REPLACE (\s\s+) _ +EXPLAIN VERBOSE +SELECT a, b, sum(m) +FROM ( + SELECT a, b, c, min(val) AS m + FROM reduce_aggregate_repartition + GROUP BY a, b, c +) s +GROUP BY b, a; + +-- SQLNESS REPLACE (-+) - +-- SQLNESS REPLACE (\s\s+) _ +-- SQLNESS REPLACE (metrics.*) REDACTED +-- SQLNESS REPLACE (peers.*) REDACTED +-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED +-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED +EXPLAIN ANALYZE +SELECT a, b, sum(m) +FROM ( + SELECT a, b, c, min(val) AS m + FROM reduce_aggregate_repartition + GROUP BY a, b, c +) s +GROUP BY b, a; + +-- Zero-key reduction should rewrite SinglePartitioned to Single. +SELECT sum(m) +FROM ( + SELECT a, b, c, min(val) AS m + FROM reduce_aggregate_repartition + GROUP BY a, b, c +) s; + +-- SQLNESS REPLACE (-+) - +-- SQLNESS REPLACE (\s\s+) _ +-- SQLNESS REPLACE (metrics.*) REDACTED +-- SQLNESS REPLACE (peers.*) REDACTED +-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED +-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED +EXPLAIN ANALYZE +SELECT sum(m) +FROM ( + SELECT a, b, c, min(val) AS m + FROM reduce_aggregate_repartition + GROUP BY a, b, c +) s; + +DROP TABLE reduce_aggregate_repartition; + +CREATE TABLE reduce_aggregate_repartition_non_subset ( + a STRING NULL, + b STRING NULL, + ts TIMESTAMP TIME INDEX, + val BIGINT NULL, + PRIMARY KEY (a, b) +) PARTITION ON COLUMNS (a) ( + a < 'm', + a >= 'm' +); + +INSERT INTO reduce_aggregate_repartition_non_subset VALUES + ('a', 'x', 1000, 10), + ('n', 'x', 2000, 5), + ('a', 'y', 3000, 7), + ('n', 'y', 4000, 3); + +-- Changing the group key from `a` to `b` is not a reduction, so the +-- repartition must remain in place. +SELECT b, sum(val) +FROM reduce_aggregate_repartition_non_subset +GROUP BY b +ORDER BY b; + +-- SQLNESS REPLACE (-+) - +-- SQLNESS REPLACE (\s\s+) _ +-- SQLNESS REPLACE (metrics.*) REDACTED +-- SQLNESS REPLACE (peers.*) REDACTED +-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED +-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED +EXPLAIN ANALYZE +SELECT b, sum(val) +FROM reduce_aggregate_repartition_non_subset +GROUP BY b; + +DROP TABLE reduce_aggregate_repartition_non_subset;