fix: aggr group by all partition cols use partial commutative (#6534)

* fix: aggr group by all partition cols use partial commutative

Signed-off-by: discord9 <discord9@163.com>

* test: bugged case

Signed-off-by: discord9 <discord9@163.com>

* test: sqlness fix

Signed-off-by: discord9 <discord9@163.com>

* test: more redacted

Signed-off-by: discord9 <discord9@163.com>

* more cases

Signed-off-by: discord9 <discord9@163.com>

* even more test cases

Signed-off-by: discord9 <discord9@163.com>

* join testcase

Signed-off-by: discord9 <discord9@163.com>

* fix: column requirement added in correct location

Signed-off-by: discord9 <discord9@163.com>

* fix test

Signed-off-by: discord9 <discord9@163.com>

* chore: clippy

Signed-off-by: discord9 <discord9@163.com>

* track col reqs per stack

Signed-off-by: discord9 <discord9@163.com>

* fix: continue

Signed-off-by: discord9 <discord9@163.com>

* chore: clippy

Signed-off-by: discord9 <discord9@163.com>

* refactor: test mod

Signed-off-by: discord9 <discord9@163.com>

* test utils

Signed-off-by: discord9 <discord9@163.com>

* test: better test

Signed-off-by: discord9 <discord9@163.com>

* more testcases

Signed-off-by: discord9 <discord9@163.com>

* test limit push down

Signed-off-by: discord9 <discord9@163.com>

* more testcases

Signed-off-by: discord9 <discord9@163.com>

* more testcase

Signed-off-by: discord9 <discord9@163.com>

* more test

Signed-off-by: discord9 <discord9@163.com>

* chore: update sqlness

Signed-off-by: discord9 <discord9@163.com>

* chore: update commnets

Signed-off-by: discord9 <discord9@163.com>

* fix: check col reqs from bottom to upper

Signed-off-by: discord9 <discord9@163.com>

* chore: more comment

Signed-off-by: discord9 <discord9@163.com>

* docs: more todo

Signed-off-by: discord9 <discord9@163.com>

* chore: comments

Signed-off-by: discord9 <discord9@163.com>

* test: a new failing test that should be fixed

Signed-off-by: discord9 <discord9@163.com>

* fix: part col alias tracking

Signed-off-by: discord9 <discord9@163.com>

* chore: unused

Signed-off-by: discord9 <discord9@163.com>

* chore: clippy

Signed-off-by: discord9 <discord9@163.com>

* docs: comment

Signed-off-by: discord9 <discord9@163.com>

* mroe testcase

Signed-off-by: discord9 <discord9@163.com>

* more testcase for step/part aggr combine

Signed-off-by: discord9 <discord9@163.com>

* FIXME: a new bug

Signed-off-by: discord9 <discord9@163.com>

* literally unfixable

Signed-off-by: discord9 <discord9@163.com>

* chore: remove some debug print

Signed-off-by: discord9 <discord9@163.com>

---------

Signed-off-by: discord9 <discord9@163.com>
This commit is contained in:
discord9
2025-07-23 17:05:34 +08:00
committed by GitHub
parent 2298227e0c
commit 8d71213ea6
7 changed files with 3151 additions and 226 deletions

View File

@@ -237,7 +237,8 @@ fn create_output_batch(
for (node, metric) in sub_stage_metrics.into_iter().enumerate() {
builder.append_metric(1, node as _, metrics_to_string(metric, format)?);
}
return Ok(TreeNodeRecursion::Stop);
// might have multiple merge scans, so continue
return Ok(TreeNodeRecursion::Continue);
}
Ok(TreeNodeRecursion::Continue)
})?;

View File

