diff --git a/src/query/src/optimizer/reduce_aggregate_repartition.rs b/src/query/src/optimizer/reduce_aggregate_repartition.rs index d29ceb56b5..4e3164ddc4 100644 --- a/src/query/src/optimizer/reduce_aggregate_repartition.rs +++ b/src/query/src/optimizer/reduce_aggregate_repartition.rs @@ -21,7 +21,6 @@ use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec; use datafusion::physical_plan::repartition::RepartitionExec; use datafusion::physical_plan::{ExecutionPlan, ExecutionPlanProperties, InputOrderMode}; use datafusion_common::Result as DfResult; -use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; use datafusion_physical_expr::{Distribution, Partitioning}; /// Replaces a redundant hash repartition before a coarser aggregate with a @@ -33,13 +32,18 @@ use datafusion_physical_expr::{Distribution, Partitioning}; #[derive(Debug)] pub struct ReduceAggregateRepartition; +struct OptimizedPlan { + plan: Arc, + subtree_is_linear: bool, +} + impl PhysicalOptimizerRule for ReduceAggregateRepartition { fn optimize( &self, plan: Arc, _config: &ConfigOptions, ) -> DfResult> { - Self::do_optimize(plan) + Ok(Self::do_optimize_with_context(plan, false)?.plan) } fn name(&self) -> &str { @@ -52,68 +56,99 @@ impl PhysicalOptimizerRule for ReduceAggregateRepartition { } impl ReduceAggregateRepartition { - fn do_optimize(plan: Arc) -> DfResult> { - plan.transform_down(|plan| { - let Some(agg_exec) = plan.as_any().downcast_ref::() else { - return Ok(Transformed::no(plan)); - }; + fn do_optimize_with_context( + plan: Arc, + under_branching_ancestor: bool, + ) -> DfResult { + let children = plan.children(); + let subtree_branches_here = children.len() > 1; + let child_under_branching_ancestor = under_branching_ancestor || subtree_branches_here; + let optimized_children = children + .into_iter() + .map(|child| { + Self::do_optimize_with_context(Arc::clone(child), child_under_branching_ancestor) + }) + .collect::>>()?; + let subtree_is_linear = !subtree_branches_here + && optimized_children + .iter() + .all(|child| child.subtree_is_linear); + let new_children = optimized_children + .into_iter() + .map(|child| child.plan) + .collect(); + let plan = plan.with_new_children(new_children)?; - let new_mode = match agg_exec.mode() { - AggregateMode::FinalPartitioned => AggregateMode::Final, - AggregateMode::SinglePartitioned => AggregateMode::Single, - _ => return Ok(Transformed::no(plan)), - }; + let plan = if under_branching_ancestor || !subtree_is_linear { + plan + } else { + Self::try_reduce_repartition(plan)? + }; - if agg_exec.input_order_mode() != &InputOrderMode::Linear - || agg_exec.group_expr().has_grouping_set() - { - return Ok(Transformed::no(plan)); - } - - let Some(repartition_exec) = - agg_exec.input().as_any().downcast_ref::() - else { - return Ok(Transformed::no(plan)); - }; - - if repartition_exec.preserve_order() { - return Ok(Transformed::no(plan)); - } - - let Some(required_distribution) = - agg_exec.required_input_distribution().into_iter().next() - else { - return Ok(Transformed::no(plan)); - }; - let repartition_satisfaction = repartition_exec.partitioning().satisfaction( - &required_distribution, - repartition_exec.properties().equivalence_properties(), - true, - ); - if !repartition_satisfaction.is_satisfied() { - return Ok(Transformed::no(plan)); - } - - if !Self::can_reduce_repartition(repartition_exec) { - return Ok(Transformed::no(plan)); - } - - let new_input = Arc::new(CoalescePartitionsExec::new( - repartition_exec.input().clone(), - )); - let new_agg = AggregateExec::try_new( - new_mode, - agg_exec.group_expr().clone(), - agg_exec.aggr_expr().to_vec(), - agg_exec.filter_expr().to_vec(), - new_input, - agg_exec.input_schema(), - )? - .with_limit_options(agg_exec.limit_options()); - - Ok(Transformed::yes(Arc::new(new_agg))) + Ok(OptimizedPlan { + plan, + subtree_is_linear, }) - .data() + } + + fn aggregate_repartition_candidate( + plan: &Arc, + ) -> Option<(&AggregateExec, &RepartitionExec)> { + let agg_exec = plan.as_any().downcast_ref::()?; + let repartition_exec = agg_exec + .input() + .as_any() + .downcast_ref::()?; + Some((agg_exec, repartition_exec)) + } + + fn can_rewrite_aggregate(agg_exec: &AggregateExec, repartition_exec: &RepartitionExec) -> bool { + if !matches!( + agg_exec.mode(), + AggregateMode::FinalPartitioned | AggregateMode::SinglePartitioned + ) || agg_exec.input_order_mode() != &InputOrderMode::Linear + || agg_exec.group_expr().has_grouping_set() + || repartition_exec.preserve_order() + { + return false; + } + + let Some(required_distribution) = agg_exec.required_input_distribution().into_iter().next() + else { + return false; + }; + let repartition_satisfaction = repartition_exec.partitioning().satisfaction( + &required_distribution, + repartition_exec.properties().equivalence_properties(), + true, + ); + repartition_satisfaction.is_satisfied() && Self::can_reduce_repartition(repartition_exec) + } + + fn try_reduce_repartition(plan: Arc) -> DfResult> { + let Some((agg_exec, repartition_exec)) = Self::aggregate_repartition_candidate(&plan) + else { + return Ok(plan); + }; + + if !Self::can_rewrite_aggregate(agg_exec, repartition_exec) { + return Ok(plan); + } + + let new_input = Arc::new(CoalescePartitionsExec::new( + repartition_exec.input().clone(), + )); + let new_agg = AggregateExec::try_new( + *agg_exec.mode(), + agg_exec.group_expr().clone(), + agg_exec.aggr_expr().to_vec(), + agg_exec.filter_expr().to_vec(), + new_input, + agg_exec.input_schema(), + )? + .with_limit_options(agg_exec.limit_options()); + + Ok(Arc::new(new_agg)) } fn can_reduce_repartition(repartition_exec: &RepartitionExec) -> bool { @@ -147,6 +182,7 @@ mod tests { use datafusion::physical_plan::aggregates::{AggregateExec, AggregateMode, PhysicalGroupBy}; use datafusion::physical_plan::projection::ProjectionExec; use datafusion::physical_plan::repartition::RepartitionExec; + use datafusion::physical_plan::union::UnionExec; use datafusion::physical_plan::{ExecutionPlan, displayable}; use datafusion_common::{DFSchema, Result}; use datafusion_physical_expr::aggregate::AggregateFunctionExpr; @@ -264,7 +300,7 @@ mod tests { assert_eq!( optimize(final_agg)?.trim(), - r#"AggregateExec: mode=Final, gby=[a@0 as a], aggr=[] + r#"AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[] CoalescePartitionsExec AggregateExec: mode=Partial, gby=[a@0 as a, b@1 as b], aggr=[] RepartitionExec: partitioning=Hash([a@0, b@1], 8), input_partitions=1 @@ -288,7 +324,7 @@ mod tests { assert_eq!( optimize(final_agg)?.trim(), - r#"AggregateExec: mode=Single, gby=[a@0 as a], aggr=[] + r#"AggregateExec: mode=SinglePartitioned, gby=[a@0 as a], aggr=[] CoalescePartitionsExec RepartitionExec: partitioning=Hash([a@0, b@1], 8), input_partitions=1 DataSourceExec: partitions=1, partition_sizes=[0]"# @@ -349,7 +385,7 @@ mod tests { assert_eq!( optimize(final_agg)?.trim(), - r#"AggregateExec: mode=Final, gby=[b@2 as b, c@0 as c], aggr=[] + r#"AggregateExec: mode=FinalPartitioned, gby=[b@2 as b, c@0 as c], aggr=[] CoalescePartitionsExec AggregateExec: mode=Partial, gby=[c@2 as c, a@0 as a, b@1 as b], aggr=[] RepartitionExec: partitioning=Hash([c@2, a@0, b@1], 8), input_partitions=1 @@ -373,7 +409,7 @@ mod tests { assert_eq!( optimize(final_agg)?.trim(), - r#"AggregateExec: mode=Final, gby=[a@0 as a, b@1 as b], aggr=[] + r#"AggregateExec: mode=FinalPartitioned, gby=[a@0 as a, b@1 as b], aggr=[] CoalescePartitionsExec RepartitionExec: partitioning=Hash([a@0, b@1, c@2], 8), input_partitions=1 DataSourceExec: partitions=1, partition_sizes=[0]"# @@ -431,7 +467,9 @@ mod tests { let optimized = optimize(final_agg)?; assert!( - optimized.contains("AggregateExec: mode=Single, gby=[x@0 as x, y@1 as y], aggr=[]") + optimized.contains( + "AggregateExec: mode=SinglePartitioned, gby=[x@0 as x, y@1 as y], aggr=[]" + ) ); assert!(optimized.contains("CoalescePartitionsExec")); assert!(optimized.contains("ProjectionExec: expr=[a@0 as x, b@1 as y, c@2 as z]")); @@ -455,7 +493,9 @@ mod tests { let optimized = optimize(final_agg)?; assert!( - optimized.contains("AggregateExec: mode=Single, gby=[x@1 as x, y@2 as y], aggr=[]"), + optimized.contains( + "AggregateExec: mode=SinglePartitioned, gby=[x@1 as x, y@2 as y], aggr=[]" + ), "{optimized}" ); assert!(optimized.contains("CoalescePartitionsExec"), "{optimized}"); @@ -463,6 +503,38 @@ mod tests { Ok(()) } + #[test] + fn keeps_subset_partitioning_with_branching_descendant() -> Result<()> { + let left = input_exec(); + let right = input_exec(); + let union = UnionExec::try_new(vec![left, right])?; + let finer = repartition(union.clone(), &["a", "b", "c"], &union.schema())?; + let partial = aggregate( + AggregateMode::Partial, + finer, + group_by(&["a", "b", "c"], &union.schema())?, + union.schema(), + vec![], + )?; + let final_repartition = repartition(partial.clone(), &["a", "b"], &partial.schema())?; + let final_agg = aggregate( + AggregateMode::FinalPartitioned, + final_repartition, + group_by(&["a", "b"], &partial.schema())?, + union.schema(), + vec![], + )?; + + let optimized = optimize(final_agg)?; + assert!( + optimized.contains("RepartitionExec: partitioning=Hash([a@0, b@1], 8)"), + "{optimized}" + ); + assert!(optimized.contains("UnionExec"), "{optimized}"); + assert!(!optimized.contains("CoalescePartitionsExec"), "{optimized}"); + Ok(()) + } + #[test] fn keeps_non_subset_repartition() -> Result<()> { let raw = input_exec(); diff --git a/src/query/src/query_engine/state.rs b/src/query/src/query_engine/state.rs index 3ded3f77fe..90680565f9 100644 --- a/src/query/src/query_engine/state.rs +++ b/src/query/src/query_engine/state.rs @@ -193,14 +193,9 @@ impl QueryEngineState { ); Self::insert_physical_optimizer_rule_after( &mut physical_optimizer.rules, - datafusion::physical_optimizer::combine_partial_final_agg::CombinePartialFinalAggregate::new().name(), + datafusion::physical_optimizer::update_aggr_exprs::OptimizeAggregateOrder::new().name(), Arc::new(ReduceAggregateRepartition), ); - Self::insert_physical_optimizer_rule_after( - &mut physical_optimizer.rules, - ReduceAggregateRepartition.name(), - Arc::new(datafusion::physical_optimizer::enforce_distribution::EnforceDistribution {}), - ); // Add rule for windowed sort physical_optimizer .rules diff --git a/tests/cases/standalone/common/tql-explain-analyze/explain.result b/tests/cases/standalone/common/tql-explain-analyze/explain.result index 471d2405f0..b44aa87264 100644 --- a/tests/cases/standalone/common/tql-explain-analyze/explain.result +++ b/tests/cases/standalone/common/tql-explain-analyze/explain.result @@ -181,10 +181,9 @@ TQL EXPLAIN VERBOSE (0, 10, '5s') test; | physical_plan after EnforceSorting_| SAME TEXT AS ABOVE_| | physical_plan after EnforceDistribution_| SAME TEXT AS ABOVE_| | physical_plan after CombinePartialFinalAggregate_| SAME TEXT AS ABOVE_| -| physical_plan after ReduceAggregateRepartition_| SAME TEXT AS ABOVE_| -| physical_plan after EnforceDistribution_| SAME TEXT AS ABOVE_| | physical_plan after EnforceSorting_| SAME TEXT AS ABOVE_| | physical_plan after OptimizeAggregateOrder_| SAME TEXT AS ABOVE_| +| physical_plan after ReduceAggregateRepartition_| SAME TEXT AS ABOVE_| | physical_plan after ProjectionPushdown_| SAME TEXT AS ABOVE_| | physical_plan after OutputRequirements_| MergeScanExec: REDACTED |_|_| @@ -329,10 +328,9 @@ TQL EXPLAIN VERBOSE (0, 10, '5s') test AS series; | physical_plan after EnforceSorting_| SAME TEXT AS ABOVE_| | physical_plan after EnforceDistribution_| SAME TEXT AS ABOVE_| | physical_plan after CombinePartialFinalAggregate_| SAME TEXT AS ABOVE_| -| physical_plan after ReduceAggregateRepartition_| SAME TEXT AS ABOVE_| -| physical_plan after EnforceDistribution_| SAME TEXT AS ABOVE_| | physical_plan after EnforceSorting_| SAME TEXT AS ABOVE_| | physical_plan after OptimizeAggregateOrder_| SAME TEXT AS ABOVE_| +| physical_plan after ReduceAggregateRepartition_| SAME TEXT AS ABOVE_| | physical_plan after ProjectionPushdown_| SAME TEXT AS ABOVE_| | physical_plan after OutputRequirements_| MergeScanExec: REDACTED |_|_| diff --git a/tests/cases/standalone/optimizer/reduce_aggregate_repartition.result b/tests/cases/standalone/optimizer/reduce_aggregate_repartition.result index 675b650fcf..1a5245f9b1 100644 --- a/tests/cases/standalone/optimizer/reduce_aggregate_repartition.result +++ b/tests/cases/standalone/optimizer/reduce_aggregate_repartition.result @@ -185,10 +185,9 @@ GROUP BY b, a; |_|_MergeScanExec: REDACTED |_|_| | physical_plan after CombinePartialFinalAggregate_| SAME TEXT AS ABOVE_| -| physical_plan after ReduceAggregateRepartition_| SAME TEXT AS ABOVE_| -| physical_plan after EnforceDistribution_| SAME TEXT AS ABOVE_| | physical_plan after EnforceSorting_| SAME TEXT AS ABOVE_| | physical_plan after OptimizeAggregateOrder_| SAME TEXT AS ABOVE_| +| physical_plan after ReduceAggregateRepartition_| SAME TEXT AS ABOVE_| | physical_plan after ProjectionPushdown_| SAME TEXT AS ABOVE_| | physical_plan after OutputRequirements_| ProjectionExec: expr=[a@1 as a, b@0 as b, sum(s.m)@2 as sum(s.m)]_| |_|_AggregateExec: mode=FinalPartitioned, gby=[b@0 as b, a@1 as a], aggr=[sum(s.m)]_| @@ -252,7 +251,7 @@ GROUP BY b, a; |_|_|_AggregateExec: mode=Partial, gby=[b@0 as b, a@1 as a], aggr=[sum(s.m)] REDACTED |_|_|_MergeScanExec: REDACTED |_|_|_| -| 1_| 0_|_AggregateExec: mode=Final, gby=[b@0 as b, a@1 as a], aggr=[__sum_state(s.m)] REDACTED +| 1_| 0_|_AggregateExec: mode=FinalPartitioned, gby=[b@0 as b, a@1 as a], aggr=[__sum_state(s.m)] REDACTED |_|_|_CoalescePartitionsExec REDACTED |_|_|_AggregateExec: mode=Partial, gby=[b@1 as b, a@0 as a], aggr=[__sum_state(s.m)] REDACTED |_|_|_ProjectionExec: expr=[a@0 as a, b@1 as b, min(reduce_aggregate_repartition.val)@3 as m] REDACTED @@ -261,7 +260,7 @@ GROUP BY b, a; |_|_|_AggregateExec: mode=Partial, gby=[a@0 as a, b@1 as b, c@2 as c], aggr=[min(reduce_aggregate_repartition.val)] REDACTED |_|_|_SeqScan: region=REDACTED, "partition_count":{"count":1, "mem_ranges":1, "files":0, "file_ranges":0} REDACTED |_|_|_| -| 1_| 1_|_AggregateExec: mode=Final, gby=[b@0 as b, a@1 as a], aggr=[__sum_state(s.m)] REDACTED +| 1_| 1_|_AggregateExec: mode=FinalPartitioned, gby=[b@0 as b, a@1 as a], aggr=[__sum_state(s.m)] REDACTED |_|_|_CoalescePartitionsExec REDACTED |_|_|_AggregateExec: mode=Partial, gby=[b@1 as b, a@0 as a], aggr=[__sum_state(s.m)] REDACTED |_|_|_ProjectionExec: expr=[a@0 as a, b@1 as b, min(reduce_aggregate_repartition.val)@3 as m] REDACTED @@ -314,7 +313,7 @@ GROUP BY a; |_|_|_CooperativeExec REDACTED |_|_|_MergeScanExec: REDACTED |_|_|_| -| 1_| 0_|_AggregateExec: mode=Final, gby=[a@0 as a], aggr=[__count_state(s.m)] REDACTED +| 1_| 0_|_AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[__count_state(s.m)] REDACTED |_|_|_CoalescePartitionsExec REDACTED |_|_|_AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[__count_state(s.m)] REDACTED |_|_|_ProjectionExec: expr=[a@0 as a, min(reduce_aggregate_repartition.val)@3 as m] REDACTED @@ -323,7 +322,7 @@ GROUP BY a; |_|_|_AggregateExec: mode=Partial, gby=[a@0 as a, b@1 as b, c@2 as c], aggr=[min(reduce_aggregate_repartition.val)] REDACTED |_|_|_SeqScan: region=REDACTED, "partition_count":{"count":1, "mem_ranges":1, "files":0, "file_ranges":0} REDACTED |_|_|_| -| 1_| 1_|_AggregateExec: mode=Final, gby=[a@0 as a], aggr=[__count_state(s.m)] REDACTED +| 1_| 1_|_AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[__count_state(s.m)] REDACTED |_|_|_CoalescePartitionsExec REDACTED |_|_|_AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[__count_state(s.m)] REDACTED |_|_|_ProjectionExec: expr=[a@0 as a, min(reduce_aggregate_repartition.val)@3 as m] REDACTED @@ -335,7 +334,8 @@ GROUP BY a; |_|_| Total rows: 2_| +-+-+-+ --- Zero-key reduction should rewrite SinglePartitioned to Single. +-- Zero-key reduction should collapse the redundant repartition while keeping +-- the outer aggregate in SinglePartitioned mode. SELECT sum(m) FROM ( SELECT a, b, c, min(val) AS m @@ -508,8 +508,9 @@ count(count(reduce_aggregate_repartition_metric) by (job)); | 0_| 0_|_CooperativeExec REDACTED |_|_|_MergeScanExec: REDACTED |_|_|_| -| 1_| 0_|_SortExec: expr=[ts@0 ASC NULLS LAST], preserve_partitioning=[false] REDACTED -|_|_|_AggregateExec: mode=Final, gby=[ts@0 as ts], aggr=[count(count(reduce_aggregate_repartition_metric.greptime_value))] REDACTED +| 1_| 0_|_SortPreservingMergeExec: [ts@0 ASC NULLS LAST] REDACTED +|_|_|_SortExec: expr=[ts@0 ASC NULLS LAST], preserve_partitioning=[true] REDACTED +|_|_|_AggregateExec: mode=FinalPartitioned, gby=[ts@0 as ts], aggr=[count(count(reduce_aggregate_repartition_metric.greptime_value))] REDACTED |_|_|_CoalescePartitionsExec REDACTED |_|_|_AggregateExec: mode=Partial, gby=[ts@0 as ts], aggr=[count(count(reduce_aggregate_repartition_metric.greptime_value))] REDACTED |_|_|_ProjectionExec: expr=[ts@0 as ts] REDACTED @@ -541,8 +542,9 @@ count(count(rate(reduce_aggregate_repartition_metric[5s])) by (job)); | 0_| 0_|_CooperativeExec REDACTED |_|_|_MergeScanExec: REDACTED |_|_|_| -| 1_| 0_|_SortExec: expr=[ts@0 ASC NULLS LAST], preserve_partitioning=[false] REDACTED -|_|_|_AggregateExec: mode=Final, gby=[ts@0 as ts], aggr=[count(count(prom_rate(ts_range,greptime_value,ts,Int64(5000))))] REDACTED +| 1_| 0_|_SortPreservingMergeExec: [ts@0 ASC NULLS LAST] REDACTED +|_|_|_SortExec: expr=[ts@0 ASC NULLS LAST], preserve_partitioning=[true] REDACTED +|_|_|_AggregateExec: mode=FinalPartitioned, gby=[ts@0 as ts], aggr=[count(count(prom_rate(ts_range,greptime_value,ts,Int64(5000))))] REDACTED |_|_|_CoalescePartitionsExec REDACTED |_|_|_AggregateExec: mode=Partial, gby=[ts@0 as ts], aggr=[count(count(prom_rate(ts_range,greptime_value,ts,Int64(5000))))] REDACTED |_|_|_ProjectionExec: expr=[ts@1 as ts, count(prom_rate(ts_range,greptime_value,ts,Int64(5000)))@2 as count(prom_rate(ts_range,greptime_value,ts,Int64(5000)))] REDACTED diff --git a/tests/cases/standalone/optimizer/reduce_aggregate_repartition.sql b/tests/cases/standalone/optimizer/reduce_aggregate_repartition.sql index 4c756ac703..969143c4cc 100644 --- a/tests/cases/standalone/optimizer/reduce_aggregate_repartition.sql +++ b/tests/cases/standalone/optimizer/reduce_aggregate_repartition.sql @@ -85,7 +85,8 @@ FROM ( ) s GROUP BY a; --- Zero-key reduction should rewrite SinglePartitioned to Single. +-- Zero-key reduction should collapse the redundant repartition while keeping +-- the outer aggregate in SinglePartitioned mode. SELECT sum(m) FROM ( SELECT a, b, c, min(val) AS m diff --git a/tests/cases/standalone/tql-explain-analyze/tsid_column.result b/tests/cases/standalone/tql-explain-analyze/tsid_column.result index 86769d03db..e0c10e9989 100644 --- a/tests/cases/standalone/tql-explain-analyze/tsid_column.result +++ b/tests/cases/standalone/tql-explain-analyze/tsid_column.result @@ -107,8 +107,9 @@ TQL ANALYZE (0, 10, '5s') sum(irate(tsid_metric[1h])) / scalar(count(count(tsid |_|_|_CooperativeExec REDACTED |_|_|_MergeScanExec: REDACTED |_|_|_| -| 1_| 0_|_SortExec: expr=[ts@0 ASC NULLS LAST], preserve_partitioning=[false] REDACTED -|_|_|_AggregateExec: mode=Final, gby=[ts@0 as ts], aggr=[count(count(tsid_metric.val))] REDACTED +| 1_| 0_|_SortPreservingMergeExec: [ts@0 ASC NULLS LAST] REDACTED +|_|_|_SortExec: expr=[ts@0 ASC NULLS LAST], preserve_partitioning=[true] REDACTED +|_|_|_AggregateExec: mode=FinalPartitioned, gby=[ts@0 as ts], aggr=[count(count(tsid_metric.val))] REDACTED |_|_|_CoalescePartitionsExec REDACTED |_|_|_AggregateExec: mode=Partial, gby=[ts@0 as ts], aggr=[count(count(tsid_metric.val))] REDACTED |_|_|_ProjectionExec: expr=[ts@0 as ts] REDACTED @@ -157,8 +158,9 @@ TQL ANALYZE (0, 10, '5s') sum(irate(tsid_metric[1h])) / scalar(count(sum(tsid_m |_|_|_CooperativeExec REDACTED |_|_|_MergeScanExec: REDACTED |_|_|_| -| 1_| 0_|_SortExec: expr=[ts@0 ASC NULLS LAST], preserve_partitioning=[false] REDACTED -|_|_|_AggregateExec: mode=Final, gby=[ts@0 as ts], aggr=[count(sum(tsid_metric.val))] REDACTED +| 1_| 0_|_SortPreservingMergeExec: [ts@0 ASC NULLS LAST] REDACTED +|_|_|_SortExec: expr=[ts@0 ASC NULLS LAST], preserve_partitioning=[true] REDACTED +|_|_|_AggregateExec: mode=FinalPartitioned, gby=[ts@0 as ts], aggr=[count(sum(tsid_metric.val))] REDACTED |_|_|_CoalescePartitionsExec REDACTED |_|_|_AggregateExec: mode=Partial, gby=[ts@0 as ts], aggr=[count(sum(tsid_metric.val))] REDACTED |_|_|_ProjectionExec: expr=[ts@0 as ts] REDACTED