From 403958f5bae0cb538fdb211e20613851d3ce997f Mon Sep 17 00:00:00 2001 From: discord9 Date: Sun, 8 Jun 2025 19:19:11 +0800 Subject: [PATCH] feat: steppable aggr fn dist push down Signed-off-by: discord9 poc: step aggr query Signed-off-by: discord9 feat: mvp poc stuff Signed-off-by: discord9 test: sqlness Signed-off-by: discord9 chore: import missing Signed-off-by: discord9 feat: support first/last_value Signed-off-by: discord9 fix: check also include first/last value Signed-off-by: discord9 chore: clean up after rebase feat: optimize yes! fix: alias qualifled test: more testcases chore: qualified column chore: per review --- src/common/function/src/aggrs/approximate.rs | 4 +- src/query/src/dist_plan/analyzer.rs | 39 +- src/query/src/dist_plan/commutativity.rs | 205 +++++- .../distributed/explain/how_alias_done.sql | 59 ++ .../distributed/explain/step_aggr.result | 409 ++++++++++++ tests/cases/distributed/explain/step_aggr.sql | 144 +++++ .../explain/step_aggr_basic.result | 80 +++ .../distributed/explain/step_aggr_basic.sql | 90 +++ .../cases/distributed/explain/try_analyze.sql | 604 ++++++++++++++++++ 9 files changed, 1622 insertions(+), 12 deletions(-) create mode 100644 tests/cases/distributed/explain/how_alias_done.sql create mode 100644 tests/cases/distributed/explain/step_aggr.result create mode 100644 tests/cases/distributed/explain/step_aggr.sql create mode 100644 tests/cases/distributed/explain/step_aggr_basic.result create mode 100644 tests/cases/distributed/explain/step_aggr_basic.sql create mode 100644 tests/cases/distributed/explain/try_analyze.sql diff --git a/src/common/function/src/aggrs/approximate.rs b/src/common/function/src/aggrs/approximate.rs index 25044da27a..11b906b971 100644 --- a/src/common/function/src/aggrs/approximate.rs +++ b/src/common/function/src/aggrs/approximate.rs @@ -14,8 +14,8 @@ use crate::function_registry::FunctionRegistry; -pub(crate) mod hll; -mod uddsketch; +pub mod hll; +pub mod uddsketch; pub(crate) struct ApproximateFunction; diff --git a/src/query/src/dist_plan/analyzer.rs b/src/query/src/dist_plan/analyzer.rs index 6ebc078eea..fe2a3e7ee3 100644 --- a/src/query/src/dist_plan/analyzer.rs +++ b/src/query/src/dist_plan/analyzer.rs @@ -15,6 +15,7 @@ use std::collections::HashSet; use std::sync::Arc; +use common_telemetry::debug; use datafusion::datasource::DefaultTableSource; use datafusion::error::Result as DfResult; use datafusion_common::config::ConfigOptions; @@ -148,12 +149,13 @@ struct PlanRewriter { level: usize, /// Simulated stack for the `rewrite` recursion stack: Vec<(LogicalPlan, usize)>, - /// Stages to be expanded + /// Stages to be expanded, will be added as parent node of merge scan one by one stage: Vec, status: RewriterStatus, /// Partition columns of the table in current pass partition_cols: Option>, column_requirements: HashSet, + expand_on_next_call: bool, } impl PlanRewriter { @@ -174,6 +176,10 @@ impl PlanRewriter { { return true; } + if self.expand_on_next_call { + self.expand_on_next_call = false; + return true; + } match Categorizer::check_plan(plan, self.partition_cols.clone()) { Commutativity::Commutative => {} Commutativity::PartialCommutative => { @@ -190,12 +196,20 @@ impl PlanRewriter { self.stage.push(plan) } } - Commutativity::TransformedCommutative(transformer) => { + Commutativity::TransformedCommutative { + transformer, + expand_on_parent, + } => { if let Some(transformer) = transformer - && let Some(plan) = transformer(plan) + && let Some(changed_plan) = transformer(plan) { - self.update_column_requirements(&plan); - self.stage.push(plan) + debug!("PlanRewriter: transformed plan: {changed_plan:#?} from {plan}"); + if let Some(last_stage) = changed_plan.last() { + // update the column requirements from the last stage + self.update_column_requirements(last_stage); + } + self.stage.extend(changed_plan.into_iter().rev()); + self.expand_on_next_call = expand_on_parent; } } Commutativity::NonCommutative @@ -391,10 +405,21 @@ impl TreeNodeRewriter for PlanRewriter { return Ok(Transformed::yes(node)); }; + let parent = parent.clone(); + // TODO(ruihang): avoid this clone - if self.should_expand(&parent.clone()) { + if self.should_expand(&parent) { // TODO(ruihang): does this work for nodes with multiple children?; - let node = self.expand(node)?; + debug!("PlanRewriter: should expand child:\n {node}\n Of Parent: {parent}"); + let node = self.expand(node); + debug!( + "PlanRewriter: expanded plan: {}", + match &node { + Ok(n) => n.to_string(), + Err(e) => format!("Error expanding plan: {e}"), + } + ); + let node = node?; self.pop_stack(); return Ok(Transformed::yes(node)); } diff --git a/src/query/src/dist_plan/commutativity.rs b/src/query/src/dist_plan/commutativity.rs index 59871e991c..87651c7fe0 100644 --- a/src/query/src/dist_plan/commutativity.rs +++ b/src/query/src/dist_plan/commutativity.rs @@ -15,7 +15,12 @@ use std::collections::HashSet; use std::sync::Arc; -use datafusion_expr::{Expr, LogicalPlan, UserDefinedLogicalNode}; +use common_function::aggrs::approximate::hll::{HllState, HLL_NAME}; +use common_function::aggrs::approximate::uddsketch::{UddSketchState, UDDSKETCH_STATE_NAME}; +use common_telemetry::debug; +use datafusion::functions_aggregate::sum::sum_udaf; +use datafusion_common::Column; +use datafusion_expr::{Expr, LogicalPlan, Projection, UserDefinedLogicalNode}; use promql::extension_plan::{ EmptyMetric, InstantManipulate, RangeManipulate, SeriesDivide, SeriesNormalize, }; @@ -23,12 +28,190 @@ use promql::extension_plan::{ use crate::dist_plan::merge_sort::{merge_sort_transformer, MergeSortLogicalPlan}; use crate::dist_plan::MergeScanLogicalPlan; +/// generate the upper aggregation plan that will execute on the frontend. +/// Basically a logical plan resembling the following: +/// Projection: +/// Aggregate: +/// +/// from Aggregate +/// +/// The upper Projection exists sole to make sure parent plan can recognize the output +/// of the upper aggregation plan. +pub fn step_aggr_to_upper_aggr( + aggr_plan: &LogicalPlan, +) -> datafusion_common::Result> { + let LogicalPlan::Aggregate(input_aggr) = aggr_plan else { + return Err(datafusion_common::DataFusionError::Plan( + "step_aggr_to_upper_aggr only accepts Aggregate plan".to_string(), + )); + }; + if !is_all_aggr_exprs_steppable(&input_aggr.aggr_expr) { + return Err(datafusion_common::DataFusionError::NotImplemented( + "Some aggregate expressions are not steppable".to_string(), + )); + } + let mut upper_aggr_expr = vec![]; + for aggr_expr in &input_aggr.aggr_expr { + let Some(aggr_func) = get_aggr_func(aggr_expr) else { + return Err(datafusion_common::DataFusionError::NotImplemented( + "Aggregate function not found".to_string(), + )); + }; + let col_name = aggr_expr.qualified_name(); + let input_column = Expr::Column(datafusion_common::Column::new(col_name.0, col_name.1)); + let upper_func = match aggr_func.func.name() { + "sum" | "min" | "max" | "last_value" | "first_value" => { + // aggr_calc(aggr_merge(input_column))) as col_name + let mut new_aggr_func = aggr_func.clone(); + new_aggr_func.args = vec![input_column.clone()]; + new_aggr_func + } + "count" => { + // sum(input_column) as col_name + let mut new_aggr_func = aggr_func.clone(); + new_aggr_func.func = sum_udaf(); + new_aggr_func.args = vec![input_column.clone()]; + new_aggr_func + } + UDDSKETCH_STATE_NAME => { + // udd_merge(bucket_size, error_rate input_column) as col_name + let mut new_aggr_func = aggr_func.clone(); + new_aggr_func.func = Arc::new(UddSketchState::merge_udf_impl()); + new_aggr_func.args[2] = input_column.clone(); + new_aggr_func + } + HLL_NAME => { + // hll_merge(input_column) as col_name + let mut new_aggr_func = aggr_func.clone(); + new_aggr_func.func = Arc::new(HllState::merge_udf_impl()); + new_aggr_func.args = vec![input_column.clone()]; + new_aggr_func + } + _ => { + return Err(datafusion_common::DataFusionError::NotImplemented(format!( + "Aggregate function {} is not supported for Step aggregation", + aggr_func.func.name() + ))) + } + }; + + // deal with nested alias case + let mut new_aggr_expr = aggr_expr.clone(); + { + let new_aggr_func = get_aggr_func_mut(&mut new_aggr_expr).unwrap(); + *new_aggr_func = upper_func; + } + + upper_aggr_expr.push(new_aggr_expr); + } + let mut new_aggr = input_aggr.clone(); + // use lower aggregate plan as input, this will be replace by merge scan plan later + new_aggr.input = Arc::new(LogicalPlan::Aggregate(input_aggr.clone())); + + new_aggr.aggr_expr = upper_aggr_expr; + + // group by expr also need to be all ref by column to avoid duplicated computing + let mut new_group_expr = new_aggr.group_expr.clone(); + for expr in &mut new_group_expr { + if let Expr::Column(_) = expr { + // already a column, no need to change + continue; + } + let col_name = expr.qualified_name(); + let input_column = Expr::Column(datafusion_common::Column::new(col_name.0, col_name.1)); + *expr = input_column; + } + new_aggr.group_expr = new_group_expr.clone(); + + let mut new_projection_exprs = new_group_expr; + // the upper aggr expr need to be aliased to the input aggr expr's name, + // so that the parent plan can recognize it. + for (lower_aggr_expr, upper_aggr_expr) in + input_aggr.aggr_expr.iter().zip(new_aggr.aggr_expr.iter()) + { + let lower_col_name = lower_aggr_expr.qualified_name(); + let (table, col_name) = upper_aggr_expr.qualified_name(); + let aggr_out_column = Column::new(table, col_name); + let aliased_output_aggr_expr = + Expr::Column(aggr_out_column).alias_qualified(lower_col_name.0, lower_col_name.1); + new_projection_exprs.push(aliased_output_aggr_expr); + } + let upper_aggr_plan = LogicalPlan::Aggregate(new_aggr); + debug!("Before recompute schema: {upper_aggr_plan:?}"); + let upper_aggr_plan = upper_aggr_plan.recompute_schema()?; + debug!("After recompute schema: {upper_aggr_plan:?}"); + // create a projection on top of the new aggregate plan + let new_projection = + Projection::try_new(new_projection_exprs, Arc::new(upper_aggr_plan.clone()))?; + let projection = LogicalPlan::Projection(new_projection); + // return the new logical plan + Ok(vec![projection, upper_aggr_plan]) +} + +/// Check if the given aggregate expression is steppable. +/// As in if it can be split into multiple steps: +/// i.e. on datanode first call `state(input)` then +/// on frontend call `calc(merge(state))` to get the final result. +/// +pub fn is_all_aggr_exprs_steppable(aggr_exprs: &[Expr]) -> bool { + let step_action = HashSet::from([ + "sum", + "count", + "min", + "max", + "first_value", + "last_value", + UDDSKETCH_STATE_NAME, + HLL_NAME, + ]); + aggr_exprs.iter().all(|expr| { + if let Some(aggr_func) = get_aggr_func(expr) { + if aggr_func.distinct { + // Distinct aggregate functions are not steppable(yet). + return false; + } + step_action.contains(aggr_func.func.name()) + } else { + false + } + }) +} + +pub fn get_aggr_func(expr: &Expr) -> Option<&datafusion_expr::expr::AggregateFunction> { + let mut expr_ref = expr; + while let Expr::Alias(alias) = expr_ref { + expr_ref = &alias.expr; + } + if let Expr::AggregateFunction(aggr_func) = expr_ref { + Some(aggr_func) + } else { + None + } +} + +pub fn get_aggr_func_mut(expr: &mut Expr) -> Option<&mut datafusion_expr::expr::AggregateFunction> { + let mut expr_ref = expr; + while let Expr::Alias(alias) = expr_ref { + expr_ref = &mut alias.expr; + } + if let Expr::AggregateFunction(aggr_func) = expr_ref { + Some(aggr_func) + } else { + None + } +} + #[allow(dead_code)] pub enum Commutativity { Commutative, PartialCommutative, ConditionalCommutative(Option), - TransformedCommutative(Option), + TransformedCommutative { + /// Return plans from parent to child order + transformer: Option, + /// whether the transformer changes the child to parent + expand_on_parent: bool, + }, NonCommutative, Unimplemented, /// For unrelated plans like DDL @@ -55,7 +238,21 @@ impl Categorizer { LogicalPlan::Filter(filter) => Self::check_expr(&filter.predicate), LogicalPlan::Window(_) => Commutativity::Unimplemented, LogicalPlan::Aggregate(aggr) => { - if !Self::check_partition(&aggr.group_expr, &partition_cols) { + let is_all_steppable = is_all_aggr_exprs_steppable(&aggr.aggr_expr); + let matches_partition = Self::check_partition(&aggr.group_expr, &partition_cols); + if !matches_partition && is_all_steppable { + debug!("Plan is steppable: {plan}"); + return Commutativity::TransformedCommutative { + transformer: Some(Arc::new(|plan: &LogicalPlan| { + debug!("Before Step optimize: {plan}"); + let ret = step_aggr_to_upper_aggr(plan); + debug!("After Step Optimize: {ret:?}"); + ret.ok() + })), + expand_on_parent: true, + }; + } + if !matches_partition { return Commutativity::NonCommutative; } for expr in &aggr.aggr_expr { @@ -217,6 +414,8 @@ impl Categorizer { pub type Transformer = Arc Option>; +pub type StageTransformer = Arc Option>>; + pub fn partial_commutative_transformer(plan: &LogicalPlan) -> Option { Some(plan.clone()) } diff --git a/tests/cases/distributed/explain/how_alias_done.sql b/tests/cases/distributed/explain/how_alias_done.sql new file mode 100644 index 0000000000..15b4316a10 --- /dev/null +++ b/tests/cases/distributed/explain/how_alias_done.sql @@ -0,0 +1,59 @@ +CREATE TABLE IF NOT EXISTS base_table ( + "time" TIMESTAMP(3) NOT NULL, + env STRING NULL, + service_name STRING NULL, + city STRING NULL, + page STRING NULL, + lcp BIGINT NULL, + fmp BIGINT NULL, + fcp BIGINT NULL, + fp BIGINT NULL, + tti BIGINT NULL, + fid BIGINT NULL, + shard_key BIGINT NULL, + TIME INDEX ("time"), + PRIMARY KEY (env, service_name) +) PARTITION ON COLUMNS (shard_key) ( + shard_key < 4, + shard_key >= 4 + AND shard_key < 8, + shard_key >= 8 + AND shard_key < 12, + shard_key >= 12 + AND shard_key < 16, + shard_key >= 16 + AND shard_key < 20, + shard_key >= 20 + AND shard_key < 24, + shard_key >= 24 + AND shard_key < 28, + shard_key >= 28 + AND shard_key < 32, + shard_key >= 32 + AND shard_key < 36, + shard_key >= 36 + AND shard_key < 40, + shard_key >= 40 + AND shard_key < 44, + shard_key >= 44 + AND shard_key < 48, + shard_key >= 48 + AND shard_key < 52, + shard_key >= 52 + AND shard_key < 56, + shard_key >= 56 + AND shard_key < 60, + shard_key >= 60 +) ENGINE = mito WITH( + 'append_mode' = 'true', + 'compaction.twcs.max_output_file_size' = "2GB", + 'compaction.twcs.time_window' = "1h", + 'compaction.type' = "twcs", +); + +EXPLAIN SELECT count(*) from base_table where time >= now(); + +-- ERROR: Internal error: 1003 +EXPLAIN ANALYZE SELECT count(*) from base_table where time >= now(); + +DROP TABLE base_table; diff --git a/tests/cases/distributed/explain/step_aggr.result b/tests/cases/distributed/explain/step_aggr.result new file mode 100644 index 0000000000..93d80ae2fb --- /dev/null +++ b/tests/cases/distributed/explain/step_aggr.result @@ -0,0 +1,409 @@ +CREATE TABLE integers( + host STRING, + i BIGINT, + ts TIMESTAMP TIME INDEX +) PARTITION ON COLUMNS (host) ( + host < '550-A', + host >= '550-A' + AND host < '550-W', + host >= '550-W' +); + +Affected Rows: 0 + +INSERT INTO integers (host, i, ts) VALUES + ('550-A', 1, '2023-01-01 00:00:00'), + ('550-A', 2, '2023-01-01 01:00:00'), + ('550-W', 3, '2023-01-01 02:00:00'), + ('550-W', 4, '2023-01-01 03:00:00'); + +Affected Rows: 4 + +-- count +EXPLAIN SELECT + count(i) +FROM + integers; + ++---------------+-------------------------------------------------------------------------------------------------------+ +| plan_type | plan | ++---------------+-------------------------------------------------------------------------------------------------------+ +| logical_plan | Aggregate: groupBy=[[]], aggr=[[sum(count(integers.i)) AS count(integers.i)]] | +| | MergeScan [is_placeholder=false, input=Aggregate: groupBy=[[]], aggr=[[count(integers.i)]] | +| | TableScan: integers] | +| physical_plan | AggregateExec: mode=Final, gby=[], aggr=[count(integers.i)] | +| | CoalescePartitionsExec | +| | AggregateExec: mode=Partial, gby=[], aggr=[count(integers.i)] | +| | MergeScanExec: peers=[4402341478400(1025, 0), 4402341478401(1025, 1), 4402341478402(1025, 2), ] | +| | | ++---------------+-------------------------------------------------------------------------------------------------------+ + +EXPLAIN SELECT + ts, + count(i) +FROM + integers +GROUP BY + ts; + ++---------------+---------------------------------------------------------------------------------------------------------+ +| plan_type | plan | ++---------------+---------------------------------------------------------------------------------------------------------+ +| logical_plan | Aggregate: groupBy=[[integers.ts]], aggr=[[sum(count(integers.i)) AS count(integers.i)]] | +| | MergeScan [is_placeholder=false, input=Aggregate: groupBy=[[integers.ts]], aggr=[[count(integers.i)]] | +| | TableScan: integers] | +| physical_plan | AggregateExec: mode=FinalPartitioned, gby=[ts@0 as ts], aggr=[count(integers.i)] | +| | CoalesceBatchesExec: target_batch_size=8192 | +| | RepartitionExec: partitioning=Hash([ts@0], 20), input_partitions=20 | +| | AggregateExec: mode=Partial, gby=[ts@0 as ts], aggr=[count(integers.i)] | +| | MergeScanExec: peers=[4402341478400(1025, 0), 4402341478401(1025, 1), 4402341478402(1025, 2), ] | +| | | ++---------------+---------------------------------------------------------------------------------------------------------+ + +EXPLAIN SELECT + count(i) +FROM + integers +GROUP BY + ts; + ++---------------+-----------------------------------------------------------------------------------------------------------+ +| plan_type | plan | ++---------------+-----------------------------------------------------------------------------------------------------------+ +| logical_plan | Projection: count(integers.i) | +| | Aggregate: groupBy=[[integers.ts]], aggr=[[sum(count(integers.i)) AS count(integers.i)]] | +| | MergeScan [is_placeholder=false, input=Aggregate: groupBy=[[integers.ts]], aggr=[[count(integers.i)]] | +| | TableScan: integers] | +| physical_plan | ProjectionExec: expr=[count(integers.i)@1 as count(integers.i)] | +| | AggregateExec: mode=FinalPartitioned, gby=[ts@0 as ts], aggr=[count(integers.i)] | +| | CoalesceBatchesExec: target_batch_size=8192 | +| | RepartitionExec: partitioning=Hash([ts@0], 20), input_partitions=20 | +| | AggregateExec: mode=Partial, gby=[ts@0 as ts], aggr=[count(integers.i)] | +| | MergeScanExec: peers=[4402341478400(1025, 0), 4402341478401(1025, 1), 4402341478402(1025, 2), ] | +| | | ++---------------+-----------------------------------------------------------------------------------------------------------+ + +-- sum +EXPLAIN SELECT + sum(i) +FROM + integers; + ++---------------+-------------------------------------------------------------------------------------------------------+ +| plan_type | plan | ++---------------+-------------------------------------------------------------------------------------------------------+ +| logical_plan | Aggregate: groupBy=[[]], aggr=[[sum(sum(integers.i)) AS sum(integers.i)]] | +| | MergeScan [is_placeholder=false, input=Aggregate: groupBy=[[]], aggr=[[sum(integers.i)]] | +| | TableScan: integers] | +| physical_plan | AggregateExec: mode=Final, gby=[], aggr=[sum(integers.i)] | +| | CoalescePartitionsExec | +| | AggregateExec: mode=Partial, gby=[], aggr=[sum(integers.i)] | +| | MergeScanExec: peers=[4402341478400(1025, 0), 4402341478401(1025, 1), 4402341478402(1025, 2), ] | +| | | ++---------------+-------------------------------------------------------------------------------------------------------+ + +EXPLAIN SELECT + ts, + sum(i) +FROM + integers +GROUP BY + ts; + ++---------------+---------------------------------------------------------------------------------------------------------+ +| plan_type | plan | ++---------------+---------------------------------------------------------------------------------------------------------+ +| logical_plan | Aggregate: groupBy=[[integers.ts]], aggr=[[sum(sum(integers.i)) AS sum(integers.i)]] | +| | MergeScan [is_placeholder=false, input=Aggregate: groupBy=[[integers.ts]], aggr=[[sum(integers.i)]] | +| | TableScan: integers] | +| physical_plan | AggregateExec: mode=FinalPartitioned, gby=[ts@0 as ts], aggr=[sum(integers.i)] | +| | CoalesceBatchesExec: target_batch_size=8192 | +| | RepartitionExec: partitioning=Hash([ts@0], 20), input_partitions=20 | +| | AggregateExec: mode=Partial, gby=[ts@0 as ts], aggr=[sum(integers.i)] | +| | MergeScanExec: peers=[4402341478400(1025, 0), 4402341478401(1025, 1), 4402341478402(1025, 2), ] | +| | | ++---------------+---------------------------------------------------------------------------------------------------------+ + +EXPLAIN SELECT + sum(i) +FROM + integers +GROUP BY + ts; + ++---------------+-----------------------------------------------------------------------------------------------------------+ +| plan_type | plan | ++---------------+-----------------------------------------------------------------------------------------------------------+ +| logical_plan | Projection: sum(integers.i) | +| | Aggregate: groupBy=[[integers.ts]], aggr=[[sum(sum(integers.i)) AS sum(integers.i)]] | +| | MergeScan [is_placeholder=false, input=Aggregate: groupBy=[[integers.ts]], aggr=[[sum(integers.i)]] | +| | TableScan: integers] | +| physical_plan | ProjectionExec: expr=[sum(integers.i)@1 as sum(integers.i)] | +| | AggregateExec: mode=FinalPartitioned, gby=[ts@0 as ts], aggr=[sum(integers.i)] | +| | CoalesceBatchesExec: target_batch_size=8192 | +| | RepartitionExec: partitioning=Hash([ts@0], 20), input_partitions=20 | +| | AggregateExec: mode=Partial, gby=[ts@0 as ts], aggr=[sum(integers.i)] | +| | MergeScanExec: peers=[4402341478400(1025, 0), 4402341478401(1025, 1), 4402341478402(1025, 2), ] | +| | | ++---------------+-----------------------------------------------------------------------------------------------------------+ + +-- min +EXPLAIN SELECT + min(i) +FROM + integers; + ++---------------+-------------------------------------------------------------------------------------------------------+ +| plan_type | plan | ++---------------+-------------------------------------------------------------------------------------------------------+ +| logical_plan | Aggregate: groupBy=[[]], aggr=[[min(min(integers.i)) AS min(integers.i)]] | +| | MergeScan [is_placeholder=false, input=Aggregate: groupBy=[[]], aggr=[[min(integers.i)]] | +| | TableScan: integers] | +| physical_plan | AggregateExec: mode=Final, gby=[], aggr=[min(integers.i)] | +| | CoalescePartitionsExec | +| | AggregateExec: mode=Partial, gby=[], aggr=[min(integers.i)] | +| | MergeScanExec: peers=[4402341478400(1025, 0), 4402341478401(1025, 1), 4402341478402(1025, 2), ] | +| | | ++---------------+-------------------------------------------------------------------------------------------------------+ + +EXPLAIN SELECT + ts, + min(i) +FROM + integers +GROUP BY + ts; + ++---------------+---------------------------------------------------------------------------------------------------------+ +| plan_type | plan | ++---------------+---------------------------------------------------------------------------------------------------------+ +| logical_plan | Aggregate: groupBy=[[integers.ts]], aggr=[[min(min(integers.i)) AS min(integers.i)]] | +| | MergeScan [is_placeholder=false, input=Aggregate: groupBy=[[integers.ts]], aggr=[[min(integers.i)]] | +| | TableScan: integers] | +| physical_plan | AggregateExec: mode=FinalPartitioned, gby=[ts@0 as ts], aggr=[min(integers.i)] | +| | CoalesceBatchesExec: target_batch_size=8192 | +| | RepartitionExec: partitioning=Hash([ts@0], 20), input_partitions=20 | +| | AggregateExec: mode=Partial, gby=[ts@0 as ts], aggr=[min(integers.i)] | +| | MergeScanExec: peers=[4402341478400(1025, 0), 4402341478401(1025, 1), 4402341478402(1025, 2), ] | +| | | ++---------------+---------------------------------------------------------------------------------------------------------+ + +EXPLAIN SELECT + min(i) +FROM + integers +GROUP BY + ts; + ++---------------+-----------------------------------------------------------------------------------------------------------+ +| plan_type | plan | ++---------------+-----------------------------------------------------------------------------------------------------------+ +| logical_plan | Projection: min(integers.i) | +| | Aggregate: groupBy=[[integers.ts]], aggr=[[min(min(integers.i)) AS min(integers.i)]] | +| | MergeScan [is_placeholder=false, input=Aggregate: groupBy=[[integers.ts]], aggr=[[min(integers.i)]] | +| | TableScan: integers] | +| physical_plan | ProjectionExec: expr=[min(integers.i)@1 as min(integers.i)] | +| | AggregateExec: mode=FinalPartitioned, gby=[ts@0 as ts], aggr=[min(integers.i)] | +| | CoalesceBatchesExec: target_batch_size=8192 | +| | RepartitionExec: partitioning=Hash([ts@0], 20), input_partitions=20 | +| | AggregateExec: mode=Partial, gby=[ts@0 as ts], aggr=[min(integers.i)] | +| | MergeScanExec: peers=[4402341478400(1025, 0), 4402341478401(1025, 1), 4402341478402(1025, 2), ] | +| | | ++---------------+-----------------------------------------------------------------------------------------------------------+ + +-- max +EXPLAIN SELECT + max(i) +FROM + integers; + ++---------------+-------------------------------------------------------------------------------------------------------+ +| plan_type | plan | ++---------------+-------------------------------------------------------------------------------------------------------+ +| logical_plan | Aggregate: groupBy=[[]], aggr=[[max(max(integers.i)) AS max(integers.i)]] | +| | MergeScan [is_placeholder=false, input=Aggregate: groupBy=[[]], aggr=[[max(integers.i)]] | +| | TableScan: integers] | +| physical_plan | AggregateExec: mode=Final, gby=[], aggr=[max(integers.i)] | +| | CoalescePartitionsExec | +| | AggregateExec: mode=Partial, gby=[], aggr=[max(integers.i)] | +| | MergeScanExec: peers=[4402341478400(1025, 0), 4402341478401(1025, 1), 4402341478402(1025, 2), ] | +| | | ++---------------+-------------------------------------------------------------------------------------------------------+ + +EXPLAIN SELECT + ts, + max(i) +FROM + integers +GROUP BY + ts; + ++---------------+---------------------------------------------------------------------------------------------------------+ +| plan_type | plan | ++---------------+---------------------------------------------------------------------------------------------------------+ +| logical_plan | Aggregate: groupBy=[[integers.ts]], aggr=[[max(max(integers.i)) AS max(integers.i)]] | +| | MergeScan [is_placeholder=false, input=Aggregate: groupBy=[[integers.ts]], aggr=[[max(integers.i)]] | +| | TableScan: integers] | +| physical_plan | AggregateExec: mode=FinalPartitioned, gby=[ts@0 as ts], aggr=[max(integers.i)] | +| | CoalesceBatchesExec: target_batch_size=8192 | +| | RepartitionExec: partitioning=Hash([ts@0], 20), input_partitions=20 | +| | AggregateExec: mode=Partial, gby=[ts@0 as ts], aggr=[max(integers.i)] | +| | MergeScanExec: peers=[4402341478400(1025, 0), 4402341478401(1025, 1), 4402341478402(1025, 2), ] | +| | | ++---------------+---------------------------------------------------------------------------------------------------------+ + +EXPLAIN SELECT + max(i) +FROM + integers +GROUP BY + ts; + ++---------------+-----------------------------------------------------------------------------------------------------------+ +| plan_type | plan | ++---------------+-----------------------------------------------------------------------------------------------------------+ +| logical_plan | Projection: max(integers.i) | +| | Aggregate: groupBy=[[integers.ts]], aggr=[[max(max(integers.i)) AS max(integers.i)]] | +| | MergeScan [is_placeholder=false, input=Aggregate: groupBy=[[integers.ts]], aggr=[[max(integers.i)]] | +| | TableScan: integers] | +| physical_plan | ProjectionExec: expr=[max(integers.i)@1 as max(integers.i)] | +| | AggregateExec: mode=FinalPartitioned, gby=[ts@0 as ts], aggr=[max(integers.i)] | +| | CoalesceBatchesExec: target_batch_size=8192 | +| | RepartitionExec: partitioning=Hash([ts@0], 20), input_partitions=20 | +| | AggregateExec: mode=Partial, gby=[ts@0 as ts], aggr=[max(integers.i)] | +| | MergeScanExec: peers=[4402341478400(1025, 0), 4402341478401(1025, 1), 4402341478402(1025, 2), ] | +| | | ++---------------+-----------------------------------------------------------------------------------------------------------+ + +-- uddsketch_state +EXPLAIN SELECT + uddsketch_state(128, 0.01, i) +FROM + integers; + ++---------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ +| plan_type | plan | ++---------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ +| logical_plan | Aggregate: groupBy=[[]], aggr=[[uddsketch_merge(Int64(128), Float64(0.01), uddsketch_state(Int64(128),Float64(0.01),integers.i)) AS uddsketch_state(Int64(128),Float64(0.01),integers.i)]] | +| | MergeScan [is_placeholder=false, input=Aggregate: groupBy=[[]], aggr=[[uddsketch_state(Int64(128), Float64(0.01), CAST(integers.i AS Float64))]] | +| | TableScan: integers] | +| physical_plan | AggregateExec: mode=Final, gby=[], aggr=[uddsketch_state(Int64(128),Float64(0.01),integers.i)] | +| | CoalescePartitionsExec | +| | AggregateExec: mode=Partial, gby=[], aggr=[uddsketch_state(Int64(128),Float64(0.01),integers.i)] | +| | MergeScanExec: peers=[4402341478400(1025, 0), 4402341478401(1025, 1), 4402341478402(1025, 2), ] | +| | | ++---------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ + +EXPLAIN SELECT + ts, + uddsketch_state(128, 0.01, i) +FROM + integers +GROUP BY + ts; + ++---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ +| plan_type | plan | ++---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ +| logical_plan | Aggregate: groupBy=[[integers.ts]], aggr=[[uddsketch_merge(Int64(128), Float64(0.01), uddsketch_state(Int64(128),Float64(0.01),integers.i)) AS uddsketch_state(Int64(128),Float64(0.01),integers.i)]] | +| | MergeScan [is_placeholder=false, input=Aggregate: groupBy=[[integers.ts]], aggr=[[uddsketch_state(Int64(128), Float64(0.01), CAST(integers.i AS Float64))]] | +| | TableScan: integers] | +| physical_plan | AggregateExec: mode=FinalPartitioned, gby=[ts@0 as ts], aggr=[uddsketch_state(Int64(128),Float64(0.01),integers.i)] | +| | CoalesceBatchesExec: target_batch_size=8192 | +| | RepartitionExec: partitioning=Hash([ts@0], 20), input_partitions=20 | +| | AggregateExec: mode=Partial, gby=[ts@0 as ts], aggr=[uddsketch_state(Int64(128),Float64(0.01),integers.i)] | +| | MergeScanExec: peers=[4402341478400(1025, 0), 4402341478401(1025, 1), 4402341478402(1025, 2), ] | +| | | ++---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ + +EXPLAIN SELECT + uddsketch_state(128, 0.01, i) +FROM + integers +GROUP BY + ts; + ++---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ +| plan_type | plan | ++---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ +| logical_plan | Projection: uddsketch_state(Int64(128),Float64(0.01),integers.i) | +| | Aggregate: groupBy=[[integers.ts]], aggr=[[uddsketch_merge(Int64(128), Float64(0.01), uddsketch_state(Int64(128),Float64(0.01),integers.i)) AS uddsketch_state(Int64(128),Float64(0.01),integers.i)]] | +| | MergeScan [is_placeholder=false, input=Aggregate: groupBy=[[integers.ts]], aggr=[[uddsketch_state(Int64(128), Float64(0.01), CAST(integers.i AS Float64))]] | +| | TableScan: integers] | +| physical_plan | ProjectionExec: expr=[uddsketch_state(Int64(128),Float64(0.01),integers.i)@1 as uddsketch_state(Int64(128),Float64(0.01),integers.i)] | +| | AggregateExec: mode=FinalPartitioned, gby=[ts@0 as ts], aggr=[uddsketch_state(Int64(128),Float64(0.01),integers.i)] | +| | CoalesceBatchesExec: target_batch_size=8192 | +| | RepartitionExec: partitioning=Hash([ts@0], 20), input_partitions=20 | +| | AggregateExec: mode=Partial, gby=[ts@0 as ts], aggr=[uddsketch_state(Int64(128),Float64(0.01),integers.i)] | +| | MergeScanExec: peers=[4402341478400(1025, 0), 4402341478401(1025, 1), 4402341478402(1025, 2), ] | +| | | ++---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ + +-- hll +EXPLAIN SELECT + hll(i) +FROM + integers; + ++---------------+----------------------------------------------------------------------------------------------------------+ +| plan_type | plan | ++---------------+----------------------------------------------------------------------------------------------------------+ +| logical_plan | Aggregate: groupBy=[[]], aggr=[[hll_merge(hll(integers.i)) AS hll(integers.i)]] | +| | MergeScan [is_placeholder=false, input=Aggregate: groupBy=[[]], aggr=[[hll(CAST(integers.i AS Utf8))]] | +| | TableScan: integers] | +| physical_plan | AggregateExec: mode=Final, gby=[], aggr=[hll(integers.i)] | +| | CoalescePartitionsExec | +| | AggregateExec: mode=Partial, gby=[], aggr=[hll(integers.i)] | +| | MergeScanExec: peers=[4402341478400(1025, 0), 4402341478401(1025, 1), 4402341478402(1025, 2), ] | +| | | ++---------------+----------------------------------------------------------------------------------------------------------+ + +EXPLAIN SELECT + ts, + hll(i) +FROM + integers +GROUP BY + ts; + ++---------------+---------------------------------------------------------------------------------------------------------------------+ +| plan_type | plan | ++---------------+---------------------------------------------------------------------------------------------------------------------+ +| logical_plan | Aggregate: groupBy=[[integers.ts]], aggr=[[hll_merge(hll(integers.i)) AS hll(integers.i)]] | +| | MergeScan [is_placeholder=false, input=Aggregate: groupBy=[[integers.ts]], aggr=[[hll(CAST(integers.i AS Utf8))]] | +| | TableScan: integers] | +| physical_plan | AggregateExec: mode=FinalPartitioned, gby=[ts@0 as ts], aggr=[hll(integers.i)] | +| | CoalesceBatchesExec: target_batch_size=8192 | +| | RepartitionExec: partitioning=Hash([ts@0], 20), input_partitions=20 | +| | AggregateExec: mode=Partial, gby=[ts@0 as ts], aggr=[hll(integers.i)] | +| | MergeScanExec: peers=[4402341478400(1025, 0), 4402341478401(1025, 1), 4402341478402(1025, 2), ] | +| | | ++---------------+---------------------------------------------------------------------------------------------------------------------+ + +EXPLAIN SELECT + hll(i) +FROM + integers +GROUP BY + ts; + ++---------------+-----------------------------------------------------------------------------------------------------------------------+ +| plan_type | plan | ++---------------+-----------------------------------------------------------------------------------------------------------------------+ +| logical_plan | Projection: hll(integers.i) | +| | Aggregate: groupBy=[[integers.ts]], aggr=[[hll_merge(hll(integers.i)) AS hll(integers.i)]] | +| | MergeScan [is_placeholder=false, input=Aggregate: groupBy=[[integers.ts]], aggr=[[hll(CAST(integers.i AS Utf8))]] | +| | TableScan: integers] | +| physical_plan | ProjectionExec: expr=[hll(integers.i)@1 as hll(integers.i)] | +| | AggregateExec: mode=FinalPartitioned, gby=[ts@0 as ts], aggr=[hll(integers.i)] | +| | CoalesceBatchesExec: target_batch_size=8192 | +| | RepartitionExec: partitioning=Hash([ts@0], 20), input_partitions=20 | +| | AggregateExec: mode=Partial, gby=[ts@0 as ts], aggr=[hll(integers.i)] | +| | MergeScanExec: peers=[4402341478400(1025, 0), 4402341478401(1025, 1), 4402341478402(1025, 2), ] | +| | | ++---------------+-----------------------------------------------------------------------------------------------------------------------+ + +DROP TABLE integers; + +Affected Rows: 0 + diff --git a/tests/cases/distributed/explain/step_aggr.sql b/tests/cases/distributed/explain/step_aggr.sql new file mode 100644 index 0000000000..4e20b44861 --- /dev/null +++ b/tests/cases/distributed/explain/step_aggr.sql @@ -0,0 +1,144 @@ +CREATE TABLE integers( + host STRING, + i BIGINT, + ts TIMESTAMP TIME INDEX +) PARTITION ON COLUMNS (host) ( + host < '550-A', + host >= '550-A' + AND host < '550-W', + host >= '550-W' +); + +INSERT INTO integers (host, i, ts) VALUES + ('550-A', 1, '2023-01-01 00:00:00'), + ('550-A', 2, '2023-01-01 01:00:00'), + ('550-W', 3, '2023-01-01 02:00:00'), + ('550-W', 4, '2023-01-01 03:00:00'); + +-- count +EXPLAIN SELECT + count(i) +FROM + integers; + +EXPLAIN SELECT + ts, + count(i) +FROM + integers +GROUP BY + ts; + +EXPLAIN SELECT + count(i) +FROM + integers +GROUP BY + ts; + +-- sum +EXPLAIN SELECT + sum(i) +FROM + integers; + +EXPLAIN SELECT + ts, + sum(i) +FROM + integers +GROUP BY + ts; + +EXPLAIN SELECT + sum(i) +FROM + integers +GROUP BY + ts; + +-- min +EXPLAIN SELECT + min(i) +FROM + integers; + +EXPLAIN SELECT + ts, + min(i) +FROM + integers +GROUP BY + ts; + +EXPLAIN SELECT + min(i) +FROM + integers +GROUP BY + ts; + +-- max +EXPLAIN SELECT + max(i) +FROM + integers; + +EXPLAIN SELECT + ts, + max(i) +FROM + integers +GROUP BY + ts; + +EXPLAIN SELECT + max(i) +FROM + integers +GROUP BY + ts; + +-- uddsketch_state +EXPLAIN SELECT + uddsketch_state(128, 0.01, i) +FROM + integers; + +EXPLAIN SELECT + ts, + uddsketch_state(128, 0.01, i) +FROM + integers +GROUP BY + ts; + +EXPLAIN SELECT + uddsketch_state(128, 0.01, i) +FROM + integers +GROUP BY + ts; + +-- hll +EXPLAIN SELECT + hll(i) +FROM + integers; + +EXPLAIN SELECT + ts, + hll(i) +FROM + integers +GROUP BY + ts; + +EXPLAIN SELECT + hll(i) +FROM + integers +GROUP BY + ts; + +DROP TABLE integers; \ No newline at end of file diff --git a/tests/cases/distributed/explain/step_aggr_basic.result b/tests/cases/distributed/explain/step_aggr_basic.result new file mode 100644 index 0000000000..ee602ad574 --- /dev/null +++ b/tests/cases/distributed/explain/step_aggr_basic.result @@ -0,0 +1,80 @@ +CREATE TABLE integers( + host STRING, + i BIGINT, + ts TIMESTAMP TIME INDEX +) PARTITION ON COLUMNS (host) ( + host < '550-A', + host >= '550-A' + AND host < '550-W', + host >= '550-W' +); + +Affected Rows: 0 + +-- count +EXPLAIN SELECT + count(i) +FROM + integers; + ++---------------+-------------------------------------------------------------------------------------------------------+ +| plan_type | plan | ++---------------+-------------------------------------------------------------------------------------------------------+ +| logical_plan | Aggregate: groupBy=[[]], aggr=[[sum(count(integers.i)) AS count(integers.i)]] | +| | MergeScan [is_placeholder=false, input=Aggregate: groupBy=[[]], aggr=[[count(integers.i)]] | +| | TableScan: integers] | +| physical_plan | AggregateExec: mode=Final, gby=[], aggr=[count(integers.i)] | +| | CoalescePartitionsExec | +| | AggregateExec: mode=Partial, gby=[], aggr=[count(integers.i)] | +| | MergeScanExec: peers=[4398046511104(1024, 0), 4398046511105(1024, 1), 4398046511106(1024, 2), ] | +| | | ++---------------+-------------------------------------------------------------------------------------------------------+ + +EXPLAIN SELECT + ts, + count(i) +FROM + integers +GROUP BY + ts; + ++---------------+---------------------------------------------------------------------------------------------------------+ +| plan_type | plan | ++---------------+---------------------------------------------------------------------------------------------------------+ +| logical_plan | Aggregate: groupBy=[[integers.ts]], aggr=[[sum(count(integers.i)) AS count(integers.i)]] | +| | MergeScan [is_placeholder=false, input=Aggregate: groupBy=[[integers.ts]], aggr=[[count(integers.i)]] | +| | TableScan: integers] | +| physical_plan | AggregateExec: mode=FinalPartitioned, gby=[ts@0 as ts], aggr=[count(integers.i)] | +| | CoalesceBatchesExec: target_batch_size=8192 | +| | RepartitionExec: partitioning=Hash([ts@0], 20), input_partitions=20 | +| | AggregateExec: mode=Partial, gby=[ts@0 as ts], aggr=[count(integers.i)] | +| | MergeScanExec: peers=[4398046511104(1024, 0), 4398046511105(1024, 1), 4398046511106(1024, 2), ] | +| | | ++---------------+---------------------------------------------------------------------------------------------------------+ + +EXPLAIN SELECT + date_bin('1 hour'::INTERVAL, ts), + count(i) +FROM + integers +GROUP BY + date_bin('1 hour'::INTERVAL, ts); + ++---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------+ +| plan_type | plan | ++---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------+ +| logical_plan | Aggregate: groupBy=[[date_bin(Utf8("1 hour"),integers.ts) AS date_bin(Utf8("1 hour"),integers.ts)]], aggr=[[sum(count(integers.i)) AS count(integers.i)]] | +| | MergeScan [is_placeholder=false, input=Aggregate: groupBy=[[date_bin(CAST(Utf8("1 hour") AS Interval(MonthDayNano)), integers.ts)]], aggr=[[count(integers.i)]] | +| | TableScan: integers] | +| physical_plan | AggregateExec: mode=FinalPartitioned, gby=[date_bin(Utf8("1 hour"),integers.ts)@0 as date_bin(Utf8("1 hour"),integers.ts)], aggr=[count(integers.i)] | +| | CoalesceBatchesExec: target_batch_size=8192 | +| | RepartitionExec: partitioning=Hash([date_bin(Utf8("1 hour"),integers.ts)@0], 20), input_partitions=20 | +| | AggregateExec: mode=Partial, gby=[date_bin(Utf8("1 hour"),integers.ts)@0 as date_bin(Utf8("1 hour"),integers.ts)], aggr=[count(integers.i)] | +| | MergeScanExec: peers=[4398046511104(1024, 0), 4398046511105(1024, 1), 4398046511106(1024, 2), ] | +| | | ++---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------+ + +DROP TABLE integers; + +Affected Rows: 0 + diff --git a/tests/cases/distributed/explain/step_aggr_basic.sql b/tests/cases/distributed/explain/step_aggr_basic.sql new file mode 100644 index 0000000000..0ed6059d54 --- /dev/null +++ b/tests/cases/distributed/explain/step_aggr_basic.sql @@ -0,0 +1,90 @@ +CREATE TABLE integers( + host STRING PRIMARY KEY, + i BIGINT, + ts TIMESTAMP TIME INDEX +) PARTITION ON COLUMNS (host) ( + host < '550-A', + host >= '550-A' + AND host < '550-W', + host >= '550-W' +); + +INSERT INTO integers (host, i, ts) VALUES + ('550-A', 1, '2023-01-01 00:00:00'), + ('550-B', 7, '2023-01-01 00:00:00'), + ('550-A', 2, '2023-01-01 01:00:00'), + ('550-W', 3, '2023-01-01 02:00:00'), + ('550-W', 4, '2023-01-01 03:00:00'); + + +-- count +SELECT + count(i) +FROM + integers; + +EXPLAIN +SELECT + count(i) +FROM + integers; + +EXPLAIN ANALYZE +SELECT + count(i) +FROM + integers; + +SELECT + ts, + count(i) +FROM + integers +GROUP BY + ts; + +EXPLAIN +SELECT + ts, + count(i) +FROM + integers +GROUP BY + ts; + +EXPLAIN ANALYZE +SELECT + ts, + count(i) +FROM + integers +GROUP BY + ts; + +SELECT + date_bin('1 hour' :: INTERVAL, ts), + count(i) +FROM + integers +GROUP BY + date_bin('1 hour' :: INTERVAL, ts); + +EXPLAIN +SELECT + date_bin('1 hour' :: INTERVAL, ts), + count(i) +FROM + integers +GROUP BY + date_bin('1 hour' :: INTERVAL, ts); + +EXPLAIN ANALYZE +SELECT + date_bin('1 hour' :: INTERVAL, ts), + count(i) +FROM + integers +GROUP BY + date_bin('1 hour' :: INTERVAL, ts); + +DROP TABLE integers; \ No newline at end of file diff --git a/tests/cases/distributed/explain/try_analyze.sql b/tests/cases/distributed/explain/try_analyze.sql new file mode 100644 index 0000000000..64f5afdd70 --- /dev/null +++ b/tests/cases/distributed/explain/try_analyze.sql @@ -0,0 +1,604 @@ +-- base table, massive row insert to this table +CREATE TABLE IF NOT EXISTS base_table ( + "time" TIMESTAMP(3) NOT NULL, + env STRING NULL, + service_name STRING NULL, + city STRING NULL, + page STRING NULL, + lcp BIGINT NULL, + fmp BIGINT NULL, + fcp BIGINT NULL, + fp BIGINT NULL, + tti BIGINT NULL, + fid BIGINT NULL, + shard_key BIGINT NULL, + TIME INDEX ("time"), + PRIMARY KEY (env, service_name) +) PARTITION ON COLUMNS (shard_key) ( + shard_key < 4, + shard_key >= 4 + AND shard_key < 8, + shard_key >= 8 + AND shard_key < 12, + shard_key >= 12 + AND shard_key < 16, + shard_key >= 16 + AND shard_key < 20, + shard_key >= 20 + AND shard_key < 24, + shard_key >= 24 + AND shard_key < 28, + shard_key >= 28 + AND shard_key < 32, + shard_key >= 32 + AND shard_key < 36, + shard_key >= 36 + AND shard_key < 40, + shard_key >= 40 + AND shard_key < 44, + shard_key >= 44 + AND shard_key < 48, + shard_key >= 48 + AND shard_key < 52, + shard_key >= 52 + AND shard_key < 56, + shard_key >= 56 + AND shard_key < 60, + shard_key >= 60 +) ENGINE = mito WITH( + 'append_mode' = 'true', + 'compaction.twcs.max_output_file_size' = "2GB", + 'compaction.twcs.time_window' = "1h", + 'compaction.type' = "twcs", +); + +EXPLAIN SELECT + env, + service_name, + city, + page, + uddsketch_state( + 128, + 0.01, + CASE + WHEN lcp > 0 + AND lcp < 3000000 THEN lcp + ELSE NULL + END + ) AS lcp_state, + max( + CASE + WHEN lcp > 0 + AND lcp < 3000000 THEN lcp + ELSE NULL + END + ) AS max_lcp, + min( + CASE + WHEN lcp > 0 + AND lcp < 3000000 THEN lcp + ELSE NULL + END + ) AS min_lcp, + uddsketch_state( + 128, + 0.01, + CASE + WHEN fmp > 0 + AND fmp < 3000000 THEN fmp + ELSE NULL + END + ) AS fmp_state, + max( + CASE + WHEN fmp > 0 + AND fmp < 3000000 THEN fmp + ELSE NULL + END + ) AS max_fmp, + min( + CASE + WHEN fmp > 0 + AND fmp < 3000000 THEN fmp + ELSE NULL + END + ) AS min_fmp, + uddsketch_state( + 128, + 0.01, + CASE + WHEN fcp > 0 + AND fcp < 3000000 THEN fcp + ELSE NULL + END + ) AS fcp_state, + max( + CASE + WHEN fcp > 0 + AND fcp < 3000000 THEN fcp + ELSE NULL + END + ) AS max_fcp, + min( + CASE + WHEN fcp > 0 + AND fcp < 3000000 THEN fcp + ELSE NULL + END + ) AS min_fcp, + uddsketch_state( + 128, + 0.01, + CASE + WHEN fp > 0 + AND fp < 3000000 THEN fp + ELSE NULL + END + ) AS fp_state, + max( + CASE + WHEN fp > 0 + AND fp < 3000000 THEN fp + ELSE NULL + END + ) AS max_fp, + min( + CASE + WHEN fp > 0 + AND fp < 3000000 THEN fp + ELSE NULL + END + ) AS min_fp, + uddsketch_state( + 128, + 0.01, + CASE + WHEN tti > 0 + AND tti < 3000000 THEN tti + ELSE NULL + END + ) AS tti_state, + max( + CASE + WHEN tti > 0 + AND tti < 3000000 THEN tti + ELSE NULL + END + ) AS max_tti, + min( + CASE + WHEN tti > 0 + AND tti < 3000000 THEN tti + ELSE NULL + END + ) AS min_tti, + uddsketch_state( + 128, + 0.01, + CASE + WHEN fid > 0 + AND fid < 3000000 THEN fid + ELSE NULL + END + ) AS fid_state, + max( + CASE + WHEN fid > 0 + AND fid < 3000000 THEN fid + ELSE NULL + END + ) AS max_fid, + min( + CASE + WHEN fid > 0 + AND fid < 3000000 THEN fid + ELSE NULL + END + ) AS min_fid, + max(shard_key) AS shard_key, + date_bin('60 seconds' :: INTERVAL, time) :: TIMESTAMP(0) +FROM + base_table +WHERE + ( + ( + lcp > 0 + AND lcp < 3000000 + ) + OR ( + fmp > 0 + AND fmp < 3000000 + ) + OR ( + fcp > 0 + AND fcp < 3000000 + ) + OR ( + fp > 0 + AND fp < 3000000 + ) + OR ( + tti > 0 + AND tti < 3000000 + ) + OR ( + fid > 0 + AND fid < 3000000 + ) + ) AND time >= now() +GROUP BY + env, + service_name, + city, + page, + date_bin('60 seconds' :: INTERVAL, time) :: TIMESTAMP(0); + + +EXPLAIN ANALYZE SELECT + env, + service_name, + city, + page, + uddsketch_state( + 128, + 0.01, + CASE + WHEN lcp > 0 + AND lcp < 3000000 THEN lcp + ELSE NULL + END + ) AS lcp_state, + max( + CASE + WHEN lcp > 0 + AND lcp < 3000000 THEN lcp + ELSE NULL + END + ) AS max_lcp, + min( + CASE + WHEN lcp > 0 + AND lcp < 3000000 THEN lcp + ELSE NULL + END + ) AS min_lcp, + uddsketch_state( + 128, + 0.01, + CASE + WHEN fmp > 0 + AND fmp < 3000000 THEN fmp + ELSE NULL + END + ) AS fmp_state, + max( + CASE + WHEN fmp > 0 + AND fmp < 3000000 THEN fmp + ELSE NULL + END + ) AS max_fmp, + min( + CASE + WHEN fmp > 0 + AND fmp < 3000000 THEN fmp + ELSE NULL + END + ) AS min_fmp, + uddsketch_state( + 128, + 0.01, + CASE + WHEN fcp > 0 + AND fcp < 3000000 THEN fcp + ELSE NULL + END + ) AS fcp_state, + max( + CASE + WHEN fcp > 0 + AND fcp < 3000000 THEN fcp + ELSE NULL + END + ) AS max_fcp, + min( + CASE + WHEN fcp > 0 + AND fcp < 3000000 THEN fcp + ELSE NULL + END + ) AS min_fcp, + uddsketch_state( + 128, + 0.01, + CASE + WHEN fp > 0 + AND fp < 3000000 THEN fp + ELSE NULL + END + ) AS fp_state, + max( + CASE + WHEN fp > 0 + AND fp < 3000000 THEN fp + ELSE NULL + END + ) AS max_fp, + min( + CASE + WHEN fp > 0 + AND fp < 3000000 THEN fp + ELSE NULL + END + ) AS min_fp, + uddsketch_state( + 128, + 0.01, + CASE + WHEN tti > 0 + AND tti < 3000000 THEN tti + ELSE NULL + END + ) AS tti_state, + max( + CASE + WHEN tti > 0 + AND tti < 3000000 THEN tti + ELSE NULL + END + ) AS max_tti, + min( + CASE + WHEN tti > 0 + AND tti < 3000000 THEN tti + ELSE NULL + END + ) AS min_tti, + uddsketch_state( + 128, + 0.01, + CASE + WHEN fid > 0 + AND fid < 3000000 THEN fid + ELSE NULL + END + ) AS fid_state, + max( + CASE + WHEN fid > 0 + AND fid < 3000000 THEN fid + ELSE NULL + END + ) AS max_fid, + min( + CASE + WHEN fid > 0 + AND fid < 3000000 THEN fid + ELSE NULL + END + ) AS min_fid, + max(shard_key) AS shard_key, + date_bin('60 seconds' :: INTERVAL, time) :: TIMESTAMP(0) +FROM + base_table +WHERE + ( + ( + lcp > 0 + AND lcp < 3000000 + ) + OR ( + fmp > 0 + AND fmp < 3000000 + ) + OR ( + fcp > 0 + AND fcp < 3000000 + ) + OR ( + fp > 0 + AND fp < 3000000 + ) + OR ( + tti > 0 + AND tti < 3000000 + ) + OR ( + fid > 0 + AND fid < 3000000 + ) + ) AND time >= now() +GROUP BY + env, + service_name, + city, + page, + date_bin('60 seconds' :: INTERVAL, time) :: TIMESTAMP(0); + +EXPLAIN SELECT + env, + service_name, + city, + page, + uddsketch_state( + 128, + 0.01, + CASE + WHEN lcp > 0 + AND lcp < 3000000 THEN lcp + ELSE NULL + END + ) AS lcp_state, + max( + CASE + WHEN lcp > 0 + AND lcp < 3000000 THEN lcp + ELSE NULL + END + ) AS max_lcp, + min( + CASE + WHEN lcp > 0 + AND lcp < 3000000 THEN lcp + ELSE NULL + END + ) AS min_lcp, + uddsketch_state( + 128, + 0.01, + CASE + WHEN fmp > 0 + AND fmp < 3000000 THEN fmp + ELSE NULL + END + ) AS fmp_state, + max( + CASE + WHEN fmp > 0 + AND fmp < 3000000 THEN fmp + ELSE NULL + END + ) AS max_fmp, + min( + CASE + WHEN fmp > 0 + AND fmp < 3000000 THEN fmp + ELSE NULL + END + ) AS min_fmp, + uddsketch_state( + 128, + 0.01, + CASE + WHEN fcp > 0 + AND fcp < 3000000 THEN fcp + ELSE NULL + END + ) AS fcp_state, + max( + CASE + WHEN fcp > 0 + AND fcp < 3000000 THEN fcp + ELSE NULL + END + ) AS max_fcp, + min( + CASE + WHEN fcp > 0 + AND fcp < 3000000 THEN fcp + ELSE NULL + END + ) AS min_fcp, + uddsketch_state( + 128, + 0.01, + CASE + WHEN fp > 0 + AND fp < 3000000 THEN fp + ELSE NULL + END + ) AS fp_state, + max( + CASE + WHEN fp > 0 + AND fp < 3000000 THEN fp + ELSE NULL + END + ) AS max_fp, + min( + CASE + WHEN fp > 0 + AND fp < 3000000 THEN fp + ELSE NULL + END + ) AS min_fp, + uddsketch_state( + 128, + 0.01, + CASE + WHEN tti > 0 + AND tti < 3000000 THEN tti + ELSE NULL + END + ) AS tti_state, + max( + CASE + WHEN tti > 0 + AND tti < 3000000 THEN tti + ELSE NULL + END + ) AS max_tti, + min( + CASE + WHEN tti > 0 + AND tti < 3000000 THEN tti + ELSE NULL + END + ) AS min_tti, + uddsketch_state( + 128, + 0.01, + CASE + WHEN fid > 0 + AND fid < 3000000 THEN fid + ELSE NULL + END + ) AS fid_state, + max( + CASE + WHEN fid > 0 + AND fid < 3000000 THEN fid + ELSE NULL + END + ) AS max_fid, + min( + CASE + WHEN fid > 0 + AND fid < 3000000 THEN fid + ELSE NULL + END + ) AS min_fid, + max(shard_key) AS shard_key, + date_bin('60 seconds' :: INTERVAL, time) :: TIMESTAMP(0) +FROM + base_table +WHERE + ( + ( + lcp > 0 + AND lcp < 3000000 + ) + OR ( + fmp > 0 + AND fmp < 3000000 + ) + OR ( + fcp > 0 + AND fcp < 3000000 + ) + OR ( + fp > 0 + AND fp < 3000000 + ) + OR ( + tti > 0 + AND tti < 3000000 + ) + OR ( + fid > 0 + AND fid < 3000000 + ) + ) AND time >= now() +GROUP BY + env, + service_name, + city, + page, + date_bin('60 seconds' :: INTERVAL, time) :: TIMESTAMP(0); + +EXPLAIN SELECT count(*) from base_table where time >= now(); + +-- ERROR: Internal error: 1003 +EXPLAIN ANALYZE SELECT count(*) from base_table where time >= now(); + +DROP TABLE base_table;