@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashSet;
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use common_telemetry::debug;
@@ -38,6 +38,13 @@ use crate::dist_plan::merge_scan::MergeScanLogicalPlan;
use crate::plan::ExtractExpr;
use crate::query_engine::DefaultSerializer;
#[cfg(test)]
mod test;
mod utils;
pub(crate) use utils::{AliasMapping, AliasTracker};
#[derive(Debug)]
pub struct DistPlannerAnalyzer;
@@ -154,7 +161,33 @@ struct PlanRewriter {
status: RewriterStatus,
/// Partition columns of the table in current pass
partition_cols: Option<Vec<String>>,
column_requirements: HashSet<Column>,
alias_tracker: Option<AliasTracker>,
/// use stack count as scope to determine column requirements is needed or not
/// i.e for a logical plan like:
/// ```ignore
/// 1: Projection: t.number
/// 2: Sort: t.pk1+t.pk2
/// 3. Projection: t.number, t.pk1, t.pk2
/// ```
/// `Sort` will make a column requirement for `t.pk1` at level 2.
/// Which making `Projection` at level 1 need to add a ref to `t.pk1` as well.
/// So that the expanded plan will be
/// ```ignore
/// Projection: t.number
/// MergeSort: t.pk1
/// MergeScan: remote_input=
/// Projection: t.number, "t.pk1+t.pk2" <--- the original `Projection` at level 1 get added with `t.pk1+t.pk2`
/// Sort: t.pk1+t.pk2
/// Projection: t.number, t.pk1, t.pk2
/// ```
/// Making `MergeSort` can have `t.pk1` as input.
/// Meanwhile `Projection` at level 3 doesn't need to add any new column because 3 > 2
/// and col requirements at level 2 is not applicable for level 3.
///
/// see more details in test `expand_proj_step_aggr` and `expand_proj_sort_proj`
///
/// TODO(discord9): a simpler solution to track column requirements for merge scan
column_requirements: Vec<(HashSet<Column>, usize)>,
/// Whether to expand on next call
/// This is used to handle the case where a plan is transformed, but need to be expanded from it's
/// parent node. For example a Aggregate plan is split into two parts in frontend and datanode, and need
@@ -164,7 +197,7 @@ struct PlanRewriter {
/// This is used to handle the case where a plan is transformed, but still
/// need to push down as many node as possible before next partial/conditional/transformed commutative
/// plan. I.e.
/// ```
/// ```ignore
/// Limit:
/// Sort:
/// ```
@@ -187,6 +220,15 @@ impl PlanRewriter {
/// Return true if should stop and expand. The input plan is the parent node of current node
fn should_expand(&mut self, plan: &LogicalPlan) -> bool {
debug!(
"Check should_expand at level: {} with Stack:\n{}, ",
self.level,
self.stack
.iter()
.map(|(p, l)| format!("{l}:{}{}", " ".repeat(l - 1), p.display()))
.collect::<Vec<String>>()
.join("\n"),
);
if DFLogicalSubstraitConvertor
.encode(plan, DefaultSerializer)
.is_err()
@@ -200,18 +242,21 @@ impl PlanRewriter {
}
if self.expand_on_next_part_cond_trans_commutative {
let comm = Categorizer::check_plan(plan, self.partition_cols.clone());
let comm = Categorizer::check_plan(plan, self.get_aliased_partition_columns());
match comm {
Commutativity::PartialCommutative => {
// a small difference is that for partial commutative, we still need to
// expand on next call(so `Limit` can be pushed down)
// push down it(so `Limit` can be pushed down)
// notice how limit needed to be expanded as well to make sure query is correct
// i.e. `Limit fetch=10` need to be pushed down to the leaf node
self.expand_on_next_part_cond_trans_commutative = false;
self.expand_on_next_call = true;
}
Commutativity::ConditionalCommutative(_)
| Commutativity::TransformedCommutative { .. } => {
// for conditional commutative and transformed commutative, we can
// expand now
// again a new node that can be push down, we should just
// do push down now and avoid further expansion
self.expand_on_next_part_cond_trans_commutative = false;
return true;
}
@@ -219,11 +264,12 @@ impl PlanRewriter {
}
}
match Categorizer::check_plan(plan, self.partition_cols.clone()) {
match Categorizer::check_plan(plan, self.get_aliased_partition_columns()) {
Commutativity::Commutative => {}
Commutativity::PartialCommutative => {
if let Some(plan) = partial_commutative_transformer(plan) {
self.update_column_requirements(&plan);
// notice this plan is parent of current node, so `self.level - 1` when updating column requirements
self.update_column_requirements(&plan, self.level - 1);
self.expand_on_next_part_cond_trans_commutative = true;
self.stage.push(plan)
}
@@ -232,7 +278,8 @@ impl PlanRewriter {
if let Some(transformer) = transformer
&& let Some(plan) = transformer(plan)
{
self.update_column_requirements(&plan);
// notice this plan is parent of current node, so `self.level - 1` when updating column requirements
self.update_column_requirements(&plan, self.level - 1);
self.expand_on_next_part_cond_trans_commutative = true;
self.stage.push(plan)
}
@@ -242,12 +289,22 @@ impl PlanRewriter {
&& let Some(transformer_actions) = transformer(plan)
{
debug!(
"PlanRewriter: transformed plan: {:?}\n from {plan}",
transformer_actions.extra_parent_plans
"PlanRewriter: transformed plan: {}\n from {plan}",
transformer_actions
.extra_parent_plans
.iter()
.enumerate()
.map(|(i, p)| format!(
"Extra {i}-th parent plan from parent to child = {}",
p.display()
))
.collect::<Vec<_>>()
.join("\n")
);
if let Some(last_stage) = transformer_actions.extra_parent_plans.last() {
// update the column requirements from the last stage
self.update_column_requirements(last_stage);
// notice current plan's parent plan is where we need to apply the column requirements
self.update_column_requirements(last_stage, self.level - 1);
}
self.stage
.extend(transformer_actions.extra_parent_plans.into_iter().rev());
@@ -265,9 +322,12 @@ impl PlanRewriter {
false
}
fn update_column_requirements(&mut self, plan: &LogicalPlan) {
/// Update the column requirements for the current plan, plan_level is the level of the plan
/// in the stack, which is used to determine if the column requirements are applicable
/// for other plans in the stack.
fn update_column_requirements(&mut self, plan: &LogicalPlan, plan_level: usize) {
debug!(
"PlanRewriter: update column requirements for plan: {plan}\n withcolumn_requirements: {:?}",
"PlanRewriter: update column requirements for plan: {plan}\n with old column_requirements: {:?}",
self.column_requirements
);
let mut container = HashSet::new();
@@ -276,9 +336,7 @@ impl PlanRewriter {
let _ = expr_to_columns(&expr, &mut container);
}
for col in container {
self.column_requirements.insert(col);
}
self.column_requirements.push((container, plan_level));
debug!(
"PlanRewriter: updated column requirements: {:?}",
self.column_requirements
@@ -297,6 +355,45 @@ impl PlanRewriter {
self.status = RewriterStatus::Unexpanded;
}
/// Maybe update alias for original table columns in the plan
fn maybe_update_alias(&mut self, node: &LogicalPlan) {
if let Some(alias_tracker) = &mut self.alias_tracker {
alias_tracker.update_alias(node);
debug!(
"Current partition columns are: {:?}",
self.get_aliased_partition_columns()
);
} else if let LogicalPlan::TableScan(table_scan) = node {
self.alias_tracker = AliasTracker::new(table_scan);
debug!(
"Initialize partition columns: {:?} with table={}",
self.get_aliased_partition_columns(),
table_scan.table_name
);
}
}
fn get_aliased_partition_columns(&self) -> Option<AliasMapping> {
if let Some(part_cols) = self.partition_cols.as_ref() {
let Some(alias_tracker) = &self.alias_tracker else {
// no alias tracker meaning no table scan encountered
return None;
};
let mut aliased = HashMap::new();
for part_col in part_cols {
let all_alias = alias_tracker
.get_all_alias_for_col(part_col)
.cloned()
.unwrap_or_default();
aliased.insert(part_col.clone(), all_alias);
}
Some(aliased)
} else {
None
}
}
fn maybe_set_partitions(&mut self, plan: &LogicalPlan) {
if self.partition_cols.is_some() {
// only need to set once
@@ -342,10 +439,15 @@ impl PlanRewriter {
}
// store schema before expand
let schema = on_node.schema().clone();
let mut rewriter = EnforceDistRequirementRewriter {
column_requirements: std::mem::take(&mut self.column_requirements),
};
let mut rewriter = EnforceDistRequirementRewriter::new(
std::mem::take(&mut self.column_requirements),
self.level,
);
debug!("PlanRewriter: enforce column requirements for node: {on_node} with rewriter: {rewriter:?}");
on_node = on_node.rewrite(&mut rewriter)?.data;
debug!(
"PlanRewriter: after enforced column requirements for node: {on_node} with rewriter: {rewriter:?}"
);
// add merge scan as the new root
let mut node = MergeScanLogicalPlan::new(
@@ -364,7 +466,8 @@ impl PlanRewriter {
}
self.set_expanded();
// recover the schema
// recover the schema, this make sure after expand the schema is the same as old node
// because after expand the raw top node might have extra columns i.e. sorting columns for `Sort` node
let node = LogicalPlanBuilder::from(node)
.project(schema.iter().map(|(qualifier, field)| {
Expr::Column(Column::new(qualifier.cloned(), field.name()))
@@ -381,42 +484,96 @@ impl PlanRewriter {
/// Requirements enforced by this rewriter:
/// - Enforce column requirements for `LogicalPlan::Projection` nodes. Makes sure the
/// required columns are available in the sub plan.
///
#[derive(Debug)]
struct EnforceDistRequirementRewriter {
column_requirements: HashSet<Column>,
/// only enforce column requirements after the expanding node in question,
/// meaning only for node with `cur_level` <= `level` will consider adding those column requirements
/// TODO(discord9): a simpler solution to track column requirements for merge scan
column_requirements: Vec<(HashSet<Column>, usize)>,
/// only apply column requirements >= `cur_level`
/// this is used to avoid applying column requirements that are not needed
/// for the current node, i.e. the node is not in the scope of the column requirements
/// i.e, for this plan:
/// ```ignore
/// Aggregate: min(t.number)
/// Projection: t.number
/// ```
/// when on `Projection` node, we don't need to apply the column requirements of `Aggregate` node
/// because the `Projection` node is not in the scope of the `Aggregate` node
cur_level: usize,
}
impl EnforceDistRequirementRewriter {
fn new(column_requirements: Vec<(HashSet<Column>, usize)>, cur_level: usize) -> Self {
Self {
column_requirements,
cur_level,
}
}
}
impl TreeNodeRewriter for EnforceDistRequirementRewriter {
type Node = LogicalPlan;
fn f_down(&mut self, node: Self::Node) -> DfResult<Transformed<Self::Node>> {
if let LogicalPlan::Projection(ref projection) = node {
let mut column_requirements = std::mem::take(&mut self.column_requirements);
if column_requirements.is_empty() {
return Ok(Transformed::no(node));
}
for expr in &projection.expr {
let (qualifier, name) = expr.qualified_name();
let column = Column::new(qualifier, name);
column_requirements.remove(&column);
}
if column_requirements.is_empty() {
return Ok(Transformed::no(node));
}
let mut new_exprs = projection.expr.clone();
for col in &column_requirements {
new_exprs.push(Expr::Column(col.clone()));
}
let new_node =
node.with_new_exprs(new_exprs, node.inputs().into_iter().cloned().collect())?;
return Ok(Transformed::yes(new_node));
// check that node doesn't have multiple children, i.e. join/subquery
if node.inputs().len() > 1 {
return Err(datafusion_common::DataFusionError::Internal(
"EnforceDistRequirementRewriter: node with multiple inputs is not supported"
.to_string(),
));
}
self.cur_level += 1;
Ok(Transformed::no(node))
}
fn f_up(&mut self, node: Self::Node) -> DfResult<Transformed<Self::Node>> {
self.cur_level -= 1;
// first get all applicable column requirements
let mut applicable_column_requirements = self
.column_requirements
.iter()
.filter(|(_, level)| *level >= self.cur_level)
.map(|(cols, _)| cols.clone())
.reduce(|mut acc, cols| {
acc.extend(cols);
acc
})
.unwrap_or_default();
debug!(
"EnforceDistRequirementRewriter: applicable column requirements at level {} = {:?} for node {}",
self.cur_level,
applicable_column_requirements,
node.display()
);
// make sure all projection applicable scope has the required columns
if let LogicalPlan::Projection(ref projection) = node {
for expr in &projection.expr {
let (qualifier, name) = expr.qualified_name();
let column = Column::new(qualifier, name);
applicable_column_requirements.remove(&column);
}
if applicable_column_requirements.is_empty() {
return Ok(Transformed::no(node));
}
let mut new_exprs = projection.expr.clone();
for col in &applicable_column_requirements {
new_exprs.push(Expr::Column(col.clone()));
}
let new_node =
node.with_new_exprs(new_exprs, node.inputs().into_iter().cloned().collect())?;
debug!(
"EnforceDistRequirementRewriter: added missing columns {:?} to projection node from old node: \n{node}\n Making new node: \n{new_node}",
applicable_column_requirements
);
// still need to continue for next projection if applicable
return Ok(Transformed::yes(new_node));
}
Ok(Transformed::no(node))
}
}
@@ -432,6 +589,7 @@ impl TreeNodeRewriter for PlanRewriter {
self.stage.clear();
self.set_unexpanded();
self.partition_cols = None;
self.alias_tracker = None;
Ok(Transformed::no(node))
}
@@ -454,8 +612,19 @@ impl TreeNodeRewriter for PlanRewriter {
self.maybe_set_partitions(&node);
self.maybe_update_alias(&node);
let Some(parent) = self.get_parent() else {
let node = self.expand(node)?;
debug!("Plan Rewriter: expand now for no parent found for node: {node}");
let node = self.expand(node);
debug!(
"PlanRewriter: expanded plan: {}",
match &node {
Ok(n) => n.to_string(),
Err(e) => format!("Error expanding plan: {e}"),
}
);
let node = node?;
self.pop_stack();
return Ok(Transformed::yes(node));
};
@@ -483,168 +652,3 @@ impl TreeNodeRewriter for PlanRewriter {
Ok(Transformed::no(node))
}
}
#[cfg(test)]
mod test {
use std::sync::Arc;
use datafusion::datasource::DefaultTableSource;
use datafusion::functions_aggregate::expr_fn::avg;
use datafusion_common::JoinType;
use datafusion_expr::{col, lit, Expr, LogicalPlanBuilder};
use table::table::adapter::DfTableProviderAdapter;
use table::table::numbers::NumbersTable;
use super::*;
#[ignore = "Projection is disabled for https://github.com/apache/arrow-datafusion/issues/6489"]
#[test]
fn transform_simple_projection_filter() {
let numbers_table = NumbersTable::table(0);
let table_source = Arc::new(DefaultTableSource::new(Arc::new(
DfTableProviderAdapter::new(numbers_table),
)));
let plan = LogicalPlanBuilder::scan_with_filters("t", table_source, None, vec![])
.unwrap()
.filter(col("number").lt(lit(10)))
.unwrap()
.project(vec![col("number")])
.unwrap()
.distinct()
.unwrap()
.build()
.unwrap();
let config = ConfigOptions::default();
let result = DistPlannerAnalyzer {}.analyze(plan, &config).unwrap();
let expected = [
"Distinct:",
" MergeScan [is_placeholder=false]",
" Distinct:",
" Projection: t.number",
" Filter: t.number < Int32(10)",
" TableScan: t",
]
.join("\n");
assert_eq!(expected, result.to_string());
}
#[test]
fn transform_aggregator() {
let numbers_table = NumbersTable::table(0);
let table_source = Arc::new(DefaultTableSource::new(Arc::new(
DfTableProviderAdapter::new(numbers_table),
)));
let plan = LogicalPlanBuilder::scan_with_filters("t", table_source, None, vec![])
.unwrap()
.aggregate(Vec::<Expr>::new(), vec![avg(col("number"))])
.unwrap()
.build()
.unwrap();
let config = ConfigOptions::default();
let result = DistPlannerAnalyzer {}.analyze(plan, &config).unwrap();
let expected = "Projection: avg(t.number)\
\n MergeScan [is_placeholder=false, remote_input=[\nAggregate: groupBy=[[]], aggr=[[avg(t.number)]]\n TableScan: t\n]]";
assert_eq!(expected, result.to_string());
}
#[test]
fn transform_distinct_order() {
let numbers_table = NumbersTable::table(0);
let table_source = Arc::new(DefaultTableSource::new(Arc::new(
DfTableProviderAdapter::new(numbers_table),
)));
let plan = LogicalPlanBuilder::scan_with_filters("t", table_source, None, vec![])
.unwrap()
.distinct()
.unwrap()
.sort(vec![col("number").sort(true, false)])
.unwrap()
.build()
.unwrap();
let config = ConfigOptions::default();
let result = DistPlannerAnalyzer {}.analyze(plan, &config).unwrap();
let expected = [
"Projection: t.number",
" MergeScan [is_placeholder=false, remote_input=[",
"Sort: t.number ASC NULLS LAST",
" Distinct:",
" TableScan: t",
"]]",
]
.join("\n");
assert_eq!(expected, result.to_string());
}
#[test]
fn transform_single_limit() {
let numbers_table = NumbersTable::table(0);
let table_source = Arc::new(DefaultTableSource::new(Arc::new(
DfTableProviderAdapter::new(numbers_table),
)));
let plan = LogicalPlanBuilder::scan_with_filters("t", table_source, None, vec![])
.unwrap()
.limit(0, Some(1))
.unwrap()
.build()
.unwrap();
let config = ConfigOptions::default();
let result = DistPlannerAnalyzer {}.analyze(plan, &config).unwrap();
let expected = "Projection: t.number\
\n MergeScan [is_placeholder=false, remote_input=[\nLimit: skip=0, fetch=1\n TableScan: t\n]]";
assert_eq!(expected, result.to_string());
}
#[test]
fn transform_unalighed_join_with_alias() {
let left = NumbersTable::table(0);
let right = NumbersTable::table(1);
let left_source = Arc::new(DefaultTableSource::new(Arc::new(
DfTableProviderAdapter::new(left),
)));
let right_source = Arc::new(DefaultTableSource::new(Arc::new(
DfTableProviderAdapter::new(right),
)));
let right_plan = LogicalPlanBuilder::scan_with_filters("t", right_source, None, vec![])
.unwrap()
.alias("right")
.unwrap()
.build()
.unwrap();
let plan = LogicalPlanBuilder::scan_with_filters("t", left_source, None, vec![])
.unwrap()
.join_on(
right_plan,
JoinType::LeftSemi,
vec![col("t.number").eq(col("right.number"))],
)
.unwrap()
.limit(0, Some(1))
.unwrap()
.build()
.unwrap();
let config = ConfigOptions::default();
let result = DistPlannerAnalyzer {}.analyze(plan, &config).unwrap();
let expected = [
"Limit: skip=0, fetch=1",
" LeftSemi Join: Filter: t.number = right.number",
" Projection: t.number",
" MergeScan [is_placeholder=false, remote_input=[\nTableScan: t\n]]",
" SubqueryAlias: right",
" Projection: t.number",
" MergeScan [is_placeholder=false, remote_input=[\nTableScan: t\n]]",
]
.join("\n");
assert_eq!(expected, result.to_string());
}
}

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,318 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::{HashMap, HashSet};
use datafusion::datasource::DefaultTableSource;
use datafusion_common::Column;
use datafusion_expr::{Expr, LogicalPlan, TableScan};
use table::metadata::TableType;
use table::table::adapter::DfTableProviderAdapter;
/// Mapping of original column in table to all the alias at current node
pub type AliasMapping = HashMap<String, HashSet<Column>>;
/// tracking aliases for the source table columns in the plan
#[derive(Debug, Clone)]
pub struct AliasTracker {
/// mapping from the original table name to the alias used in the plan
/// notice how one column might have multiple aliases in the plan
///
pub mapping: AliasMapping,
}
impl AliasTracker {
pub fn new(table_scan: &TableScan) -> Option<Self> {
if let Some(source) = table_scan
.source
.as_any()
.downcast_ref::<DefaultTableSource>()
{
if let Some(provider) = source
.table_provider
.as_any()
.downcast_ref::<DfTableProviderAdapter>()
{
if provider.table().table_type() == TableType::Base {
let info = provider.table().table_info();
let schema = info.meta.schema.clone();
let col_schema = schema.column_schemas();
let mapping = col_schema
.iter()
.map(|col| {
(
col.name.clone(),
HashSet::from_iter(std::iter::once(Column::new_unqualified(
col.name.clone(),
))),
)
})
.collect();
return Some(Self { mapping });
}
}
}
None
}
/// update alias for original columns
///
/// only handle `Alias` with column in `Projection` node
pub fn update_alias(&mut self, node: &LogicalPlan) {
if let LogicalPlan::Projection(projection) = node {
// first collect all the alias mapping, i.e. the col_a AS b AS c AS d become `a->d`
// notice one column might have multiple aliases
let mut alias_mapping: AliasMapping = HashMap::new();
for expr in &projection.expr {
if let Expr::Alias(alias) = expr {
let outer_alias = alias.clone();
let mut cur_alias = alias.clone();
while let Expr::Alias(alias) = *cur_alias.expr {
cur_alias = alias;
}
if let Expr::Column(column) = *cur_alias.expr {
alias_mapping
.entry(column.name.clone())
.or_default()
.insert(Column::new(outer_alias.relation, outer_alias.name));
}
} else if let Expr::Column(column) = expr {
// identity mapping
alias_mapping
.entry(column.name.clone())
.or_default()
.insert(column.clone());
}
}
// update mapping using `alias_mapping`
let mut new_mapping = HashMap::new();
for (table_col_name, cur_columns) in std::mem::take(&mut self.mapping) {
let new_aliases = {
let mut new_aliases = HashSet::new();
for cur_column in &cur_columns {
let new_alias_for_cur_column = alias_mapping
.get(cur_column.name())
.cloned()
.unwrap_or_default();
for new_alias in new_alias_for_cur_column {
let is_table_ref_eq = match (&new_alias.relation, &cur_column.relation)
{
(Some(o), Some(c)) => o.resolved_eq(c),
_ => true,
};
// is the same column if both name and table ref is eq
if is_table_ref_eq {
new_aliases.insert(new_alias.clone());
}
}
}
new_aliases
};
new_mapping.insert(table_col_name, new_aliases);
}
self.mapping = new_mapping;
common_telemetry::debug!(
"Updating alias tracker to {:?} using node: \n{node}",
self.mapping
);
}
}
pub fn get_all_alias_for_col(&self, col_name: &str) -> Option<&HashSet<Column>> {
self.mapping.get(col_name)
}
#[allow(unused)]
pub fn is_alias_for(&self, original_col: &str, cur_col: &Column) -> bool {
self.mapping
.get(original_col)
.map(|cols| cols.contains(cur_col))
.unwrap_or(false)
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use common_telemetry::init_default_ut_logging;
use datafusion::error::Result as DfResult;
use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion, TreeNodeVisitor};
use datafusion_expr::{col, LogicalPlanBuilder};
use super::*;
use crate::dist_plan::analyzer::test::TestTable;
#[derive(Debug)]
struct TrackerTester {
alias_tracker: Option<AliasTracker>,
mapping_at_each_level: Vec<AliasMapping>,
}
impl TreeNodeVisitor<'_> for TrackerTester {
type Node = LogicalPlan;
fn f_up(&mut self, node: &LogicalPlan) -> DfResult<TreeNodeRecursion> {
if let Some(alias_tracker) = &mut self.alias_tracker {
alias_tracker.update_alias(node);
self.mapping_at_each_level.push(
self.alias_tracker
.as_ref()
.map(|a| a.mapping.clone())
.unwrap_or_default()
.clone(),
);
} else if let LogicalPlan::TableScan(table_scan) = node {
self.alias_tracker = AliasTracker::new(table_scan);
self.mapping_at_each_level.push(
self.alias_tracker
.as_ref()
.map(|a| a.mapping.clone())
.unwrap_or_default()
.clone(),
);
}
Ok(TreeNodeRecursion::Continue)
}
}
#[test]
fn proj_alias_tracker() {
// use logging for better debugging
init_default_ut_logging();
let test_table = TestTable::table_with_name(0, "numbers".to_string());
let table_source = Arc::new(DefaultTableSource::new(Arc::new(
DfTableProviderAdapter::new(test_table),
)));
let plan = LogicalPlanBuilder::scan_with_filters("t", table_source, None, vec![])
.unwrap()
.project(vec![
col("number"),
col("pk3").alias("pk1"),
col("pk2").alias("pk3"),
])
.unwrap()
.project(vec![
col("number"),
col("pk1").alias("pk2"),
col("pk3").alias("pk1"),
])
.unwrap()
.build()
.unwrap();
let mut tracker_tester = TrackerTester {
alias_tracker: None,
mapping_at_each_level: Vec::new(),
};
plan.visit(&mut tracker_tester).unwrap();
assert_eq!(
tracker_tester.mapping_at_each_level,
vec![
HashMap::from([
("number".to_string(), HashSet::from(["number".into()])),
("pk1".to_string(), HashSet::from(["pk1".into()])),
("pk2".to_string(), HashSet::from(["pk2".into()])),
("pk3".to_string(), HashSet::from(["pk3".into()])),
("ts".to_string(), HashSet::from(["ts".into()]))
]),
HashMap::from([
("number".to_string(), HashSet::from(["t.number".into()])),
("pk1".to_string(), HashSet::from([])),
("pk2".to_string(), HashSet::from(["pk3".into()])),
("pk3".to_string(), HashSet::from(["pk1".into()])),
("ts".to_string(), HashSet::from([]))
]),
HashMap::from([
("number".to_string(), HashSet::from(["t.number".into()])),
("pk1".to_string(), HashSet::from([])),
("pk2".to_string(), HashSet::from(["pk1".into()])),
("pk3".to_string(), HashSet::from(["pk2".into()])),
("ts".to_string(), HashSet::from([]))
])
]
);
}
#[test]
fn proj_multi_alias_tracker() {
// use logging for better debugging
init_default_ut_logging();
let test_table = TestTable::table_with_name(0, "numbers".to_string());
let table_source = Arc::new(DefaultTableSource::new(Arc::new(
DfTableProviderAdapter::new(test_table),
)));
let plan = LogicalPlanBuilder::scan_with_filters("t", table_source, None, vec![])
.unwrap()
.project(vec![
col("number"),
col("pk3").alias("pk1"),
col("pk3").alias("pk2"),
])
.unwrap()
.project(vec![
col("number"),
col("pk2").alias("pk4"),
col("pk1").alias("pk5"),
])
.unwrap()
.build()
.unwrap();
let mut tracker_tester = TrackerTester {
alias_tracker: None,
mapping_at_each_level: Vec::new(),
};
plan.visit(&mut tracker_tester).unwrap();
assert_eq!(
tracker_tester.mapping_at_each_level,
vec![
HashMap::from([
("number".to_string(), HashSet::from(["number".into()])),
("pk1".to_string(), HashSet::from(["pk1".into()])),
("pk2".to_string(), HashSet::from(["pk2".into()])),
("pk3".to_string(), HashSet::from(["pk3".into()])),
("ts".to_string(), HashSet::from(["ts".into()]))
]),
HashMap::from([
("number".to_string(), HashSet::from(["t.number".into()])),
("pk1".to_string(), HashSet::from([])),
("pk2".to_string(), HashSet::from([])),
(
"pk3".to_string(),
HashSet::from(["pk1".into(), "pk2".into()])
),
("ts".to_string(), HashSet::from([]))
]),
HashMap::from([
("number".to_string(), HashSet::from(["t.number".into()])),
("pk1".to_string(), HashSet::from([])),
("pk2".to_string(), HashSet::from([])),
(
"pk3".to_string(),
HashSet::from(["pk4".into(), "pk5".into()])
),
("ts".to_string(), HashSet::from([]))
])
]
);
}
}

View File

@@ -27,6 +27,7 @@ use promql::extension_plan::{
EmptyMetric, InstantManipulate, RangeManipulate, SeriesDivide, SeriesNormalize,
};
use crate::dist_plan::analyzer::AliasMapping;
use crate::dist_plan::merge_sort::{merge_sort_transformer, MergeSortLogicalPlan};
use crate::dist_plan::MergeScanLogicalPlan;
@@ -139,9 +140,7 @@ pub fn step_aggr_to_upper_aggr(
new_projection_exprs.push(aliased_output_aggr_expr);
}
let upper_aggr_plan = LogicalPlan::Aggregate(new_aggr);
debug!("Before recompute schema: {upper_aggr_plan:?}");
let upper_aggr_plan = upper_aggr_plan.recompute_schema()?;
debug!("After recompute schema: {upper_aggr_plan:?}");
// create a projection on top of the new aggregate plan
let new_projection =
Projection::try_new(new_projection_exprs, Arc::new(upper_aggr_plan.clone()))?;
@@ -222,7 +221,7 @@ pub enum Commutativity {
pub struct Categorizer {}
impl Categorizer {
pub fn check_plan(plan: &LogicalPlan, partition_cols: Option<Vec<String>>) -> Commutativity {
pub fn check_plan(plan: &LogicalPlan, partition_cols: Option<AliasMapping>) -> Commutativity {
let partition_cols = partition_cols.unwrap_or_default();
match plan {
@@ -247,7 +246,6 @@ impl Categorizer {
transformer: Some(Arc::new(|plan: &LogicalPlan| {
debug!("Before Step optimize: {plan}");
let ret = step_aggr_to_upper_aggr(plan);
debug!("After Step Optimize: {ret:?}");
ret.ok().map(|s| TransformerAction {
extra_parent_plans: s.to_vec(),
new_child_plan: None,
@@ -264,7 +262,11 @@ impl Categorizer {
return commutativity;
}
}
Commutativity::Commutative
// all group by expressions are partition columns can push down, unless
// another push down(including `Limit` or `Sort`) is already in progress(which will then prvent next cond commutative node from being push down).
// TODO(discord9): This is a temporary solution(that works), a better description of
// commutativity is needed under this situation.
Commutativity::ConditionalCommutative(None)
}
LogicalPlan::Sort(_) => {
if partition_cols.is_empty() {
@@ -322,17 +324,20 @@ impl Categorizer {
pub fn check_extension_plan(
plan: &dyn UserDefinedLogicalNode,
partition_cols: &[String],
partition_cols: &AliasMapping,
) -> Commutativity {
match plan.name() {
name if name == SeriesDivide::name() => {
let series_divide = plan.as_any().downcast_ref::<SeriesDivide>().unwrap();
let tags = series_divide.tags().iter().collect::<HashSet<_>>();
for partition_col in partition_cols {
if !tags.contains(partition_col) {
for all_alias in partition_cols.values() {
let all_alias = all_alias.iter().map(|c| &c.name).collect::<HashSet<_>>();
if tags.intersection(&all_alias).count() == 0 {
return Commutativity::NonCommutative;
}
}
Commutativity::Commutative
}
name if name == SeriesNormalize::name()
@@ -396,7 +401,7 @@ impl Categorizer {
/// Return true if the given expr and partition cols satisfied the rule.
/// In this case the plan can be treated as fully commutative.
fn check_partition(exprs: &[Expr], partition_cols: &[String]) -> bool {
fn check_partition(exprs: &[Expr], partition_cols: &AliasMapping) -> bool {
let mut ref_cols = HashSet::new();
for expr in exprs {
expr.add_column_refs(&mut ref_cols);
@@ -405,8 +410,14 @@ impl Categorizer {
.into_iter()
.map(|c| c.name.clone())
.collect::<HashSet<_>>();
for col in partition_cols {
if !ref_cols.contains(col) {
for all_alias in partition_cols.values() {
let all_alias = all_alias
.iter()
.map(|c| c.name.clone())
.collect::<HashSet<_>>();
// check if ref columns intersect with all alias of partition columns
// is empty, if it's empty, not all partition columns show up in `exprs`
if ref_cols.intersection(&all_alias).count() == 0 {
return false;
}
}
@@ -424,7 +435,7 @@ pub type StageTransformer = Arc<dyn Fn(&LogicalPlan) -> Option<TransformerAction
pub struct TransformerAction {
/// list of plans that need to be applied to parent plans, in the order of parent to child.
/// i.e. if this returns `[Projection, Aggregate]`, then the parent plan should be transformed to
/// ```
/// ```ignore
/// Original Parent Plan:
/// Projection:
/// Aggregate:
@@ -453,7 +464,7 @@ mod test {
fetch: None,
});
assert!(matches!(
Categorizer::check_plan(&plan, Some(vec![])),
Categorizer::check_plan(&plan, Some(Default::default())),
Commutativity::Commutative
));
}

View File

@@ -0,0 +1,974 @@
CREATE TABLE IF NOT EXISTS aggr_optimize_not (
a STRING NULL,
b STRING NULL,
c STRING NULL,
d STRING NULL,
greptime_timestamp TIMESTAMP(3) NOT NULL,
greptime_value DOUBLE NULL,
TIME INDEX (greptime_timestamp),
PRIMARY KEY (a, b, c, d)
) PARTITION ON COLUMNS (a, b, c) (a < 'b', a >= 'b',);
Affected Rows: 0
-- Case 0: group by columns are the same as partition columns.
-- This query shouldn't push down aggregation even if group by columns are partitioned.
-- because sort is already pushed down.
-- If it does, it will cause a wrong result.
-- explain at 0s, 5s and 10s. No point at 0s.
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE (Hash.*) REDACTED
tql explain (1752591864, 1752592164, '30s') max by (a, b, c) (max_over_time(aggr_optimize_not [2m]));
+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | Sort: aggr_optimize_not.a ASC NULLS LAST, aggr_optimize_not.b ASC NULLS LAST, aggr_optimize_not.c ASC NULLS LAST, aggr_optimize_not.greptime_timestamp ASC NULLS LAST |
| | Aggregate: groupBy=[[aggr_optimize_not.a, aggr_optimize_not.b, aggr_optimize_not.c, aggr_optimize_not.greptime_timestamp]], aggr=[[max(prom_max_over_time(greptime_timestamp_range,greptime_value))]] |
| | Projection: aggr_optimize_not.greptime_timestamp, prom_max_over_time(greptime_timestamp_range,greptime_value), aggr_optimize_not.a, aggr_optimize_not.b, aggr_optimize_not.c |
| | MergeSort: aggr_optimize_not.a ASC NULLS FIRST, aggr_optimize_not.b ASC NULLS FIRST, aggr_optimize_not.c ASC NULLS FIRST, aggr_optimize_not.d ASC NULLS FIRST, aggr_optimize_not.greptime_timestamp ASC NULLS FIRST |
| | MergeScan [is_placeholder=false, remote_input=[ |
| | Filter: prom_max_over_time(greptime_timestamp_range,greptime_value) IS NOT NULL |
| | Projection: aggr_optimize_not.greptime_timestamp, prom_max_over_time(greptime_timestamp_range, greptime_value) AS prom_max_over_time(greptime_timestamp_range,greptime_value), aggr_optimize_not.a, aggr_optimize_not.b, aggr_optimize_not.c, aggr_optimize_not.d |
| | PromRangeManipulate: req range=[0..0], interval=[300000], eval range=[120000], time index=[greptime_timestamp], values=["greptime_value"] |
| | PromSeriesNormalize: offset=[0], time index=[greptime_timestamp], filter NaN: [true] |
| | PromSeriesDivide: tags=["a", "b", "c", "d"] |
| | Sort: aggr_optimize_not.a ASC NULLS FIRST, aggr_optimize_not.b ASC NULLS FIRST, aggr_optimize_not.c ASC NULLS FIRST, aggr_optimize_not.d ASC NULLS FIRST, aggr_optimize_not.greptime_timestamp ASC NULLS FIRST |
| | Filter: aggr_optimize_not.greptime_timestamp >= TimestampMillisecond(-420000, None) AND aggr_optimize_not.greptime_timestamp <= TimestampMillisecond(300000, None) |
| | TableScan: aggr_optimize_not |
| | ]] |
| physical_plan | SortPreservingMergeExec: [a@0 ASC NULLS LAST, b@1 ASC NULLS LAST, c@2 ASC NULLS LAST, greptime_timestamp@3 ASC NULLS LAST] |
| | SortExec: expr=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST, c@2 ASC NULLS LAST, greptime_timestamp@3 ASC NULLS LAST], preserve_partitioning=[true] |
| | AggregateExec: mode=FinalPartitioned, gby=[a@0 as a, b@1 as b, c@2 as c, greptime_timestamp@3 as greptime_timestamp], aggr=[max(prom_max_over_time(greptime_timestamp_range,greptime_value))], ordering_mode=PartiallySorted([0, 1, 2]) |
| | SortExec: expr=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST, c@2 ASC NULLS LAST], preserve_partitioning=[true] |
| | CoalesceBatchesExec: target_batch_size=8192 |
| | RepartitionExec: partitioning=REDACTED
| | AggregateExec: mode=Partial, gby=[a@2 as a, b@3 as b, c@4 as c, greptime_timestamp@0 as greptime_timestamp], aggr=[max(prom_max_over_time(greptime_timestamp_range,greptime_value))], ordering_mode=PartiallySorted([0, 1, 2]) |
| | ProjectionExec: expr=[greptime_timestamp@0 as greptime_timestamp, prom_max_over_time(greptime_timestamp_range,greptime_value)@1 as prom_max_over_time(greptime_timestamp_range,greptime_value), a@2 as a, b@3 as b, c@4 as c] |
| | SortExec: expr=[a@2 ASC, b@3 ASC, c@4 ASC, d@5 ASC, greptime_timestamp@0 ASC], preserve_partitioning=[true] |
| | MergeScanExec: REDACTED
| | |
+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
-- SQLNESS REPLACE (metrics.*) REDACTED
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
-- SQLNESS REPLACE (Hash.*) REDACTED
-- SQLNESS REPLACE (-+) -
-- SQLNESS REPLACE (\s\s+) _
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
tql analyze (1752591864, 1752592164, '30s') max by (a, b, c) (max_over_time(aggr_optimize_not [2m]));
+-+-+-+
| stage | node | plan_|
+-+-+-+
| 0_| 0_|_SortPreservingMergeExec: [a@0 ASC NULLS LAST, b@1 ASC NULLS LAST, c@2 ASC NULLS LAST, greptime_timestamp@3 ASC NULLS LAST] REDACTED
|_|_|_SortExec: expr=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST, c@2 ASC NULLS LAST, greptime_timestamp@3 ASC NULLS LAST], preserve_partitioning=[true] REDACTED
|_|_|_AggregateExec: mode=FinalPartitioned, gby=[a@0 as a, b@1 as b, c@2 as c, greptime_timestamp@3 as greptime_timestamp], aggr=[max(prom_max_over_time(greptime_timestamp_range,greptime_value))], ordering_mode=PartiallySorted([0, 1, 2]) REDACTED
|_|_|_SortExec: expr=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST, c@2 ASC NULLS LAST], preserve_partitioning=[true] REDACTED
|_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED
|_|_|_RepartitionExec: partitioning=REDACTED
|_|_|_AggregateExec: mode=Partial, gby=[a@2 as a, b@3 as b, c@4 as c, greptime_timestamp@0 as greptime_timestamp], aggr=[max(prom_max_over_time(greptime_timestamp_range,greptime_value))], ordering_mode=PartiallySorted([0, 1, 2]) REDACTED
|_|_|_ProjectionExec: expr=[greptime_timestamp@0 as greptime_timestamp, prom_max_over_time(greptime_timestamp_range,greptime_value)@1 as prom_max_over_time(greptime_timestamp_range,greptime_value), a@2 as a, b@3 as b, c@4 as c] REDACTED
|_|_|_SortExec: expr=[a@2 ASC, b@3 ASC, c@4 ASC, d@5 ASC, greptime_timestamp@0 ASC], preserve_partitioning=[true] REDACTED
|_|_|_MergeScanExec: REDACTED
|_|_|_|
| 1_| 0_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED
|_|_|_FilterExec: prom_max_over_time(greptime_timestamp_range,greptime_value)@1 IS NOT NULL REDACTED
|_|_|_ProjectionExec: expr=[greptime_timestamp@4 as greptime_timestamp, prom_max_over_time(greptime_timestamp_range@6, greptime_value@5) as prom_max_over_time(greptime_timestamp_range,greptime_value), a@0 as a, b@1 as b, c@2 as c, d@3 as d] REDACTED
|_|_|_PromRangeManipulateExec: req range=[1752591864000..1752592164000], interval=[30000], eval range=[120000], time index=[greptime_timestamp] REDACTED
|_|_|_PromSeriesNormalizeExec: offset=[0], time index=[greptime_timestamp], filter NaN: [true] REDACTED
|_|_|_PromSeriesDivideExec: tags=["a", "b", "c", "d"] REDACTED
|_|_|_SeriesScan: region=REDACTED, "partition_count":{"count":0, "mem_ranges":0, "files":0, "file_ranges":0}, "distribution":"PerSeries" REDACTED
|_|_|_|
| 1_| 1_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED
|_|_|_FilterExec: prom_max_over_time(greptime_timestamp_range,greptime_value)@1 IS NOT NULL REDACTED
|_|_|_ProjectionExec: expr=[greptime_timestamp@4 as greptime_timestamp, prom_max_over_time(greptime_timestamp_range@6, greptime_value@5) as prom_max_over_time(greptime_timestamp_range,greptime_value), a@0 as a, b@1 as b, c@2 as c, d@3 as d] REDACTED
|_|_|_PromRangeManipulateExec: req range=[1752591864000..1752592164000], interval=[30000], eval range=[120000], time index=[greptime_timestamp] REDACTED
|_|_|_PromSeriesNormalizeExec: offset=[0], time index=[greptime_timestamp], filter NaN: [true] REDACTED
|_|_|_PromSeriesDivideExec: tags=["a", "b", "c", "d"] REDACTED
|_|_|_SeriesScan: region=REDACTED, "partition_count":{"count":0, "mem_ranges":0, "files":0, "file_ranges":0}, "distribution":"PerSeries" REDACTED
|_|_|_|
|_|_| Total rows: 0_|
+-+-+-+
-- Case 1: group by columns are prefix of partition columns.
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE (Hash.*) REDACTED
tql explain (1752591864, 1752592164, '30s') sum by (a, b) (max_over_time(aggr_optimize_not [2m]));
+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | Sort: aggr_optimize_not.a ASC NULLS LAST, aggr_optimize_not.b ASC NULLS LAST, aggr_optimize_not.greptime_timestamp ASC NULLS LAST |
| | Aggregate: groupBy=[[aggr_optimize_not.a, aggr_optimize_not.b, aggr_optimize_not.greptime_timestamp]], aggr=[[sum(prom_max_over_time(greptime_timestamp_range,greptime_value))]] |
| | Projection: aggr_optimize_not.greptime_timestamp, prom_max_over_time(greptime_timestamp_range,greptime_value), aggr_optimize_not.a, aggr_optimize_not.b |
| | MergeSort: aggr_optimize_not.a ASC NULLS FIRST, aggr_optimize_not.b ASC NULLS FIRST, aggr_optimize_not.c ASC NULLS FIRST, aggr_optimize_not.d ASC NULLS FIRST, aggr_optimize_not.greptime_timestamp ASC NULLS FIRST |
| | MergeScan [is_placeholder=false, remote_input=[ |
| | Filter: prom_max_over_time(greptime_timestamp_range,greptime_value) IS NOT NULL |
| | Projection: aggr_optimize_not.greptime_timestamp, prom_max_over_time(greptime_timestamp_range, greptime_value) AS prom_max_over_time(greptime_timestamp_range,greptime_value), aggr_optimize_not.a, aggr_optimize_not.b, aggr_optimize_not.c, aggr_optimize_not.d |
| | PromRangeManipulate: req range=[0..0], interval=[300000], eval range=[120000], time index=[greptime_timestamp], values=["greptime_value"] |
| | PromSeriesNormalize: offset=[0], time index=[greptime_timestamp], filter NaN: [true] |
| | PromSeriesDivide: tags=["a", "b", "c", "d"] |
| | Sort: aggr_optimize_not.a ASC NULLS FIRST, aggr_optimize_not.b ASC NULLS FIRST, aggr_optimize_not.c ASC NULLS FIRST, aggr_optimize_not.d ASC NULLS FIRST, aggr_optimize_not.greptime_timestamp ASC NULLS FIRST |
| | Filter: aggr_optimize_not.greptime_timestamp >= TimestampMillisecond(-420000, None) AND aggr_optimize_not.greptime_timestamp <= TimestampMillisecond(300000, None) |
| | TableScan: aggr_optimize_not |
| | ]] |
| physical_plan | SortPreservingMergeExec: [a@0 ASC NULLS LAST, b@1 ASC NULLS LAST, greptime_timestamp@2 ASC NULLS LAST] |
| | SortExec: expr=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST, greptime_timestamp@2 ASC NULLS LAST], preserve_partitioning=[true] |
| | AggregateExec: mode=FinalPartitioned, gby=[a@0 as a, b@1 as b, greptime_timestamp@2 as greptime_timestamp], aggr=[sum(prom_max_over_time(greptime_timestamp_range,greptime_value))], ordering_mode=PartiallySorted([0, 1]) |
| | SortExec: expr=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST], preserve_partitioning=[true] |
| | CoalesceBatchesExec: target_batch_size=8192 |
| | RepartitionExec: partitioning=REDACTED
| | AggregateExec: mode=Partial, gby=[a@2 as a, b@3 as b, greptime_timestamp@0 as greptime_timestamp], aggr=[sum(prom_max_over_time(greptime_timestamp_range,greptime_value))], ordering_mode=PartiallySorted([0, 1]) |
| | ProjectionExec: expr=[greptime_timestamp@0 as greptime_timestamp, prom_max_over_time(greptime_timestamp_range,greptime_value)@1 as prom_max_over_time(greptime_timestamp_range,greptime_value), a@2 as a, b@3 as b] |
| | SortExec: expr=[a@2 ASC, b@3 ASC, c@4 ASC, d@5 ASC, greptime_timestamp@0 ASC], preserve_partitioning=[true] |
| | MergeScanExec: REDACTED
| | |
+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
-- SQLNESS REPLACE (metrics.*) REDACTED
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
-- SQLNESS REPLACE (Hash.*) REDACTED
-- SQLNESS REPLACE (-+) -
-- SQLNESS REPLACE (\s\s+) _
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
tql analyze (1752591864, 1752592164, '30s') sum by (a, b) (max_over_time(aggr_optimize_not [2m]));
+-+-+-+
| stage | node | plan_|
+-+-+-+
| 0_| 0_|_SortPreservingMergeExec: [a@0 ASC NULLS LAST, b@1 ASC NULLS LAST, greptime_timestamp@2 ASC NULLS LAST] REDACTED
|_|_|_SortExec: expr=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST, greptime_timestamp@2 ASC NULLS LAST], preserve_partitioning=[true] REDACTED
|_|_|_AggregateExec: mode=FinalPartitioned, gby=[a@0 as a, b@1 as b, greptime_timestamp@2 as greptime_timestamp], aggr=[sum(prom_max_over_time(greptime_timestamp_range,greptime_value))], ordering_mode=PartiallySorted([0, 1]) REDACTED
|_|_|_SortExec: expr=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST], preserve_partitioning=[true] REDACTED
|_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED
|_|_|_RepartitionExec: partitioning=REDACTED
|_|_|_AggregateExec: mode=Partial, gby=[a@2 as a, b@3 as b, greptime_timestamp@0 as greptime_timestamp], aggr=[sum(prom_max_over_time(greptime_timestamp_range,greptime_value))], ordering_mode=PartiallySorted([0, 1]) REDACTED
|_|_|_ProjectionExec: expr=[greptime_timestamp@0 as greptime_timestamp, prom_max_over_time(greptime_timestamp_range,greptime_value)@1 as prom_max_over_time(greptime_timestamp_range,greptime_value), a@2 as a, b@3 as b] REDACTED
|_|_|_SortExec: expr=[a@2 ASC, b@3 ASC, c@4 ASC, d@5 ASC, greptime_timestamp@0 ASC], preserve_partitioning=[true] REDACTED
|_|_|_MergeScanExec: REDACTED
|_|_|_|
| 1_| 0_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED
|_|_|_FilterExec: prom_max_over_time(greptime_timestamp_range,greptime_value)@1 IS NOT NULL REDACTED
|_|_|_ProjectionExec: expr=[greptime_timestamp@4 as greptime_timestamp, prom_max_over_time(greptime_timestamp_range@6, greptime_value@5) as prom_max_over_time(greptime_timestamp_range,greptime_value), a@0 as a, b@1 as b, c@2 as c, d@3 as d] REDACTED
|_|_|_PromRangeManipulateExec: req range=[1752591864000..1752592164000], interval=[30000], eval range=[120000], time index=[greptime_timestamp] REDACTED
|_|_|_PromSeriesNormalizeExec: offset=[0], time index=[greptime_timestamp], filter NaN: [true] REDACTED
|_|_|_PromSeriesDivideExec: tags=["a", "b", "c", "d"] REDACTED
|_|_|_SeriesScan: region=REDACTED, "partition_count":{"count":0, "mem_ranges":0, "files":0, "file_ranges":0}, "distribution":"PerSeries" REDACTED
|_|_|_|
| 1_| 1_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED
|_|_|_FilterExec: prom_max_over_time(greptime_timestamp_range,greptime_value)@1 IS NOT NULL REDACTED
|_|_|_ProjectionExec: expr=[greptime_timestamp@4 as greptime_timestamp, prom_max_over_time(greptime_timestamp_range@6, greptime_value@5) as prom_max_over_time(greptime_timestamp_range,greptime_value), a@0 as a, b@1 as b, c@2 as c, d@3 as d] REDACTED
|_|_|_PromRangeManipulateExec: req range=[1752591864000..1752592164000], interval=[30000], eval range=[120000], time index=[greptime_timestamp] REDACTED
|_|_|_PromSeriesNormalizeExec: offset=[0], time index=[greptime_timestamp], filter NaN: [true] REDACTED
|_|_|_PromSeriesDivideExec: tags=["a", "b", "c", "d"] REDACTED
|_|_|_SeriesScan: region=REDACTED, "partition_count":{"count":0, "mem_ranges":0, "files":0, "file_ranges":0}, "distribution":"PerSeries" REDACTED
|_|_|_|
|_|_| Total rows: 0_|
+-+-+-+
-- Case 2: group by columns are prefix of partition columns.
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE (Hash.*) REDACTED
tql explain (1752591864, 1752592164, '30s') avg by (a) (max_over_time(aggr_optimize_not [2m]));
+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | Sort: aggr_optimize_not.a ASC NULLS LAST, aggr_optimize_not.greptime_timestamp ASC NULLS LAST |
| | Aggregate: groupBy=[[aggr_optimize_not.a, aggr_optimize_not.greptime_timestamp]], aggr=[[avg(prom_max_over_time(greptime_timestamp_range,greptime_value))]] |
| | Projection: aggr_optimize_not.greptime_timestamp, prom_max_over_time(greptime_timestamp_range,greptime_value), aggr_optimize_not.a |
| | MergeSort: aggr_optimize_not.a ASC NULLS FIRST, aggr_optimize_not.b ASC NULLS FIRST, aggr_optimize_not.c ASC NULLS FIRST, aggr_optimize_not.d ASC NULLS FIRST, aggr_optimize_not.greptime_timestamp ASC NULLS FIRST |
| | MergeScan [is_placeholder=false, remote_input=[ |
| | Filter: prom_max_over_time(greptime_timestamp_range,greptime_value) IS NOT NULL |
| | Projection: aggr_optimize_not.greptime_timestamp, prom_max_over_time(greptime_timestamp_range, greptime_value) AS prom_max_over_time(greptime_timestamp_range,greptime_value), aggr_optimize_not.a, aggr_optimize_not.b, aggr_optimize_not.c, aggr_optimize_not.d |
| | PromRangeManipulate: req range=[0..0], interval=[300000], eval range=[120000], time index=[greptime_timestamp], values=["greptime_value"] |
| | PromSeriesNormalize: offset=[0], time index=[greptime_timestamp], filter NaN: [true] |
| | PromSeriesDivide: tags=["a", "b", "c", "d"] |
| | Sort: aggr_optimize_not.a ASC NULLS FIRST, aggr_optimize_not.b ASC NULLS FIRST, aggr_optimize_not.c ASC NULLS FIRST, aggr_optimize_not.d ASC NULLS FIRST, aggr_optimize_not.greptime_timestamp ASC NULLS FIRST |
| | Filter: aggr_optimize_not.greptime_timestamp >= TimestampMillisecond(-420000, None) AND aggr_optimize_not.greptime_timestamp <= TimestampMillisecond(300000, None) |
| | TableScan: aggr_optimize_not |
| | ]] |
| physical_plan | SortPreservingMergeExec: [a@0 ASC NULLS LAST, greptime_timestamp@1 ASC NULLS LAST] |
| | SortExec: expr=[a@0 ASC NULLS LAST, greptime_timestamp@1 ASC NULLS LAST], preserve_partitioning=[true] |
| | AggregateExec: mode=FinalPartitioned, gby=[a@0 as a, greptime_timestamp@1 as greptime_timestamp], aggr=[avg(prom_max_over_time(greptime_timestamp_range,greptime_value))], ordering_mode=PartiallySorted([0]) |
| | SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true] |
| | CoalesceBatchesExec: target_batch_size=8192 |
| | RepartitionExec: partitioning=REDACTED
| | AggregateExec: mode=Partial, gby=[a@2 as a, greptime_timestamp@0 as greptime_timestamp], aggr=[avg(prom_max_over_time(greptime_timestamp_range,greptime_value))], ordering_mode=PartiallySorted([0]) |
| | ProjectionExec: expr=[greptime_timestamp@0 as greptime_timestamp, prom_max_over_time(greptime_timestamp_range,greptime_value)@1 as prom_max_over_time(greptime_timestamp_range,greptime_value), a@2 as a] |
| | SortExec: expr=[a@2 ASC, b@3 ASC, c@4 ASC, d@5 ASC, greptime_timestamp@0 ASC], preserve_partitioning=[true] |
| | MergeScanExec: REDACTED
| | |
+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
-- SQLNESS REPLACE (metrics.*) REDACTED
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
-- SQLNESS REPLACE (Hash.*) REDACTED
-- SQLNESS REPLACE (-+) -
-- SQLNESS REPLACE (\s\s+) _
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
tql analyze (1752591864, 1752592164, '30s') avg by (a) (max_over_time(aggr_optimize_not [2m]));
+-+-+-+
| stage | node | plan_|
+-+-+-+
| 0_| 0_|_SortPreservingMergeExec: [a@0 ASC NULLS LAST, greptime_timestamp@1 ASC NULLS LAST] REDACTED
|_|_|_SortExec: expr=[a@0 ASC NULLS LAST, greptime_timestamp@1 ASC NULLS LAST], preserve_partitioning=[true] REDACTED
|_|_|_AggregateExec: mode=FinalPartitioned, gby=[a@0 as a, greptime_timestamp@1 as greptime_timestamp], aggr=[avg(prom_max_over_time(greptime_timestamp_range,greptime_value))], ordering_mode=PartiallySorted([0]) REDACTED
|_|_|_SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true] REDACTED
|_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED
|_|_|_RepartitionExec: partitioning=REDACTED
|_|_|_AggregateExec: mode=Partial, gby=[a@2 as a, greptime_timestamp@0 as greptime_timestamp], aggr=[avg(prom_max_over_time(greptime_timestamp_range,greptime_value))], ordering_mode=PartiallySorted([0]) REDACTED
|_|_|_ProjectionExec: expr=[greptime_timestamp@0 as greptime_timestamp, prom_max_over_time(greptime_timestamp_range,greptime_value)@1 as prom_max_over_time(greptime_timestamp_range,greptime_value), a@2 as a] REDACTED
|_|_|_SortExec: expr=[a@2 ASC, b@3 ASC, c@4 ASC, d@5 ASC, greptime_timestamp@0 ASC], preserve_partitioning=[true] REDACTED
|_|_|_MergeScanExec: REDACTED
|_|_|_|
| 1_| 0_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED
|_|_|_FilterExec: prom_max_over_time(greptime_timestamp_range,greptime_value)@1 IS NOT NULL REDACTED
|_|_|_ProjectionExec: expr=[greptime_timestamp@4 as greptime_timestamp, prom_max_over_time(greptime_timestamp_range@6, greptime_value@5) as prom_max_over_time(greptime_timestamp_range,greptime_value), a@0 as a, b@1 as b, c@2 as c, d@3 as d] REDACTED
|_|_|_PromRangeManipulateExec: req range=[1752591864000..1752592164000], interval=[30000], eval range=[120000], time index=[greptime_timestamp] REDACTED
|_|_|_PromSeriesNormalizeExec: offset=[0], time index=[greptime_timestamp], filter NaN: [true] REDACTED
|_|_|_PromSeriesDivideExec: tags=["a", "b", "c", "d"] REDACTED
|_|_|_SeriesScan: region=REDACTED, "partition_count":{"count":0, "mem_ranges":0, "files":0, "file_ranges":0}, "distribution":"PerSeries" REDACTED
|_|_|_|
| 1_| 1_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED
|_|_|_FilterExec: prom_max_over_time(greptime_timestamp_range,greptime_value)@1 IS NOT NULL REDACTED
|_|_|_ProjectionExec: expr=[greptime_timestamp@4 as greptime_timestamp, prom_max_over_time(greptime_timestamp_range@6, greptime_value@5) as prom_max_over_time(greptime_timestamp_range,greptime_value), a@0 as a, b@1 as b, c@2 as c, d@3 as d] REDACTED
|_|_|_PromRangeManipulateExec: req range=[1752591864000..1752592164000], interval=[30000], eval range=[120000], time index=[greptime_timestamp] REDACTED
|_|_|_PromSeriesNormalizeExec: offset=[0], time index=[greptime_timestamp], filter NaN: [true] REDACTED
|_|_|_PromSeriesDivideExec: tags=["a", "b", "c", "d"] REDACTED
|_|_|_SeriesScan: region=REDACTED, "partition_count":{"count":0, "mem_ranges":0, "files":0, "file_ranges":0}, "distribution":"PerSeries" REDACTED
|_|_|_|
|_|_| Total rows: 0_|
+-+-+-+
-- Case 3: group by columns are superset of partition columns.
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE (Hash.*) REDACTED
tql explain (1752591864, 1752592164, '30s') count by (a, b, c, d) (max_over_time(aggr_optimize_not [2m]));
+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | Sort: aggr_optimize_not.a ASC NULLS LAST, aggr_optimize_not.b ASC NULLS LAST, aggr_optimize_not.c ASC NULLS LAST, aggr_optimize_not.d ASC NULLS LAST, aggr_optimize_not.greptime_timestamp ASC NULLS LAST |
| | Aggregate: groupBy=[[aggr_optimize_not.a, aggr_optimize_not.b, aggr_optimize_not.c, aggr_optimize_not.d, aggr_optimize_not.greptime_timestamp]], aggr=[[count(prom_max_over_time(greptime_timestamp_range,greptime_value))]] |
| | MergeSort: aggr_optimize_not.a ASC NULLS FIRST, aggr_optimize_not.b ASC NULLS FIRST, aggr_optimize_not.c ASC NULLS FIRST, aggr_optimize_not.d ASC NULLS FIRST, aggr_optimize_not.greptime_timestamp ASC NULLS FIRST |
| | MergeScan [is_placeholder=false, remote_input=[ |
| | Filter: prom_max_over_time(greptime_timestamp_range,greptime_value) IS NOT NULL |
| | Projection: aggr_optimize_not.greptime_timestamp, prom_max_over_time(greptime_timestamp_range, greptime_value) AS prom_max_over_time(greptime_timestamp_range,greptime_value), aggr_optimize_not.a, aggr_optimize_not.b, aggr_optimize_not.c, aggr_optimize_not.d |
| | PromRangeManipulate: req range=[0..0], interval=[300000], eval range=[120000], time index=[greptime_timestamp], values=["greptime_value"] |
| | PromSeriesNormalize: offset=[0], time index=[greptime_timestamp], filter NaN: [true] |
| | PromSeriesDivide: tags=["a", "b", "c", "d"] |
| | Sort: aggr_optimize_not.a ASC NULLS FIRST, aggr_optimize_not.b ASC NULLS FIRST, aggr_optimize_not.c ASC NULLS FIRST, aggr_optimize_not.d ASC NULLS FIRST, aggr_optimize_not.greptime_timestamp ASC NULLS FIRST |
| | Filter: aggr_optimize_not.greptime_timestamp >= TimestampMillisecond(-420000, None) AND aggr_optimize_not.greptime_timestamp <= TimestampMillisecond(300000, None) |
| | TableScan: aggr_optimize_not |
| | ]] |
| physical_plan | SortPreservingMergeExec: [a@0 ASC NULLS LAST, b@1 ASC NULLS LAST, c@2 ASC NULLS LAST, d@3 ASC NULLS LAST, greptime_timestamp@4 ASC NULLS LAST] |
| | AggregateExec: mode=FinalPartitioned, gby=[a@0 as a, b@1 as b, c@2 as c, d@3 as d, greptime_timestamp@4 as greptime_timestamp], aggr=[count(prom_max_over_time(greptime_timestamp_range,greptime_value))], ordering_mode=Sorted |
| | SortExec: expr=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST, c@2 ASC NULLS LAST, d@3 ASC NULLS LAST, greptime_timestamp@4 ASC NULLS LAST], preserve_partitioning=[true] |
| | CoalesceBatchesExec: target_batch_size=8192 |
| | RepartitionExec: partitioning=REDACTED
| | AggregateExec: mode=Partial, gby=[a@2 as a, b@3 as b, c@4 as c, d@5 as d, greptime_timestamp@0 as greptime_timestamp], aggr=[count(prom_max_over_time(greptime_timestamp_range,greptime_value))], ordering_mode=Sorted |
| | SortExec: expr=[a@2 ASC, b@3 ASC, c@4 ASC, d@5 ASC, greptime_timestamp@0 ASC], preserve_partitioning=[true] |
| | MergeScanExec: REDACTED
| | |
+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
-- SQLNESS REPLACE (metrics.*) REDACTED
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
-- SQLNESS REPLACE (Hash.*) REDACTED
-- SQLNESS REPLACE (-+) -
-- SQLNESS REPLACE (\s\s+) _
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
tql analyze (1752591864, 1752592164, '30s') count by (a, b, c, d) (max_over_time(aggr_optimize_not [2m]));
+-+-+-+
| stage | node | plan_|
+-+-+-+
| 0_| 0_|_SortPreservingMergeExec: [a@0 ASC NULLS LAST, b@1 ASC NULLS LAST, c@2 ASC NULLS LAST, d@3 ASC NULLS LAST, greptime_timestamp@4 ASC NULLS LAST] REDACTED
|_|_|_AggregateExec: mode=FinalPartitioned, gby=[a@0 as a, b@1 as b, c@2 as c, d@3 as d, greptime_timestamp@4 as greptime_timestamp], aggr=[count(prom_max_over_time(greptime_timestamp_range,greptime_value))], ordering_mode=Sorted REDACTED
|_|_|_SortExec: expr=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST, c@2 ASC NULLS LAST, d@3 ASC NULLS LAST, greptime_timestamp@4 ASC NULLS LAST], preserve_partitioning=[true] REDACTED
|_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED
|_|_|_RepartitionExec: partitioning=REDACTED
|_|_|_AggregateExec: mode=Partial, gby=[a@2 as a, b@3 as b, c@4 as c, d@5 as d, greptime_timestamp@0 as greptime_timestamp], aggr=[count(prom_max_over_time(greptime_timestamp_range,greptime_value))], ordering_mode=Sorted REDACTED
|_|_|_SortExec: expr=[a@2 ASC, b@3 ASC, c@4 ASC, d@5 ASC, greptime_timestamp@0 ASC], preserve_partitioning=[true] REDACTED
|_|_|_MergeScanExec: REDACTED
|_|_|_|
| 1_| 0_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED
|_|_|_FilterExec: prom_max_over_time(greptime_timestamp_range,greptime_value)@1 IS NOT NULL REDACTED
|_|_|_ProjectionExec: expr=[greptime_timestamp@4 as greptime_timestamp, prom_max_over_time(greptime_timestamp_range@6, greptime_value@5) as prom_max_over_time(greptime_timestamp_range,greptime_value), a@0 as a, b@1 as b, c@2 as c, d@3 as d] REDACTED
|_|_|_PromRangeManipulateExec: req range=[1752591864000..1752592164000], interval=[30000], eval range=[120000], time index=[greptime_timestamp] REDACTED
|_|_|_PromSeriesNormalizeExec: offset=[0], time index=[greptime_timestamp], filter NaN: [true] REDACTED
|_|_|_PromSeriesDivideExec: tags=["a", "b", "c", "d"] REDACTED
|_|_|_SeriesScan: region=REDACTED, "partition_count":{"count":0, "mem_ranges":0, "files":0, "file_ranges":0}, "distribution":"PerSeries" REDACTED
|_|_|_|
| 1_| 1_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED
|_|_|_FilterExec: prom_max_over_time(greptime_timestamp_range,greptime_value)@1 IS NOT NULL REDACTED
|_|_|_ProjectionExec: expr=[greptime_timestamp@4 as greptime_timestamp, prom_max_over_time(greptime_timestamp_range@6, greptime_value@5) as prom_max_over_time(greptime_timestamp_range,greptime_value), a@0 as a, b@1 as b, c@2 as c, d@3 as d] REDACTED
|_|_|_PromRangeManipulateExec: req range=[1752591864000..1752592164000], interval=[30000], eval range=[120000], time index=[greptime_timestamp] REDACTED
|_|_|_PromSeriesNormalizeExec: offset=[0], time index=[greptime_timestamp], filter NaN: [true] REDACTED
|_|_|_PromSeriesDivideExec: tags=["a", "b", "c", "d"] REDACTED
|_|_|_SeriesScan: region=REDACTED, "partition_count":{"count":0, "mem_ranges":0, "files":0, "file_ranges":0}, "distribution":"PerSeries" REDACTED
|_|_|_|
|_|_| Total rows: 0_|
+-+-+-+
-- Case 4: group by columns are not prefix of partition columns.
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE (Hash.*) REDACTED
tql explain (1752591864, 1752592164, '30s') min by (b, c, d) (max_over_time(aggr_optimize_not [2m]));
+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | Sort: aggr_optimize_not.b ASC NULLS LAST, aggr_optimize_not.c ASC NULLS LAST, aggr_optimize_not.d ASC NULLS LAST, aggr_optimize_not.greptime_timestamp ASC NULLS LAST |
| | Aggregate: groupBy=[[aggr_optimize_not.b, aggr_optimize_not.c, aggr_optimize_not.d, aggr_optimize_not.greptime_timestamp]], aggr=[[min(prom_max_over_time(greptime_timestamp_range,greptime_value))]] |
| | Projection: aggr_optimize_not.greptime_timestamp, prom_max_over_time(greptime_timestamp_range,greptime_value), aggr_optimize_not.b, aggr_optimize_not.c, aggr_optimize_not.d |
| | MergeSort: aggr_optimize_not.a ASC NULLS FIRST, aggr_optimize_not.b ASC NULLS FIRST, aggr_optimize_not.c ASC NULLS FIRST, aggr_optimize_not.d ASC NULLS FIRST, aggr_optimize_not.greptime_timestamp ASC NULLS FIRST |
| | MergeScan [is_placeholder=false, remote_input=[ |
| | Filter: prom_max_over_time(greptime_timestamp_range,greptime_value) IS NOT NULL |
| | Projection: aggr_optimize_not.greptime_timestamp, prom_max_over_time(greptime_timestamp_range, greptime_value) AS prom_max_over_time(greptime_timestamp_range,greptime_value), aggr_optimize_not.a, aggr_optimize_not.b, aggr_optimize_not.c, aggr_optimize_not.d |
| | PromRangeManipulate: req range=[0..0], interval=[300000], eval range=[120000], time index=[greptime_timestamp], values=["greptime_value"] |
| | PromSeriesNormalize: offset=[0], time index=[greptime_timestamp], filter NaN: [true] |
| | PromSeriesDivide: tags=["a", "b", "c", "d"] |
| | Sort: aggr_optimize_not.a ASC NULLS FIRST, aggr_optimize_not.b ASC NULLS FIRST, aggr_optimize_not.c ASC NULLS FIRST, aggr_optimize_not.d ASC NULLS FIRST, aggr_optimize_not.greptime_timestamp ASC NULLS FIRST |
| | Filter: aggr_optimize_not.greptime_timestamp >= TimestampMillisecond(-420000, None) AND aggr_optimize_not.greptime_timestamp <= TimestampMillisecond(300000, None) |
| | TableScan: aggr_optimize_not |
| | ]] |
| physical_plan | SortPreservingMergeExec: [b@0 ASC NULLS LAST, c@1 ASC NULLS LAST, d@2 ASC NULLS LAST, greptime_timestamp@3 ASC NULLS LAST] |
| | SortExec: expr=[b@0 ASC NULLS LAST, c@1 ASC NULLS LAST, d@2 ASC NULLS LAST, greptime_timestamp@3 ASC NULLS LAST], preserve_partitioning=[true] |
| | AggregateExec: mode=FinalPartitioned, gby=[b@0 as b, c@1 as c, d@2 as d, greptime_timestamp@3 as greptime_timestamp], aggr=[min(prom_max_over_time(greptime_timestamp_range,greptime_value))] |
| | CoalesceBatchesExec: target_batch_size=8192 |
| | RepartitionExec: partitioning=REDACTED
| | AggregateExec: mode=Partial, gby=[b@2 as b, c@3 as c, d@4 as d, greptime_timestamp@0 as greptime_timestamp], aggr=[min(prom_max_over_time(greptime_timestamp_range,greptime_value))] |
| | ProjectionExec: expr=[greptime_timestamp@0 as greptime_timestamp, prom_max_over_time(greptime_timestamp_range,greptime_value)@1 as prom_max_over_time(greptime_timestamp_range,greptime_value), b@3 as b, c@4 as c, d@5 as d] |
| | MergeScanExec: REDACTED
| | |
+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
-- SQLNESS REPLACE (metrics.*) REDACTED
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
-- SQLNESS REPLACE (Hash.*) REDACTED
-- SQLNESS REPLACE (-+) -
-- SQLNESS REPLACE (\s\s+) _
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
tql analyze (1752591864, 1752592164, '30s') min by (b, c, d) (max_over_time(aggr_optimize_not [2m]));
+-+-+-+
| stage | node | plan_|
+-+-+-+
| 0_| 0_|_SortPreservingMergeExec: [b@0 ASC NULLS LAST, c@1 ASC NULLS LAST, d@2 ASC NULLS LAST, greptime_timestamp@3 ASC NULLS LAST] REDACTED
|_|_|_SortExec: expr=[b@0 ASC NULLS LAST, c@1 ASC NULLS LAST, d@2 ASC NULLS LAST, greptime_timestamp@3 ASC NULLS LAST], preserve_partitioning=[true] REDACTED
|_|_|_AggregateExec: mode=FinalPartitioned, gby=[b@0 as b, c@1 as c, d@2 as d, greptime_timestamp@3 as greptime_timestamp], aggr=[min(prom_max_over_time(greptime_timestamp_range,greptime_value))] REDACTED
|_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED
|_|_|_RepartitionExec: partitioning=REDACTED
|_|_|_AggregateExec: mode=Partial, gby=[b@2 as b, c@3 as c, d@4 as d, greptime_timestamp@0 as greptime_timestamp], aggr=[min(prom_max_over_time(greptime_timestamp_range,greptime_value))] REDACTED
|_|_|_ProjectionExec: expr=[greptime_timestamp@0 as greptime_timestamp, prom_max_over_time(greptime_timestamp_range,greptime_value)@1 as prom_max_over_time(greptime_timestamp_range,greptime_value), b@3 as b, c@4 as c, d@5 as d] REDACTED
|_|_|_MergeScanExec: REDACTED
|_|_|_|
| 1_| 0_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED
|_|_|_FilterExec: prom_max_over_time(greptime_timestamp_range,greptime_value)@1 IS NOT NULL REDACTED
|_|_|_ProjectionExec: expr=[greptime_timestamp@4 as greptime_timestamp, prom_max_over_time(greptime_timestamp_range@6, greptime_value@5) as prom_max_over_time(greptime_timestamp_range,greptime_value), a@0 as a, b@1 as b, c@2 as c, d@3 as d] REDACTED
|_|_|_PromRangeManipulateExec: req range=[1752591864000..1752592164000], interval=[30000], eval range=[120000], time index=[greptime_timestamp] REDACTED
|_|_|_PromSeriesNormalizeExec: offset=[0], time index=[greptime_timestamp], filter NaN: [true] REDACTED
|_|_|_PromSeriesDivideExec: tags=["a", "b", "c", "d"] REDACTED
|_|_|_SeriesScan: region=REDACTED, "partition_count":{"count":0, "mem_ranges":0, "files":0, "file_ranges":0}, "distribution":"PerSeries" REDACTED
|_|_|_|
| 1_| 1_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED
|_|_|_FilterExec: prom_max_over_time(greptime_timestamp_range,greptime_value)@1 IS NOT NULL REDACTED
|_|_|_ProjectionExec: expr=[greptime_timestamp@4 as greptime_timestamp, prom_max_over_time(greptime_timestamp_range@6, greptime_value@5) as prom_max_over_time(greptime_timestamp_range,greptime_value), a@0 as a, b@1 as b, c@2 as c, d@3 as d] REDACTED
|_|_|_PromRangeManipulateExec: req range=[1752591864000..1752592164000], interval=[30000], eval range=[120000], time index=[greptime_timestamp] REDACTED
|_|_|_PromSeriesNormalizeExec: offset=[0], time index=[greptime_timestamp], filter NaN: [true] REDACTED
|_|_|_PromSeriesDivideExec: tags=["a", "b", "c", "d"] REDACTED
|_|_|_SeriesScan: region=REDACTED, "partition_count":{"count":0, "mem_ranges":0, "files":0, "file_ranges":0}, "distribution":"PerSeries" REDACTED
|_|_|_|
|_|_| Total rows: 0_|
+-+-+-+
-- Case 5: a simple sum
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE (Hash.*) REDACTED
tql explain sum(aggr_optimize_not);
+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | Sort: aggr_optimize_not.greptime_timestamp ASC NULLS LAST |
| | Aggregate: groupBy=[[aggr_optimize_not.greptime_timestamp]], aggr=[[sum(aggr_optimize_not.greptime_value)]] |
| | Projection: aggr_optimize_not.greptime_timestamp, aggr_optimize_not.greptime_value |
| | MergeSort: aggr_optimize_not.a ASC NULLS FIRST, aggr_optimize_not.b ASC NULLS FIRST, aggr_optimize_not.c ASC NULLS FIRST, aggr_optimize_not.d ASC NULLS FIRST, aggr_optimize_not.greptime_timestamp ASC NULLS FIRST |
| | MergeScan [is_placeholder=false, remote_input=[ |
| | PromInstantManipulate: range=[0..0], lookback=[300000], interval=[300000], time index=[greptime_timestamp] |
| | PromSeriesDivide: tags=["a", "b", "c", "d"] |
| | Sort: aggr_optimize_not.a ASC NULLS FIRST, aggr_optimize_not.b ASC NULLS FIRST, aggr_optimize_not.c ASC NULLS FIRST, aggr_optimize_not.d ASC NULLS FIRST, aggr_optimize_not.greptime_timestamp ASC NULLS FIRST |
| | Filter: aggr_optimize_not.greptime_timestamp >= TimestampMillisecond(-300000, None) AND aggr_optimize_not.greptime_timestamp <= TimestampMillisecond(300000, None) |
| | TableScan: aggr_optimize_not |
| | ]] |
| physical_plan | SortPreservingMergeExec: [greptime_timestamp@0 ASC NULLS LAST] |
| | SortExec: expr=[greptime_timestamp@0 ASC NULLS LAST], preserve_partitioning=[true] |
| | AggregateExec: mode=FinalPartitioned, gby=[greptime_timestamp@0 as greptime_timestamp], aggr=[sum(aggr_optimize_not.greptime_value)] |
| | CoalesceBatchesExec: target_batch_size=8192 |
| | RepartitionExec: partitioning=REDACTED
| | AggregateExec: mode=Partial, gby=[greptime_timestamp@0 as greptime_timestamp], aggr=[sum(aggr_optimize_not.greptime_value)] |
| | ProjectionExec: expr=[greptime_timestamp@4 as greptime_timestamp, greptime_value@5 as greptime_value] |
| | MergeScanExec: REDACTED
| | |
+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
-- SQLNESS REPLACE (metrics.*) REDACTED
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
-- SQLNESS REPLACE (Hash.*) REDACTED
-- SQLNESS REPLACE (-+) -
-- SQLNESS REPLACE (\s\s+) _
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
tql analyze sum(aggr_optimize_not);
+-+-+-+
| stage | node | plan_|
+-+-+-+
| 0_| 0_|_SortPreservingMergeExec: [greptime_timestamp@0 ASC NULLS LAST] REDACTED
|_|_|_SortExec: expr=[greptime_timestamp@0 ASC NULLS LAST], preserve_partitioning=[true] REDACTED
|_|_|_AggregateExec: mode=FinalPartitioned, gby=[greptime_timestamp@0 as greptime_timestamp], aggr=[sum(aggr_optimize_not.greptime_value)] REDACTED
|_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED
|_|_|_RepartitionExec: partitioning=REDACTED
|_|_|_AggregateExec: mode=Partial, gby=[greptime_timestamp@0 as greptime_timestamp], aggr=[sum(aggr_optimize_not.greptime_value)] REDACTED
|_|_|_ProjectionExec: expr=[greptime_timestamp@4 as greptime_timestamp, greptime_value@5 as greptime_value] REDACTED
|_|_|_MergeScanExec: REDACTED
|_|_|_|
| 1_| 0_|_PromInstantManipulateExec: range=[0..0], lookback=[300000], interval=[300000], time index=[greptime_timestamp] REDACTED
|_|_|_PromSeriesDivideExec: tags=["a", "b", "c", "d"] REDACTED
|_|_|_SeriesScan: region=REDACTED, "partition_count":{"count":0, "mem_ranges":0, "files":0, "file_ranges":0}, "distribution":"PerSeries" REDACTED
|_|_|_|
| 1_| 1_|_PromInstantManipulateExec: range=[0..0], lookback=[300000], interval=[300000], time index=[greptime_timestamp] REDACTED
|_|_|_PromSeriesDivideExec: tags=["a", "b", "c", "d"] REDACTED
|_|_|_SeriesScan: region=REDACTED, "partition_count":{"count":0, "mem_ranges":0, "files":0, "file_ranges":0}, "distribution":"PerSeries" REDACTED
|_|_|_|
|_|_| Total rows: 0_|
+-+-+-+
-- TODO(discord9): more cases for aggr push down interacting with partitioning&tql
CREATE TABLE IF NOT EXISTS aggr_optimize_not_count (
a STRING NULL,
b STRING NULL,
c STRING NULL,
d STRING NULL,
greptime_timestamp TIMESTAMP(3) NOT NULL,
greptime_value DOUBLE NULL,
TIME INDEX (greptime_timestamp),
PRIMARY KEY (a, b, c, d)
) PARTITION ON COLUMNS (a, b, c) (a < 'b', a >= 'b',);
Affected Rows: 0
-- Case 6: Test average rate (sum/count like)
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE (Hash.*) REDACTED
tql explain (1752591864, 1752592164, '30s') sum by (a, b, c) (rate(aggr_optimize_not [2m])) / sum by (a, b, c) (rate(aggr_optimize_not_count [2m]));
+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | Projection: aggr_optimize_not_count.a, aggr_optimize_not_count.b, aggr_optimize_not_count.c, aggr_optimize_not_count.greptime_timestamp, aggr_optimize_not.sum(prom_rate(greptime_timestamp_range,greptime_value,greptime_timestamp,Int64(120000))) / aggr_optimize_not_count.sum(prom_rate(greptime_timestamp_range,greptime_value,greptime_timestamp,Int64(120000))) AS aggr_optimize_not.sum(prom_rate(greptime_timestamp_range,greptime_value,greptime_timestamp,Int64(120000))) / aggr_optimize_not_count.sum(prom_rate(greptime_timestamp_range,greptime_value,greptime_timestamp,Int64(120000))) |
| | Inner Join: aggr_optimize_not.a = aggr_optimize_not_count.a, aggr_optimize_not.b = aggr_optimize_not_count.b, aggr_optimize_not.c = aggr_optimize_not_count.c, aggr_optimize_not.greptime_timestamp = aggr_optimize_not_count.greptime_timestamp |
| | SubqueryAlias: aggr_optimize_not |
| | Sort: aggr_optimize_not.a ASC NULLS LAST, aggr_optimize_not.b ASC NULLS LAST, aggr_optimize_not.c ASC NULLS LAST, aggr_optimize_not.greptime_timestamp ASC NULLS LAST |
| | Aggregate: groupBy=[[aggr_optimize_not.a, aggr_optimize_not.b, aggr_optimize_not.c, aggr_optimize_not.greptime_timestamp]], aggr=[[sum(prom_rate(greptime_timestamp_range,greptime_value,greptime_timestamp,Int64(120000)))]] |
| | Projection: aggr_optimize_not.greptime_timestamp, prom_rate(greptime_timestamp_range,greptime_value,greptime_timestamp,Int64(120000)), aggr_optimize_not.a, aggr_optimize_not.b, aggr_optimize_not.c |
| | MergeSort: aggr_optimize_not.a ASC NULLS FIRST, aggr_optimize_not.b ASC NULLS FIRST, aggr_optimize_not.c ASC NULLS FIRST, aggr_optimize_not.d ASC NULLS FIRST, aggr_optimize_not.greptime_timestamp ASC NULLS FIRST |
| | MergeScan [is_placeholder=false, remote_input=[ |
| | Filter: prom_rate(greptime_timestamp_range,greptime_value,greptime_timestamp,Int64(120000)) IS NOT NULL |
| | Projection: aggr_optimize_not.greptime_timestamp, prom_rate(greptime_timestamp_range, greptime_value, aggr_optimize_not.greptime_timestamp, Int64(120000)) AS prom_rate(greptime_timestamp_range,greptime_value,greptime_timestamp,Int64(120000)), aggr_optimize_not.a, aggr_optimize_not.b, aggr_optimize_not.c, aggr_optimize_not.d |
| | PromRangeManipulate: req range=[0..0], interval=[300000], eval range=[120000], time index=[greptime_timestamp], values=["greptime_value"] |
| | PromSeriesNormalize: offset=[0], time index=[greptime_timestamp], filter NaN: [true] |
| | PromSeriesDivide: tags=["a", "b", "c", "d"] |
| | Sort: aggr_optimize_not.a ASC NULLS FIRST, aggr_optimize_not.b ASC NULLS FIRST, aggr_optimize_not.c ASC NULLS FIRST, aggr_optimize_not.d ASC NULLS FIRST, aggr_optimize_not.greptime_timestamp ASC NULLS FIRST |
| | Filter: aggr_optimize_not.greptime_timestamp >= TimestampMillisecond(-420000, None) AND aggr_optimize_not.greptime_timestamp <= TimestampMillisecond(300000, None) |
| | TableScan: aggr_optimize_not |
| | ]] |
| | SubqueryAlias: aggr_optimize_not_count |
| | Sort: aggr_optimize_not_count.a ASC NULLS LAST, aggr_optimize_not_count.b ASC NULLS LAST, aggr_optimize_not_count.c ASC NULLS LAST, aggr_optimize_not_count.greptime_timestamp ASC NULLS LAST |
| | Aggregate: groupBy=[[aggr_optimize_not_count.a, aggr_optimize_not_count.b, aggr_optimize_not_count.c, aggr_optimize_not_count.greptime_timestamp]], aggr=[[sum(prom_rate(greptime_timestamp_range,greptime_value,greptime_timestamp,Int64(120000)))]] |
| | Projection: aggr_optimize_not_count.greptime_timestamp, prom_rate(greptime_timestamp_range,greptime_value,greptime_timestamp,Int64(120000)), aggr_optimize_not_count.a, aggr_optimize_not_count.b, aggr_optimize_not_count.c |
| | MergeSort: aggr_optimize_not_count.a ASC NULLS FIRST, aggr_optimize_not_count.b ASC NULLS FIRST, aggr_optimize_not_count.c ASC NULLS FIRST, aggr_optimize_not_count.d ASC NULLS FIRST, aggr_optimize_not_count.greptime_timestamp ASC NULLS FIRST |
| | MergeScan [is_placeholder=false, remote_input=[ |
| | Filter: prom_rate(greptime_timestamp_range,greptime_value,greptime_timestamp,Int64(120000)) IS NOT NULL |
| | Projection: aggr_optimize_not_count.greptime_timestamp, prom_rate(greptime_timestamp_range, greptime_value, aggr_optimize_not_count.greptime_timestamp, Int64(120000)) AS prom_rate(greptime_timestamp_range,greptime_value,greptime_timestamp,Int64(120000)), aggr_optimize_not_count.a, aggr_optimize_not_count.b, aggr_optimize_not_count.c, aggr_optimize_not_count.d |
| | PromRangeManipulate: req range=[0..0], interval=[300000], eval range=[120000], time index=[greptime_timestamp], values=["greptime_value"] |
| | PromSeriesNormalize: offset=[0], time index=[greptime_timestamp], filter NaN: [true] |
| | PromSeriesDivide: tags=["a", "b", "c", "d"] |
| | Sort: aggr_optimize_not_count.a ASC NULLS FIRST, aggr_optimize_not_count.b ASC NULLS FIRST, aggr_optimize_not_count.c ASC NULLS FIRST, aggr_optimize_not_count.d ASC NULLS FIRST, aggr_optimize_not_count.greptime_timestamp ASC NULLS FIRST |
| | Filter: aggr_optimize_not_count.greptime_timestamp >= TimestampMillisecond(-420000, None) AND aggr_optimize_not_count.greptime_timestamp <= TimestampMillisecond(300000, None) |
| | TableScan: aggr_optimize_not_count |
| | ]] |
| physical_plan | ProjectionExec: expr=[a@1 as a, b@2 as b, c@3 as c, greptime_timestamp@4 as greptime_timestamp, sum(prom_rate(greptime_timestamp_range,greptime_value,greptime_timestamp,Int64(120000)))@0 / sum(prom_rate(greptime_timestamp_range,greptime_value,greptime_timestamp,Int64(120000)))@5 as aggr_optimize_not.sum(prom_rate(greptime_timestamp_range,greptime_value,greptime_timestamp,Int64(120000))) / aggr_optimize_not_count.sum(prom_rate(greptime_timestamp_range,greptime_value,greptime_timestamp,Int64(120000)))] |
| | CoalesceBatchesExec: target_batch_size=8192 |
| | REDACTED
| | AggregateExec: mode=FinalPartitioned, gby=[a@0 as a, b@1 as b, c@2 as c, greptime_timestamp@3 as greptime_timestamp], aggr=[sum(prom_rate(greptime_timestamp_range,greptime_value,greptime_timestamp,Int64(120000)))], ordering_mode=PartiallySorted([0, 1, 2]) |
| | SortExec: expr=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST, c@2 ASC NULLS LAST], preserve_partitioning=[true] |
| | CoalesceBatchesExec: target_batch_size=8192 |
| | RepartitionExec: partitioning=REDACTED
| | AggregateExec: mode=Partial, gby=[a@2 as a, b@3 as b, c@4 as c, greptime_timestamp@0 as greptime_timestamp], aggr=[sum(prom_rate(greptime_timestamp_range,greptime_value,greptime_timestamp,Int64(120000)))], ordering_mode=PartiallySorted([0, 1, 2]) |
| | ProjectionExec: expr=[greptime_timestamp@0 as greptime_timestamp, prom_rate(greptime_timestamp_range,greptime_value,greptime_timestamp,Int64(120000))@1 as prom_rate(greptime_timestamp_range,greptime_value,greptime_timestamp,Int64(120000)), a@2 as a, b@3 as b, c@4 as c] |
| | SortExec: expr=[a@2 ASC, b@3 ASC, c@4 ASC, d@5 ASC, greptime_timestamp@0 ASC], preserve_partitioning=[true] |
| | MergeScanExec: REDACTED
| | CoalesceBatchesExec: target_batch_size=8192 |
| | RepartitionExec: partitioning=REDACTED
| | CoalescePartitionsExec |
| | AggregateExec: mode=FinalPartitioned, gby=[a@0 as a, b@1 as b, c@2 as c, greptime_timestamp@3 as greptime_timestamp], aggr=[sum(prom_rate(greptime_timestamp_range,greptime_value,greptime_timestamp,Int64(120000)))], ordering_mode=PartiallySorted([0, 1, 2]) |
| | SortExec: expr=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST, c@2 ASC NULLS LAST], preserve_partitioning=[true] |
| | CoalesceBatchesExec: target_batch_size=8192 |
| | RepartitionExec: partitioning=REDACTED
| | AggregateExec: mode=Partial, gby=[a@2 as a, b@3 as b, c@4 as c, greptime_timestamp@0 as greptime_timestamp], aggr=[sum(prom_rate(greptime_timestamp_range,greptime_value,greptime_timestamp,Int64(120000)))], ordering_mode=PartiallySorted([0, 1, 2]) |
| | ProjectionExec: expr=[greptime_timestamp@0 as greptime_timestamp, prom_rate(greptime_timestamp_range,greptime_value,greptime_timestamp,Int64(120000))@1 as prom_rate(greptime_timestamp_range,greptime_value,greptime_timestamp,Int64(120000)), a@2 as a, b@3 as b, c@4 as c] |
| | SortExec: expr=[a@2 ASC, b@3 ASC, c@4 ASC, d@5 ASC, greptime_timestamp@0 ASC], preserve_partitioning=[true] |
| | MergeScanExec: REDACTED
| | |
+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
-- SQLNESS REPLACE (metrics.*) REDACTED
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
-- SQLNESS REPLACE (Hash.*) REDACTED
-- SQLNESS REPLACE (-+) -
-- SQLNESS REPLACE (\s\s+) _
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
tql analyze (1752591864, 1752592164, '30s') sum by (a, b, c) (rate(aggr_optimize_not [2m])) / sum by (a, b, c) (rate(aggr_optimize_not_count [2m]));
+-+-+-+
| stage | node | plan_|
+-+-+-+
| 0_| 0_|_ProjectionExec: expr=[a@1 as a, b@2 as b, c@3 as c, greptime_timestamp@4 as greptime_timestamp, sum(prom_rate(greptime_timestamp_range,greptime_value,greptime_timestamp,Int64(120000)))@0 / sum(prom_rate(greptime_timestamp_range,greptime_value,greptime_timestamp,Int64(120000)))@5 as aggr_optimize_not.sum(prom_rate(greptime_timestamp_range,greptime_value,greptime_timestamp,Int64(120000))) / aggr_optimize_not_count.sum(prom_rate(greptime_timestamp_range,greptime_value,greptime_timestamp,Int64(120000)))] REDACTED
|_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED
|_|_|_REDACTED
|_|_|_AggregateExec: mode=FinalPartitioned, gby=[a@0 as a, b@1 as b, c@2 as c, greptime_timestamp@3 as greptime_timestamp], aggr=[sum(prom_rate(greptime_timestamp_range,greptime_value,greptime_timestamp,Int64(120000)))], ordering_mode=PartiallySorted([0, 1, 2]) REDACTED
|_|_|_SortExec: expr=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST, c@2 ASC NULLS LAST], preserve_partitioning=[true] REDACTED
|_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED
|_|_|_RepartitionExec: partitioning=REDACTED
|_|_|_AggregateExec: mode=Partial, gby=[a@2 as a, b@3 as b, c@4 as c, greptime_timestamp@0 as greptime_timestamp], aggr=[sum(prom_rate(greptime_timestamp_range,greptime_value,greptime_timestamp,Int64(120000)))], ordering_mode=PartiallySorted([0, 1, 2]) REDACTED
|_|_|_ProjectionExec: expr=[greptime_timestamp@0 as greptime_timestamp, prom_rate(greptime_timestamp_range,greptime_value,greptime_timestamp,Int64(120000))@1 as prom_rate(greptime_timestamp_range,greptime_value,greptime_timestamp,Int64(120000)), a@2 as a, b@3 as b, c@4 as c] REDACTED
|_|_|_SortExec: expr=[a@2 ASC, b@3 ASC, c@4 ASC, d@5 ASC, greptime_timestamp@0 ASC], preserve_partitioning=[true] REDACTED
|_|_|_MergeScanExec: REDACTED
|_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED
|_|_|_RepartitionExec: partitioning=REDACTED
|_|_|_CoalescePartitionsExec REDACTED
|_|_|_AggregateExec: mode=FinalPartitioned, gby=[a@0 as a, b@1 as b, c@2 as c, greptime_timestamp@3 as greptime_timestamp], aggr=[sum(prom_rate(greptime_timestamp_range,greptime_value,greptime_timestamp,Int64(120000)))], ordering_mode=PartiallySorted([0, 1, 2]) REDACTED
|_|_|_SortExec: expr=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST, c@2 ASC NULLS LAST], preserve_partitioning=[true] REDACTED
|_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED
|_|_|_RepartitionExec: partitioning=REDACTED
|_|_|_AggregateExec: mode=Partial, gby=[a@2 as a, b@3 as b, c@4 as c, greptime_timestamp@0 as greptime_timestamp], aggr=[sum(prom_rate(greptime_timestamp_range,greptime_value,greptime_timestamp,Int64(120000)))], ordering_mode=PartiallySorted([0, 1, 2]) REDACTED
|_|_|_ProjectionExec: expr=[greptime_timestamp@0 as greptime_timestamp, prom_rate(greptime_timestamp_range,greptime_value,greptime_timestamp,Int64(120000))@1 as prom_rate(greptime_timestamp_range,greptime_value,greptime_timestamp,Int64(120000)), a@2 as a, b@3 as b, c@4 as c] REDACTED
|_|_|_SortExec: expr=[a@2 ASC, b@3 ASC, c@4 ASC, d@5 ASC, greptime_timestamp@0 ASC], preserve_partitioning=[true] REDACTED
|_|_|_MergeScanExec: REDACTED
|_|_|_|
| 1_| 0_|_ProjectionExec: expr=[greptime_timestamp@0 as greptime_timestamp, prom_rate(greptime_timestamp_range,greptime_value,aggr_optimize_not.greptime_timestamp,Int64(120000))@1 as prom_rate(greptime_timestamp_range,greptime_value,greptime_timestamp,Int64(120000)), a@2 as a, b@3 as b, c@4 as c, d@5 as d] REDACTED
|_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED
|_|_|_FilterExec: prom_rate(greptime_timestamp_range,greptime_value,aggr_optimize_not.greptime_timestamp,Int64(120000))@1 IS NOT NULL REDACTED
|_|_|_ProjectionExec: expr=[greptime_timestamp@4 as greptime_timestamp, prom_rate(greptime_timestamp_range@6, greptime_value@5, greptime_timestamp@4, 120000) as prom_rate(greptime_timestamp_range,greptime_value,aggr_optimize_not.greptime_timestamp,Int64(120000)), a@0 as a, b@1 as b, c@2 as c, d@3 as d] REDACTED
|_|_|_PromRangeManipulateExec: req range=[1752591864000..1752592164000], interval=[30000], eval range=[120000], time index=[greptime_timestamp] REDACTED
|_|_|_PromSeriesNormalizeExec: offset=[0], time index=[greptime_timestamp], filter NaN: [true] REDACTED
|_|_|_PromSeriesDivideExec: tags=["a", "b", "c", "d"] REDACTED
|_|_|_SeriesScan: region=REDACTED, "partition_count":{"count":0, "mem_ranges":0, "files":0, "file_ranges":0}, "distribution":"PerSeries" REDACTED
|_|_|_|
| 1_| 1_|_ProjectionExec: expr=[greptime_timestamp@0 as greptime_timestamp, prom_rate(greptime_timestamp_range,greptime_value,aggr_optimize_not.greptime_timestamp,Int64(120000))@1 as prom_rate(greptime_timestamp_range,greptime_value,greptime_timestamp,Int64(120000)), a@2 as a, b@3 as b, c@4 as c, d@5 as d] REDACTED
|_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED
|_|_|_FilterExec: prom_rate(greptime_timestamp_range,greptime_value,aggr_optimize_not.greptime_timestamp,Int64(120000))@1 IS NOT NULL REDACTED
|_|_|_ProjectionExec: expr=[greptime_timestamp@4 as greptime_timestamp, prom_rate(greptime_timestamp_range@6, greptime_value@5, greptime_timestamp@4, 120000) as prom_rate(greptime_timestamp_range,greptime_value,aggr_optimize_not.greptime_timestamp,Int64(120000)), a@0 as a, b@1 as b, c@2 as c, d@3 as d] REDACTED
|_|_|_PromRangeManipulateExec: req range=[1752591864000..1752592164000], interval=[30000], eval range=[120000], time index=[greptime_timestamp] REDACTED
|_|_|_PromSeriesNormalizeExec: offset=[0], time index=[greptime_timestamp], filter NaN: [true] REDACTED
|_|_|_PromSeriesDivideExec: tags=["a", "b", "c", "d"] REDACTED
|_|_|_SeriesScan: region=REDACTED, "partition_count":{"count":0, "mem_ranges":0, "files":0, "file_ranges":0}, "distribution":"PerSeries" REDACTED
|_|_|_|
| 1_| 0_|_ProjectionExec: expr=[greptime_timestamp@0 as greptime_timestamp, prom_rate(greptime_timestamp_range,greptime_value,aggr_optimize_not_count.greptime_timestamp,Int64(120000))@1 as prom_rate(greptime_timestamp_range,greptime_value,greptime_timestamp,Int64(120000)), a@2 as a, b@3 as b, c@4 as c, d@5 as d] REDACTED
|_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED
|_|_|_FilterExec: prom_rate(greptime_timestamp_range,greptime_value,aggr_optimize_not_count.greptime_timestamp,Int64(120000))@1 IS NOT NULL REDACTED
|_|_|_ProjectionExec: expr=[greptime_timestamp@4 as greptime_timestamp, prom_rate(greptime_timestamp_range@6, greptime_value@5, greptime_timestamp@4, 120000) as prom_rate(greptime_timestamp_range,greptime_value,aggr_optimize_not_count.greptime_timestamp,Int64(120000)), a@0 as a, b@1 as b, c@2 as c, d@3 as d] REDACTED
|_|_|_PromRangeManipulateExec: req range=[1752591864000..1752592164000], interval=[30000], eval range=[120000], time index=[greptime_timestamp] REDACTED
|_|_|_PromSeriesNormalizeExec: offset=[0], time index=[greptime_timestamp], filter NaN: [true] REDACTED
|_|_|_PromSeriesDivideExec: tags=["a", "b", "c", "d"] REDACTED
|_|_|_SeriesScan: region=REDACTED, "partition_count":{"count":0, "mem_ranges":0, "files":0, "file_ranges":0}, "distribution":"PerSeries" REDACTED
|_|_|_|
| 1_| 1_|_ProjectionExec: expr=[greptime_timestamp@0 as greptime_timestamp, prom_rate(greptime_timestamp_range,greptime_value,aggr_optimize_not_count.greptime_timestamp,Int64(120000))@1 as prom_rate(greptime_timestamp_range,greptime_value,greptime_timestamp,Int64(120000)), a@2 as a, b@3 as b, c@4 as c, d@5 as d] REDACTED
|_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED
|_|_|_FilterExec: prom_rate(greptime_timestamp_range,greptime_value,aggr_optimize_not_count.greptime_timestamp,Int64(120000))@1 IS NOT NULL REDACTED
|_|_|_ProjectionExec: expr=[greptime_timestamp@4 as greptime_timestamp, prom_rate(greptime_timestamp_range@6, greptime_value@5, greptime_timestamp@4, 120000) as prom_rate(greptime_timestamp_range,greptime_value,aggr_optimize_not_count.greptime_timestamp,Int64(120000)), a@0 as a, b@1 as b, c@2 as c, d@3 as d] REDACTED
|_|_|_PromRangeManipulateExec: req range=[1752591864000..1752592164000], interval=[30000], eval range=[120000], time index=[greptime_timestamp] REDACTED
|_|_|_PromSeriesNormalizeExec: offset=[0], time index=[greptime_timestamp], filter NaN: [true] REDACTED
|_|_|_PromSeriesDivideExec: tags=["a", "b", "c", "d"] REDACTED
|_|_|_SeriesScan: region=REDACTED, "partition_count":{"count":0, "mem_ranges":0, "files":0, "file_ranges":0}, "distribution":"PerSeries" REDACTED
|_|_|_|
|_|_| Total rows: 0_|
+-+-+-+
-- Case 7: aggregate without sort should be pushed down. This one push down for include all partition columns.
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE (Hash.*) REDACTED
EXPLAIN
SELECT
min(greptime_value)
FROM
aggr_optimize_not
GROUP BY
a,
b,
c;
+---------------+----------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+----------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | MergeScan [is_placeholder=false, remote_input=[ |
| | Projection: min(aggr_optimize_not.greptime_value) |
| | Aggregate: groupBy=[[aggr_optimize_not.a, aggr_optimize_not.b, aggr_optimize_not.c]], aggr=[[min(aggr_optimize_not.greptime_value)]] |
| | TableScan: aggr_optimize_not |
| | ]] |
| physical_plan | MergeScanExec: REDACTED
| | |
+---------------+----------------------------------------------------------------------------------------------------------------------------------------+
-- SQLNESS REPLACE (metrics.*) REDACTED
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
-- SQLNESS REPLACE (Hash.*) REDACTED
-- SQLNESS REPLACE (-+) -
-- SQLNESS REPLACE (\s\s+) _
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
EXPLAIN ANALYZE
SELECT
min(greptime_value)
FROM
aggr_optimize_not
GROUP BY
a,
b,
c;
+-+-+-+
| stage | node | plan_|
+-+-+-+
| 0_| 0_|_MergeScanExec: REDACTED
|_|_|_|
| 1_| 0_|_ProjectionExec: expr=[min(aggr_optimize_not.greptime_value)@3 as min(aggr_optimize_not.greptime_value)] REDACTED
|_|_|_AggregateExec: mode=FinalPartitioned, gby=[a@0 as a, b@1 as b, c@2 as c], aggr=[min(aggr_optimize_not.greptime_value)] REDACTED
|_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED
|_|_|_RepartitionExec: partitioning=REDACTED
|_|_|_AggregateExec: mode=Partial, gby=[a@0 as a, b@1 as b, c@2 as c], aggr=[min(aggr_optimize_not.greptime_value)] REDACTED
|_|_|_SeqScan: region=REDACTED, "partition_count":{"count":0, "mem_ranges":0, "files":0, "file_ranges":0} REDACTED
|_|_|_|
| 1_| 1_|_ProjectionExec: expr=[min(aggr_optimize_not.greptime_value)@3 as min(aggr_optimize_not.greptime_value)] REDACTED
|_|_|_AggregateExec: mode=FinalPartitioned, gby=[a@0 as a, b@1 as b, c@2 as c], aggr=[min(aggr_optimize_not.greptime_value)] REDACTED
|_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED
|_|_|_RepartitionExec: partitioning=REDACTED
|_|_|_AggregateExec: mode=Partial, gby=[a@0 as a, b@1 as b, c@2 as c], aggr=[min(aggr_optimize_not.greptime_value)] REDACTED
|_|_|_SeqScan: region=REDACTED, "partition_count":{"count":0, "mem_ranges":0, "files":0, "file_ranges":0} REDACTED
|_|_|_|
|_|_| Total rows: 0_|
+-+-+-+
-- Case 8: aggregate without sort should be pushed down. This one push down for include all partition columns then some
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE (Hash.*) REDACTED
EXPLAIN
SELECT
min(greptime_value)
FROM
aggr_optimize_not
GROUP BY
a,
b,
c,
d;
+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | MergeScan [is_placeholder=false, remote_input=[ |
| | Projection: min(aggr_optimize_not.greptime_value) |
| | Aggregate: groupBy=[[aggr_optimize_not.a, aggr_optimize_not.b, aggr_optimize_not.c, aggr_optimize_not.d]], aggr=[[min(aggr_optimize_not.greptime_value)]] |
| | TableScan: aggr_optimize_not |
| | ]] |
| physical_plan | MergeScanExec: REDACTED
| | |
+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------+
-- SQLNESS REPLACE (metrics.*) REDACTED
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
-- SQLNESS REPLACE (Hash.*) REDACTED
-- SQLNESS REPLACE (-+) -
-- SQLNESS REPLACE (\s\s+) _
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
EXPLAIN ANALYZE
SELECT
min(greptime_value)
FROM
aggr_optimize_not
GROUP BY
a,
b,
c,
d;
+-+-+-+
| stage | node | plan_|
+-+-+-+
| 0_| 0_|_MergeScanExec: REDACTED
|_|_|_|
| 1_| 0_|_ProjectionExec: expr=[min(aggr_optimize_not.greptime_value)@4 as min(aggr_optimize_not.greptime_value)] REDACTED
|_|_|_AggregateExec: mode=FinalPartitioned, gby=[a@0 as a, b@1 as b, c@2 as c, d@3 as d], aggr=[min(aggr_optimize_not.greptime_value)] REDACTED
|_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED
|_|_|_RepartitionExec: partitioning=REDACTED
|_|_|_AggregateExec: mode=Partial, gby=[a@0 as a, b@1 as b, c@2 as c, d@3 as d], aggr=[min(aggr_optimize_not.greptime_value)] REDACTED
|_|_|_SeqScan: region=REDACTED, "partition_count":{"count":0, "mem_ranges":0, "files":0, "file_ranges":0} REDACTED
|_|_|_|
| 1_| 1_|_ProjectionExec: expr=[min(aggr_optimize_not.greptime_value)@4 as min(aggr_optimize_not.greptime_value)] REDACTED
|_|_|_AggregateExec: mode=FinalPartitioned, gby=[a@0 as a, b@1 as b, c@2 as c, d@3 as d], aggr=[min(aggr_optimize_not.greptime_value)] REDACTED
|_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED
|_|_|_RepartitionExec: partitioning=REDACTED
|_|_|_AggregateExec: mode=Partial, gby=[a@0 as a, b@1 as b, c@2 as c, d@3 as d], aggr=[min(aggr_optimize_not.greptime_value)] REDACTED
|_|_|_SeqScan: region=REDACTED, "partition_count":{"count":0, "mem_ranges":0, "files":0, "file_ranges":0} REDACTED
|_|_|_|
|_|_| Total rows: 0_|
+-+-+-+
-- Case 9: aggregate without sort should be pushed down. This one push down for step aggr push down.
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE (Hash.*) REDACTED
EXPLAIN
SELECT
min(greptime_value)
FROM
aggr_optimize_not
GROUP BY
a,
b;
+---------------+------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+------------------------------------------------------------------------------------------------------------------------+
| logical_plan | Projection: min(min(aggr_optimize_not.greptime_value)) AS min(aggr_optimize_not.greptime_value) |
| | Aggregate: groupBy=[[aggr_optimize_not.a, aggr_optimize_not.b]], aggr=[[min(min(aggr_optimize_not.greptime_value))]] |
| | MergeScan [is_placeholder=false, remote_input=[ |
| | Aggregate: groupBy=[[aggr_optimize_not.a, aggr_optimize_not.b]], aggr=[[min(aggr_optimize_not.greptime_value)]] |
| | TableScan: aggr_optimize_not |
| | ]] |
| physical_plan | ProjectionExec: expr=[min(min(aggr_optimize_not.greptime_value))@2 as min(aggr_optimize_not.greptime_value)] |
| | AggregateExec: mode=SinglePartitioned, gby=[a@0 as a, b@1 as b], aggr=[min(min(aggr_optimize_not.greptime_value))] |
| | MergeScanExec: REDACTED
| | |
+---------------+------------------------------------------------------------------------------------------------------------------------+
-- SQLNESS REPLACE (metrics.*) REDACTED
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
-- SQLNESS REPLACE (Hash.*) REDACTED
-- SQLNESS REPLACE (-+) -
-- SQLNESS REPLACE (\s\s+) _
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
EXPLAIN ANALYZE
SELECT
min(greptime_value)
FROM
aggr_optimize_not
GROUP BY
a,
b;
+-+-+-+
| stage | node | plan_|
+-+-+-+
| 0_| 0_|_ProjectionExec: expr=[min(min(aggr_optimize_not.greptime_value))@2 as min(aggr_optimize_not.greptime_value)] REDACTED
|_|_|_AggregateExec: mode=SinglePartitioned, gby=[a@0 as a, b@1 as b], aggr=[min(min(aggr_optimize_not.greptime_value))] REDACTED
|_|_|_MergeScanExec: REDACTED
|_|_|_|
| 1_| 0_|_AggregateExec: mode=FinalPartitioned, gby=[a@0 as a, b@1 as b], aggr=[min(aggr_optimize_not.greptime_value)] REDACTED
|_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED
|_|_|_RepartitionExec: partitioning=REDACTED
|_|_|_AggregateExec: mode=Partial, gby=[a@0 as a, b@1 as b], aggr=[min(aggr_optimize_not.greptime_value)] REDACTED
|_|_|_SeqScan: region=REDACTED, "partition_count":{"count":0, "mem_ranges":0, "files":0, "file_ranges":0} REDACTED
|_|_|_|
| 1_| 1_|_AggregateExec: mode=FinalPartitioned, gby=[a@0 as a, b@1 as b], aggr=[min(aggr_optimize_not.greptime_value)] REDACTED
|_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED
|_|_|_RepartitionExec: partitioning=REDACTED
|_|_|_AggregateExec: mode=Partial, gby=[a@0 as a, b@1 as b], aggr=[min(aggr_optimize_not.greptime_value)] REDACTED
|_|_|_SeqScan: region=REDACTED, "partition_count":{"count":0, "mem_ranges":0, "files":0, "file_ranges":0} REDACTED
|_|_|_|
|_|_| Total rows: 0_|
+-+-+-+
-- Case 10: aggregate without sort should be pushed down. This one push down for step aggr push down with complex aggr
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE (Hash.*) REDACTED
EXPLAIN
SELECT
min(greptime_value) + max(greptime_value)
FROM
aggr_optimize_not
GROUP BY
a,
b;
+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | Projection: min(min(aggr_optimize_not.greptime_value)) + max(max(aggr_optimize_not.greptime_value)) AS min(aggr_optimize_not.greptime_value) + max(aggr_optimize_not.greptime_value) |
| | Aggregate: groupBy=[[aggr_optimize_not.a, aggr_optimize_not.b]], aggr=[[min(min(aggr_optimize_not.greptime_value)), max(max(aggr_optimize_not.greptime_value))]] |
| | MergeScan [is_placeholder=false, remote_input=[ |
| | Aggregate: groupBy=[[aggr_optimize_not.a, aggr_optimize_not.b]], aggr=[[min(aggr_optimize_not.greptime_value), max(aggr_optimize_not.greptime_value)]] |
| | TableScan: aggr_optimize_not |
| | ]] |
| physical_plan | ProjectionExec: expr=[min(min(aggr_optimize_not.greptime_value))@2 + max(max(aggr_optimize_not.greptime_value))@3 as min(aggr_optimize_not.greptime_value) + max(aggr_optimize_not.greptime_value)] |
| | AggregateExec: mode=SinglePartitioned, gby=[a@0 as a, b@1 as b], aggr=[min(min(aggr_optimize_not.greptime_value)), max(max(aggr_optimize_not.greptime_value))] |
| | MergeScanExec: REDACTED
| | |
+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
-- SQLNESS REPLACE (metrics.*) REDACTED
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
-- SQLNESS REPLACE (Hash.*) REDACTED
-- SQLNESS REPLACE (-+) -
-- SQLNESS REPLACE (\s\s+) _
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
EXPLAIN ANALYZE
SELECT
min(greptime_value) + max(greptime_value)
FROM
aggr_optimize_not
GROUP BY
a,
b;
+-+-+-+
| stage | node | plan_|
+-+-+-+
| 0_| 0_|_ProjectionExec: expr=[min(min(aggr_optimize_not.greptime_value))@2 + max(max(aggr_optimize_not.greptime_value))@3 as min(aggr_optimize_not.greptime_value) + max(aggr_optimize_not.greptime_value)] REDACTED
|_|_|_AggregateExec: mode=SinglePartitioned, gby=[a@0 as a, b@1 as b], aggr=[min(min(aggr_optimize_not.greptime_value)), max(max(aggr_optimize_not.greptime_value))] REDACTED
|_|_|_MergeScanExec: REDACTED
|_|_|_|
| 1_| 0_|_AggregateExec: mode=FinalPartitioned, gby=[a@0 as a, b@1 as b], aggr=[min(aggr_optimize_not.greptime_value), max(aggr_optimize_not.greptime_value)] REDACTED
|_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED
|_|_|_RepartitionExec: partitioning=REDACTED
|_|_|_AggregateExec: mode=Partial, gby=[a@0 as a, b@1 as b], aggr=[min(aggr_optimize_not.greptime_value), max(aggr_optimize_not.greptime_value)] REDACTED
|_|_|_SeqScan: region=REDACTED, "partition_count":{"count":0, "mem_ranges":0, "files":0, "file_ranges":0} REDACTED
|_|_|_|
| 1_| 1_|_AggregateExec: mode=FinalPartitioned, gby=[a@0 as a, b@1 as b], aggr=[min(aggr_optimize_not.greptime_value), max(aggr_optimize_not.greptime_value)] REDACTED
|_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED
|_|_|_RepartitionExec: partitioning=REDACTED
|_|_|_AggregateExec: mode=Partial, gby=[a@0 as a, b@1 as b], aggr=[min(aggr_optimize_not.greptime_value), max(aggr_optimize_not.greptime_value)] REDACTED
|_|_|_SeqScan: region=REDACTED, "partition_count":{"count":0, "mem_ranges":0, "files":0, "file_ranges":0} REDACTED
|_|_|_|
|_|_| Total rows: 0_|
+-+-+-+
-- Case 11: aggregate with subquery
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE (Hash.*) REDACTED
EXPLAIN
SELECT
a,
min(greptime_value)
FROM
(
SELECT
a,
b,
greptime_value
FROM
aggr_optimize_not
ORDER BY
a,
b
)
GROUP BY
a;
+---------------+------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+------------------------------------------------------------------------------------------------------------------------+
| logical_plan | Projection: aggr_optimize_not.a, min(min(aggr_optimize_not.greptime_value)) AS min(aggr_optimize_not.greptime_value) |
| | Aggregate: groupBy=[[aggr_optimize_not.a]], aggr=[[min(min(aggr_optimize_not.greptime_value))]] |
| | MergeScan [is_placeholder=false, remote_input=[ |
| | Aggregate: groupBy=[[aggr_optimize_not.a]], aggr=[[min(aggr_optimize_not.greptime_value)]] |
| | Projection: aggr_optimize_not.a, aggr_optimize_not.b, aggr_optimize_not.greptime_value |
| | TableScan: aggr_optimize_not |
| | ]] |
| physical_plan | ProjectionExec: expr=[a@0 as a, min(min(aggr_optimize_not.greptime_value))@1 as min(aggr_optimize_not.greptime_value)] |
| | AggregateExec: mode=SinglePartitioned, gby=[a@0 as a], aggr=[min(min(aggr_optimize_not.greptime_value))] |
| | MergeScanExec: REDACTED
| | |
+---------------+------------------------------------------------------------------------------------------------------------------------+
-- SQLNESS REPLACE (metrics.*) REDACTED
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
-- SQLNESS REPLACE (Hash.*) REDACTED
-- SQLNESS REPLACE (-+) -
-- SQLNESS REPLACE (\s\s+) _
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
EXPLAIN ANALYZE
SELECT
a,
min(greptime_value)
FROM
(
SELECT
a,
b,
greptime_value
FROM
aggr_optimize_not
ORDER BY
a,
b
)
GROUP BY
a;
+-+-+-+
| stage | node | plan_|
+-+-+-+
| 0_| 0_|_ProjectionExec: expr=[a@0 as a, min(min(aggr_optimize_not.greptime_value))@1 as min(aggr_optimize_not.greptime_value)] REDACTED
|_|_|_AggregateExec: mode=SinglePartitioned, gby=[a@0 as a], aggr=[min(min(aggr_optimize_not.greptime_value))] REDACTED
|_|_|_MergeScanExec: REDACTED
|_|_|_|
| 1_| 0_|_AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[min(aggr_optimize_not.greptime_value)] REDACTED
|_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED
|_|_|_RepartitionExec: partitioning=REDACTED
|_|_|_AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[min(aggr_optimize_not.greptime_value)] REDACTED
|_|_|_SeqScan: region=REDACTED, "partition_count":{"count":0, "mem_ranges":0, "files":0, "file_ranges":0} REDACTED
|_|_|_|
| 1_| 1_|_AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[min(aggr_optimize_not.greptime_value)] REDACTED
|_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED
|_|_|_RepartitionExec: partitioning=REDACTED
|_|_|_AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[min(aggr_optimize_not.greptime_value)] REDACTED
|_|_|_SeqScan: region=REDACTED, "partition_count":{"count":0, "mem_ranges":0, "files":0, "file_ranges":0} REDACTED
|_|_|_|
|_|_| Total rows: 0_|
+-+-+-+
drop table aggr_optimize_not_count;
Affected Rows: 0
drop table aggr_optimize_not;
Affected Rows: 0

View File

@@ -0,0 +1,307 @@
CREATE TABLE IF NOT EXISTS aggr_optimize_not (
a STRING NULL,
b STRING NULL,
c STRING NULL,
d STRING NULL,
greptime_timestamp TIMESTAMP(3) NOT NULL,
greptime_value DOUBLE NULL,
TIME INDEX (greptime_timestamp),
PRIMARY KEY (a, b, c, d)
) PARTITION ON COLUMNS (a, b, c) (a < 'b', a >= 'b',);
-- Case 0: group by columns are the same as partition columns.
-- This query shouldn't push down aggregation even if group by columns are partitioned.
-- because sort is already pushed down.
-- If it does, it will cause a wrong result.
-- explain at 0s, 5s and 10s. No point at 0s.
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE (Hash.*) REDACTED
tql explain (1752591864, 1752592164, '30s') max by (a, b, c) (max_over_time(aggr_optimize_not [2m]));
-- SQLNESS REPLACE (metrics.*) REDACTED
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
-- SQLNESS REPLACE (Hash.*) REDACTED
-- SQLNESS REPLACE (-+) -
-- SQLNESS REPLACE (\s\s+) _
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
tql analyze (1752591864, 1752592164, '30s') max by (a, b, c) (max_over_time(aggr_optimize_not [2m]));
-- Case 1: group by columns are prefix of partition columns.
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE (Hash.*) REDACTED
tql explain (1752591864, 1752592164, '30s') sum by (a, b) (max_over_time(aggr_optimize_not [2m]));
-- SQLNESS REPLACE (metrics.*) REDACTED
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
-- SQLNESS REPLACE (Hash.*) REDACTED
-- SQLNESS REPLACE (-+) -
-- SQLNESS REPLACE (\s\s+) _
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
tql analyze (1752591864, 1752592164, '30s') sum by (a, b) (max_over_time(aggr_optimize_not [2m]));
-- Case 2: group by columns are prefix of partition columns.
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE (Hash.*) REDACTED
tql explain (1752591864, 1752592164, '30s') avg by (a) (max_over_time(aggr_optimize_not [2m]));
-- SQLNESS REPLACE (metrics.*) REDACTED
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
-- SQLNESS REPLACE (Hash.*) REDACTED
-- SQLNESS REPLACE (-+) -
-- SQLNESS REPLACE (\s\s+) _
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
tql analyze (1752591864, 1752592164, '30s') avg by (a) (max_over_time(aggr_optimize_not [2m]));
-- Case 3: group by columns are superset of partition columns.
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE (Hash.*) REDACTED
tql explain (1752591864, 1752592164, '30s') count by (a, b, c, d) (max_over_time(aggr_optimize_not [2m]));
-- SQLNESS REPLACE (metrics.*) REDACTED
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
-- SQLNESS REPLACE (Hash.*) REDACTED
-- SQLNESS REPLACE (-+) -
-- SQLNESS REPLACE (\s\s+) _
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
tql analyze (1752591864, 1752592164, '30s') count by (a, b, c, d) (max_over_time(aggr_optimize_not [2m]));
-- Case 4: group by columns are not prefix of partition columns.
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE (Hash.*) REDACTED
tql explain (1752591864, 1752592164, '30s') min by (b, c, d) (max_over_time(aggr_optimize_not [2m]));
-- SQLNESS REPLACE (metrics.*) REDACTED
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
-- SQLNESS REPLACE (Hash.*) REDACTED
-- SQLNESS REPLACE (-+) -
-- SQLNESS REPLACE (\s\s+) _
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
tql analyze (1752591864, 1752592164, '30s') min by (b, c, d) (max_over_time(aggr_optimize_not [2m]));
-- Case 5: a simple sum
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE (Hash.*) REDACTED
tql explain sum(aggr_optimize_not);
-- SQLNESS REPLACE (metrics.*) REDACTED
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
-- SQLNESS REPLACE (Hash.*) REDACTED
-- SQLNESS REPLACE (-+) -
-- SQLNESS REPLACE (\s\s+) _
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
tql analyze sum(aggr_optimize_not);
-- TODO(discord9): more cases for aggr push down interacting with partitioning&tql
CREATE TABLE IF NOT EXISTS aggr_optimize_not_count (
a STRING NULL,
b STRING NULL,
c STRING NULL,
d STRING NULL,
greptime_timestamp TIMESTAMP(3) NOT NULL,
greptime_value DOUBLE NULL,
TIME INDEX (greptime_timestamp),
PRIMARY KEY (a, b, c, d)
) PARTITION ON COLUMNS (a, b, c) (a < 'b', a >= 'b',);
-- Case 6: Test average rate (sum/count like)
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE (Hash.*) REDACTED
tql explain (1752591864, 1752592164, '30s') sum by (a, b, c) (rate(aggr_optimize_not [2m])) / sum by (a, b, c) (rate(aggr_optimize_not_count [2m]));
-- SQLNESS REPLACE (metrics.*) REDACTED
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
-- SQLNESS REPLACE (Hash.*) REDACTED
-- SQLNESS REPLACE (-+) -
-- SQLNESS REPLACE (\s\s+) _
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
tql analyze (1752591864, 1752592164, '30s') sum by (a, b, c) (rate(aggr_optimize_not [2m])) / sum by (a, b, c) (rate(aggr_optimize_not_count [2m]));
-- Case 7: aggregate without sort should be pushed down. This one push down for include all partition columns.
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE (Hash.*) REDACTED
EXPLAIN
SELECT
min(greptime_value)
FROM
aggr_optimize_not
GROUP BY
a,
b,
c;
-- SQLNESS REPLACE (metrics.*) REDACTED
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
-- SQLNESS REPLACE (Hash.*) REDACTED
-- SQLNESS REPLACE (-+) -
-- SQLNESS REPLACE (\s\s+) _
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
EXPLAIN ANALYZE
SELECT
min(greptime_value)
FROM
aggr_optimize_not
GROUP BY
a,
b,
c;
-- Case 8: aggregate without sort should be pushed down. This one push down for include all partition columns then some
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE (Hash.*) REDACTED
EXPLAIN
SELECT
min(greptime_value)
FROM
aggr_optimize_not
GROUP BY
a,
b,
c,
d;
-- SQLNESS REPLACE (metrics.*) REDACTED
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
-- SQLNESS REPLACE (Hash.*) REDACTED
-- SQLNESS REPLACE (-+) -
-- SQLNESS REPLACE (\s\s+) _
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
EXPLAIN ANALYZE
SELECT
min(greptime_value)
FROM
aggr_optimize_not
GROUP BY
a,
b,
c,
d;
-- Case 9: aggregate without sort should be pushed down. This one push down for step aggr push down.
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE (Hash.*) REDACTED
EXPLAIN
SELECT
min(greptime_value)
FROM
aggr_optimize_not
GROUP BY
a,
b;
-- SQLNESS REPLACE (metrics.*) REDACTED
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
-- SQLNESS REPLACE (Hash.*) REDACTED
-- SQLNESS REPLACE (-+) -
-- SQLNESS REPLACE (\s\s+) _
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
EXPLAIN ANALYZE
SELECT
min(greptime_value)
FROM
aggr_optimize_not
GROUP BY
a,
b;
-- Case 10: aggregate without sort should be pushed down. This one push down for step aggr push down with complex aggr
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE (Hash.*) REDACTED
EXPLAIN
SELECT
min(greptime_value) + max(greptime_value)
FROM
aggr_optimize_not
GROUP BY
a,
b;
-- SQLNESS REPLACE (metrics.*) REDACTED
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
-- SQLNESS REPLACE (Hash.*) REDACTED
-- SQLNESS REPLACE (-+) -
-- SQLNESS REPLACE (\s\s+) _
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
EXPLAIN ANALYZE
SELECT
min(greptime_value) + max(greptime_value)
FROM
aggr_optimize_not
GROUP BY
a,
b;
-- Case 11: aggregate with subquery
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE (Hash.*) REDACTED
EXPLAIN
SELECT
a,
min(greptime_value)
FROM
(
SELECT
a,
b,
greptime_value
FROM
aggr_optimize_not
ORDER BY
a,
b
)
GROUP BY
a;
-- SQLNESS REPLACE (metrics.*) REDACTED
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
-- SQLNESS REPLACE (Hash.*) REDACTED
-- SQLNESS REPLACE (-+) -
-- SQLNESS REPLACE (\s\s+) _
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
EXPLAIN ANALYZE
SELECT
a,
min(greptime_value)
FROM
(
SELECT
a,
b,
greptime_value
FROM
aggr_optimize_not
ORDER BY
a,
b
)
GROUP BY
a;
drop table aggr_optimize_not_count;
drop table aggr_optimize_not;