diff --git a/src/query/src/optimizer.rs b/src/query/src/optimizer.rs index aaac1e3124..2b4c24601e 100644 --- a/src/query/src/optimizer.rs +++ b/src/query/src/optimizer.rs @@ -17,6 +17,7 @@ pub mod count_nest_aggr; pub mod count_wildcard; pub mod parallelize_scan; pub mod pass_distribution; +pub mod reduce_aggregate_repartition; pub mod remove_duplicate; pub mod scan_hint; pub mod string_normalization; diff --git a/src/query/src/optimizer/reduce_aggregate_repartition.rs b/src/query/src/optimizer/reduce_aggregate_repartition.rs new file mode 100644 index 0000000000..722268b503 --- /dev/null +++ b/src/query/src/optimizer/reduce_aggregate_repartition.rs @@ -0,0 +1,287 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use datafusion::config::ConfigOptions; +use datafusion::physical_optimizer::PhysicalOptimizerRule; +use datafusion::physical_plan::aggregates::{AggregateExec, AggregateMode}; +use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec; +use datafusion::physical_plan::repartition::RepartitionExec; +use datafusion::physical_plan::{ExecutionPlan, ExecutionPlanProperties, InputOrderMode}; +use datafusion_common::Result as DfResult; +use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; +use datafusion_physical_expr::{Partitioning, physical_exprs_equal}; + +/// Replaces a redundant hash repartition before a coarser aggregate with a +/// single fan-in. +/// +/// This only applies when the aggregate already receives explicit hash +/// partitioning on its final grouping keys and the repartition input is already +/// hash partitioned on a strict superset of those keys. +#[derive(Debug)] +pub struct ReduceAggregateRepartition; + +impl PhysicalOptimizerRule for ReduceAggregateRepartition { + fn optimize( + &self, + plan: Arc, + _config: &ConfigOptions, + ) -> DfResult> { + Self::do_optimize(plan) + } + + fn name(&self) -> &str { + "ReduceAggregateRepartition" + } + + fn schema_check(&self) -> bool { + false + } +} + +impl ReduceAggregateRepartition { + fn do_optimize(plan: Arc) -> DfResult> { + plan.transform_down(|plan| { + let Some(agg_exec) = plan.as_any().downcast_ref::() else { + return Ok(Transformed::no(plan)); + }; + + let new_mode = match agg_exec.mode() { + AggregateMode::FinalPartitioned => AggregateMode::Final, + AggregateMode::SinglePartitioned => AggregateMode::Single, + _ => return Ok(Transformed::no(plan)), + }; + + if agg_exec.input_order_mode() != &InputOrderMode::Linear + || agg_exec.group_expr().has_grouping_set() + { + return Ok(Transformed::no(plan)); + } + + let Some(repartition_exec) = + agg_exec.input().as_any().downcast_ref::() + else { + return Ok(Transformed::no(plan)); + }; + + if repartition_exec.preserve_order() { + return Ok(Transformed::no(plan)); + } + + let Partitioning::Hash(final_partition_exprs, _) = repartition_exec.partitioning() + else { + return Ok(Transformed::no(plan)); + }; + + let Partitioning::Hash(finer_partition_exprs, _) = + repartition_exec.input().output_partitioning() + else { + return Ok(Transformed::no(plan)); + }; + + let group_exprs = agg_exec.group_expr().input_exprs(); + if !physical_exprs_equal(group_exprs.as_slice(), final_partition_exprs.as_slice()) + || !is_strict_subset(final_partition_exprs, finer_partition_exprs) + { + return Ok(Transformed::no(plan)); + } + + let new_input = Arc::new(CoalescePartitionsExec::new( + repartition_exec.input().clone(), + )); + let new_agg = AggregateExec::try_new( + new_mode, + agg_exec.group_expr().clone(), + agg_exec.aggr_expr().to_vec(), + agg_exec.filter_expr().to_vec(), + new_input, + agg_exec.input_schema(), + )? + .with_limit_options(agg_exec.limit_options()); + + Ok(Transformed::yes(Arc::new(new_agg))) + }) + .data() + } +} + +fn is_strict_subset( + subset_exprs: &[Arc], + superset_exprs: &[Arc], +) -> bool { + if subset_exprs.is_empty() || subset_exprs.len() >= superset_exprs.len() { + return false; + } + + subset_exprs + .iter() + .all(|subset_expr| superset_exprs.iter().any(|expr| subset_expr.eq(expr))) +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; + use datafusion::datasource::memory::MemorySourceConfig; + use datafusion::datasource::source::DataSourceExec; + use datafusion::physical_optimizer::PhysicalOptimizerRule; + use datafusion::physical_plan::aggregates::{AggregateExec, AggregateMode, PhysicalGroupBy}; + use datafusion::physical_plan::repartition::RepartitionExec; + use datafusion::physical_plan::{ExecutionPlan, displayable}; + use datafusion_common::Result; + use datafusion_physical_expr::expressions::col; + use datafusion_physical_expr::{Partitioning, PhysicalExpr}; + use pretty_assertions::assert_eq; + + use super::ReduceAggregateRepartition; + + fn schema() -> SchemaRef { + Arc::new(Schema::new(vec![ + Field::new("a", DataType::Int64, true), + Field::new("b", DataType::Int64, true), + Field::new("c", DataType::Int64, true), + ])) + } + + fn input_exec(schema: SchemaRef) -> Arc { + let config = MemorySourceConfig::try_new(&[vec![]], schema, None).unwrap(); + DataSourceExec::from_data_source(config) + } + + fn group_by(names: &[&str], schema: &SchemaRef) -> Result { + let groups: Result, String)>> = names + .iter() + .map(|name| Ok((col(name, schema)?, (*name).to_string()))) + .collect(); + Ok(PhysicalGroupBy::new_single(groups?)) + } + + fn repartition( + input: Arc, + keys: &[&str], + schema: &SchemaRef, + ) -> Result> { + let exprs = keys + .iter() + .map(|name| col(name, schema)) + .collect::>>()?; + Ok(Arc::new(RepartitionExec::try_new( + input, + Partitioning::Hash(exprs, 8), + )?)) + } + + fn aggregate( + mode: AggregateMode, + input: Arc, + group_by: PhysicalGroupBy, + input_schema: SchemaRef, + ) -> Result> { + Ok(Arc::new(AggregateExec::try_new( + mode, + group_by, + vec![], + vec![], + input, + input_schema, + )?)) + } + + fn optimize(plan: Arc) -> Result { + let optimized = ReduceAggregateRepartition.optimize(plan, &Default::default())?; + Ok(displayable(optimized.as_ref()).indent(true).to_string()) + } + + #[test] + fn rewrites_final_partitioned_subset_repartition() -> Result<()> { + let raw = input_exec(schema()); + let finer = repartition(raw.clone(), &["a", "b"], &raw.schema())?; + let partial = aggregate( + AggregateMode::Partial, + finer, + group_by(&["a", "b"], &raw.schema())?, + raw.schema(), + )?; + let final_repartition = repartition(partial.clone(), &["a"], &partial.schema())?; + let final_agg = aggregate( + AggregateMode::FinalPartitioned, + final_repartition, + group_by(&["a"], &partial.schema())?, + raw.schema(), + )?; + + assert_eq!( + optimize(final_agg)?.trim(), + r#"AggregateExec: mode=Final, gby=[a@0 as a], aggr=[] + CoalescePartitionsExec + AggregateExec: mode=Partial, gby=[a@0 as a, b@1 as b], aggr=[] + RepartitionExec: partitioning=Hash([a@0, b@1], 8), input_partitions=1 + DataSourceExec: partitions=1, partition_sizes=[0]"# + ); + Ok(()) + } + + #[test] + fn rewrites_single_partitioned_subset_repartition() -> Result<()> { + let raw = input_exec(schema()); + let finer = repartition(raw.clone(), &["a", "b"], &raw.schema())?; + let final_repartition = repartition(finer.clone(), &["a"], &finer.schema())?; + let final_agg = aggregate( + AggregateMode::SinglePartitioned, + final_repartition, + group_by(&["a"], &finer.schema())?, + raw.schema(), + )?; + + assert_eq!( + optimize(final_agg)?.trim(), + r#"AggregateExec: mode=Single, gby=[a@0 as a], aggr=[] + CoalescePartitionsExec + RepartitionExec: partitioning=Hash([a@0, b@1], 8), input_partitions=1 + DataSourceExec: partitions=1, partition_sizes=[0]"# + ); + Ok(()) + } + + #[test] + fn keeps_equal_partitioning_keys() -> Result<()> { + let raw = input_exec(schema()); + let finer = repartition(raw.clone(), &["a", "b"], &raw.schema())?; + let partial = aggregate( + AggregateMode::Partial, + finer, + group_by(&["a", "b"], &raw.schema())?, + raw.schema(), + )?; + let final_repartition = repartition(partial.clone(), &["a", "b"], &partial.schema())?; + let final_agg = aggregate( + AggregateMode::FinalPartitioned, + final_repartition, + group_by(&["a", "b"], &partial.schema())?, + raw.schema(), + )?; + + assert_eq!( + optimize(final_agg)?.trim(), + r#"AggregateExec: mode=FinalPartitioned, gby=[a@0 as a, b@1 as b], aggr=[] + RepartitionExec: partitioning=Hash([a@0, b@1], 8), input_partitions=8 + AggregateExec: mode=Partial, gby=[a@0 as a, b@1 as b], aggr=[] + RepartitionExec: partitioning=Hash([a@0, b@1], 8), input_partitions=1 + DataSourceExec: partitions=1, partition_sizes=[0]"# + ); + Ok(()) + } +} diff --git a/src/query/src/query_engine/state.rs b/src/query/src/query_engine/state.rs index 0052c98063..c6a96a6b03 100644 --- a/src/query/src/query_engine/state.rs +++ b/src/query/src/query_engine/state.rs @@ -66,6 +66,7 @@ use crate::optimizer::count_nest_aggr::CountNestAggrRule; use crate::optimizer::count_wildcard::CountWildcardToTimeIndexRule; use crate::optimizer::parallelize_scan::ParallelizeScan; use crate::optimizer::pass_distribution::PassDistribution; +use crate::optimizer::reduce_aggregate_repartition::ReduceAggregateRepartition; use crate::optimizer::remove_duplicate::RemoveDuplicate; use crate::optimizer::scan_hint::ScanHintRule; use crate::optimizer::string_normalization::StringNormalizationRule; @@ -192,6 +193,16 @@ impl QueryEngineState { 7, Arc::new(datafusion::physical_optimizer::enforce_sorting::EnforceSorting {}), ); + Self::insert_physical_optimizer_rule_after( + &mut physical_optimizer.rules, + datafusion::physical_optimizer::combine_partial_final_agg::CombinePartialFinalAggregate::new().name(), + Arc::new(ReduceAggregateRepartition), + ); + Self::insert_physical_optimizer_rule_after( + &mut physical_optimizer.rules, + ReduceAggregateRepartition.name(), + Arc::new(datafusion::physical_optimizer::enforce_distribution::EnforceDistribution {}), + ); // Add rule for windowed sort physical_optimizer .rules @@ -253,6 +264,19 @@ impl QueryEngineState { rules.retain(|rule| rule.name() != name); } + fn insert_physical_optimizer_rule_after( + rules: &mut Vec>, + name: &str, + rule: Arc, + ) { + let index = rules + .iter() + .position(|candidate| candidate.name() == name) + .map(|index| index + 1) + .unwrap_or(rules.len()); + rules.insert(index, rule); + } + /// Optimize the logical plan by the extension analyzer rules. pub fn optimize_by_extension_rules( &self, diff --git a/tests/cases/standalone/common/tql-explain-analyze/explain.result b/tests/cases/standalone/common/tql-explain-analyze/explain.result index 1e4cf18b40..471d2405f0 100644 --- a/tests/cases/standalone/common/tql-explain-analyze/explain.result +++ b/tests/cases/standalone/common/tql-explain-analyze/explain.result @@ -181,6 +181,8 @@ TQL EXPLAIN VERBOSE (0, 10, '5s') test; | physical_plan after EnforceSorting_| SAME TEXT AS ABOVE_| | physical_plan after EnforceDistribution_| SAME TEXT AS ABOVE_| | physical_plan after CombinePartialFinalAggregate_| SAME TEXT AS ABOVE_| +| physical_plan after ReduceAggregateRepartition_| SAME TEXT AS ABOVE_| +| physical_plan after EnforceDistribution_| SAME TEXT AS ABOVE_| | physical_plan after EnforceSorting_| SAME TEXT AS ABOVE_| | physical_plan after OptimizeAggregateOrder_| SAME TEXT AS ABOVE_| | physical_plan after ProjectionPushdown_| SAME TEXT AS ABOVE_| @@ -327,6 +329,8 @@ TQL EXPLAIN VERBOSE (0, 10, '5s') test AS series; | physical_plan after EnforceSorting_| SAME TEXT AS ABOVE_| | physical_plan after EnforceDistribution_| SAME TEXT AS ABOVE_| | physical_plan after CombinePartialFinalAggregate_| SAME TEXT AS ABOVE_| +| physical_plan after ReduceAggregateRepartition_| SAME TEXT AS ABOVE_| +| physical_plan after EnforceDistribution_| SAME TEXT AS ABOVE_| | physical_plan after EnforceSorting_| SAME TEXT AS ABOVE_| | physical_plan after OptimizeAggregateOrder_| SAME TEXT AS ABOVE_| | physical_plan after ProjectionPushdown_| SAME TEXT AS ABOVE_|