From 0583725b16893faf66989eef8012f771cedce471 Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Sat, 28 Mar 2026 12:35:49 +0800 Subject: [PATCH] finalize Signed-off-by: Ruihang Xia --- src/query/src/optimizer.rs | 1 - .../optimizer/combine_stepped_aggregate.rs | 323 ------------------ src/query/src/query_engine/state.rs | 6 - .../explain/step_aggr_advance.result | 6 +- .../common/tql-explain-analyze/explain.result | 2 - .../reduce_aggregate_repartition.result | 1 - 6 files changed, 4 insertions(+), 335 deletions(-) delete mode 100644 src/query/src/optimizer/combine_stepped_aggregate.rs diff --git a/src/query/src/optimizer.rs b/src/query/src/optimizer.rs index 6600e3d7b4..2b4c24601e 100644 --- a/src/query/src/optimizer.rs +++ b/src/query/src/optimizer.rs @@ -12,7 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -pub mod combine_stepped_aggregate; pub mod constant_term; pub mod count_nest_aggr; pub mod count_wildcard; diff --git a/src/query/src/optimizer/combine_stepped_aggregate.rs b/src/query/src/optimizer/combine_stepped_aggregate.rs deleted file mode 100644 index bea5dc3598..0000000000 --- a/src/query/src/optimizer/combine_stepped_aggregate.rs +++ /dev/null @@ -1,323 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::sync::Arc; - -use datafusion::config::ConfigOptions; -use datafusion::physical_optimizer::PhysicalOptimizerRule; -use datafusion::physical_plan::ExecutionPlan; -use datafusion::physical_plan::aggregates::{AggregateExec, AggregateMode}; -use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec; -use datafusion_common::Result as DfResult; -use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; -use datafusion_physical_expr::aggregate::AggregateFunctionExpr; -use datafusion_physical_expr::{PhysicalExpr, physical_exprs_equal}; - -/// Collapses `Final <- CoalescePartitionsExec <- Partial` min/max aggregates -/// into a single-step aggregate over the coalesced input. -#[derive(Debug)] -pub struct CombineSteppedAggregate; - -impl PhysicalOptimizerRule for CombineSteppedAggregate { - fn optimize( - &self, - plan: Arc, - _config: &ConfigOptions, - ) -> DfResult> { - plan.transform_down(|plan| { - let Some(agg_exec) = plan.as_any().downcast_ref::() else { - return Ok(Transformed::no(plan)); - }; - - let Some(new_plan) = Self::combine_coalesced_min_max_partial_final(agg_exec)? else { - return Ok(Transformed::no(plan)); - }; - - Ok(Transformed::yes(new_plan)) - }) - .data() - } - - fn name(&self) -> &str { - "CombineSteppedAggregate" - } - - fn schema_check(&self) -> bool { - false - } -} - -impl CombineSteppedAggregate { - fn combine_coalesced_min_max_partial_final( - agg_exec: &AggregateExec, - ) -> DfResult>> { - if agg_exec.mode() != &AggregateMode::Final { - return Ok(None); - } - - let Some(coalesce_exec) = agg_exec - .input() - .as_any() - .downcast_ref::() - else { - return Ok(None); - }; - - let Some(partial_exec) = coalesce_exec - .input() - .as_any() - .downcast_ref::() - else { - return Ok(None); - }; - - if *partial_exec.mode() != AggregateMode::Partial - || !supports_min_max_family(agg_exec.aggr_expr()) - || !supports_min_max_family(partial_exec.aggr_expr()) - || !can_combine( - ( - agg_exec.group_expr(), - agg_exec.aggr_expr(), - agg_exec.filter_expr(), - ), - ( - partial_exec.group_expr(), - partial_exec.aggr_expr(), - partial_exec.filter_expr(), - ), - ) - { - return Ok(None); - } - - let new_input = Arc::new(CoalescePartitionsExec::new(partial_exec.input().clone())); - let new_agg = AggregateExec::try_new( - AggregateMode::Single, - partial_exec.group_expr().clone(), - partial_exec.aggr_expr().to_vec(), - partial_exec.filter_expr().to_vec(), - new_input, - partial_exec.input_schema(), - )? - .with_limit_options(agg_exec.limit_options()); - - Ok(Some(Arc::new(new_agg))) - } -} - -type GroupExprsRef<'a> = ( - &'a datafusion::physical_plan::aggregates::PhysicalGroupBy, - &'a [Arc], - &'a [Option>], -); - -fn can_combine(final_agg: GroupExprsRef<'_>, partial_agg: GroupExprsRef<'_>) -> bool { - let (final_group_by, final_aggr_expr, final_filter_expr) = final_agg; - let (input_group_by, input_aggr_expr, input_filter_expr) = partial_agg; - - physical_exprs_equal( - &input_group_by.output_exprs(), - &final_group_by.input_exprs(), - ) && input_group_by.groups() == final_group_by.groups() - && input_group_by.null_expr().len() == final_group_by.null_expr().len() - && input_group_by - .null_expr() - .iter() - .zip(final_group_by.null_expr().iter()) - .all(|((lhs_expr, lhs_str), (rhs_expr, rhs_str))| { - lhs_expr.eq(rhs_expr) && lhs_str == rhs_str - }) - && final_aggr_expr.len() == input_aggr_expr.len() - && final_aggr_expr - .iter() - .zip(input_aggr_expr.iter()) - .all(|(final_expr, partial_expr)| final_expr.eq(partial_expr)) - && final_filter_expr.len() == input_filter_expr.len() - && final_filter_expr.iter().zip(input_filter_expr.iter()).all( - |(final_expr, partial_expr)| match (final_expr, partial_expr) { - (Some(l), Some(r)) => l.eq(r), - (None, None) => true, - _ => false, - }, - ) -} - -fn supports_min_max_family(aggr_exprs: &[Arc]) -> bool { - !aggr_exprs.is_empty() - && aggr_exprs.iter().all(|expr| { - let name = expr.fun().name(); - name.eq_ignore_ascii_case("min") || name.eq_ignore_ascii_case("max") - }) -} - -#[cfg(test)] -mod tests { - use std::sync::Arc; - - use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; - use datafusion::datasource::memory::MemorySourceConfig; - use datafusion::datasource::source::DataSourceExec; - use datafusion::functions_aggregate::count::count_udaf; - use datafusion::functions_aggregate::min_max::min_udaf; - use datafusion::physical_optimizer::PhysicalOptimizerRule; - use datafusion::physical_plan::aggregates::{AggregateExec, AggregateMode, PhysicalGroupBy}; - use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec; - use datafusion::physical_plan::repartition::RepartitionExec; - use datafusion::physical_plan::{ExecutionPlan, displayable}; - use datafusion_common::Result; - use datafusion_physical_expr::aggregate::{AggregateExprBuilder, AggregateFunctionExpr}; - use datafusion_physical_expr::expressions::col; - use datafusion_physical_expr::{Partitioning, PhysicalExpr}; - use pretty_assertions::assert_eq; - - use super::CombineSteppedAggregate; - - fn schema() -> SchemaRef { - Arc::new(Schema::new(vec![ - Field::new("a", DataType::Int64, true), - Field::new("b", DataType::Int64, true), - Field::new("c", DataType::Int64, true), - ])) - } - - fn input_exec(schema: SchemaRef) -> Arc { - let config = MemorySourceConfig::try_new(&[vec![]], schema, None).unwrap(); - DataSourceExec::from_data_source(config) - } - - fn group_by(names: &[&str], schema: &SchemaRef) -> Result { - let groups: Result, String)>> = names - .iter() - .map(|name| Ok((col(name, schema)?, (*name).to_string()))) - .collect(); - Ok(PhysicalGroupBy::new_single(groups?)) - } - - fn repartition( - input: Arc, - keys: &[&str], - schema: &SchemaRef, - ) -> Result> { - let exprs = keys - .iter() - .map(|name| col(name, schema)) - .collect::>>()?; - Ok(Arc::new(RepartitionExec::try_new( - input, - Partitioning::Hash(exprs, 8), - )?)) - } - - fn aggregate( - mode: AggregateMode, - input: Arc, - group_by: PhysicalGroupBy, - input_schema: SchemaRef, - aggr_expr: Vec>, - ) -> Result> { - let filter_expr = vec![None; aggr_expr.len()]; - Ok(Arc::new(AggregateExec::try_new( - mode, - group_by, - aggr_expr, - filter_expr, - input, - input_schema, - )?)) - } - - fn min_expr(name: &str, schema: &SchemaRef) -> Result> { - Ok(Arc::new( - AggregateExprBuilder::new(min_udaf(), vec![col(name, schema)?]) - .schema(schema.clone()) - .alias(format!("min({name})")) - .build()?, - )) - } - - fn count_expr(name: &str, schema: &SchemaRef) -> Result> { - Ok(Arc::new( - AggregateExprBuilder::new(count_udaf(), vec![col(name, schema)?]) - .schema(schema.clone()) - .alias(format!("count({name})")) - .build()?, - )) - } - - fn optimize(plan: Arc) -> Result { - let optimized = CombineSteppedAggregate.optimize(plan, &Default::default())?; - Ok(displayable(optimized.as_ref()).indent(true).to_string()) - } - - #[test] - fn combines_coalesced_partial_final_for_min() -> Result<()> { - let raw = input_exec(schema()); - let finer = repartition(raw.clone(), &["a", "b"], &raw.schema())?; - let aggr_expr = vec![min_expr("c", &raw.schema())?]; - let partial = aggregate( - AggregateMode::Partial, - finer, - group_by(&["a"], &raw.schema())?, - raw.schema(), - aggr_expr.clone(), - )?; - let final_agg = aggregate( - AggregateMode::Final, - Arc::new(CoalescePartitionsExec::new(partial)), - group_by(&["a"], &raw.schema())?, - raw.schema(), - aggr_expr, - )?; - - assert_eq!( - optimize(final_agg)?.trim(), - r#"AggregateExec: mode=Single, gby=[a@0 as a], aggr=[min(c)] - CoalescePartitionsExec - RepartitionExec: partitioning=Hash([a@0, b@1], 8), input_partitions=1 - DataSourceExec: partitions=1, partition_sizes=[0]"# - ); - Ok(()) - } - - #[test] - fn keeps_coalesced_partial_final_for_count() -> Result<()> { - let raw = input_exec(schema()); - let finer = repartition(raw.clone(), &["a", "b"], &raw.schema())?; - let aggr_expr = vec![count_expr("c", &raw.schema())?]; - let partial = aggregate( - AggregateMode::Partial, - finer, - group_by(&["a"], &raw.schema())?, - raw.schema(), - aggr_expr.clone(), - )?; - let final_agg = aggregate( - AggregateMode::Final, - Arc::new(CoalescePartitionsExec::new(partial)), - group_by(&["a"], &raw.schema())?, - raw.schema(), - aggr_expr, - )?; - - assert_eq!( - optimize(final_agg)?.trim(), - r#"AggregateExec: mode=Final, gby=[a@0 as a], aggr=[count(c)] - CoalescePartitionsExec - AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[count(c)] - RepartitionExec: partitioning=Hash([a@0, b@1], 8), input_partitions=1 - DataSourceExec: partitions=1, partition_sizes=[0]"# - ); - Ok(()) - } -} diff --git a/src/query/src/query_engine/state.rs b/src/query/src/query_engine/state.rs index 03b1f60269..c6a96a6b03 100644 --- a/src/query/src/query_engine/state.rs +++ b/src/query/src/query_engine/state.rs @@ -61,7 +61,6 @@ use crate::dist_plan::{ }; use crate::metrics::{QUERY_MEMORY_POOL_REJECTED_TOTAL, QUERY_MEMORY_POOL_USAGE_BYTES}; use crate::optimizer::ExtensionAnalyzerRule; -use crate::optimizer::combine_stepped_aggregate::CombineSteppedAggregate; use crate::optimizer::constant_term::MatchesConstantTermOptimizer; use crate::optimizer::count_nest_aggr::CountNestAggrRule; use crate::optimizer::count_wildcard::CountWildcardToTimeIndexRule; @@ -202,11 +201,6 @@ impl QueryEngineState { Self::insert_physical_optimizer_rule_after( &mut physical_optimizer.rules, ReduceAggregateRepartition.name(), - Arc::new(CombineSteppedAggregate), - ); - Self::insert_physical_optimizer_rule_after( - &mut physical_optimizer.rules, - CombineSteppedAggregate.name(), Arc::new(datafusion::physical_optimizer::enforce_distribution::EnforceDistribution {}), ); // Add rule for windowed sort diff --git a/tests/cases/distributed/explain/step_aggr_advance.result b/tests/cases/distributed/explain/step_aggr_advance.result index a6888a4890..1e060ec310 100644 --- a/tests/cases/distributed/explain/step_aggr_advance.result +++ b/tests/cases/distributed/explain/step_aggr_advance.result @@ -58,8 +58,9 @@ tql analyze (1752591864, 1752592164, '30s') max by (a, b, c) (max_over_time(aggr |_|_|_MergeScanExec: REDACTED |_|_|_| | 1_| 0_|_SortExec: expr=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST, c@2 ASC NULLS LAST, greptime_timestamp@3 ASC NULLS LAST], preserve_partitioning=[false] REDACTED -|_|_|_AggregateExec: mode=Single, gby=[a@2 as a, b@3 as b, c@4 as c, greptime_timestamp@0 as greptime_timestamp], aggr=[max(prom_max_over_time(greptime_timestamp_range,greptime_value))] REDACTED +|_|_|_AggregateExec: mode=Final, gby=[a@0 as a, b@1 as b, c@2 as c, greptime_timestamp@3 as greptime_timestamp], aggr=[max(prom_max_over_time(greptime_timestamp_range,greptime_value))] REDACTED |_|_|_CoalescePartitionsExec REDACTED +|_|_|_AggregateExec: mode=Partial, gby=[a@2 as a, b@3 as b, c@4 as c, greptime_timestamp@0 as greptime_timestamp], aggr=[max(prom_max_over_time(greptime_timestamp_range,greptime_value))] REDACTED |_|_|_FilterExec: prom_max_over_time(greptime_timestamp_range,greptime_value)@1 IS NOT NULL REDACTED |_|_|_ProjectionExec: expr=[greptime_timestamp@4 as greptime_timestamp, prom_max_over_time(greptime_timestamp_range@6, greptime_value@5) as prom_max_over_time(greptime_timestamp_range,greptime_value), a@0 as a, b@1 as b, c@2 as c] REDACTED |_|_|_PromRangeManipulateExec: req range=[1752591864000..1752592164000], interval=[30000], eval range=[120000], time index=[greptime_timestamp] REDACTED @@ -68,8 +69,9 @@ tql analyze (1752591864, 1752592164, '30s') max by (a, b, c) (max_over_time(aggr |_|_|_SeriesScan: region=REDACTED, "partition_count":{"count":0, "mem_ranges":0, "files":0, "file_ranges":0}, "distribution":"PerSeries" REDACTED |_|_|_| | 1_| 1_|_SortExec: expr=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST, c@2 ASC NULLS LAST, greptime_timestamp@3 ASC NULLS LAST], preserve_partitioning=[false] REDACTED -|_|_|_AggregateExec: mode=Single, gby=[a@2 as a, b@3 as b, c@4 as c, greptime_timestamp@0 as greptime_timestamp], aggr=[max(prom_max_over_time(greptime_timestamp_range,greptime_value))] REDACTED +|_|_|_AggregateExec: mode=Final, gby=[a@0 as a, b@1 as b, c@2 as c, greptime_timestamp@3 as greptime_timestamp], aggr=[max(prom_max_over_time(greptime_timestamp_range,greptime_value))] REDACTED |_|_|_CoalescePartitionsExec REDACTED +|_|_|_AggregateExec: mode=Partial, gby=[a@2 as a, b@3 as b, c@4 as c, greptime_timestamp@0 as greptime_timestamp], aggr=[max(prom_max_over_time(greptime_timestamp_range,greptime_value))] REDACTED |_|_|_FilterExec: prom_max_over_time(greptime_timestamp_range,greptime_value)@1 IS NOT NULL REDACTED |_|_|_ProjectionExec: expr=[greptime_timestamp@4 as greptime_timestamp, prom_max_over_time(greptime_timestamp_range@6, greptime_value@5) as prom_max_over_time(greptime_timestamp_range,greptime_value), a@0 as a, b@1 as b, c@2 as c] REDACTED |_|_|_PromRangeManipulateExec: req range=[1752591864000..1752592164000], interval=[30000], eval range=[120000], time index=[greptime_timestamp] REDACTED diff --git a/tests/cases/standalone/common/tql-explain-analyze/explain.result b/tests/cases/standalone/common/tql-explain-analyze/explain.result index 50a5d8a837..471d2405f0 100644 --- a/tests/cases/standalone/common/tql-explain-analyze/explain.result +++ b/tests/cases/standalone/common/tql-explain-analyze/explain.result @@ -182,7 +182,6 @@ TQL EXPLAIN VERBOSE (0, 10, '5s') test; | physical_plan after EnforceDistribution_| SAME TEXT AS ABOVE_| | physical_plan after CombinePartialFinalAggregate_| SAME TEXT AS ABOVE_| | physical_plan after ReduceAggregateRepartition_| SAME TEXT AS ABOVE_| -| physical_plan after CombineSteppedAggregate_| SAME TEXT AS ABOVE_| | physical_plan after EnforceDistribution_| SAME TEXT AS ABOVE_| | physical_plan after EnforceSorting_| SAME TEXT AS ABOVE_| | physical_plan after OptimizeAggregateOrder_| SAME TEXT AS ABOVE_| @@ -331,7 +330,6 @@ TQL EXPLAIN VERBOSE (0, 10, '5s') test AS series; | physical_plan after EnforceDistribution_| SAME TEXT AS ABOVE_| | physical_plan after CombinePartialFinalAggregate_| SAME TEXT AS ABOVE_| | physical_plan after ReduceAggregateRepartition_| SAME TEXT AS ABOVE_| -| physical_plan after CombineSteppedAggregate_| SAME TEXT AS ABOVE_| | physical_plan after EnforceDistribution_| SAME TEXT AS ABOVE_| | physical_plan after EnforceSorting_| SAME TEXT AS ABOVE_| | physical_plan after OptimizeAggregateOrder_| SAME TEXT AS ABOVE_| diff --git a/tests/cases/standalone/optimizer/reduce_aggregate_repartition.result b/tests/cases/standalone/optimizer/reduce_aggregate_repartition.result index 920b03d78b..674474c236 100644 --- a/tests/cases/standalone/optimizer/reduce_aggregate_repartition.result +++ b/tests/cases/standalone/optimizer/reduce_aggregate_repartition.result @@ -184,7 +184,6 @@ GROUP BY b, a; |_|_| | physical_plan after CombinePartialFinalAggregate_| SAME TEXT AS ABOVE_| | physical_plan after ReduceAggregateRepartition_| SAME TEXT AS ABOVE_| -| physical_plan after CombineSteppedAggregate_| SAME TEXT AS ABOVE_| | physical_plan after EnforceDistribution_| SAME TEXT AS ABOVE_| | physical_plan after EnforceSorting_| SAME TEXT AS ABOVE_| | physical_plan after OptimizeAggregateOrder_| SAME TEXT AS ABOVE_|