From 838d3ab04ee1cec7024ad08552bcb02ec9d9504e Mon Sep 17 00:00:00 2001 From: discord9 Date: Sun, 8 Jun 2025 19:19:11 +0800 Subject: [PATCH] feat: steppable aggr fn poc: step aggr query feat: mvp poc stuff test: sqlness chore: import missing feat: support first/last_value fix: check also include first/last value --- src/query/src/dist_plan/analyzer.rs | 34 +- src/query/src/dist_plan/commutativity.rs | 163 ++++++- .../distributed/explain/step_aggr.result | 409 ++++++++++++++++++ tests/cases/distributed/explain/step_aggr.sql | 144 ++++++ .../explain/step_aggr_basic.result | 80 ++++ .../distributed/explain/step_aggr_basic.sql | 34 ++ 6 files changed, 856 insertions(+), 8 deletions(-) create mode 100644 tests/cases/distributed/explain/step_aggr.result create mode 100644 tests/cases/distributed/explain/step_aggr.sql create mode 100644 tests/cases/distributed/explain/step_aggr_basic.result create mode 100644 tests/cases/distributed/explain/step_aggr_basic.sql diff --git a/src/query/src/dist_plan/analyzer.rs b/src/query/src/dist_plan/analyzer.rs index 6ebc078eea..72d697ec8d 100644 --- a/src/query/src/dist_plan/analyzer.rs +++ b/src/query/src/dist_plan/analyzer.rs @@ -15,6 +15,7 @@ use std::collections::HashSet; use std::sync::Arc; +use common_telemetry::debug; use datafusion::datasource::DefaultTableSource; use datafusion::error::Result as DfResult; use datafusion_common::config::ConfigOptions; @@ -154,6 +155,7 @@ struct PlanRewriter { /// Partition columns of the table in current pass partition_cols: Option>, column_requirements: HashSet, + expand_on_next_call: bool, } impl PlanRewriter { @@ -174,6 +176,10 @@ impl PlanRewriter { { return true; } + if self.expand_on_next_call { + self.expand_on_next_call = false; + return true; + } match Categorizer::check_plan(plan, self.partition_cols.clone()) { Commutativity::Commutative => {} Commutativity::PartialCommutative => { @@ -190,12 +196,17 @@ impl PlanRewriter { self.stage.push(plan) } } - Commutativity::TransformedCommutative(transformer) => { + Commutativity::TransformedCommutative { + transformer, + expand_on_parent, + } => { if let Some(transformer) = transformer - && let Some(plan) = transformer(plan) + && let Some(changed_plan) = transformer(plan) { - self.update_column_requirements(&plan); - self.stage.push(plan) + debug!("PlanRewriter: transformed plan: {changed_plan} from {plan}"); + self.update_column_requirements(&changed_plan); + self.stage.push(changed_plan); + self.expand_on_next_call = expand_on_parent; } } Commutativity::NonCommutative @@ -391,10 +402,21 @@ impl TreeNodeRewriter for PlanRewriter { return Ok(Transformed::yes(node)); }; + let parent = parent.clone(); + // TODO(ruihang): avoid this clone - if self.should_expand(&parent.clone()) { + if self.should_expand(&parent) { // TODO(ruihang): does this work for nodes with multiple children?; - let node = self.expand(node)?; + debug!("PlanRewriter: should expand child:\n {node}\n Of Parent: {parent}"); + let node = self.expand(node); + debug!( + "PlanRewriter: expanded plan: {}", + match &node { + Ok(n) => n.to_string(), + Err(e) => format!("Error expanding plan: {e}"), + } + ); + let node = node?; self.pop_stack(); return Ok(Transformed::yes(node)); } diff --git a/src/query/src/dist_plan/commutativity.rs b/src/query/src/dist_plan/commutativity.rs index 59871e991c..6a06988889 100644 --- a/src/query/src/dist_plan/commutativity.rs +++ b/src/query/src/dist_plan/commutativity.rs @@ -15,6 +15,9 @@ use std::collections::HashSet; use std::sync::Arc; +use common_function::aggr::{HllState, UddSketchState, HLL_NAME, UDDSKETCH_STATE_NAME}; +use common_telemetry::debug; +use datafusion::functions_aggregate::sum::sum_udaf; use datafusion_expr::{Expr, LogicalPlan, UserDefinedLogicalNode}; use promql::extension_plan::{ EmptyMetric, InstantManipulate, RangeManipulate, SeriesDivide, SeriesNormalize, @@ -23,12 +26,157 @@ use promql::extension_plan::{ use crate::dist_plan::merge_sort::{merge_sort_transformer, MergeSortLogicalPlan}; use crate::dist_plan::MergeScanLogicalPlan; +/// generate the upper aggregation plan that will execute on the frontend. +pub fn step_aggr_to_upper_aggr(aggr_plan: &LogicalPlan) -> datafusion_common::Result { + let LogicalPlan::Aggregate(aggr) = aggr_plan else { + return Err(datafusion_common::DataFusionError::Plan( + "step_aggr_to_upper_aggr only accepts Aggregate plan".to_string(), + )); + }; + if !is_all_aggr_exprs_steppable(&aggr.aggr_expr) { + return Err(datafusion_common::DataFusionError::NotImplemented( + "Some aggregate expressions are not steppable".to_string(), + )); + } + let mut upper_aggr_expr = vec![]; + for aggr_expr in &aggr.aggr_expr { + let Some(aggr_func) = get_aggr_func(aggr_expr) else { + return Err(datafusion_common::DataFusionError::NotImplemented( + "Aggregate function not found".to_string(), + )); + }; + let col_name = aggr_expr.name_for_alias()?; + let input_column = + Expr::Column(datafusion_common::Column::new_unqualified(col_name.clone())); + let upper_func = match aggr_func.func.name() { + "sum" | "min" | "max" | "last_value" | "first_value" => { + // aggr_calc(aggr_merge(input_column))) as col_name + let mut new_aggr_func = aggr_func.clone(); + new_aggr_func.args = vec![input_column.clone()]; + new_aggr_func + } + "count" => { + // sum(input_column) as col_name + let mut new_aggr_func = aggr_func.clone(); + new_aggr_func.func = sum_udaf(); + new_aggr_func.args = vec![input_column.clone()]; + new_aggr_func + } + UDDSKETCH_STATE_NAME => { + // udd_merge(bucket_size, error_rate input_column) as col_name + let mut new_aggr_func = aggr_func.clone(); + new_aggr_func.func = Arc::new(UddSketchState::merge_udf_impl()); + new_aggr_func.args[2] = input_column.clone(); + new_aggr_func + } + HLL_NAME => { + // hll_merge(input_column) as col_name + let mut new_aggr_func = aggr_func.clone(); + new_aggr_func.func = Arc::new(HllState::merge_udf_impl()); + new_aggr_func.args = vec![input_column.clone()]; + new_aggr_func + } + _ => { + return Err(datafusion_common::DataFusionError::NotImplemented(format!( + "Aggregate function {} is not supported for Step aggregation", + aggr_func.func.name() + ))) + } + }; + + // deal with nested alias case + let mut new_aggr_expr = aggr_expr.clone(); + { + let new_aggr_func = get_aggr_func_mut(&mut new_aggr_expr).unwrap(); + *new_aggr_func = upper_func; + } + + // make the column name the same, so parent can recognize it + upper_aggr_expr.push(new_aggr_expr.alias(col_name)); + } + let mut new_aggr = aggr.clone(); + new_aggr.aggr_expr = upper_aggr_expr; + // group by expr also need alias to avoid duplicated computing + + let mut new_group_expr = new_aggr.group_expr.clone(); + for expr in &mut new_group_expr { + if let Expr::Column(_) = expr { + // already a column, no need to change + continue; + } + let col_name = expr.name_for_alias()?; + let input_column = + Expr::Column(datafusion_common::Column::new_unqualified(col_name.clone())); + *expr = input_column.alias(col_name); + } + new_aggr.group_expr = new_group_expr; + // return the new logical plan + Ok(LogicalPlan::Aggregate(new_aggr)) +} + +/// Check if the given aggregate expression is steppable. +/// As in if it can be split into multiple steps: +/// i.e. on datanode first call `state(input)` then +/// on frontend call `calc(merge(state))` to get the final result. +/// +pub fn is_all_aggr_exprs_steppable(aggr_exprs: &[Expr]) -> bool { + let step_action = HashSet::from([ + "sum", + "count", + "min", + "max", + "first_value", + "last_value", + UDDSKETCH_STATE_NAME, + HLL_NAME, + ]); + aggr_exprs.iter().all(|expr| { + if let Some(aggr_func) = get_aggr_func(expr) { + if aggr_func.distinct { + // Distinct aggregate functions are not steppable(yet). + return false; + } + step_action.contains(aggr_func.func.name()) + } else { + false + } + }) +} + +pub fn get_aggr_func(expr: &Expr) -> Option<&datafusion_expr::expr::AggregateFunction> { + let mut expr_ref = expr; + while let Expr::Alias(alias) = expr_ref { + expr_ref = &alias.expr; + } + if let Expr::AggregateFunction(aggr_func) = expr_ref { + Some(aggr_func) + } else { + None + } +} + +pub fn get_aggr_func_mut(expr: &mut Expr) -> Option<&mut datafusion_expr::expr::AggregateFunction> { + let mut expr_ref = expr; + while let Expr::Alias(alias) = expr_ref { + expr_ref = &mut alias.expr; + } + if let Expr::AggregateFunction(aggr_func) = expr_ref { + Some(aggr_func) + } else { + None + } +} + #[allow(dead_code)] pub enum Commutativity { Commutative, PartialCommutative, ConditionalCommutative(Option), - TransformedCommutative(Option), + TransformedCommutative { + transformer: Option, + /// whether the transformer changes the child to parent + expand_on_parent: bool, + }, NonCommutative, Unimplemented, /// For unrelated plans like DDL @@ -55,7 +203,18 @@ impl Categorizer { LogicalPlan::Filter(filter) => Self::check_expr(&filter.predicate), LogicalPlan::Window(_) => Commutativity::Unimplemented, LogicalPlan::Aggregate(aggr) => { - if !Self::check_partition(&aggr.group_expr, &partition_cols) { + let is_all_steppable = is_all_aggr_exprs_steppable(&aggr.aggr_expr); + let is_partition = Self::check_partition(&aggr.group_expr, &partition_cols); + if !is_partition && is_all_steppable { + debug!("Plan is steppable: {plan}"); + return Commutativity::TransformedCommutative { + transformer: Some(Arc::new(|plan: &LogicalPlan| { + step_aggr_to_upper_aggr(plan).ok() + })), + expand_on_parent: true, + }; + } + if !is_partition { return Commutativity::NonCommutative; } for expr in &aggr.aggr_expr { diff --git a/tests/cases/distributed/explain/step_aggr.result b/tests/cases/distributed/explain/step_aggr.result new file mode 100644 index 0000000000..93d80ae2fb --- /dev/null +++ b/tests/cases/distributed/explain/step_aggr.result @@ -0,0 +1,409 @@ +CREATE TABLE integers( + host STRING, + i BIGINT, + ts TIMESTAMP TIME INDEX +) PARTITION ON COLUMNS (host) ( + host < '550-A', + host >= '550-A' + AND host < '550-W', + host >= '550-W' +); + +Affected Rows: 0 + +INSERT INTO integers (host, i, ts) VALUES + ('550-A', 1, '2023-01-01 00:00:00'), + ('550-A', 2, '2023-01-01 01:00:00'), + ('550-W', 3, '2023-01-01 02:00:00'), + ('550-W', 4, '2023-01-01 03:00:00'); + +Affected Rows: 4 + +-- count +EXPLAIN SELECT + count(i) +FROM + integers; + ++---------------+-------------------------------------------------------------------------------------------------------+ +| plan_type | plan | ++---------------+-------------------------------------------------------------------------------------------------------+ +| logical_plan | Aggregate: groupBy=[[]], aggr=[[sum(count(integers.i)) AS count(integers.i)]] | +| | MergeScan [is_placeholder=false, input=Aggregate: groupBy=[[]], aggr=[[count(integers.i)]] | +| | TableScan: integers] | +| physical_plan | AggregateExec: mode=Final, gby=[], aggr=[count(integers.i)] | +| | CoalescePartitionsExec | +| | AggregateExec: mode=Partial, gby=[], aggr=[count(integers.i)] | +| | MergeScanExec: peers=[4402341478400(1025, 0), 4402341478401(1025, 1), 4402341478402(1025, 2), ] | +| | | ++---------------+-------------------------------------------------------------------------------------------------------+ + +EXPLAIN SELECT + ts, + count(i) +FROM + integers +GROUP BY + ts; + ++---------------+---------------------------------------------------------------------------------------------------------+ +| plan_type | plan | ++---------------+---------------------------------------------------------------------------------------------------------+ +| logical_plan | Aggregate: groupBy=[[integers.ts]], aggr=[[sum(count(integers.i)) AS count(integers.i)]] | +| | MergeScan [is_placeholder=false, input=Aggregate: groupBy=[[integers.ts]], aggr=[[count(integers.i)]] | +| | TableScan: integers] | +| physical_plan | AggregateExec: mode=FinalPartitioned, gby=[ts@0 as ts], aggr=[count(integers.i)] | +| | CoalesceBatchesExec: target_batch_size=8192 | +| | RepartitionExec: partitioning=Hash([ts@0], 20), input_partitions=20 | +| | AggregateExec: mode=Partial, gby=[ts@0 as ts], aggr=[count(integers.i)] | +| | MergeScanExec: peers=[4402341478400(1025, 0), 4402341478401(1025, 1), 4402341478402(1025, 2), ] | +| | | ++---------------+---------------------------------------------------------------------------------------------------------+ + +EXPLAIN SELECT + count(i) +FROM + integers +GROUP BY + ts; + ++---------------+-----------------------------------------------------------------------------------------------------------+ +| plan_type | plan | ++---------------+-----------------------------------------------------------------------------------------------------------+ +| logical_plan | Projection: count(integers.i) | +| | Aggregate: groupBy=[[integers.ts]], aggr=[[sum(count(integers.i)) AS count(integers.i)]] | +| | MergeScan [is_placeholder=false, input=Aggregate: groupBy=[[integers.ts]], aggr=[[count(integers.i)]] | +| | TableScan: integers] | +| physical_plan | ProjectionExec: expr=[count(integers.i)@1 as count(integers.i)] | +| | AggregateExec: mode=FinalPartitioned, gby=[ts@0 as ts], aggr=[count(integers.i)] | +| | CoalesceBatchesExec: target_batch_size=8192 | +| | RepartitionExec: partitioning=Hash([ts@0], 20), input_partitions=20 | +| | AggregateExec: mode=Partial, gby=[ts@0 as ts], aggr=[count(integers.i)] | +| | MergeScanExec: peers=[4402341478400(1025, 0), 4402341478401(1025, 1), 4402341478402(1025, 2), ] | +| | | ++---------------+-----------------------------------------------------------------------------------------------------------+ + +-- sum +EXPLAIN SELECT + sum(i) +FROM + integers; + ++---------------+-------------------------------------------------------------------------------------------------------+ +| plan_type | plan | ++---------------+-------------------------------------------------------------------------------------------------------+ +| logical_plan | Aggregate: groupBy=[[]], aggr=[[sum(sum(integers.i)) AS sum(integers.i)]] | +| | MergeScan [is_placeholder=false, input=Aggregate: groupBy=[[]], aggr=[[sum(integers.i)]] | +| | TableScan: integers] | +| physical_plan | AggregateExec: mode=Final, gby=[], aggr=[sum(integers.i)] | +| | CoalescePartitionsExec | +| | AggregateExec: mode=Partial, gby=[], aggr=[sum(integers.i)] | +| | MergeScanExec: peers=[4402341478400(1025, 0), 4402341478401(1025, 1), 4402341478402(1025, 2), ] | +| | | ++---------------+-------------------------------------------------------------------------------------------------------+ + +EXPLAIN SELECT + ts, + sum(i) +FROM + integers +GROUP BY + ts; + ++---------------+---------------------------------------------------------------------------------------------------------+ +| plan_type | plan | ++---------------+---------------------------------------------------------------------------------------------------------+ +| logical_plan | Aggregate: groupBy=[[integers.ts]], aggr=[[sum(sum(integers.i)) AS sum(integers.i)]] | +| | MergeScan [is_placeholder=false, input=Aggregate: groupBy=[[integers.ts]], aggr=[[sum(integers.i)]] | +| | TableScan: integers] | +| physical_plan | AggregateExec: mode=FinalPartitioned, gby=[ts@0 as ts], aggr=[sum(integers.i)] | +| | CoalesceBatchesExec: target_batch_size=8192 | +| | RepartitionExec: partitioning=Hash([ts@0], 20), input_partitions=20 | +| | AggregateExec: mode=Partial, gby=[ts@0 as ts], aggr=[sum(integers.i)] | +| | MergeScanExec: peers=[4402341478400(1025, 0), 4402341478401(1025, 1), 4402341478402(1025, 2), ] | +| | | ++---------------+---------------------------------------------------------------------------------------------------------+ + +EXPLAIN SELECT + sum(i) +FROM + integers +GROUP BY + ts; + ++---------------+-----------------------------------------------------------------------------------------------------------+ +| plan_type | plan | ++---------------+-----------------------------------------------------------------------------------------------------------+ +| logical_plan | Projection: sum(integers.i) | +| | Aggregate: groupBy=[[integers.ts]], aggr=[[sum(sum(integers.i)) AS sum(integers.i)]] | +| | MergeScan [is_placeholder=false, input=Aggregate: groupBy=[[integers.ts]], aggr=[[sum(integers.i)]] | +| | TableScan: integers] | +| physical_plan | ProjectionExec: expr=[sum(integers.i)@1 as sum(integers.i)] | +| | AggregateExec: mode=FinalPartitioned, gby=[ts@0 as ts], aggr=[sum(integers.i)] | +| | CoalesceBatchesExec: target_batch_size=8192 | +| | RepartitionExec: partitioning=Hash([ts@0], 20), input_partitions=20 | +| | AggregateExec: mode=Partial, gby=[ts@0 as ts], aggr=[sum(integers.i)] | +| | MergeScanExec: peers=[4402341478400(1025, 0), 4402341478401(1025, 1), 4402341478402(1025, 2), ] | +| | | ++---------------+-----------------------------------------------------------------------------------------------------------+ + +-- min +EXPLAIN SELECT + min(i) +FROM + integers; + ++---------------+-------------------------------------------------------------------------------------------------------+ +| plan_type | plan | ++---------------+-------------------------------------------------------------------------------------------------------+ +| logical_plan | Aggregate: groupBy=[[]], aggr=[[min(min(integers.i)) AS min(integers.i)]] | +| | MergeScan [is_placeholder=false, input=Aggregate: groupBy=[[]], aggr=[[min(integers.i)]] | +| | TableScan: integers] | +| physical_plan | AggregateExec: mode=Final, gby=[], aggr=[min(integers.i)] | +| | CoalescePartitionsExec | +| | AggregateExec: mode=Partial, gby=[], aggr=[min(integers.i)] | +| | MergeScanExec: peers=[4402341478400(1025, 0), 4402341478401(1025, 1), 4402341478402(1025, 2), ] | +| | | ++---------------+-------------------------------------------------------------------------------------------------------+ + +EXPLAIN SELECT + ts, + min(i) +FROM + integers +GROUP BY + ts; + ++---------------+---------------------------------------------------------------------------------------------------------+ +| plan_type | plan | ++---------------+---------------------------------------------------------------------------------------------------------+ +| logical_plan | Aggregate: groupBy=[[integers.ts]], aggr=[[min(min(integers.i)) AS min(integers.i)]] | +| | MergeScan [is_placeholder=false, input=Aggregate: groupBy=[[integers.ts]], aggr=[[min(integers.i)]] | +| | TableScan: integers] | +| physical_plan | AggregateExec: mode=FinalPartitioned, gby=[ts@0 as ts], aggr=[min(integers.i)] | +| | CoalesceBatchesExec: target_batch_size=8192 | +| | RepartitionExec: partitioning=Hash([ts@0], 20), input_partitions=20 | +| | AggregateExec: mode=Partial, gby=[ts@0 as ts], aggr=[min(integers.i)] | +| | MergeScanExec: peers=[4402341478400(1025, 0), 4402341478401(1025, 1), 4402341478402(1025, 2), ] | +| | | ++---------------+---------------------------------------------------------------------------------------------------------+ + +EXPLAIN SELECT + min(i) +FROM + integers +GROUP BY + ts; + ++---------------+-----------------------------------------------------------------------------------------------------------+ +| plan_type | plan | ++---------------+-----------------------------------------------------------------------------------------------------------+ +| logical_plan | Projection: min(integers.i) | +| | Aggregate: groupBy=[[integers.ts]], aggr=[[min(min(integers.i)) AS min(integers.i)]] | +| | MergeScan [is_placeholder=false, input=Aggregate: groupBy=[[integers.ts]], aggr=[[min(integers.i)]] | +| | TableScan: integers] | +| physical_plan | ProjectionExec: expr=[min(integers.i)@1 as min(integers.i)] | +| | AggregateExec: mode=FinalPartitioned, gby=[ts@0 as ts], aggr=[min(integers.i)] | +| | CoalesceBatchesExec: target_batch_size=8192 | +| | RepartitionExec: partitioning=Hash([ts@0], 20), input_partitions=20 | +| | AggregateExec: mode=Partial, gby=[ts@0 as ts], aggr=[min(integers.i)] | +| | MergeScanExec: peers=[4402341478400(1025, 0), 4402341478401(1025, 1), 4402341478402(1025, 2), ] | +| | | ++---------------+-----------------------------------------------------------------------------------------------------------+ + +-- max +EXPLAIN SELECT + max(i) +FROM + integers; + ++---------------+-------------------------------------------------------------------------------------------------------+ +| plan_type | plan | ++---------------+-------------------------------------------------------------------------------------------------------+ +| logical_plan | Aggregate: groupBy=[[]], aggr=[[max(max(integers.i)) AS max(integers.i)]] | +| | MergeScan [is_placeholder=false, input=Aggregate: groupBy=[[]], aggr=[[max(integers.i)]] | +| | TableScan: integers] | +| physical_plan | AggregateExec: mode=Final, gby=[], aggr=[max(integers.i)] | +| | CoalescePartitionsExec | +| | AggregateExec: mode=Partial, gby=[], aggr=[max(integers.i)] | +| | MergeScanExec: peers=[4402341478400(1025, 0), 4402341478401(1025, 1), 4402341478402(1025, 2), ] | +| | | ++---------------+-------------------------------------------------------------------------------------------------------+ + +EXPLAIN SELECT + ts, + max(i) +FROM + integers +GROUP BY + ts; + ++---------------+---------------------------------------------------------------------------------------------------------+ +| plan_type | plan | ++---------------+---------------------------------------------------------------------------------------------------------+ +| logical_plan | Aggregate: groupBy=[[integers.ts]], aggr=[[max(max(integers.i)) AS max(integers.i)]] | +| | MergeScan [is_placeholder=false, input=Aggregate: groupBy=[[integers.ts]], aggr=[[max(integers.i)]] | +| | TableScan: integers] | +| physical_plan | AggregateExec: mode=FinalPartitioned, gby=[ts@0 as ts], aggr=[max(integers.i)] | +| | CoalesceBatchesExec: target_batch_size=8192 | +| | RepartitionExec: partitioning=Hash([ts@0], 20), input_partitions=20 | +| | AggregateExec: mode=Partial, gby=[ts@0 as ts], aggr=[max(integers.i)] | +| | MergeScanExec: peers=[4402341478400(1025, 0), 4402341478401(1025, 1), 4402341478402(1025, 2), ] | +| | | ++---------------+---------------------------------------------------------------------------------------------------------+ + +EXPLAIN SELECT + max(i) +FROM + integers +GROUP BY + ts; + ++---------------+-----------------------------------------------------------------------------------------------------------+ +| plan_type | plan | ++---------------+-----------------------------------------------------------------------------------------------------------+ +| logical_plan | Projection: max(integers.i) | +| | Aggregate: groupBy=[[integers.ts]], aggr=[[max(max(integers.i)) AS max(integers.i)]] | +| | MergeScan [is_placeholder=false, input=Aggregate: groupBy=[[integers.ts]], aggr=[[max(integers.i)]] | +| | TableScan: integers] | +| physical_plan | ProjectionExec: expr=[max(integers.i)@1 as max(integers.i)] | +| | AggregateExec: mode=FinalPartitioned, gby=[ts@0 as ts], aggr=[max(integers.i)] | +| | CoalesceBatchesExec: target_batch_size=8192 | +| | RepartitionExec: partitioning=Hash([ts@0], 20), input_partitions=20 | +| | AggregateExec: mode=Partial, gby=[ts@0 as ts], aggr=[max(integers.i)] | +| | MergeScanExec: peers=[4402341478400(1025, 0), 4402341478401(1025, 1), 4402341478402(1025, 2), ] | +| | | ++---------------+-----------------------------------------------------------------------------------------------------------+ + +-- uddsketch_state +EXPLAIN SELECT + uddsketch_state(128, 0.01, i) +FROM + integers; + ++---------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ +| plan_type | plan | ++---------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ +| logical_plan | Aggregate: groupBy=[[]], aggr=[[uddsketch_merge(Int64(128), Float64(0.01), uddsketch_state(Int64(128),Float64(0.01),integers.i)) AS uddsketch_state(Int64(128),Float64(0.01),integers.i)]] | +| | MergeScan [is_placeholder=false, input=Aggregate: groupBy=[[]], aggr=[[uddsketch_state(Int64(128), Float64(0.01), CAST(integers.i AS Float64))]] | +| | TableScan: integers] | +| physical_plan | AggregateExec: mode=Final, gby=[], aggr=[uddsketch_state(Int64(128),Float64(0.01),integers.i)] | +| | CoalescePartitionsExec | +| | AggregateExec: mode=Partial, gby=[], aggr=[uddsketch_state(Int64(128),Float64(0.01),integers.i)] | +| | MergeScanExec: peers=[4402341478400(1025, 0), 4402341478401(1025, 1), 4402341478402(1025, 2), ] | +| | | ++---------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ + +EXPLAIN SELECT + ts, + uddsketch_state(128, 0.01, i) +FROM + integers +GROUP BY + ts; + ++---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ +| plan_type | plan | ++---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ +| logical_plan | Aggregate: groupBy=[[integers.ts]], aggr=[[uddsketch_merge(Int64(128), Float64(0.01), uddsketch_state(Int64(128),Float64(0.01),integers.i)) AS uddsketch_state(Int64(128),Float64(0.01),integers.i)]] | +| | MergeScan [is_placeholder=false, input=Aggregate: groupBy=[[integers.ts]], aggr=[[uddsketch_state(Int64(128), Float64(0.01), CAST(integers.i AS Float64))]] | +| | TableScan: integers] | +| physical_plan | AggregateExec: mode=FinalPartitioned, gby=[ts@0 as ts], aggr=[uddsketch_state(Int64(128),Float64(0.01),integers.i)] | +| | CoalesceBatchesExec: target_batch_size=8192 | +| | RepartitionExec: partitioning=Hash([ts@0], 20), input_partitions=20 | +| | AggregateExec: mode=Partial, gby=[ts@0 as ts], aggr=[uddsketch_state(Int64(128),Float64(0.01),integers.i)] | +| | MergeScanExec: peers=[4402341478400(1025, 0), 4402341478401(1025, 1), 4402341478402(1025, 2), ] | +| | | ++---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ + +EXPLAIN SELECT + uddsketch_state(128, 0.01, i) +FROM + integers +GROUP BY + ts; + ++---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ +| plan_type | plan | ++---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ +| logical_plan | Projection: uddsketch_state(Int64(128),Float64(0.01),integers.i) | +| | Aggregate: groupBy=[[integers.ts]], aggr=[[uddsketch_merge(Int64(128), Float64(0.01), uddsketch_state(Int64(128),Float64(0.01),integers.i)) AS uddsketch_state(Int64(128),Float64(0.01),integers.i)]] | +| | MergeScan [is_placeholder=false, input=Aggregate: groupBy=[[integers.ts]], aggr=[[uddsketch_state(Int64(128), Float64(0.01), CAST(integers.i AS Float64))]] | +| | TableScan: integers] | +| physical_plan | ProjectionExec: expr=[uddsketch_state(Int64(128),Float64(0.01),integers.i)@1 as uddsketch_state(Int64(128),Float64(0.01),integers.i)] | +| | AggregateExec: mode=FinalPartitioned, gby=[ts@0 as ts], aggr=[uddsketch_state(Int64(128),Float64(0.01),integers.i)] | +| | CoalesceBatchesExec: target_batch_size=8192 | +| | RepartitionExec: partitioning=Hash([ts@0], 20), input_partitions=20 | +| | AggregateExec: mode=Partial, gby=[ts@0 as ts], aggr=[uddsketch_state(Int64(128),Float64(0.01),integers.i)] | +| | MergeScanExec: peers=[4402341478400(1025, 0), 4402341478401(1025, 1), 4402341478402(1025, 2), ] | +| | | ++---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ + +-- hll +EXPLAIN SELECT + hll(i) +FROM + integers; + ++---------------+----------------------------------------------------------------------------------------------------------+ +| plan_type | plan | ++---------------+----------------------------------------------------------------------------------------------------------+ +| logical_plan | Aggregate: groupBy=[[]], aggr=[[hll_merge(hll(integers.i)) AS hll(integers.i)]] | +| | MergeScan [is_placeholder=false, input=Aggregate: groupBy=[[]], aggr=[[hll(CAST(integers.i AS Utf8))]] | +| | TableScan: integers] | +| physical_plan | AggregateExec: mode=Final, gby=[], aggr=[hll(integers.i)] | +| | CoalescePartitionsExec | +| | AggregateExec: mode=Partial, gby=[], aggr=[hll(integers.i)] | +| | MergeScanExec: peers=[4402341478400(1025, 0), 4402341478401(1025, 1), 4402341478402(1025, 2), ] | +| | | ++---------------+----------------------------------------------------------------------------------------------------------+ + +EXPLAIN SELECT + ts, + hll(i) +FROM + integers +GROUP BY + ts; + ++---------------+---------------------------------------------------------------------------------------------------------------------+ +| plan_type | plan | ++---------------+---------------------------------------------------------------------------------------------------------------------+ +| logical_plan | Aggregate: groupBy=[[integers.ts]], aggr=[[hll_merge(hll(integers.i)) AS hll(integers.i)]] | +| | MergeScan [is_placeholder=false, input=Aggregate: groupBy=[[integers.ts]], aggr=[[hll(CAST(integers.i AS Utf8))]] | +| | TableScan: integers] | +| physical_plan | AggregateExec: mode=FinalPartitioned, gby=[ts@0 as ts], aggr=[hll(integers.i)] | +| | CoalesceBatchesExec: target_batch_size=8192 | +| | RepartitionExec: partitioning=Hash([ts@0], 20), input_partitions=20 | +| | AggregateExec: mode=Partial, gby=[ts@0 as ts], aggr=[hll(integers.i)] | +| | MergeScanExec: peers=[4402341478400(1025, 0), 4402341478401(1025, 1), 4402341478402(1025, 2), ] | +| | | ++---------------+---------------------------------------------------------------------------------------------------------------------+ + +EXPLAIN SELECT + hll(i) +FROM + integers +GROUP BY + ts; + ++---------------+-----------------------------------------------------------------------------------------------------------------------+ +| plan_type | plan | ++---------------+-----------------------------------------------------------------------------------------------------------------------+ +| logical_plan | Projection: hll(integers.i) | +| | Aggregate: groupBy=[[integers.ts]], aggr=[[hll_merge(hll(integers.i)) AS hll(integers.i)]] | +| | MergeScan [is_placeholder=false, input=Aggregate: groupBy=[[integers.ts]], aggr=[[hll(CAST(integers.i AS Utf8))]] | +| | TableScan: integers] | +| physical_plan | ProjectionExec: expr=[hll(integers.i)@1 as hll(integers.i)] | +| | AggregateExec: mode=FinalPartitioned, gby=[ts@0 as ts], aggr=[hll(integers.i)] | +| | CoalesceBatchesExec: target_batch_size=8192 | +| | RepartitionExec: partitioning=Hash([ts@0], 20), input_partitions=20 | +| | AggregateExec: mode=Partial, gby=[ts@0 as ts], aggr=[hll(integers.i)] | +| | MergeScanExec: peers=[4402341478400(1025, 0), 4402341478401(1025, 1), 4402341478402(1025, 2), ] | +| | | ++---------------+-----------------------------------------------------------------------------------------------------------------------+ + +DROP TABLE integers; + +Affected Rows: 0 + diff --git a/tests/cases/distributed/explain/step_aggr.sql b/tests/cases/distributed/explain/step_aggr.sql new file mode 100644 index 0000000000..4e20b44861 --- /dev/null +++ b/tests/cases/distributed/explain/step_aggr.sql @@ -0,0 +1,144 @@ +CREATE TABLE integers( + host STRING, + i BIGINT, + ts TIMESTAMP TIME INDEX +) PARTITION ON COLUMNS (host) ( + host < '550-A', + host >= '550-A' + AND host < '550-W', + host >= '550-W' +); + +INSERT INTO integers (host, i, ts) VALUES + ('550-A', 1, '2023-01-01 00:00:00'), + ('550-A', 2, '2023-01-01 01:00:00'), + ('550-W', 3, '2023-01-01 02:00:00'), + ('550-W', 4, '2023-01-01 03:00:00'); + +-- count +EXPLAIN SELECT + count(i) +FROM + integers; + +EXPLAIN SELECT + ts, + count(i) +FROM + integers +GROUP BY + ts; + +EXPLAIN SELECT + count(i) +FROM + integers +GROUP BY + ts; + +-- sum +EXPLAIN SELECT + sum(i) +FROM + integers; + +EXPLAIN SELECT + ts, + sum(i) +FROM + integers +GROUP BY + ts; + +EXPLAIN SELECT + sum(i) +FROM + integers +GROUP BY + ts; + +-- min +EXPLAIN SELECT + min(i) +FROM + integers; + +EXPLAIN SELECT + ts, + min(i) +FROM + integers +GROUP BY + ts; + +EXPLAIN SELECT + min(i) +FROM + integers +GROUP BY + ts; + +-- max +EXPLAIN SELECT + max(i) +FROM + integers; + +EXPLAIN SELECT + ts, + max(i) +FROM + integers +GROUP BY + ts; + +EXPLAIN SELECT + max(i) +FROM + integers +GROUP BY + ts; + +-- uddsketch_state +EXPLAIN SELECT + uddsketch_state(128, 0.01, i) +FROM + integers; + +EXPLAIN SELECT + ts, + uddsketch_state(128, 0.01, i) +FROM + integers +GROUP BY + ts; + +EXPLAIN SELECT + uddsketch_state(128, 0.01, i) +FROM + integers +GROUP BY + ts; + +-- hll +EXPLAIN SELECT + hll(i) +FROM + integers; + +EXPLAIN SELECT + ts, + hll(i) +FROM + integers +GROUP BY + ts; + +EXPLAIN SELECT + hll(i) +FROM + integers +GROUP BY + ts; + +DROP TABLE integers; \ No newline at end of file diff --git a/tests/cases/distributed/explain/step_aggr_basic.result b/tests/cases/distributed/explain/step_aggr_basic.result new file mode 100644 index 0000000000..ee602ad574 --- /dev/null +++ b/tests/cases/distributed/explain/step_aggr_basic.result @@ -0,0 +1,80 @@ +CREATE TABLE integers( + host STRING, + i BIGINT, + ts TIMESTAMP TIME INDEX +) PARTITION ON COLUMNS (host) ( + host < '550-A', + host >= '550-A' + AND host < '550-W', + host >= '550-W' +); + +Affected Rows: 0 + +-- count +EXPLAIN SELECT + count(i) +FROM + integers; + ++---------------+-------------------------------------------------------------------------------------------------------+ +| plan_type | plan | ++---------------+-------------------------------------------------------------------------------------------------------+ +| logical_plan | Aggregate: groupBy=[[]], aggr=[[sum(count(integers.i)) AS count(integers.i)]] | +| | MergeScan [is_placeholder=false, input=Aggregate: groupBy=[[]], aggr=[[count(integers.i)]] | +| | TableScan: integers] | +| physical_plan | AggregateExec: mode=Final, gby=[], aggr=[count(integers.i)] | +| | CoalescePartitionsExec | +| | AggregateExec: mode=Partial, gby=[], aggr=[count(integers.i)] | +| | MergeScanExec: peers=[4398046511104(1024, 0), 4398046511105(1024, 1), 4398046511106(1024, 2), ] | +| | | ++---------------+-------------------------------------------------------------------------------------------------------+ + +EXPLAIN SELECT + ts, + count(i) +FROM + integers +GROUP BY + ts; + ++---------------+---------------------------------------------------------------------------------------------------------+ +| plan_type | plan | ++---------------+---------------------------------------------------------------------------------------------------------+ +| logical_plan | Aggregate: groupBy=[[integers.ts]], aggr=[[sum(count(integers.i)) AS count(integers.i)]] | +| | MergeScan [is_placeholder=false, input=Aggregate: groupBy=[[integers.ts]], aggr=[[count(integers.i)]] | +| | TableScan: integers] | +| physical_plan | AggregateExec: mode=FinalPartitioned, gby=[ts@0 as ts], aggr=[count(integers.i)] | +| | CoalesceBatchesExec: target_batch_size=8192 | +| | RepartitionExec: partitioning=Hash([ts@0], 20), input_partitions=20 | +| | AggregateExec: mode=Partial, gby=[ts@0 as ts], aggr=[count(integers.i)] | +| | MergeScanExec: peers=[4398046511104(1024, 0), 4398046511105(1024, 1), 4398046511106(1024, 2), ] | +| | | ++---------------+---------------------------------------------------------------------------------------------------------+ + +EXPLAIN SELECT + date_bin('1 hour'::INTERVAL, ts), + count(i) +FROM + integers +GROUP BY + date_bin('1 hour'::INTERVAL, ts); + ++---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------+ +| plan_type | plan | ++---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------+ +| logical_plan | Aggregate: groupBy=[[date_bin(Utf8("1 hour"),integers.ts) AS date_bin(Utf8("1 hour"),integers.ts)]], aggr=[[sum(count(integers.i)) AS count(integers.i)]] | +| | MergeScan [is_placeholder=false, input=Aggregate: groupBy=[[date_bin(CAST(Utf8("1 hour") AS Interval(MonthDayNano)), integers.ts)]], aggr=[[count(integers.i)]] | +| | TableScan: integers] | +| physical_plan | AggregateExec: mode=FinalPartitioned, gby=[date_bin(Utf8("1 hour"),integers.ts)@0 as date_bin(Utf8("1 hour"),integers.ts)], aggr=[count(integers.i)] | +| | CoalesceBatchesExec: target_batch_size=8192 | +| | RepartitionExec: partitioning=Hash([date_bin(Utf8("1 hour"),integers.ts)@0], 20), input_partitions=20 | +| | AggregateExec: mode=Partial, gby=[date_bin(Utf8("1 hour"),integers.ts)@0 as date_bin(Utf8("1 hour"),integers.ts)], aggr=[count(integers.i)] | +| | MergeScanExec: peers=[4398046511104(1024, 0), 4398046511105(1024, 1), 4398046511106(1024, 2), ] | +| | | ++---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------+ + +DROP TABLE integers; + +Affected Rows: 0 + diff --git a/tests/cases/distributed/explain/step_aggr_basic.sql b/tests/cases/distributed/explain/step_aggr_basic.sql new file mode 100644 index 0000000000..287df6a144 --- /dev/null +++ b/tests/cases/distributed/explain/step_aggr_basic.sql @@ -0,0 +1,34 @@ +CREATE TABLE integers( + host STRING, + i BIGINT, + ts TIMESTAMP TIME INDEX +) PARTITION ON COLUMNS (host) ( + host < '550-A', + host >= '550-A' + AND host < '550-W', + host >= '550-W' +); + +-- count +EXPLAIN SELECT + count(i) +FROM + integers; + +EXPLAIN SELECT + ts, + count(i) +FROM + integers +GROUP BY + ts; + +EXPLAIN SELECT + date_bin('1 hour'::INTERVAL, ts), + count(i) +FROM + integers +GROUP BY + date_bin('1 hour'::INTERVAL, ts); + +DROP TABLE integers; \ No newline at end of file