mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-10 15:22:56 +00:00
feat: steppable aggr fn dist push down
Signed-off-by: discord9 <discord9@163.com> poc: step aggr query Signed-off-by: discord9 <discord9@163.com> feat: mvp poc stuff Signed-off-by: discord9 <discord9@163.com> test: sqlness Signed-off-by: discord9 <discord9@163.com> chore: import missing Signed-off-by: discord9 <discord9@163.com> feat: support first/last_value Signed-off-by: discord9 <discord9@163.com> fix: check also include first/last value Signed-off-by: discord9 <discord9@163.com> chore: clean up after rebase feat: optimize yes! fix: alias qualifled test: more testcases chore: qualified column chore: per review
This commit is contained in:
@@ -14,8 +14,8 @@
|
||||
|
||||
use crate::function_registry::FunctionRegistry;
|
||||
|
||||
pub(crate) mod hll;
|
||||
mod uddsketch;
|
||||
pub mod hll;
|
||||
pub mod uddsketch;
|
||||
|
||||
pub(crate) struct ApproximateFunction;
|
||||
|
||||
|
||||
@@ -15,6 +15,7 @@
|
||||
use std::collections::HashSet;
|
||||
use std::sync::Arc;
|
||||
|
||||
use common_telemetry::debug;
|
||||
use datafusion::datasource::DefaultTableSource;
|
||||
use datafusion::error::Result as DfResult;
|
||||
use datafusion_common::config::ConfigOptions;
|
||||
@@ -148,12 +149,13 @@ struct PlanRewriter {
|
||||
level: usize,
|
||||
/// Simulated stack for the `rewrite` recursion
|
||||
stack: Vec<(LogicalPlan, usize)>,
|
||||
/// Stages to be expanded
|
||||
/// Stages to be expanded, will be added as parent node of merge scan one by one
|
||||
stage: Vec<LogicalPlan>,
|
||||
status: RewriterStatus,
|
||||
/// Partition columns of the table in current pass
|
||||
partition_cols: Option<Vec<String>>,
|
||||
column_requirements: HashSet<Column>,
|
||||
expand_on_next_call: bool,
|
||||
}
|
||||
|
||||
impl PlanRewriter {
|
||||
@@ -174,6 +176,10 @@ impl PlanRewriter {
|
||||
{
|
||||
return true;
|
||||
}
|
||||
if self.expand_on_next_call {
|
||||
self.expand_on_next_call = false;
|
||||
return true;
|
||||
}
|
||||
match Categorizer::check_plan(plan, self.partition_cols.clone()) {
|
||||
Commutativity::Commutative => {}
|
||||
Commutativity::PartialCommutative => {
|
||||
@@ -190,12 +196,20 @@ impl PlanRewriter {
|
||||
self.stage.push(plan)
|
||||
}
|
||||
}
|
||||
Commutativity::TransformedCommutative(transformer) => {
|
||||
Commutativity::TransformedCommutative {
|
||||
transformer,
|
||||
expand_on_parent,
|
||||
} => {
|
||||
if let Some(transformer) = transformer
|
||||
&& let Some(plan) = transformer(plan)
|
||||
&& let Some(changed_plan) = transformer(plan)
|
||||
{
|
||||
self.update_column_requirements(&plan);
|
||||
self.stage.push(plan)
|
||||
debug!("PlanRewriter: transformed plan: {changed_plan:#?} from {plan}");
|
||||
if let Some(last_stage) = changed_plan.last() {
|
||||
// update the column requirements from the last stage
|
||||
self.update_column_requirements(last_stage);
|
||||
}
|
||||
self.stage.extend(changed_plan.into_iter().rev());
|
||||
self.expand_on_next_call = expand_on_parent;
|
||||
}
|
||||
}
|
||||
Commutativity::NonCommutative
|
||||
@@ -391,10 +405,21 @@ impl TreeNodeRewriter for PlanRewriter {
|
||||
return Ok(Transformed::yes(node));
|
||||
};
|
||||
|
||||
let parent = parent.clone();
|
||||
|
||||
// TODO(ruihang): avoid this clone
|
||||
if self.should_expand(&parent.clone()) {
|
||||
if self.should_expand(&parent) {
|
||||
// TODO(ruihang): does this work for nodes with multiple children?;
|
||||
let node = self.expand(node)?;
|
||||
debug!("PlanRewriter: should expand child:\n {node}\n Of Parent: {parent}");
|
||||
let node = self.expand(node);
|
||||
debug!(
|
||||
"PlanRewriter: expanded plan: {}",
|
||||
match &node {
|
||||
Ok(n) => n.to_string(),
|
||||
Err(e) => format!("Error expanding plan: {e}"),
|
||||
}
|
||||
);
|
||||
let node = node?;
|
||||
self.pop_stack();
|
||||
return Ok(Transformed::yes(node));
|
||||
}
|
||||
|
||||
@@ -15,7 +15,12 @@
|
||||
use std::collections::HashSet;
|
||||
use std::sync::Arc;
|
||||
|
||||
use datafusion_expr::{Expr, LogicalPlan, UserDefinedLogicalNode};
|
||||
use common_function::aggrs::approximate::hll::{HllState, HLL_NAME};
|
||||
use common_function::aggrs::approximate::uddsketch::{UddSketchState, UDDSKETCH_STATE_NAME};
|
||||
use common_telemetry::debug;
|
||||
use datafusion::functions_aggregate::sum::sum_udaf;
|
||||
use datafusion_common::Column;
|
||||
use datafusion_expr::{Expr, LogicalPlan, Projection, UserDefinedLogicalNode};
|
||||
use promql::extension_plan::{
|
||||
EmptyMetric, InstantManipulate, RangeManipulate, SeriesDivide, SeriesNormalize,
|
||||
};
|
||||
@@ -23,12 +28,190 @@ use promql::extension_plan::{
|
||||
use crate::dist_plan::merge_sort::{merge_sort_transformer, MergeSortLogicalPlan};
|
||||
use crate::dist_plan::MergeScanLogicalPlan;
|
||||
|
||||
/// generate the upper aggregation plan that will execute on the frontend.
|
||||
/// Basically a logical plan resembling the following:
|
||||
/// Projection:
|
||||
/// Aggregate:
|
||||
///
|
||||
/// from Aggregate
|
||||
///
|
||||
/// The upper Projection exists sole to make sure parent plan can recognize the output
|
||||
/// of the upper aggregation plan.
|
||||
pub fn step_aggr_to_upper_aggr(
|
||||
aggr_plan: &LogicalPlan,
|
||||
) -> datafusion_common::Result<Vec<LogicalPlan>> {
|
||||
let LogicalPlan::Aggregate(input_aggr) = aggr_plan else {
|
||||
return Err(datafusion_common::DataFusionError::Plan(
|
||||
"step_aggr_to_upper_aggr only accepts Aggregate plan".to_string(),
|
||||
));
|
||||
};
|
||||
if !is_all_aggr_exprs_steppable(&input_aggr.aggr_expr) {
|
||||
return Err(datafusion_common::DataFusionError::NotImplemented(
|
||||
"Some aggregate expressions are not steppable".to_string(),
|
||||
));
|
||||
}
|
||||
let mut upper_aggr_expr = vec![];
|
||||
for aggr_expr in &input_aggr.aggr_expr {
|
||||
let Some(aggr_func) = get_aggr_func(aggr_expr) else {
|
||||
return Err(datafusion_common::DataFusionError::NotImplemented(
|
||||
"Aggregate function not found".to_string(),
|
||||
));
|
||||
};
|
||||
let col_name = aggr_expr.qualified_name();
|
||||
let input_column = Expr::Column(datafusion_common::Column::new(col_name.0, col_name.1));
|
||||
let upper_func = match aggr_func.func.name() {
|
||||
"sum" | "min" | "max" | "last_value" | "first_value" => {
|
||||
// aggr_calc(aggr_merge(input_column))) as col_name
|
||||
let mut new_aggr_func = aggr_func.clone();
|
||||
new_aggr_func.args = vec![input_column.clone()];
|
||||
new_aggr_func
|
||||
}
|
||||
"count" => {
|
||||
// sum(input_column) as col_name
|
||||
let mut new_aggr_func = aggr_func.clone();
|
||||
new_aggr_func.func = sum_udaf();
|
||||
new_aggr_func.args = vec![input_column.clone()];
|
||||
new_aggr_func
|
||||
}
|
||||
UDDSKETCH_STATE_NAME => {
|
||||
// udd_merge(bucket_size, error_rate input_column) as col_name
|
||||
let mut new_aggr_func = aggr_func.clone();
|
||||
new_aggr_func.func = Arc::new(UddSketchState::merge_udf_impl());
|
||||
new_aggr_func.args[2] = input_column.clone();
|
||||
new_aggr_func
|
||||
}
|
||||
HLL_NAME => {
|
||||
// hll_merge(input_column) as col_name
|
||||
let mut new_aggr_func = aggr_func.clone();
|
||||
new_aggr_func.func = Arc::new(HllState::merge_udf_impl());
|
||||
new_aggr_func.args = vec![input_column.clone()];
|
||||
new_aggr_func
|
||||
}
|
||||
_ => {
|
||||
return Err(datafusion_common::DataFusionError::NotImplemented(format!(
|
||||
"Aggregate function {} is not supported for Step aggregation",
|
||||
aggr_func.func.name()
|
||||
)))
|
||||
}
|
||||
};
|
||||
|
||||
// deal with nested alias case
|
||||
let mut new_aggr_expr = aggr_expr.clone();
|
||||
{
|
||||
let new_aggr_func = get_aggr_func_mut(&mut new_aggr_expr).unwrap();
|
||||
*new_aggr_func = upper_func;
|
||||
}
|
||||
|
||||
upper_aggr_expr.push(new_aggr_expr);
|
||||
}
|
||||
let mut new_aggr = input_aggr.clone();
|
||||
// use lower aggregate plan as input, this will be replace by merge scan plan later
|
||||
new_aggr.input = Arc::new(LogicalPlan::Aggregate(input_aggr.clone()));
|
||||
|
||||
new_aggr.aggr_expr = upper_aggr_expr;
|
||||
|
||||
// group by expr also need to be all ref by column to avoid duplicated computing
|
||||
let mut new_group_expr = new_aggr.group_expr.clone();
|
||||
for expr in &mut new_group_expr {
|
||||
if let Expr::Column(_) = expr {
|
||||
// already a column, no need to change
|
||||
continue;
|
||||
}
|
||||
let col_name = expr.qualified_name();
|
||||
let input_column = Expr::Column(datafusion_common::Column::new(col_name.0, col_name.1));
|
||||
*expr = input_column;
|
||||
}
|
||||
new_aggr.group_expr = new_group_expr.clone();
|
||||
|
||||
let mut new_projection_exprs = new_group_expr;
|
||||
// the upper aggr expr need to be aliased to the input aggr expr's name,
|
||||
// so that the parent plan can recognize it.
|
||||
for (lower_aggr_expr, upper_aggr_expr) in
|
||||
input_aggr.aggr_expr.iter().zip(new_aggr.aggr_expr.iter())
|
||||
{
|
||||
let lower_col_name = lower_aggr_expr.qualified_name();
|
||||
let (table, col_name) = upper_aggr_expr.qualified_name();
|
||||
let aggr_out_column = Column::new(table, col_name);
|
||||
let aliased_output_aggr_expr =
|
||||
Expr::Column(aggr_out_column).alias_qualified(lower_col_name.0, lower_col_name.1);
|
||||
new_projection_exprs.push(aliased_output_aggr_expr);
|
||||
}
|
||||
let upper_aggr_plan = LogicalPlan::Aggregate(new_aggr);
|
||||
debug!("Before recompute schema: {upper_aggr_plan:?}");
|
||||
let upper_aggr_plan = upper_aggr_plan.recompute_schema()?;
|
||||
debug!("After recompute schema: {upper_aggr_plan:?}");
|
||||
// create a projection on top of the new aggregate plan
|
||||
let new_projection =
|
||||
Projection::try_new(new_projection_exprs, Arc::new(upper_aggr_plan.clone()))?;
|
||||
let projection = LogicalPlan::Projection(new_projection);
|
||||
// return the new logical plan
|
||||
Ok(vec![projection, upper_aggr_plan])
|
||||
}
|
||||
|
||||
/// Check if the given aggregate expression is steppable.
|
||||
/// As in if it can be split into multiple steps:
|
||||
/// i.e. on datanode first call `state(input)` then
|
||||
/// on frontend call `calc(merge(state))` to get the final result.
|
||||
///
|
||||
pub fn is_all_aggr_exprs_steppable(aggr_exprs: &[Expr]) -> bool {
|
||||
let step_action = HashSet::from([
|
||||
"sum",
|
||||
"count",
|
||||
"min",
|
||||
"max",
|
||||
"first_value",
|
||||
"last_value",
|
||||
UDDSKETCH_STATE_NAME,
|
||||
HLL_NAME,
|
||||
]);
|
||||
aggr_exprs.iter().all(|expr| {
|
||||
if let Some(aggr_func) = get_aggr_func(expr) {
|
||||
if aggr_func.distinct {
|
||||
// Distinct aggregate functions are not steppable(yet).
|
||||
return false;
|
||||
}
|
||||
step_action.contains(aggr_func.func.name())
|
||||
} else {
|
||||
false
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
pub fn get_aggr_func(expr: &Expr) -> Option<&datafusion_expr::expr::AggregateFunction> {
|
||||
let mut expr_ref = expr;
|
||||
while let Expr::Alias(alias) = expr_ref {
|
||||
expr_ref = &alias.expr;
|
||||
}
|
||||
if let Expr::AggregateFunction(aggr_func) = expr_ref {
|
||||
Some(aggr_func)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
pub fn get_aggr_func_mut(expr: &mut Expr) -> Option<&mut datafusion_expr::expr::AggregateFunction> {
|
||||
let mut expr_ref = expr;
|
||||
while let Expr::Alias(alias) = expr_ref {
|
||||
expr_ref = &mut alias.expr;
|
||||
}
|
||||
if let Expr::AggregateFunction(aggr_func) = expr_ref {
|
||||
Some(aggr_func)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
pub enum Commutativity {
|
||||
Commutative,
|
||||
PartialCommutative,
|
||||
ConditionalCommutative(Option<Transformer>),
|
||||
TransformedCommutative(Option<Transformer>),
|
||||
TransformedCommutative {
|
||||
/// Return plans from parent to child order
|
||||
transformer: Option<StageTransformer>,
|
||||
/// whether the transformer changes the child to parent
|
||||
expand_on_parent: bool,
|
||||
},
|
||||
NonCommutative,
|
||||
Unimplemented,
|
||||
/// For unrelated plans like DDL
|
||||
@@ -55,7 +238,21 @@ impl Categorizer {
|
||||
LogicalPlan::Filter(filter) => Self::check_expr(&filter.predicate),
|
||||
LogicalPlan::Window(_) => Commutativity::Unimplemented,
|
||||
LogicalPlan::Aggregate(aggr) => {
|
||||
if !Self::check_partition(&aggr.group_expr, &partition_cols) {
|
||||
let is_all_steppable = is_all_aggr_exprs_steppable(&aggr.aggr_expr);
|
||||
let matches_partition = Self::check_partition(&aggr.group_expr, &partition_cols);
|
||||
if !matches_partition && is_all_steppable {
|
||||
debug!("Plan is steppable: {plan}");
|
||||
return Commutativity::TransformedCommutative {
|
||||
transformer: Some(Arc::new(|plan: &LogicalPlan| {
|
||||
debug!("Before Step optimize: {plan}");
|
||||
let ret = step_aggr_to_upper_aggr(plan);
|
||||
debug!("After Step Optimize: {ret:?}");
|
||||
ret.ok()
|
||||
})),
|
||||
expand_on_parent: true,
|
||||
};
|
||||
}
|
||||
if !matches_partition {
|
||||
return Commutativity::NonCommutative;
|
||||
}
|
||||
for expr in &aggr.aggr_expr {
|
||||
@@ -217,6 +414,8 @@ impl Categorizer {
|
||||
|
||||
pub type Transformer = Arc<dyn Fn(&LogicalPlan) -> Option<LogicalPlan>>;
|
||||
|
||||
pub type StageTransformer = Arc<dyn Fn(&LogicalPlan) -> Option<Vec<LogicalPlan>>>;
|
||||
|
||||
pub fn partial_commutative_transformer(plan: &LogicalPlan) -> Option<LogicalPlan> {
|
||||
Some(plan.clone())
|
||||
}
|
||||
|
||||
59
tests/cases/distributed/explain/how_alias_done.sql
Normal file
59
tests/cases/distributed/explain/how_alias_done.sql
Normal file
@@ -0,0 +1,59 @@
|
||||
CREATE TABLE IF NOT EXISTS base_table (
|
||||
"time" TIMESTAMP(3) NOT NULL,
|
||||
env STRING NULL,
|
||||
service_name STRING NULL,
|
||||
city STRING NULL,
|
||||
page STRING NULL,
|
||||
lcp BIGINT NULL,
|
||||
fmp BIGINT NULL,
|
||||
fcp BIGINT NULL,
|
||||
fp BIGINT NULL,
|
||||
tti BIGINT NULL,
|
||||
fid BIGINT NULL,
|
||||
shard_key BIGINT NULL,
|
||||
TIME INDEX ("time"),
|
||||
PRIMARY KEY (env, service_name)
|
||||
) PARTITION ON COLUMNS (shard_key) (
|
||||
shard_key < 4,
|
||||
shard_key >= 4
|
||||
AND shard_key < 8,
|
||||
shard_key >= 8
|
||||
AND shard_key < 12,
|
||||
shard_key >= 12
|
||||
AND shard_key < 16,
|
||||
shard_key >= 16
|
||||
AND shard_key < 20,
|
||||
shard_key >= 20
|
||||
AND shard_key < 24,
|
||||
shard_key >= 24
|
||||
AND shard_key < 28,
|
||||
shard_key >= 28
|
||||
AND shard_key < 32,
|
||||
shard_key >= 32
|
||||
AND shard_key < 36,
|
||||
shard_key >= 36
|
||||
AND shard_key < 40,
|
||||
shard_key >= 40
|
||||
AND shard_key < 44,
|
||||
shard_key >= 44
|
||||
AND shard_key < 48,
|
||||
shard_key >= 48
|
||||
AND shard_key < 52,
|
||||
shard_key >= 52
|
||||
AND shard_key < 56,
|
||||
shard_key >= 56
|
||||
AND shard_key < 60,
|
||||
shard_key >= 60
|
||||
) ENGINE = mito WITH(
|
||||
'append_mode' = 'true',
|
||||
'compaction.twcs.max_output_file_size' = "2GB",
|
||||
'compaction.twcs.time_window' = "1h",
|
||||
'compaction.type' = "twcs",
|
||||
);
|
||||
|
||||
EXPLAIN SELECT count(*) from base_table where time >= now();
|
||||
|
||||
-- ERROR: Internal error: 1003
|
||||
EXPLAIN ANALYZE SELECT count(*) from base_table where time >= now();
|
||||
|
||||
DROP TABLE base_table;
|
||||
409
tests/cases/distributed/explain/step_aggr.result
Normal file
409
tests/cases/distributed/explain/step_aggr.result
Normal file
@@ -0,0 +1,409 @@
|
||||
CREATE TABLE integers(
|
||||
host STRING,
|
||||
i BIGINT,
|
||||
ts TIMESTAMP TIME INDEX
|
||||
) PARTITION ON COLUMNS (host) (
|
||||
host < '550-A',
|
||||
host >= '550-A'
|
||||
AND host < '550-W',
|
||||
host >= '550-W'
|
||||
);
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
INSERT INTO integers (host, i, ts) VALUES
|
||||
('550-A', 1, '2023-01-01 00:00:00'),
|
||||
('550-A', 2, '2023-01-01 01:00:00'),
|
||||
('550-W', 3, '2023-01-01 02:00:00'),
|
||||
('550-W', 4, '2023-01-01 03:00:00');
|
||||
|
||||
Affected Rows: 4
|
||||
|
||||
-- count
|
||||
EXPLAIN SELECT
|
||||
count(i)
|
||||
FROM
|
||||
integers;
|
||||
|
||||
+---------------+-------------------------------------------------------------------------------------------------------+
|
||||
| plan_type | plan |
|
||||
+---------------+-------------------------------------------------------------------------------------------------------+
|
||||
| logical_plan | Aggregate: groupBy=[[]], aggr=[[sum(count(integers.i)) AS count(integers.i)]] |
|
||||
| | MergeScan [is_placeholder=false, input=Aggregate: groupBy=[[]], aggr=[[count(integers.i)]] |
|
||||
| | TableScan: integers] |
|
||||
| physical_plan | AggregateExec: mode=Final, gby=[], aggr=[count(integers.i)] |
|
||||
| | CoalescePartitionsExec |
|
||||
| | AggregateExec: mode=Partial, gby=[], aggr=[count(integers.i)] |
|
||||
| | MergeScanExec: peers=[4402341478400(1025, 0), 4402341478401(1025, 1), 4402341478402(1025, 2), ] |
|
||||
| | |
|
||||
+---------------+-------------------------------------------------------------------------------------------------------+
|
||||
|
||||
EXPLAIN SELECT
|
||||
ts,
|
||||
count(i)
|
||||
FROM
|
||||
integers
|
||||
GROUP BY
|
||||
ts;
|
||||
|
||||
+---------------+---------------------------------------------------------------------------------------------------------+
|
||||
| plan_type | plan |
|
||||
+---------------+---------------------------------------------------------------------------------------------------------+
|
||||
| logical_plan | Aggregate: groupBy=[[integers.ts]], aggr=[[sum(count(integers.i)) AS count(integers.i)]] |
|
||||
| | MergeScan [is_placeholder=false, input=Aggregate: groupBy=[[integers.ts]], aggr=[[count(integers.i)]] |
|
||||
| | TableScan: integers] |
|
||||
| physical_plan | AggregateExec: mode=FinalPartitioned, gby=[ts@0 as ts], aggr=[count(integers.i)] |
|
||||
| | CoalesceBatchesExec: target_batch_size=8192 |
|
||||
| | RepartitionExec: partitioning=Hash([ts@0], 20), input_partitions=20 |
|
||||
| | AggregateExec: mode=Partial, gby=[ts@0 as ts], aggr=[count(integers.i)] |
|
||||
| | MergeScanExec: peers=[4402341478400(1025, 0), 4402341478401(1025, 1), 4402341478402(1025, 2), ] |
|
||||
| | |
|
||||
+---------------+---------------------------------------------------------------------------------------------------------+
|
||||
|
||||
EXPLAIN SELECT
|
||||
count(i)
|
||||
FROM
|
||||
integers
|
||||
GROUP BY
|
||||
ts;
|
||||
|
||||
+---------------+-----------------------------------------------------------------------------------------------------------+
|
||||
| plan_type | plan |
|
||||
+---------------+-----------------------------------------------------------------------------------------------------------+
|
||||
| logical_plan | Projection: count(integers.i) |
|
||||
| | Aggregate: groupBy=[[integers.ts]], aggr=[[sum(count(integers.i)) AS count(integers.i)]] |
|
||||
| | MergeScan [is_placeholder=false, input=Aggregate: groupBy=[[integers.ts]], aggr=[[count(integers.i)]] |
|
||||
| | TableScan: integers] |
|
||||
| physical_plan | ProjectionExec: expr=[count(integers.i)@1 as count(integers.i)] |
|
||||
| | AggregateExec: mode=FinalPartitioned, gby=[ts@0 as ts], aggr=[count(integers.i)] |
|
||||
| | CoalesceBatchesExec: target_batch_size=8192 |
|
||||
| | RepartitionExec: partitioning=Hash([ts@0], 20), input_partitions=20 |
|
||||
| | AggregateExec: mode=Partial, gby=[ts@0 as ts], aggr=[count(integers.i)] |
|
||||
| | MergeScanExec: peers=[4402341478400(1025, 0), 4402341478401(1025, 1), 4402341478402(1025, 2), ] |
|
||||
| | |
|
||||
+---------------+-----------------------------------------------------------------------------------------------------------+
|
||||
|
||||
-- sum
|
||||
EXPLAIN SELECT
|
||||
sum(i)
|
||||
FROM
|
||||
integers;
|
||||
|
||||
+---------------+-------------------------------------------------------------------------------------------------------+
|
||||
| plan_type | plan |
|
||||
+---------------+-------------------------------------------------------------------------------------------------------+
|
||||
| logical_plan | Aggregate: groupBy=[[]], aggr=[[sum(sum(integers.i)) AS sum(integers.i)]] |
|
||||
| | MergeScan [is_placeholder=false, input=Aggregate: groupBy=[[]], aggr=[[sum(integers.i)]] |
|
||||
| | TableScan: integers] |
|
||||
| physical_plan | AggregateExec: mode=Final, gby=[], aggr=[sum(integers.i)] |
|
||||
| | CoalescePartitionsExec |
|
||||
| | AggregateExec: mode=Partial, gby=[], aggr=[sum(integers.i)] |
|
||||
| | MergeScanExec: peers=[4402341478400(1025, 0), 4402341478401(1025, 1), 4402341478402(1025, 2), ] |
|
||||
| | |
|
||||
+---------------+-------------------------------------------------------------------------------------------------------+
|
||||
|
||||
EXPLAIN SELECT
|
||||
ts,
|
||||
sum(i)
|
||||
FROM
|
||||
integers
|
||||
GROUP BY
|
||||
ts;
|
||||
|
||||
+---------------+---------------------------------------------------------------------------------------------------------+
|
||||
| plan_type | plan |
|
||||
+---------------+---------------------------------------------------------------------------------------------------------+
|
||||
| logical_plan | Aggregate: groupBy=[[integers.ts]], aggr=[[sum(sum(integers.i)) AS sum(integers.i)]] |
|
||||
| | MergeScan [is_placeholder=false, input=Aggregate: groupBy=[[integers.ts]], aggr=[[sum(integers.i)]] |
|
||||
| | TableScan: integers] |
|
||||
| physical_plan | AggregateExec: mode=FinalPartitioned, gby=[ts@0 as ts], aggr=[sum(integers.i)] |
|
||||
| | CoalesceBatchesExec: target_batch_size=8192 |
|
||||
| | RepartitionExec: partitioning=Hash([ts@0], 20), input_partitions=20 |
|
||||
| | AggregateExec: mode=Partial, gby=[ts@0 as ts], aggr=[sum(integers.i)] |
|
||||
| | MergeScanExec: peers=[4402341478400(1025, 0), 4402341478401(1025, 1), 4402341478402(1025, 2), ] |
|
||||
| | |
|
||||
+---------------+---------------------------------------------------------------------------------------------------------+
|
||||
|
||||
EXPLAIN SELECT
|
||||
sum(i)
|
||||
FROM
|
||||
integers
|
||||
GROUP BY
|
||||
ts;
|
||||
|
||||
+---------------+-----------------------------------------------------------------------------------------------------------+
|
||||
| plan_type | plan |
|
||||
+---------------+-----------------------------------------------------------------------------------------------------------+
|
||||
| logical_plan | Projection: sum(integers.i) |
|
||||
| | Aggregate: groupBy=[[integers.ts]], aggr=[[sum(sum(integers.i)) AS sum(integers.i)]] |
|
||||
| | MergeScan [is_placeholder=false, input=Aggregate: groupBy=[[integers.ts]], aggr=[[sum(integers.i)]] |
|
||||
| | TableScan: integers] |
|
||||
| physical_plan | ProjectionExec: expr=[sum(integers.i)@1 as sum(integers.i)] |
|
||||
| | AggregateExec: mode=FinalPartitioned, gby=[ts@0 as ts], aggr=[sum(integers.i)] |
|
||||
| | CoalesceBatchesExec: target_batch_size=8192 |
|
||||
| | RepartitionExec: partitioning=Hash([ts@0], 20), input_partitions=20 |
|
||||
| | AggregateExec: mode=Partial, gby=[ts@0 as ts], aggr=[sum(integers.i)] |
|
||||
| | MergeScanExec: peers=[4402341478400(1025, 0), 4402341478401(1025, 1), 4402341478402(1025, 2), ] |
|
||||
| | |
|
||||
+---------------+-----------------------------------------------------------------------------------------------------------+
|
||||
|
||||
-- min
|
||||
EXPLAIN SELECT
|
||||
min(i)
|
||||
FROM
|
||||
integers;
|
||||
|
||||
+---------------+-------------------------------------------------------------------------------------------------------+
|
||||
| plan_type | plan |
|
||||
+---------------+-------------------------------------------------------------------------------------------------------+
|
||||
| logical_plan | Aggregate: groupBy=[[]], aggr=[[min(min(integers.i)) AS min(integers.i)]] |
|
||||
| | MergeScan [is_placeholder=false, input=Aggregate: groupBy=[[]], aggr=[[min(integers.i)]] |
|
||||
| | TableScan: integers] |
|
||||
| physical_plan | AggregateExec: mode=Final, gby=[], aggr=[min(integers.i)] |
|
||||
| | CoalescePartitionsExec |
|
||||
| | AggregateExec: mode=Partial, gby=[], aggr=[min(integers.i)] |
|
||||
| | MergeScanExec: peers=[4402341478400(1025, 0), 4402341478401(1025, 1), 4402341478402(1025, 2), ] |
|
||||
| | |
|
||||
+---------------+-------------------------------------------------------------------------------------------------------+
|
||||
|
||||
EXPLAIN SELECT
|
||||
ts,
|
||||
min(i)
|
||||
FROM
|
||||
integers
|
||||
GROUP BY
|
||||
ts;
|
||||
|
||||
+---------------+---------------------------------------------------------------------------------------------------------+
|
||||
| plan_type | plan |
|
||||
+---------------+---------------------------------------------------------------------------------------------------------+
|
||||
| logical_plan | Aggregate: groupBy=[[integers.ts]], aggr=[[min(min(integers.i)) AS min(integers.i)]] |
|
||||
| | MergeScan [is_placeholder=false, input=Aggregate: groupBy=[[integers.ts]], aggr=[[min(integers.i)]] |
|
||||
| | TableScan: integers] |
|
||||
| physical_plan | AggregateExec: mode=FinalPartitioned, gby=[ts@0 as ts], aggr=[min(integers.i)] |
|
||||
| | CoalesceBatchesExec: target_batch_size=8192 |
|
||||
| | RepartitionExec: partitioning=Hash([ts@0], 20), input_partitions=20 |
|
||||
| | AggregateExec: mode=Partial, gby=[ts@0 as ts], aggr=[min(integers.i)] |
|
||||
| | MergeScanExec: peers=[4402341478400(1025, 0), 4402341478401(1025, 1), 4402341478402(1025, 2), ] |
|
||||
| | |
|
||||
+---------------+---------------------------------------------------------------------------------------------------------+
|
||||
|
||||
EXPLAIN SELECT
|
||||
min(i)
|
||||
FROM
|
||||
integers
|
||||
GROUP BY
|
||||
ts;
|
||||
|
||||
+---------------+-----------------------------------------------------------------------------------------------------------+
|
||||
| plan_type | plan |
|
||||
+---------------+-----------------------------------------------------------------------------------------------------------+
|
||||
| logical_plan | Projection: min(integers.i) |
|
||||
| | Aggregate: groupBy=[[integers.ts]], aggr=[[min(min(integers.i)) AS min(integers.i)]] |
|
||||
| | MergeScan [is_placeholder=false, input=Aggregate: groupBy=[[integers.ts]], aggr=[[min(integers.i)]] |
|
||||
| | TableScan: integers] |
|
||||
| physical_plan | ProjectionExec: expr=[min(integers.i)@1 as min(integers.i)] |
|
||||
| | AggregateExec: mode=FinalPartitioned, gby=[ts@0 as ts], aggr=[min(integers.i)] |
|
||||
| | CoalesceBatchesExec: target_batch_size=8192 |
|
||||
| | RepartitionExec: partitioning=Hash([ts@0], 20), input_partitions=20 |
|
||||
| | AggregateExec: mode=Partial, gby=[ts@0 as ts], aggr=[min(integers.i)] |
|
||||
| | MergeScanExec: peers=[4402341478400(1025, 0), 4402341478401(1025, 1), 4402341478402(1025, 2), ] |
|
||||
| | |
|
||||
+---------------+-----------------------------------------------------------------------------------------------------------+
|
||||
|
||||
-- max
|
||||
EXPLAIN SELECT
|
||||
max(i)
|
||||
FROM
|
||||
integers;
|
||||
|
||||
+---------------+-------------------------------------------------------------------------------------------------------+
|
||||
| plan_type | plan |
|
||||
+---------------+-------------------------------------------------------------------------------------------------------+
|
||||
| logical_plan | Aggregate: groupBy=[[]], aggr=[[max(max(integers.i)) AS max(integers.i)]] |
|
||||
| | MergeScan [is_placeholder=false, input=Aggregate: groupBy=[[]], aggr=[[max(integers.i)]] |
|
||||
| | TableScan: integers] |
|
||||
| physical_plan | AggregateExec: mode=Final, gby=[], aggr=[max(integers.i)] |
|
||||
| | CoalescePartitionsExec |
|
||||
| | AggregateExec: mode=Partial, gby=[], aggr=[max(integers.i)] |
|
||||
| | MergeScanExec: peers=[4402341478400(1025, 0), 4402341478401(1025, 1), 4402341478402(1025, 2), ] |
|
||||
| | |
|
||||
+---------------+-------------------------------------------------------------------------------------------------------+
|
||||
|
||||
EXPLAIN SELECT
|
||||
ts,
|
||||
max(i)
|
||||
FROM
|
||||
integers
|
||||
GROUP BY
|
||||
ts;
|
||||
|
||||
+---------------+---------------------------------------------------------------------------------------------------------+
|
||||
| plan_type | plan |
|
||||
+---------------+---------------------------------------------------------------------------------------------------------+
|
||||
| logical_plan | Aggregate: groupBy=[[integers.ts]], aggr=[[max(max(integers.i)) AS max(integers.i)]] |
|
||||
| | MergeScan [is_placeholder=false, input=Aggregate: groupBy=[[integers.ts]], aggr=[[max(integers.i)]] |
|
||||
| | TableScan: integers] |
|
||||
| physical_plan | AggregateExec: mode=FinalPartitioned, gby=[ts@0 as ts], aggr=[max(integers.i)] |
|
||||
| | CoalesceBatchesExec: target_batch_size=8192 |
|
||||
| | RepartitionExec: partitioning=Hash([ts@0], 20), input_partitions=20 |
|
||||
| | AggregateExec: mode=Partial, gby=[ts@0 as ts], aggr=[max(integers.i)] |
|
||||
| | MergeScanExec: peers=[4402341478400(1025, 0), 4402341478401(1025, 1), 4402341478402(1025, 2), ] |
|
||||
| | |
|
||||
+---------------+---------------------------------------------------------------------------------------------------------+
|
||||
|
||||
EXPLAIN SELECT
|
||||
max(i)
|
||||
FROM
|
||||
integers
|
||||
GROUP BY
|
||||
ts;
|
||||
|
||||
+---------------+-----------------------------------------------------------------------------------------------------------+
|
||||
| plan_type | plan |
|
||||
+---------------+-----------------------------------------------------------------------------------------------------------+
|
||||
| logical_plan | Projection: max(integers.i) |
|
||||
| | Aggregate: groupBy=[[integers.ts]], aggr=[[max(max(integers.i)) AS max(integers.i)]] |
|
||||
| | MergeScan [is_placeholder=false, input=Aggregate: groupBy=[[integers.ts]], aggr=[[max(integers.i)]] |
|
||||
| | TableScan: integers] |
|
||||
| physical_plan | ProjectionExec: expr=[max(integers.i)@1 as max(integers.i)] |
|
||||
| | AggregateExec: mode=FinalPartitioned, gby=[ts@0 as ts], aggr=[max(integers.i)] |
|
||||
| | CoalesceBatchesExec: target_batch_size=8192 |
|
||||
| | RepartitionExec: partitioning=Hash([ts@0], 20), input_partitions=20 |
|
||||
| | AggregateExec: mode=Partial, gby=[ts@0 as ts], aggr=[max(integers.i)] |
|
||||
| | MergeScanExec: peers=[4402341478400(1025, 0), 4402341478401(1025, 1), 4402341478402(1025, 2), ] |
|
||||
| | |
|
||||
+---------------+-----------------------------------------------------------------------------------------------------------+
|
||||
|
||||
-- uddsketch_state
|
||||
EXPLAIN SELECT
|
||||
uddsketch_state(128, 0.01, i)
|
||||
FROM
|
||||
integers;
|
||||
|
||||
+---------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|
||||
| plan_type | plan |
|
||||
+---------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|
||||
| logical_plan | Aggregate: groupBy=[[]], aggr=[[uddsketch_merge(Int64(128), Float64(0.01), uddsketch_state(Int64(128),Float64(0.01),integers.i)) AS uddsketch_state(Int64(128),Float64(0.01),integers.i)]] |
|
||||
| | MergeScan [is_placeholder=false, input=Aggregate: groupBy=[[]], aggr=[[uddsketch_state(Int64(128), Float64(0.01), CAST(integers.i AS Float64))]] |
|
||||
| | TableScan: integers] |
|
||||
| physical_plan | AggregateExec: mode=Final, gby=[], aggr=[uddsketch_state(Int64(128),Float64(0.01),integers.i)] |
|
||||
| | CoalescePartitionsExec |
|
||||
| | AggregateExec: mode=Partial, gby=[], aggr=[uddsketch_state(Int64(128),Float64(0.01),integers.i)] |
|
||||
| | MergeScanExec: peers=[4402341478400(1025, 0), 4402341478401(1025, 1), 4402341478402(1025, 2), ] |
|
||||
| | |
|
||||
+---------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|
||||
|
||||
EXPLAIN SELECT
|
||||
ts,
|
||||
uddsketch_state(128, 0.01, i)
|
||||
FROM
|
||||
integers
|
||||
GROUP BY
|
||||
ts;
|
||||
|
||||
+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|
||||
| plan_type | plan |
|
||||
+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|
||||
| logical_plan | Aggregate: groupBy=[[integers.ts]], aggr=[[uddsketch_merge(Int64(128), Float64(0.01), uddsketch_state(Int64(128),Float64(0.01),integers.i)) AS uddsketch_state(Int64(128),Float64(0.01),integers.i)]] |
|
||||
| | MergeScan [is_placeholder=false, input=Aggregate: groupBy=[[integers.ts]], aggr=[[uddsketch_state(Int64(128), Float64(0.01), CAST(integers.i AS Float64))]] |
|
||||
| | TableScan: integers] |
|
||||
| physical_plan | AggregateExec: mode=FinalPartitioned, gby=[ts@0 as ts], aggr=[uddsketch_state(Int64(128),Float64(0.01),integers.i)] |
|
||||
| | CoalesceBatchesExec: target_batch_size=8192 |
|
||||
| | RepartitionExec: partitioning=Hash([ts@0], 20), input_partitions=20 |
|
||||
| | AggregateExec: mode=Partial, gby=[ts@0 as ts], aggr=[uddsketch_state(Int64(128),Float64(0.01),integers.i)] |
|
||||
| | MergeScanExec: peers=[4402341478400(1025, 0), 4402341478401(1025, 1), 4402341478402(1025, 2), ] |
|
||||
| | |
|
||||
+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|
||||
|
||||
EXPLAIN SELECT
|
||||
uddsketch_state(128, 0.01, i)
|
||||
FROM
|
||||
integers
|
||||
GROUP BY
|
||||
ts;
|
||||
|
||||
+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|
||||
| plan_type | plan |
|
||||
+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|
||||
| logical_plan | Projection: uddsketch_state(Int64(128),Float64(0.01),integers.i) |
|
||||
| | Aggregate: groupBy=[[integers.ts]], aggr=[[uddsketch_merge(Int64(128), Float64(0.01), uddsketch_state(Int64(128),Float64(0.01),integers.i)) AS uddsketch_state(Int64(128),Float64(0.01),integers.i)]] |
|
||||
| | MergeScan [is_placeholder=false, input=Aggregate: groupBy=[[integers.ts]], aggr=[[uddsketch_state(Int64(128), Float64(0.01), CAST(integers.i AS Float64))]] |
|
||||
| | TableScan: integers] |
|
||||
| physical_plan | ProjectionExec: expr=[uddsketch_state(Int64(128),Float64(0.01),integers.i)@1 as uddsketch_state(Int64(128),Float64(0.01),integers.i)] |
|
||||
| | AggregateExec: mode=FinalPartitioned, gby=[ts@0 as ts], aggr=[uddsketch_state(Int64(128),Float64(0.01),integers.i)] |
|
||||
| | CoalesceBatchesExec: target_batch_size=8192 |
|
||||
| | RepartitionExec: partitioning=Hash([ts@0], 20), input_partitions=20 |
|
||||
| | AggregateExec: mode=Partial, gby=[ts@0 as ts], aggr=[uddsketch_state(Int64(128),Float64(0.01),integers.i)] |
|
||||
| | MergeScanExec: peers=[4402341478400(1025, 0), 4402341478401(1025, 1), 4402341478402(1025, 2), ] |
|
||||
| | |
|
||||
+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|
||||
|
||||
-- hll
|
||||
EXPLAIN SELECT
|
||||
hll(i)
|
||||
FROM
|
||||
integers;
|
||||
|
||||
+---------------+----------------------------------------------------------------------------------------------------------+
|
||||
| plan_type | plan |
|
||||
+---------------+----------------------------------------------------------------------------------------------------------+
|
||||
| logical_plan | Aggregate: groupBy=[[]], aggr=[[hll_merge(hll(integers.i)) AS hll(integers.i)]] |
|
||||
| | MergeScan [is_placeholder=false, input=Aggregate: groupBy=[[]], aggr=[[hll(CAST(integers.i AS Utf8))]] |
|
||||
| | TableScan: integers] |
|
||||
| physical_plan | AggregateExec: mode=Final, gby=[], aggr=[hll(integers.i)] |
|
||||
| | CoalescePartitionsExec |
|
||||
| | AggregateExec: mode=Partial, gby=[], aggr=[hll(integers.i)] |
|
||||
| | MergeScanExec: peers=[4402341478400(1025, 0), 4402341478401(1025, 1), 4402341478402(1025, 2), ] |
|
||||
| | |
|
||||
+---------------+----------------------------------------------------------------------------------------------------------+
|
||||
|
||||
EXPLAIN SELECT
|
||||
ts,
|
||||
hll(i)
|
||||
FROM
|
||||
integers
|
||||
GROUP BY
|
||||
ts;
|
||||
|
||||
+---------------+---------------------------------------------------------------------------------------------------------------------+
|
||||
| plan_type | plan |
|
||||
+---------------+---------------------------------------------------------------------------------------------------------------------+
|
||||
| logical_plan | Aggregate: groupBy=[[integers.ts]], aggr=[[hll_merge(hll(integers.i)) AS hll(integers.i)]] |
|
||||
| | MergeScan [is_placeholder=false, input=Aggregate: groupBy=[[integers.ts]], aggr=[[hll(CAST(integers.i AS Utf8))]] |
|
||||
| | TableScan: integers] |
|
||||
| physical_plan | AggregateExec: mode=FinalPartitioned, gby=[ts@0 as ts], aggr=[hll(integers.i)] |
|
||||
| | CoalesceBatchesExec: target_batch_size=8192 |
|
||||
| | RepartitionExec: partitioning=Hash([ts@0], 20), input_partitions=20 |
|
||||
| | AggregateExec: mode=Partial, gby=[ts@0 as ts], aggr=[hll(integers.i)] |
|
||||
| | MergeScanExec: peers=[4402341478400(1025, 0), 4402341478401(1025, 1), 4402341478402(1025, 2), ] |
|
||||
| | |
|
||||
+---------------+---------------------------------------------------------------------------------------------------------------------+
|
||||
|
||||
EXPLAIN SELECT
|
||||
hll(i)
|
||||
FROM
|
||||
integers
|
||||
GROUP BY
|
||||
ts;
|
||||
|
||||
+---------------+-----------------------------------------------------------------------------------------------------------------------+
|
||||
| plan_type | plan |
|
||||
+---------------+-----------------------------------------------------------------------------------------------------------------------+
|
||||
| logical_plan | Projection: hll(integers.i) |
|
||||
| | Aggregate: groupBy=[[integers.ts]], aggr=[[hll_merge(hll(integers.i)) AS hll(integers.i)]] |
|
||||
| | MergeScan [is_placeholder=false, input=Aggregate: groupBy=[[integers.ts]], aggr=[[hll(CAST(integers.i AS Utf8))]] |
|
||||
| | TableScan: integers] |
|
||||
| physical_plan | ProjectionExec: expr=[hll(integers.i)@1 as hll(integers.i)] |
|
||||
| | AggregateExec: mode=FinalPartitioned, gby=[ts@0 as ts], aggr=[hll(integers.i)] |
|
||||
| | CoalesceBatchesExec: target_batch_size=8192 |
|
||||
| | RepartitionExec: partitioning=Hash([ts@0], 20), input_partitions=20 |
|
||||
| | AggregateExec: mode=Partial, gby=[ts@0 as ts], aggr=[hll(integers.i)] |
|
||||
| | MergeScanExec: peers=[4402341478400(1025, 0), 4402341478401(1025, 1), 4402341478402(1025, 2), ] |
|
||||
| | |
|
||||
+---------------+-----------------------------------------------------------------------------------------------------------------------+
|
||||
|
||||
DROP TABLE integers;
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
144
tests/cases/distributed/explain/step_aggr.sql
Normal file
144
tests/cases/distributed/explain/step_aggr.sql
Normal file
@@ -0,0 +1,144 @@
|
||||
CREATE TABLE integers(
|
||||
host STRING,
|
||||
i BIGINT,
|
||||
ts TIMESTAMP TIME INDEX
|
||||
) PARTITION ON COLUMNS (host) (
|
||||
host < '550-A',
|
||||
host >= '550-A'
|
||||
AND host < '550-W',
|
||||
host >= '550-W'
|
||||
);
|
||||
|
||||
INSERT INTO integers (host, i, ts) VALUES
|
||||
('550-A', 1, '2023-01-01 00:00:00'),
|
||||
('550-A', 2, '2023-01-01 01:00:00'),
|
||||
('550-W', 3, '2023-01-01 02:00:00'),
|
||||
('550-W', 4, '2023-01-01 03:00:00');
|
||||
|
||||
-- count
|
||||
EXPLAIN SELECT
|
||||
count(i)
|
||||
FROM
|
||||
integers;
|
||||
|
||||
EXPLAIN SELECT
|
||||
ts,
|
||||
count(i)
|
||||
FROM
|
||||
integers
|
||||
GROUP BY
|
||||
ts;
|
||||
|
||||
EXPLAIN SELECT
|
||||
count(i)
|
||||
FROM
|
||||
integers
|
||||
GROUP BY
|
||||
ts;
|
||||
|
||||
-- sum
|
||||
EXPLAIN SELECT
|
||||
sum(i)
|
||||
FROM
|
||||
integers;
|
||||
|
||||
EXPLAIN SELECT
|
||||
ts,
|
||||
sum(i)
|
||||
FROM
|
||||
integers
|
||||
GROUP BY
|
||||
ts;
|
||||
|
||||
EXPLAIN SELECT
|
||||
sum(i)
|
||||
FROM
|
||||
integers
|
||||
GROUP BY
|
||||
ts;
|
||||
|
||||
-- min
|
||||
EXPLAIN SELECT
|
||||
min(i)
|
||||
FROM
|
||||
integers;
|
||||
|
||||
EXPLAIN SELECT
|
||||
ts,
|
||||
min(i)
|
||||
FROM
|
||||
integers
|
||||
GROUP BY
|
||||
ts;
|
||||
|
||||
EXPLAIN SELECT
|
||||
min(i)
|
||||
FROM
|
||||
integers
|
||||
GROUP BY
|
||||
ts;
|
||||
|
||||
-- max
|
||||
EXPLAIN SELECT
|
||||
max(i)
|
||||
FROM
|
||||
integers;
|
||||
|
||||
EXPLAIN SELECT
|
||||
ts,
|
||||
max(i)
|
||||
FROM
|
||||
integers
|
||||
GROUP BY
|
||||
ts;
|
||||
|
||||
EXPLAIN SELECT
|
||||
max(i)
|
||||
FROM
|
||||
integers
|
||||
GROUP BY
|
||||
ts;
|
||||
|
||||
-- uddsketch_state
|
||||
EXPLAIN SELECT
|
||||
uddsketch_state(128, 0.01, i)
|
||||
FROM
|
||||
integers;
|
||||
|
||||
EXPLAIN SELECT
|
||||
ts,
|
||||
uddsketch_state(128, 0.01, i)
|
||||
FROM
|
||||
integers
|
||||
GROUP BY
|
||||
ts;
|
||||
|
||||
EXPLAIN SELECT
|
||||
uddsketch_state(128, 0.01, i)
|
||||
FROM
|
||||
integers
|
||||
GROUP BY
|
||||
ts;
|
||||
|
||||
-- hll
|
||||
EXPLAIN SELECT
|
||||
hll(i)
|
||||
FROM
|
||||
integers;
|
||||
|
||||
EXPLAIN SELECT
|
||||
ts,
|
||||
hll(i)
|
||||
FROM
|
||||
integers
|
||||
GROUP BY
|
||||
ts;
|
||||
|
||||
EXPLAIN SELECT
|
||||
hll(i)
|
||||
FROM
|
||||
integers
|
||||
GROUP BY
|
||||
ts;
|
||||
|
||||
DROP TABLE integers;
|
||||
80
tests/cases/distributed/explain/step_aggr_basic.result
Normal file
80
tests/cases/distributed/explain/step_aggr_basic.result
Normal file
@@ -0,0 +1,80 @@
|
||||
CREATE TABLE integers(
|
||||
host STRING,
|
||||
i BIGINT,
|
||||
ts TIMESTAMP TIME INDEX
|
||||
) PARTITION ON COLUMNS (host) (
|
||||
host < '550-A',
|
||||
host >= '550-A'
|
||||
AND host < '550-W',
|
||||
host >= '550-W'
|
||||
);
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
-- count
|
||||
EXPLAIN SELECT
|
||||
count(i)
|
||||
FROM
|
||||
integers;
|
||||
|
||||
+---------------+-------------------------------------------------------------------------------------------------------+
|
||||
| plan_type | plan |
|
||||
+---------------+-------------------------------------------------------------------------------------------------------+
|
||||
| logical_plan | Aggregate: groupBy=[[]], aggr=[[sum(count(integers.i)) AS count(integers.i)]] |
|
||||
| | MergeScan [is_placeholder=false, input=Aggregate: groupBy=[[]], aggr=[[count(integers.i)]] |
|
||||
| | TableScan: integers] |
|
||||
| physical_plan | AggregateExec: mode=Final, gby=[], aggr=[count(integers.i)] |
|
||||
| | CoalescePartitionsExec |
|
||||
| | AggregateExec: mode=Partial, gby=[], aggr=[count(integers.i)] |
|
||||
| | MergeScanExec: peers=[4398046511104(1024, 0), 4398046511105(1024, 1), 4398046511106(1024, 2), ] |
|
||||
| | |
|
||||
+---------------+-------------------------------------------------------------------------------------------------------+
|
||||
|
||||
EXPLAIN SELECT
|
||||
ts,
|
||||
count(i)
|
||||
FROM
|
||||
integers
|
||||
GROUP BY
|
||||
ts;
|
||||
|
||||
+---------------+---------------------------------------------------------------------------------------------------------+
|
||||
| plan_type | plan |
|
||||
+---------------+---------------------------------------------------------------------------------------------------------+
|
||||
| logical_plan | Aggregate: groupBy=[[integers.ts]], aggr=[[sum(count(integers.i)) AS count(integers.i)]] |
|
||||
| | MergeScan [is_placeholder=false, input=Aggregate: groupBy=[[integers.ts]], aggr=[[count(integers.i)]] |
|
||||
| | TableScan: integers] |
|
||||
| physical_plan | AggregateExec: mode=FinalPartitioned, gby=[ts@0 as ts], aggr=[count(integers.i)] |
|
||||
| | CoalesceBatchesExec: target_batch_size=8192 |
|
||||
| | RepartitionExec: partitioning=Hash([ts@0], 20), input_partitions=20 |
|
||||
| | AggregateExec: mode=Partial, gby=[ts@0 as ts], aggr=[count(integers.i)] |
|
||||
| | MergeScanExec: peers=[4398046511104(1024, 0), 4398046511105(1024, 1), 4398046511106(1024, 2), ] |
|
||||
| | |
|
||||
+---------------+---------------------------------------------------------------------------------------------------------+
|
||||
|
||||
EXPLAIN SELECT
|
||||
date_bin('1 hour'::INTERVAL, ts),
|
||||
count(i)
|
||||
FROM
|
||||
integers
|
||||
GROUP BY
|
||||
date_bin('1 hour'::INTERVAL, ts);
|
||||
|
||||
+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|
||||
| plan_type | plan |
|
||||
+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|
||||
| logical_plan | Aggregate: groupBy=[[date_bin(Utf8("1 hour"),integers.ts) AS date_bin(Utf8("1 hour"),integers.ts)]], aggr=[[sum(count(integers.i)) AS count(integers.i)]] |
|
||||
| | MergeScan [is_placeholder=false, input=Aggregate: groupBy=[[date_bin(CAST(Utf8("1 hour") AS Interval(MonthDayNano)), integers.ts)]], aggr=[[count(integers.i)]] |
|
||||
| | TableScan: integers] |
|
||||
| physical_plan | AggregateExec: mode=FinalPartitioned, gby=[date_bin(Utf8("1 hour"),integers.ts)@0 as date_bin(Utf8("1 hour"),integers.ts)], aggr=[count(integers.i)] |
|
||||
| | CoalesceBatchesExec: target_batch_size=8192 |
|
||||
| | RepartitionExec: partitioning=Hash([date_bin(Utf8("1 hour"),integers.ts)@0], 20), input_partitions=20 |
|
||||
| | AggregateExec: mode=Partial, gby=[date_bin(Utf8("1 hour"),integers.ts)@0 as date_bin(Utf8("1 hour"),integers.ts)], aggr=[count(integers.i)] |
|
||||
| | MergeScanExec: peers=[4398046511104(1024, 0), 4398046511105(1024, 1), 4398046511106(1024, 2), ] |
|
||||
| | |
|
||||
+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|
||||
|
||||
DROP TABLE integers;
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
90
tests/cases/distributed/explain/step_aggr_basic.sql
Normal file
90
tests/cases/distributed/explain/step_aggr_basic.sql
Normal file
@@ -0,0 +1,90 @@
|
||||
CREATE TABLE integers(
|
||||
host STRING PRIMARY KEY,
|
||||
i BIGINT,
|
||||
ts TIMESTAMP TIME INDEX
|
||||
) PARTITION ON COLUMNS (host) (
|
||||
host < '550-A',
|
||||
host >= '550-A'
|
||||
AND host < '550-W',
|
||||
host >= '550-W'
|
||||
);
|
||||
|
||||
INSERT INTO integers (host, i, ts) VALUES
|
||||
('550-A', 1, '2023-01-01 00:00:00'),
|
||||
('550-B', 7, '2023-01-01 00:00:00'),
|
||||
('550-A', 2, '2023-01-01 01:00:00'),
|
||||
('550-W', 3, '2023-01-01 02:00:00'),
|
||||
('550-W', 4, '2023-01-01 03:00:00');
|
||||
|
||||
|
||||
-- count
|
||||
SELECT
|
||||
count(i)
|
||||
FROM
|
||||
integers;
|
||||
|
||||
EXPLAIN
|
||||
SELECT
|
||||
count(i)
|
||||
FROM
|
||||
integers;
|
||||
|
||||
EXPLAIN ANALYZE
|
||||
SELECT
|
||||
count(i)
|
||||
FROM
|
||||
integers;
|
||||
|
||||
SELECT
|
||||
ts,
|
||||
count(i)
|
||||
FROM
|
||||
integers
|
||||
GROUP BY
|
||||
ts;
|
||||
|
||||
EXPLAIN
|
||||
SELECT
|
||||
ts,
|
||||
count(i)
|
||||
FROM
|
||||
integers
|
||||
GROUP BY
|
||||
ts;
|
||||
|
||||
EXPLAIN ANALYZE
|
||||
SELECT
|
||||
ts,
|
||||
count(i)
|
||||
FROM
|
||||
integers
|
||||
GROUP BY
|
||||
ts;
|
||||
|
||||
SELECT
|
||||
date_bin('1 hour' :: INTERVAL, ts),
|
||||
count(i)
|
||||
FROM
|
||||
integers
|
||||
GROUP BY
|
||||
date_bin('1 hour' :: INTERVAL, ts);
|
||||
|
||||
EXPLAIN
|
||||
SELECT
|
||||
date_bin('1 hour' :: INTERVAL, ts),
|
||||
count(i)
|
||||
FROM
|
||||
integers
|
||||
GROUP BY
|
||||
date_bin('1 hour' :: INTERVAL, ts);
|
||||
|
||||
EXPLAIN ANALYZE
|
||||
SELECT
|
||||
date_bin('1 hour' :: INTERVAL, ts),
|
||||
count(i)
|
||||
FROM
|
||||
integers
|
||||
GROUP BY
|
||||
date_bin('1 hour' :: INTERVAL, ts);
|
||||
|
||||
DROP TABLE integers;
|
||||
604
tests/cases/distributed/explain/try_analyze.sql
Normal file
604
tests/cases/distributed/explain/try_analyze.sql
Normal file
@@ -0,0 +1,604 @@
|
||||
-- base table, massive row insert to this table
|
||||
CREATE TABLE IF NOT EXISTS base_table (
|
||||
"time" TIMESTAMP(3) NOT NULL,
|
||||
env STRING NULL,
|
||||
service_name STRING NULL,
|
||||
city STRING NULL,
|
||||
page STRING NULL,
|
||||
lcp BIGINT NULL,
|
||||
fmp BIGINT NULL,
|
||||
fcp BIGINT NULL,
|
||||
fp BIGINT NULL,
|
||||
tti BIGINT NULL,
|
||||
fid BIGINT NULL,
|
||||
shard_key BIGINT NULL,
|
||||
TIME INDEX ("time"),
|
||||
PRIMARY KEY (env, service_name)
|
||||
) PARTITION ON COLUMNS (shard_key) (
|
||||
shard_key < 4,
|
||||
shard_key >= 4
|
||||
AND shard_key < 8,
|
||||
shard_key >= 8
|
||||
AND shard_key < 12,
|
||||
shard_key >= 12
|
||||
AND shard_key < 16,
|
||||
shard_key >= 16
|
||||
AND shard_key < 20,
|
||||
shard_key >= 20
|
||||
AND shard_key < 24,
|
||||
shard_key >= 24
|
||||
AND shard_key < 28,
|
||||
shard_key >= 28
|
||||
AND shard_key < 32,
|
||||
shard_key >= 32
|
||||
AND shard_key < 36,
|
||||
shard_key >= 36
|
||||
AND shard_key < 40,
|
||||
shard_key >= 40
|
||||
AND shard_key < 44,
|
||||
shard_key >= 44
|
||||
AND shard_key < 48,
|
||||
shard_key >= 48
|
||||
AND shard_key < 52,
|
||||
shard_key >= 52
|
||||
AND shard_key < 56,
|
||||
shard_key >= 56
|
||||
AND shard_key < 60,
|
||||
shard_key >= 60
|
||||
) ENGINE = mito WITH(
|
||||
'append_mode' = 'true',
|
||||
'compaction.twcs.max_output_file_size' = "2GB",
|
||||
'compaction.twcs.time_window' = "1h",
|
||||
'compaction.type' = "twcs",
|
||||
);
|
||||
|
||||
EXPLAIN SELECT
|
||||
env,
|
||||
service_name,
|
||||
city,
|
||||
page,
|
||||
uddsketch_state(
|
||||
128,
|
||||
0.01,
|
||||
CASE
|
||||
WHEN lcp > 0
|
||||
AND lcp < 3000000 THEN lcp
|
||||
ELSE NULL
|
||||
END
|
||||
) AS lcp_state,
|
||||
max(
|
||||
CASE
|
||||
WHEN lcp > 0
|
||||
AND lcp < 3000000 THEN lcp
|
||||
ELSE NULL
|
||||
END
|
||||
) AS max_lcp,
|
||||
min(
|
||||
CASE
|
||||
WHEN lcp > 0
|
||||
AND lcp < 3000000 THEN lcp
|
||||
ELSE NULL
|
||||
END
|
||||
) AS min_lcp,
|
||||
uddsketch_state(
|
||||
128,
|
||||
0.01,
|
||||
CASE
|
||||
WHEN fmp > 0
|
||||
AND fmp < 3000000 THEN fmp
|
||||
ELSE NULL
|
||||
END
|
||||
) AS fmp_state,
|
||||
max(
|
||||
CASE
|
||||
WHEN fmp > 0
|
||||
AND fmp < 3000000 THEN fmp
|
||||
ELSE NULL
|
||||
END
|
||||
) AS max_fmp,
|
||||
min(
|
||||
CASE
|
||||
WHEN fmp > 0
|
||||
AND fmp < 3000000 THEN fmp
|
||||
ELSE NULL
|
||||
END
|
||||
) AS min_fmp,
|
||||
uddsketch_state(
|
||||
128,
|
||||
0.01,
|
||||
CASE
|
||||
WHEN fcp > 0
|
||||
AND fcp < 3000000 THEN fcp
|
||||
ELSE NULL
|
||||
END
|
||||
) AS fcp_state,
|
||||
max(
|
||||
CASE
|
||||
WHEN fcp > 0
|
||||
AND fcp < 3000000 THEN fcp
|
||||
ELSE NULL
|
||||
END
|
||||
) AS max_fcp,
|
||||
min(
|
||||
CASE
|
||||
WHEN fcp > 0
|
||||
AND fcp < 3000000 THEN fcp
|
||||
ELSE NULL
|
||||
END
|
||||
) AS min_fcp,
|
||||
uddsketch_state(
|
||||
128,
|
||||
0.01,
|
||||
CASE
|
||||
WHEN fp > 0
|
||||
AND fp < 3000000 THEN fp
|
||||
ELSE NULL
|
||||
END
|
||||
) AS fp_state,
|
||||
max(
|
||||
CASE
|
||||
WHEN fp > 0
|
||||
AND fp < 3000000 THEN fp
|
||||
ELSE NULL
|
||||
END
|
||||
) AS max_fp,
|
||||
min(
|
||||
CASE
|
||||
WHEN fp > 0
|
||||
AND fp < 3000000 THEN fp
|
||||
ELSE NULL
|
||||
END
|
||||
) AS min_fp,
|
||||
uddsketch_state(
|
||||
128,
|
||||
0.01,
|
||||
CASE
|
||||
WHEN tti > 0
|
||||
AND tti < 3000000 THEN tti
|
||||
ELSE NULL
|
||||
END
|
||||
) AS tti_state,
|
||||
max(
|
||||
CASE
|
||||
WHEN tti > 0
|
||||
AND tti < 3000000 THEN tti
|
||||
ELSE NULL
|
||||
END
|
||||
) AS max_tti,
|
||||
min(
|
||||
CASE
|
||||
WHEN tti > 0
|
||||
AND tti < 3000000 THEN tti
|
||||
ELSE NULL
|
||||
END
|
||||
) AS min_tti,
|
||||
uddsketch_state(
|
||||
128,
|
||||
0.01,
|
||||
CASE
|
||||
WHEN fid > 0
|
||||
AND fid < 3000000 THEN fid
|
||||
ELSE NULL
|
||||
END
|
||||
) AS fid_state,
|
||||
max(
|
||||
CASE
|
||||
WHEN fid > 0
|
||||
AND fid < 3000000 THEN fid
|
||||
ELSE NULL
|
||||
END
|
||||
) AS max_fid,
|
||||
min(
|
||||
CASE
|
||||
WHEN fid > 0
|
||||
AND fid < 3000000 THEN fid
|
||||
ELSE NULL
|
||||
END
|
||||
) AS min_fid,
|
||||
max(shard_key) AS shard_key,
|
||||
date_bin('60 seconds' :: INTERVAL, time) :: TIMESTAMP(0)
|
||||
FROM
|
||||
base_table
|
||||
WHERE
|
||||
(
|
||||
(
|
||||
lcp > 0
|
||||
AND lcp < 3000000
|
||||
)
|
||||
OR (
|
||||
fmp > 0
|
||||
AND fmp < 3000000
|
||||
)
|
||||
OR (
|
||||
fcp > 0
|
||||
AND fcp < 3000000
|
||||
)
|
||||
OR (
|
||||
fp > 0
|
||||
AND fp < 3000000
|
||||
)
|
||||
OR (
|
||||
tti > 0
|
||||
AND tti < 3000000
|
||||
)
|
||||
OR (
|
||||
fid > 0
|
||||
AND fid < 3000000
|
||||
)
|
||||
) AND time >= now()
|
||||
GROUP BY
|
||||
env,
|
||||
service_name,
|
||||
city,
|
||||
page,
|
||||
date_bin('60 seconds' :: INTERVAL, time) :: TIMESTAMP(0);
|
||||
|
||||
|
||||
EXPLAIN ANALYZE SELECT
|
||||
env,
|
||||
service_name,
|
||||
city,
|
||||
page,
|
||||
uddsketch_state(
|
||||
128,
|
||||
0.01,
|
||||
CASE
|
||||
WHEN lcp > 0
|
||||
AND lcp < 3000000 THEN lcp
|
||||
ELSE NULL
|
||||
END
|
||||
) AS lcp_state,
|
||||
max(
|
||||
CASE
|
||||
WHEN lcp > 0
|
||||
AND lcp < 3000000 THEN lcp
|
||||
ELSE NULL
|
||||
END
|
||||
) AS max_lcp,
|
||||
min(
|
||||
CASE
|
||||
WHEN lcp > 0
|
||||
AND lcp < 3000000 THEN lcp
|
||||
ELSE NULL
|
||||
END
|
||||
) AS min_lcp,
|
||||
uddsketch_state(
|
||||
128,
|
||||
0.01,
|
||||
CASE
|
||||
WHEN fmp > 0
|
||||
AND fmp < 3000000 THEN fmp
|
||||
ELSE NULL
|
||||
END
|
||||
) AS fmp_state,
|
||||
max(
|
||||
CASE
|
||||
WHEN fmp > 0
|
||||
AND fmp < 3000000 THEN fmp
|
||||
ELSE NULL
|
||||
END
|
||||
) AS max_fmp,
|
||||
min(
|
||||
CASE
|
||||
WHEN fmp > 0
|
||||
AND fmp < 3000000 THEN fmp
|
||||
ELSE NULL
|
||||
END
|
||||
) AS min_fmp,
|
||||
uddsketch_state(
|
||||
128,
|
||||
0.01,
|
||||
CASE
|
||||
WHEN fcp > 0
|
||||
AND fcp < 3000000 THEN fcp
|
||||
ELSE NULL
|
||||
END
|
||||
) AS fcp_state,
|
||||
max(
|
||||
CASE
|
||||
WHEN fcp > 0
|
||||
AND fcp < 3000000 THEN fcp
|
||||
ELSE NULL
|
||||
END
|
||||
) AS max_fcp,
|
||||
min(
|
||||
CASE
|
||||
WHEN fcp > 0
|
||||
AND fcp < 3000000 THEN fcp
|
||||
ELSE NULL
|
||||
END
|
||||
) AS min_fcp,
|
||||
uddsketch_state(
|
||||
128,
|
||||
0.01,
|
||||
CASE
|
||||
WHEN fp > 0
|
||||
AND fp < 3000000 THEN fp
|
||||
ELSE NULL
|
||||
END
|
||||
) AS fp_state,
|
||||
max(
|
||||
CASE
|
||||
WHEN fp > 0
|
||||
AND fp < 3000000 THEN fp
|
||||
ELSE NULL
|
||||
END
|
||||
) AS max_fp,
|
||||
min(
|
||||
CASE
|
||||
WHEN fp > 0
|
||||
AND fp < 3000000 THEN fp
|
||||
ELSE NULL
|
||||
END
|
||||
) AS min_fp,
|
||||
uddsketch_state(
|
||||
128,
|
||||
0.01,
|
||||
CASE
|
||||
WHEN tti > 0
|
||||
AND tti < 3000000 THEN tti
|
||||
ELSE NULL
|
||||
END
|
||||
) AS tti_state,
|
||||
max(
|
||||
CASE
|
||||
WHEN tti > 0
|
||||
AND tti < 3000000 THEN tti
|
||||
ELSE NULL
|
||||
END
|
||||
) AS max_tti,
|
||||
min(
|
||||
CASE
|
||||
WHEN tti > 0
|
||||
AND tti < 3000000 THEN tti
|
||||
ELSE NULL
|
||||
END
|
||||
) AS min_tti,
|
||||
uddsketch_state(
|
||||
128,
|
||||
0.01,
|
||||
CASE
|
||||
WHEN fid > 0
|
||||
AND fid < 3000000 THEN fid
|
||||
ELSE NULL
|
||||
END
|
||||
) AS fid_state,
|
||||
max(
|
||||
CASE
|
||||
WHEN fid > 0
|
||||
AND fid < 3000000 THEN fid
|
||||
ELSE NULL
|
||||
END
|
||||
) AS max_fid,
|
||||
min(
|
||||
CASE
|
||||
WHEN fid > 0
|
||||
AND fid < 3000000 THEN fid
|
||||
ELSE NULL
|
||||
END
|
||||
) AS min_fid,
|
||||
max(shard_key) AS shard_key,
|
||||
date_bin('60 seconds' :: INTERVAL, time) :: TIMESTAMP(0)
|
||||
FROM
|
||||
base_table
|
||||
WHERE
|
||||
(
|
||||
(
|
||||
lcp > 0
|
||||
AND lcp < 3000000
|
||||
)
|
||||
OR (
|
||||
fmp > 0
|
||||
AND fmp < 3000000
|
||||
)
|
||||
OR (
|
||||
fcp > 0
|
||||
AND fcp < 3000000
|
||||
)
|
||||
OR (
|
||||
fp > 0
|
||||
AND fp < 3000000
|
||||
)
|
||||
OR (
|
||||
tti > 0
|
||||
AND tti < 3000000
|
||||
)
|
||||
OR (
|
||||
fid > 0
|
||||
AND fid < 3000000
|
||||
)
|
||||
) AND time >= now()
|
||||
GROUP BY
|
||||
env,
|
||||
service_name,
|
||||
city,
|
||||
page,
|
||||
date_bin('60 seconds' :: INTERVAL, time) :: TIMESTAMP(0);
|
||||
|
||||
EXPLAIN SELECT
|
||||
env,
|
||||
service_name,
|
||||
city,
|
||||
page,
|
||||
uddsketch_state(
|
||||
128,
|
||||
0.01,
|
||||
CASE
|
||||
WHEN lcp > 0
|
||||
AND lcp < 3000000 THEN lcp
|
||||
ELSE NULL
|
||||
END
|
||||
) AS lcp_state,
|
||||
max(
|
||||
CASE
|
||||
WHEN lcp > 0
|
||||
AND lcp < 3000000 THEN lcp
|
||||
ELSE NULL
|
||||
END
|
||||
) AS max_lcp,
|
||||
min(
|
||||
CASE
|
||||
WHEN lcp > 0
|
||||
AND lcp < 3000000 THEN lcp
|
||||
ELSE NULL
|
||||
END
|
||||
) AS min_lcp,
|
||||
uddsketch_state(
|
||||
128,
|
||||
0.01,
|
||||
CASE
|
||||
WHEN fmp > 0
|
||||
AND fmp < 3000000 THEN fmp
|
||||
ELSE NULL
|
||||
END
|
||||
) AS fmp_state,
|
||||
max(
|
||||
CASE
|
||||
WHEN fmp > 0
|
||||
AND fmp < 3000000 THEN fmp
|
||||
ELSE NULL
|
||||
END
|
||||
) AS max_fmp,
|
||||
min(
|
||||
CASE
|
||||
WHEN fmp > 0
|
||||
AND fmp < 3000000 THEN fmp
|
||||
ELSE NULL
|
||||
END
|
||||
) AS min_fmp,
|
||||
uddsketch_state(
|
||||
128,
|
||||
0.01,
|
||||
CASE
|
||||
WHEN fcp > 0
|
||||
AND fcp < 3000000 THEN fcp
|
||||
ELSE NULL
|
||||
END
|
||||
) AS fcp_state,
|
||||
max(
|
||||
CASE
|
||||
WHEN fcp > 0
|
||||
AND fcp < 3000000 THEN fcp
|
||||
ELSE NULL
|
||||
END
|
||||
) AS max_fcp,
|
||||
min(
|
||||
CASE
|
||||
WHEN fcp > 0
|
||||
AND fcp < 3000000 THEN fcp
|
||||
ELSE NULL
|
||||
END
|
||||
) AS min_fcp,
|
||||
uddsketch_state(
|
||||
128,
|
||||
0.01,
|
||||
CASE
|
||||
WHEN fp > 0
|
||||
AND fp < 3000000 THEN fp
|
||||
ELSE NULL
|
||||
END
|
||||
) AS fp_state,
|
||||
max(
|
||||
CASE
|
||||
WHEN fp > 0
|
||||
AND fp < 3000000 THEN fp
|
||||
ELSE NULL
|
||||
END
|
||||
) AS max_fp,
|
||||
min(
|
||||
CASE
|
||||
WHEN fp > 0
|
||||
AND fp < 3000000 THEN fp
|
||||
ELSE NULL
|
||||
END
|
||||
) AS min_fp,
|
||||
uddsketch_state(
|
||||
128,
|
||||
0.01,
|
||||
CASE
|
||||
WHEN tti > 0
|
||||
AND tti < 3000000 THEN tti
|
||||
ELSE NULL
|
||||
END
|
||||
) AS tti_state,
|
||||
max(
|
||||
CASE
|
||||
WHEN tti > 0
|
||||
AND tti < 3000000 THEN tti
|
||||
ELSE NULL
|
||||
END
|
||||
) AS max_tti,
|
||||
min(
|
||||
CASE
|
||||
WHEN tti > 0
|
||||
AND tti < 3000000 THEN tti
|
||||
ELSE NULL
|
||||
END
|
||||
) AS min_tti,
|
||||
uddsketch_state(
|
||||
128,
|
||||
0.01,
|
||||
CASE
|
||||
WHEN fid > 0
|
||||
AND fid < 3000000 THEN fid
|
||||
ELSE NULL
|
||||
END
|
||||
) AS fid_state,
|
||||
max(
|
||||
CASE
|
||||
WHEN fid > 0
|
||||
AND fid < 3000000 THEN fid
|
||||
ELSE NULL
|
||||
END
|
||||
) AS max_fid,
|
||||
min(
|
||||
CASE
|
||||
WHEN fid > 0
|
||||
AND fid < 3000000 THEN fid
|
||||
ELSE NULL
|
||||
END
|
||||
) AS min_fid,
|
||||
max(shard_key) AS shard_key,
|
||||
date_bin('60 seconds' :: INTERVAL, time) :: TIMESTAMP(0)
|
||||
FROM
|
||||
base_table
|
||||
WHERE
|
||||
(
|
||||
(
|
||||
lcp > 0
|
||||
AND lcp < 3000000
|
||||
)
|
||||
OR (
|
||||
fmp > 0
|
||||
AND fmp < 3000000
|
||||
)
|
||||
OR (
|
||||
fcp > 0
|
||||
AND fcp < 3000000
|
||||
)
|
||||
OR (
|
||||
fp > 0
|
||||
AND fp < 3000000
|
||||
)
|
||||
OR (
|
||||
tti > 0
|
||||
AND tti < 3000000
|
||||
)
|
||||
OR (
|
||||
fid > 0
|
||||
AND fid < 3000000
|
||||
)
|
||||
) AND time >= now()
|
||||
GROUP BY
|
||||
env,
|
||||
service_name,
|
||||
city,
|
||||
page,
|
||||
date_bin('60 seconds' :: INTERVAL, time) :: TIMESTAMP(0);
|
||||
|
||||
EXPLAIN SELECT count(*) from base_table where time >= now();
|
||||
|
||||
-- ERROR: Internal error: 1003
|
||||
EXPLAIN ANALYZE SELECT count(*) from base_table where time >= now();
|
||||
|
||||
DROP TABLE base_table;
|
||||
Reference in New Issue
Block a user