From 36a5023f6764388ebf3c94785424b3efabb59886 Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Fri, 3 Apr 2026 11:17:53 +0800 Subject: [PATCH] isolate rule into a dedicated mod Signed-off-by: Ruihang Xia --- src/query/src/dist_plan/merge_scan.rs | 17 + src/query/src/optimizer.rs | 1 + src/query/src/optimizer/join_reduce.rs | 1285 +++++++++++++++++ src/query/src/optimizer/pass_distribution.rs | 201 ++- src/query/src/promql/planner.rs | 206 +-- src/query/src/query_engine/state.rs | 5 + .../common/promql/set_operation.result | 32 +- .../common/tql-explain-analyze/explain.result | 4 + 8 files changed, 1534 insertions(+), 217 deletions(-) create mode 100644 src/query/src/optimizer/join_reduce.rs diff --git a/src/query/src/dist_plan/merge_scan.rs b/src/query/src/dist_plan/merge_scan.rs index 2d32ce16b3..03eb761de6 100644 --- a/src/query/src/dist_plan/merge_scan.rs +++ b/src/query/src/dist_plan/merge_scan.rs @@ -43,6 +43,7 @@ use greptime_proto::v1::region::RegionRequestHeader; use meter_core::data::ReadItem; use meter_macros::read_meter; use session::context::QueryContextRef; +use store_api::metric_engine_consts::DATA_SCHEMA_TSID_COLUMN_NAME; use store_api::storage::RegionId; use table::table_name::TableName; use tokio::time::Instant; @@ -442,6 +443,22 @@ impl MergeScanExec { } } + // Metric-engine scans can satisfy any hash distribution that includes `__tsid`. + // Equal requested keys also share the same `__tsid`, and equal `__tsid` values stay + // co-located across MergeScan partitions. + if self + .arrow_schema + .column_with_name(DATA_SCHEMA_TSID_COLUMN_NAME) + .is_some() + && hash_exprs.iter().any(|expr| { + expr.as_any() + .downcast_ref::() + .is_some_and(|col_expr| col_expr.name() == DATA_SCHEMA_TSID_COLUMN_NAME) + }) + { + overlaps = hash_exprs.clone(); + } + if overlaps.is_empty() { return None; } diff --git a/src/query/src/optimizer.rs b/src/query/src/optimizer.rs index aaac1e3124..edc344211a 100644 --- a/src/query/src/optimizer.rs +++ b/src/query/src/optimizer.rs @@ -15,6 +15,7 @@ pub mod constant_term; pub mod count_nest_aggr; pub mod count_wildcard; +pub mod join_reduce; pub mod parallelize_scan; pub mod pass_distribution; pub mod remove_duplicate; diff --git a/src/query/src/optimizer/join_reduce.rs b/src/query/src/optimizer/join_reduce.rs new file mode 100644 index 0000000000..47daf3eb96 --- /dev/null +++ b/src/query/src/optimizer/join_reduce.rs @@ -0,0 +1,1285 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::{HashMap, HashSet}; +use std::sync::Arc; + +use datafusion::datasource::DefaultTableSource; +use datafusion::sql::TableReference; +use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; +use datafusion_common::{Column, DFSchema, Result as DataFusionResult}; +use datafusion_expr::expr_rewriter::normalize_cols; +use datafusion_expr::logical_plan::{Aggregate, Join, Projection, TableScan}; +use datafusion_expr::{Expr, JoinType, LogicalPlan, LogicalPlanBuilder}; +use datafusion_optimizer::{OptimizerConfig, OptimizerRule}; +use table::table::adapter::DfTableProviderAdapter; + +use crate::dist_plan::MergeScanLogicalPlan; +use crate::dummy_catalog::DummyTableProvider; + +#[derive(Debug)] +pub struct JoinReduceRule; + +impl OptimizerRule for JoinReduceRule { + fn name(&self) -> &str { + "JoinReduceRule" + } + + fn rewrite( + &self, + plan: LogicalPlan, + _config: &dyn OptimizerConfig, + ) -> DataFusionResult> { + plan.transform_up_with_subqueries(Self::rewrite_plan) + } +} + +impl JoinReduceRule { + fn rewrite_plan(plan: LogicalPlan) -> DataFusionResult> { + let LogicalPlan::Projection(projection) = &plan else { + return Ok(Transformed::no(plan)); + }; + + if let Some(rewritten) = Self::try_reduce_projection(projection)? { + Ok(Transformed::yes(rewritten)) + } else { + Ok(Transformed::no(plan)) + } + } + + fn try_reduce_projection(projection: &Projection) -> DataFusionResult> { + let LogicalPlan::Join(outer_join) = projection.input.as_ref() else { + return Ok(None); + }; + + if let Some(rewritten) = Self::try_reduce_join_side(projection, outer_join, true)? { + return Ok(Some(rewritten)); + } + + Self::try_reduce_join_side(projection, outer_join, false) + } + + fn try_reduce_join_side( + projection: &Projection, + outer_join: &Join, + projection_side_is_left: bool, + ) -> DataFusionResult> { + if outer_join.join_type != JoinType::Inner || outer_join.filter.is_some() { + return Ok(None); + } + + let projection_side_plan = if projection_side_is_left { + outer_join.left.as_ref() + } else { + outer_join.right.as_ref() + }; + let repeated_outer_plan = if projection_side_is_left { + outer_join.right.as_ref() + } else { + outer_join.left.as_ref() + }; + + let Some(inner_projection) = + Self::strip_subquery_aliases(projection_side_plan).as_projection() + else { + return Ok(None); + }; + let Some(inner_join) = + Self::strip_subquery_aliases(inner_projection.input.as_ref()).as_join() + else { + return Ok(None); + }; + if inner_join.join_type != JoinType::Inner || inner_join.filter.is_some() { + return Ok(None); + } + + let Some((inner_repeated_plan, repeated_on_left)) = + Self::find_matching_inner_side(inner_join, repeated_outer_plan) + else { + return Ok(None); + }; + let reduced_repeated_plan = Self::relabel_repeated_plan( + repeated_outer_plan, + Self::preferred_passthrough_relation(projection).as_ref(), + )?; + let Some(inner_repeated_to_outer) = + Self::build_column_map_by_name(inner_repeated_plan, &reduced_repeated_plan) + else { + return Ok(None); + }; + let Some(outer_repeated_to_reduced) = + Self::build_column_map_by_name(repeated_outer_plan, &reduced_repeated_plan) + else { + return Ok(None); + }; + + let outer_name_counts = Self::column_name_counts(outer_join.schema.columns()); + let projection_output_map = Self::build_projection_output_map( + projection_side_plan, + inner_projection, + &inner_repeated_to_outer, + &outer_name_counts, + )?; + let Some(outer_key_columns) = Self::validate_outer_join_keys( + inner_join, + outer_join, + projection_side_is_left, + &projection_output_map, + repeated_outer_plan, + ) else { + return Ok(None); + }; + + if !Self::is_unique_on(repeated_outer_plan, &outer_key_columns) { + return Ok(None); + } + + let Some(reduced_input) = Self::replace_inner_join_side( + inner_join, + repeated_on_left, + &reduced_repeated_plan, + &inner_repeated_to_outer, + ) else { + return Ok(None); + }; + let reduced_input = LogicalPlan::Join(reduced_input); + + let original_projection_exprs = + normalize_cols(projection.expr.clone(), projection.input.as_ref())?; + let rewritten_exprs = original_projection_exprs + .iter() + .cloned() + .map(|expr| { + let expr = Self::replace_columns_by_name(expr, &outer_repeated_to_reduced)?; + Self::replace_columns_by_name(expr, &projection_output_map) + }) + .collect::>>()?; + let rewritten_exprs = normalize_cols(rewritten_exprs, &reduced_input)?; + let rewritten_exprs = + Self::preserve_projection_output_columns(rewritten_exprs, projection, &reduced_input); + let rewritten_exprs = + Self::preserve_projection_output_names(rewritten_exprs, projection.schema.as_ref()); + + let validated = + Projection::try_new(rewritten_exprs.clone(), Arc::new(reduced_input.clone()))?; + if !Self::same_schema_types(validated.schema.as_ref(), projection.schema.as_ref()) { + return Ok(None); + } + + Ok(Some(LogicalPlan::Projection(validated))) + } + + fn strip_subquery_aliases(mut plan: &LogicalPlan) -> &LogicalPlan { + while let LogicalPlan::SubqueryAlias(alias) = plan { + plan = alias.input.as_ref(); + } + plan + } + + fn strip_passthrough_aliases_and_projections(mut plan: &LogicalPlan) -> &LogicalPlan { + loop { + match plan { + LogicalPlan::SubqueryAlias(alias) => { + plan = alias.input.as_ref(); + } + LogicalPlan::Projection(projection) + if Self::is_passthrough_projection(projection) => + { + plan = projection.input.as_ref(); + } + _ => return plan, + } + } + } + + fn same_repeated_source(left: &LogicalPlan, right: &LogicalPlan) -> bool { + let left = Self::strip_passthrough_aliases_and_projections(left); + let right = Self::strip_passthrough_aliases_and_projections(right); + + match (left, right) { + (LogicalPlan::Filter(left), LogicalPlan::Filter(right)) => { + Self::same_filter_predicate(&left.predicate, &right.predicate) + && Self::same_repeated_source(left.input.as_ref(), right.input.as_ref()) + } + (LogicalPlan::Sort(left), LogicalPlan::Sort(right)) => { + left.expr == right.expr + && left.fetch == right.fetch + && Self::same_repeated_source(left.input.as_ref(), right.input.as_ref()) + } + (LogicalPlan::Repartition(left), LogicalPlan::Repartition(right)) => { + left.partitioning_scheme == right.partitioning_scheme + && Self::same_repeated_source(left.input.as_ref(), right.input.as_ref()) + } + (LogicalPlan::Limit(left), LogicalPlan::Limit(right)) => { + left.skip == right.skip + && left.fetch == right.fetch + && Self::same_repeated_source(left.input.as_ref(), right.input.as_ref()) + } + (LogicalPlan::Extension(left), LogicalPlan::Extension(right)) => { + if let (Some(left_merge_scan), Some(right_merge_scan)) = ( + left.node.as_any().downcast_ref::(), + right.node.as_any().downcast_ref::(), + ) { + left_merge_scan.is_placeholder() == right_merge_scan.is_placeholder() + && left_merge_scan.partition_cols() == right_merge_scan.partition_cols() + && Self::same_repeated_source( + left_merge_scan.input(), + right_merge_scan.input(), + ) + } else { + left.node.name() == right.node.name() + && left.node.expressions() == right.node.expressions() + && match ( + left.node.inputs().as_slice(), + right.node.inputs().as_slice(), + ) { + ([left_input], [right_input]) => { + Self::same_repeated_source(left_input, right_input) + } + _ => left == right, + } + } + } + (LogicalPlan::TableScan(left), LogicalPlan::TableScan(right)) => { + left.table_name == right.table_name + && left.filters == right.filters + && left.fetch == right.fetch + } + _ => left == right, + } + } + + fn is_passthrough_projection(projection: &Projection) -> bool { + projection + .expr + .iter() + .all(|expr| matches!(expr.clone().unalias(), Expr::Column(_))) + } + + fn find_matching_inner_side<'a>( + inner_join: &'a Join, + repeated_outer_plan: &LogicalPlan, + ) -> Option<(&'a LogicalPlan, bool)> { + if Self::same_repeated_source(inner_join.left.as_ref(), repeated_outer_plan) { + Some((inner_join.left.as_ref(), true)) + } else if Self::same_repeated_source(inner_join.right.as_ref(), repeated_outer_plan) { + Some((inner_join.right.as_ref(), false)) + } else { + None + } + } + + fn build_column_map_by_name( + input_plan: &LogicalPlan, + output_plan: &LogicalPlan, + ) -> Option> { + let output_columns = output_plan + .schema() + .columns() + .into_iter() + .map(|column| (column.name.clone(), column)) + .collect::>(); + let input_columns = input_plan.schema().columns(); + let input_name_counts = Self::column_name_counts(input_columns.clone()); + + input_columns + .into_iter() + .try_fold(HashMap::new(), |mut mappings, input| { + let output = output_columns.get(&input.name)?.clone(); + let expr = Expr::Column(output); + mappings.insert(input.flat_name(), expr.clone()); + if input_name_counts + .get(&input.name) + .copied() + .unwrap_or_default() + == 1 + { + mappings.insert(input.name.clone(), expr); + } + Some(mappings) + }) + } + + fn preferred_passthrough_relation(projection: &Projection) -> Option { + let mut relations = projection + .expr + .iter() + .zip(projection.schema.columns()) + .filter_map(|(expr, output_column)| { + matches!(expr.clone().unalias(), Expr::Column(_)) + .then_some(output_column.relation) + .flatten() + }); + let first = relations.next()?; + relations.all(|relation| relation == first).then_some(first) + } + + fn relabel_repeated_plan( + plan: &LogicalPlan, + relation: Option<&TableReference>, + ) -> DataFusionResult { + let Some(relation) = relation else { + return Ok(plan.clone()); + }; + LogicalPlanBuilder::from(plan.clone()) + .alias(relation.clone())? + .build() + } + + fn replace_inner_join_side( + inner_join: &Join, + repeated_on_left: bool, + repeated_outer_plan: &LogicalPlan, + inner_repeated_to_outer: &HashMap, + ) -> Option { + let rewritten_on = inner_join + .on + .iter() + .map(|(left, right)| { + Some(( + Self::replace_columns_by_name(left.clone(), inner_repeated_to_outer).ok()?, + Self::replace_columns_by_name(right.clone(), inner_repeated_to_outer).ok()?, + )) + }) + .collect::>>()?; + + let replaced = Join::try_new( + if repeated_on_left { + Arc::new(repeated_outer_plan.clone()) + } else { + inner_join.left.clone() + }, + if repeated_on_left { + inner_join.right.clone() + } else { + Arc::new(repeated_outer_plan.clone()) + }, + rewritten_on, + inner_join.filter.clone(), + inner_join.join_type, + inner_join.join_constraint, + inner_join.null_equality, + inner_join.null_aware, + ) + .ok()?; + + Self::join_exprs_match_inputs(&replaced).then_some(replaced) + } + + fn join_exprs_match_inputs(join: &Join) -> bool { + join.on.iter().all(|(left, right)| { + left.column_refs() + .iter() + .all(|column| join.left.schema().has_column(column)) + && right + .column_refs() + .iter() + .all(|column| join.right.schema().has_column(column)) + }) + } + + fn build_projection_output_map( + output_plan: &LogicalPlan, + projection: &Projection, + inner_repeated_to_outer: &HashMap, + outer_name_counts: &HashMap, + ) -> DataFusionResult> { + output_plan + .schema() + .columns() + .into_iter() + .zip(projection.expr.iter()) + .try_fold(HashMap::new(), |mut mappings, (column, expr)| { + let expr = + Self::replace_columns_by_name(expr.clone().unalias(), inner_repeated_to_outer)?; + mappings.insert(column.flat_name(), expr.clone()); + if outer_name_counts + .get(&column.name) + .copied() + .unwrap_or_default() + == 1 + { + mappings.insert(column.name.clone(), expr); + } + Ok(mappings) + }) + } + + fn validate_outer_join_keys( + inner_join: &Join, + outer_join: &Join, + projection_side_is_left: bool, + projection_output_map: &HashMap, + repeated_outer_plan: &LogicalPlan, + ) -> Option> { + let repeated_by_name = repeated_outer_plan + .schema() + .columns() + .into_iter() + .map(|column| (column.name.clone(), column)) + .collect::>(); + let mut repeated_outer_key_columns = Vec::with_capacity(outer_join.on.len()); + + for (left, right) in &outer_join.on { + let projection_expr = if projection_side_is_left { left } else { right } + .clone() + .unalias(); + let repeated_expr = if projection_side_is_left { right } else { left } + .clone() + .unalias(); + + let Expr::Column(projection_column) = projection_expr else { + return None; + }; + let Expr::Column(repeated_column) = repeated_expr else { + return None; + }; + + let mapped = projection_output_map + .get(&projection_column.flat_name()) + .cloned() + .or_else(|| projection_output_map.get(&projection_column.name).cloned())? + .unalias(); + let Expr::Column(reduced_join_column) = mapped else { + return None; + }; + let expected_repeated_column = repeated_by_name.get(&reduced_join_column.name)?; + if *expected_repeated_column != repeated_column + && !Self::join_equates_columns(inner_join, &reduced_join_column, &repeated_column) + { + return None; + } + + repeated_outer_key_columns.push(repeated_column); + } + + Some(repeated_outer_key_columns) + } + + fn is_unique_on(plan: &LogicalPlan, requested_columns: &[Column]) -> bool { + let requested_columns = Self::dedup_columns(requested_columns); + match plan { + LogicalPlan::SubqueryAlias(alias) => { + Self::map_same_schema_columns(&requested_columns, plan, alias.input.as_ref()) + .is_some_and(|mapped| Self::is_unique_on(alias.input.as_ref(), &mapped)) + } + LogicalPlan::Projection(projection) => { + Self::map_projection_columns(projection, plan, &requested_columns) + .is_some_and(|mapped| Self::is_unique_on(projection.input.as_ref(), &mapped)) + } + LogicalPlan::Filter(filter) => { + let mut augmented_columns = requested_columns.clone(); + augmented_columns.extend(Self::constant_filtered_columns(&filter.predicate)); + let augmented_columns = Self::dedup_columns(&augmented_columns); + Self::is_unique_on(filter.input.as_ref(), &augmented_columns) + } + LogicalPlan::Sort(sort) => Self::is_unique_on(sort.input.as_ref(), &requested_columns), + LogicalPlan::Repartition(repartition) => { + Self::is_unique_on(repartition.input.as_ref(), &requested_columns) + } + LogicalPlan::Limit(limit) => { + Self::is_unique_on(limit.input.as_ref(), &requested_columns) + } + LogicalPlan::Aggregate(aggregate) => { + Self::aggregate_is_unique_on(aggregate, &requested_columns) + } + LogicalPlan::Extension(extension) => { + if let Some(merge_scan) = extension + .node + .as_any() + .downcast_ref::() + && let Some(mapped) = + Self::map_same_schema_columns(&requested_columns, plan, merge_scan.input()) + { + return Self::is_unique_on(merge_scan.input(), &mapped); + } + + if extension.node.name() == "SeriesDivide" { + let required = extension + .node + .expressions() + .into_iter() + .filter_map(|expr| expr.unalias().try_as_col().cloned()) + .collect::>(); + return Self::requested_contains_all(&requested_columns, &required); + } + + if extension.node.name() == "InstantManipulate" + && let Some(input) = extension.node.inputs().into_iter().next() + && let Some(mapped) = + Self::map_same_schema_columns(&requested_columns, plan, input) + { + return Self::is_unique_on(input, &mapped); + } + + false + } + LogicalPlan::TableScan(table_scan) => Self::table_scan_unique_key(table_scan) + .is_some_and(|required| { + Self::requested_contains_all(&requested_columns, &required) + }), + _ => false, + } + } + + fn aggregate_is_unique_on(aggregate: &Aggregate, requested_columns: &[Column]) -> bool { + aggregate + .group_expr + .iter() + .map(|expr| expr.clone().unalias()) + .map(|expr| match expr { + Expr::Column(column) => Some(column), + _ => None, + }) + .collect::>>() + .is_some_and(|required| Self::requested_contains_all(requested_columns, &required)) + } + + fn constant_filtered_columns(predicate: &Expr) -> Vec { + let mut columns = Vec::new(); + Self::collect_constant_filtered_columns(&predicate.clone().unalias(), &mut columns); + columns + } + + fn collect_constant_filtered_columns(expr: &Expr, columns: &mut Vec) { + match expr { + Expr::BinaryExpr(binary) if matches!(binary.op, datafusion_expr::Operator::And) => { + Self::collect_constant_filtered_columns(binary.left.as_ref(), columns); + Self::collect_constant_filtered_columns(binary.right.as_ref(), columns); + } + Expr::BinaryExpr(binary) if matches!(binary.op, datafusion_expr::Operator::Eq) => { + match (binary.left.as_ref(), binary.right.as_ref()) { + (Expr::Column(column), Expr::Literal(_, _)) + | (Expr::Literal(_, _), Expr::Column(column)) => { + columns.push(column.clone()); + } + _ => {} + } + } + _ => {} + } + } + + fn same_filter_predicate(left: &Expr, right: &Expr) -> bool { + let mut left_terms = Vec::new(); + let mut right_terms = Vec::new(); + Self::collect_conjunct_terms(&left.clone().unalias(), &mut left_terms); + Self::collect_conjunct_terms(&right.clone().unalias(), &mut right_terms); + left_terms.sort_unstable(); + right_terms.sort_unstable(); + left_terms == right_terms + } + + fn collect_conjunct_terms(expr: &Expr, terms: &mut Vec) { + match expr { + Expr::BinaryExpr(binary) if matches!(binary.op, datafusion_expr::Operator::And) => { + Self::collect_conjunct_terms(binary.left.as_ref(), terms); + Self::collect_conjunct_terms(binary.right.as_ref(), terms); + } + _ => terms.push(expr.to_string()), + } + } + + fn map_projection_columns( + projection: &Projection, + output_plan: &LogicalPlan, + requested_columns: &[Column], + ) -> Option> { + let output_map = output_plan + .schema() + .columns() + .into_iter() + .zip(projection.expr.iter()) + .map(|(output_column, expr)| (output_column.flat_name(), expr.clone().unalias())) + .collect::>(); + + requested_columns + .iter() + .map(|column| { + let expr = output_map.get(&column.flat_name())?.clone(); + match expr { + Expr::Column(inner_column) => Some(inner_column), + _ => None, + } + }) + .collect() + } + + fn map_same_schema_columns( + requested_columns: &[Column], + output_plan: &LogicalPlan, + input_plan: &LogicalPlan, + ) -> Option> { + let output_columns = output_plan.schema().columns(); + let input_columns = input_plan.schema().columns(); + if output_columns.len() != input_columns.len() { + return None; + } + + let replace_map = output_columns + .into_iter() + .zip(input_columns) + .collect::>(); + requested_columns + .iter() + .map(|column| replace_map.get(column).cloned()) + .collect() + } + + fn table_scan_unique_key(table_scan: &TableScan) -> Option> { + let source = table_scan + .source + .as_any() + .downcast_ref::()?; + + if let Some(provider) = source + .table_provider + .as_any() + .downcast_ref::() + { + let metadata = provider.region_metadata(); + let mut key_names = metadata + .primary_key_columns() + .map(|column| column.column_schema.name.clone()) + .collect::>(); + key_names.push(metadata.time_index_column().column_schema.name.clone()); + return Self::schema_columns_by_names( + table_scan.projected_schema.columns(), + &key_names, + ); + } + + if let Some(provider) = source + .table_provider + .as_any() + .downcast_ref::() + { + let table = provider.table(); + let mut key_names = table + .primary_key_columns() + .map(|column| column.name) + .collect::>(); + key_names.push(table.schema().timestamp_column()?.name.clone()); + return Self::schema_columns_by_names( + table_scan.projected_schema.columns(), + &key_names, + ); + } + + None + } + + fn schema_columns_by_names( + schema_columns: Vec, + key_names: &[String], + ) -> Option> { + let by_name = schema_columns + .into_iter() + .map(|column| (column.name.clone(), column)) + .collect::>(); + key_names + .iter() + .map(|name| by_name.get(name).cloned()) + .collect() + } + + fn requested_contains_all(requested_columns: &[Column], required_columns: &[Column]) -> bool { + let requested_flat = requested_columns + .iter() + .map(Column::flat_name) + .collect::>(); + let requested_names = requested_columns + .iter() + .map(|column| column.name.clone()) + .collect::>(); + required_columns.iter().all(|column| { + requested_flat.contains(&column.flat_name()) || requested_names.contains(&column.name) + }) + } + + fn replace_columns_by_name( + expr: Expr, + replace_map: &HashMap, + ) -> DataFusionResult { + expr.transform_up(|expr| { + Ok(if let Expr::Column(column) = &expr { + if let Some(replacement) = replace_map.get(&column.flat_name()) { + Transformed::yes(replacement.clone()) + } else { + Transformed::no(expr) + } + } else { + Transformed::no(expr) + }) + }) + .data() + } + + fn column_name_counts(columns: Vec) -> HashMap { + let mut counts = HashMap::with_capacity(columns.len()); + for column in columns { + *counts.entry(column.name).or_insert(0) += 1; + } + counts + } + + fn dedup_columns(columns: &[Column]) -> Vec { + let mut unique = HashMap::with_capacity(columns.len()); + for column in columns { + unique + .entry(column.flat_name()) + .or_insert_with(|| column.clone()); + } + unique.into_values().collect() + } + + fn same_schema_types(left: &DFSchema, right: &DFSchema) -> bool { + left.fields().len() == right.fields().len() + && left + .fields() + .iter() + .zip(right.fields().iter()) + .all(|(left, right)| { + left.data_type() == right.data_type() + && left.is_nullable() == right.is_nullable() + }) + } + + fn join_equates_columns(join: &Join, left: &Column, right: &Column) -> bool { + join.on.iter().any(|(join_left, join_right)| { + matches!( + (join_left.clone().unalias(), join_right.clone().unalias()), + (Expr::Column(join_left), Expr::Column(join_right)) + if (join_left == *left && join_right == *right) + || (join_left == *right && join_right == *left) + ) + }) + } + + fn preserve_projection_output_names(exprs: Vec, schema: &DFSchema) -> Vec { + exprs + .into_iter() + .zip(schema.fields().iter()) + .map(|(expr, field)| { + if matches!(expr.clone().unalias(), Expr::Column(_)) { + return expr; + } + + let output_name = field.name(); + if expr.schema_name().to_string() == *output_name { + expr + } else { + Expr::Alias(datafusion_expr::expr::Alias::new( + expr, + None::, + output_name.clone(), + )) + } + }) + .collect() + } + + fn preserve_projection_output_columns( + exprs: Vec, + projection: &Projection, + input: &LogicalPlan, + ) -> Vec { + let original_output_columns = projection.schema.columns(); + exprs + .into_iter() + .zip(projection.expr.iter()) + .zip(original_output_columns) + .map(|((expr, original_expr), output_column)| { + if matches!(original_expr.clone().unalias(), Expr::Column(_)) + && input.schema().has_column(&output_column) + { + Expr::Column(output_column) + } else { + expr + } + }) + .collect() + } +} + +trait LogicalPlanExt { + fn as_join(&self) -> Option<&Join>; + fn as_projection(&self) -> Option<&Projection>; +} + +impl LogicalPlanExt for LogicalPlan { + fn as_join(&self) -> Option<&Join> { + match self { + LogicalPlan::Join(join) => Some(join), + _ => None, + } + } + + fn as_projection(&self) -> Option<&Projection> { + match self { + LogicalPlan::Projection(projection) => Some(projection), + _ => None, + } + } +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use datafusion::datasource::provider_as_source; + use datafusion_common::NullEquality; + use datafusion_expr::expr::{Alias, BinaryExpr}; + use datafusion_expr::{JoinConstraint, LogicalPlanBuilder, Operator, col, lit}; + use datafusion_optimizer::OptimizerContext; + + use super::*; + use crate::dist_plan::MergeScanLogicalPlan; + use crate::optimizer::test_util::{mock_table_provider, mock_table_provider_with_tsid}; + + fn optimize(plan: LogicalPlan) -> LogicalPlan { + JoinReduceRule + .rewrite(plan, &OptimizerContext::new()) + .unwrap() + .data + } + + fn build_scan(table_name: &str, with_tsid: bool) -> LogicalPlan { + let provider = if with_tsid { + Arc::new(mock_table_provider_with_tsid(1.into())) as _ + } else { + Arc::new(mock_table_provider(1.into())) as _ + }; + + LogicalPlanBuilder::scan(table_name, provider_as_source(provider), None) + .unwrap() + .build() + .unwrap() + } + + fn alias_plan(plan: LogicalPlan, alias: &str) -> LogicalPlan { + LogicalPlanBuilder::from(plan) + .alias(alias) + .unwrap() + .build() + .unwrap() + } + + fn merge_scan(plan: LogicalPlan) -> LogicalPlan { + MergeScanLogicalPlan::new(plan, false, Default::default()).into_logical_plan() + } + + fn join_on( + left: LogicalPlan, + right: LogicalPlan, + left_keys: &[&str], + right_keys: &[&str], + ) -> LogicalPlan { + let on = left_keys + .iter() + .zip(right_keys.iter()) + .map(|(left_key, right_key)| (col(*left_key), col(*right_key))) + .collect::>(); + LogicalPlan::Join( + Join::try_new( + Arc::new(left), + Arc::new(right), + on, + None, + JoinType::Inner, + JoinConstraint::On, + NullEquality::NullEqualsNull, + false, + ) + .unwrap(), + ) + } + + #[test] + fn reduces_redundant_join_with_unique_scan_key() { + let repeated = alias_plan(build_scan("a", false), "a"); + let other = alias_plan(build_scan("b", false), "b"); + let inner_join = join_on( + repeated.clone(), + other, + &["a.k0", "a.ts"], + &["b.k0", "b.ts"], + ); + let inner_projection = LogicalPlan::Projection( + Projection::try_new( + vec![ + col("a.k0"), + col("a.ts"), + Expr::Alias(Alias::new( + Expr::BinaryExpr(BinaryExpr { + left: Box::new(col("a.v0")), + op: Operator::Minus, + right: Box::new(col("b.v0")), + }), + None::, + "delta".to_string(), + )), + ], + Arc::new(inner_join), + ) + .unwrap(), + ); + let outer_join = join_on( + alias_plan(inner_projection, "delta"), + repeated, + &["delta.k0", "delta.ts"], + &["a.k0", "a.ts"], + ); + let outer_projection = LogicalPlan::Projection( + Projection::try_new( + vec![ + col("a.k0"), + col("a.ts"), + Expr::Alias(Alias::new( + Expr::BinaryExpr(BinaryExpr { + left: Box::new(col("delta")), + op: Operator::Divide, + right: Box::new(col("a.v0")), + }), + None::, + "ratio".to_string(), + )), + ], + Arc::new(outer_join), + ) + .unwrap(), + ); + + let optimized = optimize(outer_projection); + let formatted = optimized.display_indent_schema().to_string(); + assert_eq!(formatted.matches("Inner Join").count(), 1, "{formatted}"); + assert!(formatted.contains("ratio"), "{formatted}"); + } + + #[test] + fn keeps_outer_join_when_repeated_side_is_not_unique() { + let repeated = alias_plan(build_scan("a", false), "a"); + let other = alias_plan(build_scan("b", false), "b"); + let inner_join = join_on(repeated.clone(), other, &["a.k0"], &["b.k0"]); + let inner_projection = LogicalPlan::Projection( + Projection::try_new( + vec![ + col("a.k0"), + Expr::Alias(Alias::new( + Expr::BinaryExpr(BinaryExpr { + left: Box::new(col("a.v0")), + op: Operator::Minus, + right: Box::new(col("b.v0")), + }), + None::, + "delta".to_string(), + )), + ], + Arc::new(inner_join), + ) + .unwrap(), + ); + let outer_join = join_on( + alias_plan(inner_projection, "delta"), + repeated, + &["delta.k0"], + &["a.k0"], + ); + let outer_projection = LogicalPlan::Projection( + Projection::try_new(vec![col("delta"), col("a.v0")], Arc::new(outer_join)).unwrap(), + ); + + let optimized = optimize(outer_projection); + let formatted = optimized.display_indent_schema().to_string(); + assert_eq!(formatted.matches("Inner Join").count(), 2, "{formatted}"); + } + + #[test] + fn reduces_join_when_projection_uses_other_side_keys() { + let repeated = alias_plan(build_scan("a", false), "a"); + let other = alias_plan(build_scan("b", false), "b"); + let inner_join = join_on( + repeated.clone(), + other, + &["a.k0", "a.ts"], + &["b.k0", "b.ts"], + ); + let inner_projection = LogicalPlan::Projection( + Projection::try_new( + vec![ + col("b.k0"), + col("b.ts"), + Expr::Alias(Alias::new( + Expr::BinaryExpr(BinaryExpr { + left: Box::new(col("a.v0")), + op: Operator::Minus, + right: Box::new(col("b.v0")), + }), + None::, + "delta".to_string(), + )), + ], + Arc::new(inner_join), + ) + .unwrap(), + ); + let outer_join = join_on( + alias_plan(inner_projection, "delta"), + repeated, + &["delta.k0", "delta.ts"], + &["a.k0", "a.ts"], + ); + let outer_projection = LogicalPlan::Projection( + Projection::try_new( + vec![ + col("a.k0"), + col("a.ts"), + Expr::Alias(Alias::new( + Expr::BinaryExpr(BinaryExpr { + left: Box::new(col("delta")), + op: Operator::Divide, + right: Box::new(col("a.v0")), + }), + None::, + "ratio".to_string(), + )), + ], + Arc::new(outer_join), + ) + .unwrap(), + ); + + let optimized = optimize(outer_projection); + let formatted = optimized.display_indent_schema().to_string(); + assert_eq!(formatted.matches("Inner Join").count(), 1, "{formatted}"); + } + + #[test] + fn reduces_join_when_repeated_filters_reorder_conjuncts() { + let repeated = alias_plan(build_scan("a", false), "a"); + let repeated_left = LogicalPlanBuilder::from(repeated.clone()) + .filter(col("a.k0").eq(lit("x")).and(col("a.ts").eq(lit(1_i64)))) + .unwrap() + .build() + .unwrap(); + let repeated_right = LogicalPlanBuilder::from(repeated) + .filter(col("a.ts").eq(lit(1_i64)).and(col("a.k0").eq(lit("x")))) + .unwrap() + .build() + .unwrap(); + let other = alias_plan(build_scan("b", false), "b"); + let inner_join = join_on(repeated_left, other, &["a.k0", "a.ts"], &["b.k0", "b.ts"]); + let inner_projection = LogicalPlan::Projection( + Projection::try_new( + vec![ + col("b.k0"), + col("b.ts"), + Expr::Alias(Alias::new( + Expr::BinaryExpr(BinaryExpr { + left: Box::new(col("a.v0")), + op: Operator::Minus, + right: Box::new(col("b.v0")), + }), + None::, + "delta".to_string(), + )), + ], + Arc::new(inner_join), + ) + .unwrap(), + ); + let outer_join = join_on( + alias_plan(inner_projection, "delta"), + repeated_right, + &["delta.k0", "delta.ts"], + &["a.k0", "a.ts"], + ); + let outer_projection = LogicalPlan::Projection( + Projection::try_new(vec![col("delta"), col("a.v0")], Arc::new(outer_join)).unwrap(), + ); + + let optimized = optimize(outer_projection); + let formatted = optimized.display_indent_schema().to_string(); + assert_eq!(formatted.matches("Inner Join").count(), 1, "{formatted}"); + } + + #[test] + fn reduces_join_through_merge_scan_with_reordered_filters() { + let repeated = alias_plan(build_scan("a", false), "a"); + let repeated_left = merge_scan( + LogicalPlanBuilder::from(repeated.clone()) + .filter(col("a.k0").eq(lit("x")).and(col("a.ts").eq(lit(1_i64)))) + .unwrap() + .build() + .unwrap(), + ); + let repeated_right = merge_scan( + LogicalPlanBuilder::from(repeated) + .filter(col("a.ts").eq(lit(1_i64)).and(col("a.k0").eq(lit("x")))) + .unwrap() + .build() + .unwrap(), + ); + let other = alias_plan(build_scan("b", false), "b"); + let inner_join = join_on(repeated_left, other, &["a.k0", "a.ts"], &["b.k0", "b.ts"]); + let inner_projection = LogicalPlan::Projection( + Projection::try_new( + vec![ + col("a.k0"), + col("a.ts"), + Expr::Alias(Alias::new( + Expr::BinaryExpr(BinaryExpr { + left: Box::new(col("a.v0")), + op: Operator::Minus, + right: Box::new(col("b.v0")), + }), + None::, + "delta".to_string(), + )), + ], + Arc::new(inner_join), + ) + .unwrap(), + ); + let outer_join = join_on( + alias_plan(inner_projection, "delta"), + repeated_right, + &["delta.k0", "delta.ts"], + &["a.k0", "a.ts"], + ); + let outer_projection = LogicalPlan::Projection( + Projection::try_new(vec![col("delta"), col("a.v0")], Arc::new(outer_join)).unwrap(), + ); + + let optimized = optimize(outer_projection); + let formatted = optimized.display_indent_schema().to_string(); + assert_eq!(formatted.matches("Inner Join").count(), 1, "{formatted}"); + } + + #[test] + fn reduces_join_through_merge_scan_wrappers() { + let repeated = alias_plan(build_scan("a", false), "a"); + let repeated_left = merge_scan( + LogicalPlanBuilder::from(repeated.clone()) + .filter(col("a.k0").eq(lit("x")).and(col("a.ts").eq(lit(1_i64)))) + .unwrap() + .build() + .unwrap(), + ); + let repeated_right = merge_scan( + LogicalPlanBuilder::from(repeated) + .filter(col("a.ts").eq(lit(1_i64)).and(col("a.k0").eq(lit("x")))) + .unwrap() + .build() + .unwrap(), + ); + let repeated_left = LogicalPlanBuilder::from(repeated_left) + .project(vec![col("a.v0"), col("a.k0"), col("a.ts")]) + .unwrap() + .build() + .unwrap(); + let other = merge_scan(alias_plan(build_scan("b", false), "b")); + let other = LogicalPlanBuilder::from(other) + .project(vec![col("b.v0"), col("b.k0"), col("b.ts")]) + .unwrap() + .build() + .unwrap(); + + let inner_join = join_on(repeated_left, other, &["a.k0", "a.ts"], &["b.k0", "b.ts"]); + let inner_projection = LogicalPlan::Projection( + Projection::try_new( + vec![ + col("b.k0"), + col("b.ts"), + Expr::Alias(Alias::new( + Expr::BinaryExpr(BinaryExpr { + left: Box::new(col("a.v0")), + op: Operator::Minus, + right: Box::new(col("b.v0")), + }), + None::, + "delta".to_string(), + )), + ], + Arc::new(inner_join), + ) + .unwrap(), + ); + let outer_join = join_on( + alias_plan(inner_projection, "delta"), + repeated_right, + &["delta.k0", "delta.ts"], + &["a.k0", "a.ts"], + ); + let outer_projection = LogicalPlan::Projection( + Projection::try_new(vec![col("delta"), col("a.v0")], Arc::new(outer_join)).unwrap(), + ); + + let optimized = optimize(outer_projection); + let formatted = optimized.display_indent_schema().to_string(); + assert_eq!(formatted.matches("Inner Join").count(), 1, "{formatted}"); + } + + #[test] + fn recognizes_series_divide_uniqueness() { + let repeated = alias_plan(build_scan("metric", true), "metric"); + let divide = LogicalPlan::Extension(datafusion_expr::Extension { + node: Arc::new(promql::extension_plan::SeriesDivide::new( + vec!["__tsid".to_string()], + "ts".to_string(), + repeated.clone(), + )), + }); + let other = alias_plan(build_scan("other", true), "other"); + let inner_join = join_on( + divide.clone(), + other, + &["metric.__tsid", "metric.ts"], + &["other.__tsid", "other.ts"], + ); + let inner_projection = LogicalPlan::Projection( + Projection::try_new( + vec![ + col("metric.__tsid"), + col("metric.ts"), + Expr::Alias(Alias::new( + Expr::BinaryExpr(BinaryExpr { + left: Box::new(col("metric.v0")), + op: Operator::Minus, + right: Box::new(col("other.v0")), + }), + None::, + "delta".to_string(), + )), + ], + Arc::new(inner_join), + ) + .unwrap(), + ); + let outer_join = join_on( + alias_plan(inner_projection, "delta"), + divide, + &["delta.__tsid", "delta.ts"], + &["metric.__tsid", "metric.ts"], + ); + let outer_projection = LogicalPlan::Projection( + Projection::try_new(vec![col("delta"), col("metric.v0")], Arc::new(outer_join)) + .unwrap(), + ); + + let optimized = optimize(outer_projection); + let formatted = optimized.display_indent_schema().to_string(); + assert_eq!(formatted.matches("Inner Join").count(), 1, "{formatted}"); + } +} diff --git a/src/query/src/optimizer/pass_distribution.rs b/src/query/src/optimizer/pass_distribution.rs index 7ce2ffd752..23d93b2a8e 100644 --- a/src/query/src/optimizer/pass_distribution.rs +++ b/src/query/src/optimizer/pass_distribution.rs @@ -17,8 +17,10 @@ use std::sync::Arc; use datafusion::config::ConfigOptions; use datafusion::physical_optimizer::PhysicalOptimizerRule; use datafusion::physical_plan::ExecutionPlan; +use datafusion::physical_plan::projection::ProjectionExec; use datafusion_common::Result as DfResult; use datafusion_physical_expr::Distribution; +use datafusion_physical_expr::utils::map_columns_before_projection; use crate::dist_plan::MergeScanExec; @@ -83,7 +85,9 @@ impl PassDistribution { let mut new_children = Vec::with_capacity(children.len()); for (idx, child) in children.into_iter().enumerate() { let child_req = match required.get(idx) { - Some(Distribution::UnspecifiedDistribution) => None, + Some(Distribution::UnspecifiedDistribution) => { + Self::propagate_unspecified_child_requirement(plan.as_ref(), idx, ¤t_req) + } None => current_req.clone(), Some(req) => Some(req.clone()), }; @@ -103,4 +107,199 @@ impl PassDistribution { plan.with_new_children(new_children) } } + + fn propagate_unspecified_child_requirement( + plan: &dyn ExecutionPlan, + idx: usize, + current_req: &Option, + ) -> Option { + if idx != 0 { + return None; + } + + let Some(Distribution::HashPartitioned(required_exprs)) = current_req else { + return None; + }; + + let projection = plan.as_any().downcast_ref::()?; + let proj_exprs = projection + .expr() + .iter() + .map(|expr| (Arc::clone(&expr.expr), expr.alias.clone())) + .collect::>(); + let mapped = map_columns_before_projection(required_exprs, &proj_exprs); + + (mapped.len() == required_exprs.len()).then_some(Distribution::HashPartitioned(mapped)) + } +} + +#[cfg(test)] +mod tests { + use std::collections::{BTreeMap, BTreeSet}; + + use arrow_schema::{DataType, Field, Schema, SchemaRef, TimeUnit}; + use async_trait::async_trait; + use common_query::request::QueryRequest; + use common_recordbatch::SendableRecordBatchStream; + use datafusion::common::NullEquality; + use datafusion::execution::SessionStateBuilder; + use datafusion::physical_optimizer::PhysicalOptimizerRule; + use datafusion::physical_plan::joins::{HashJoinExec, PartitionMode}; + use datafusion::physical_plan::projection::{ProjectionExec, ProjectionExpr}; + use datafusion::physical_plan::{ExecutionPlanProperties, Partitioning}; + use datafusion_expr::{JoinType, LogicalPlanBuilder}; + use datafusion_physical_expr::PhysicalExpr; + use datafusion_physical_expr::expressions::Column as PhysicalColumn; + use session::ReadPreference; + use session::context::QueryContext; + use store_api::metric_engine_consts::DATA_SCHEMA_TSID_COLUMN_NAME; + use store_api::storage::RegionId; + use table::table_name::TableName; + + use super::*; + use crate::error::Result as QueryResult; + use crate::region_query::RegionQueryHandler; + + struct NoopRegionQueryHandler; + + #[async_trait] + impl RegionQueryHandler for NoopRegionQueryHandler { + async fn do_get( + &self, + _read_preference: ReadPreference, + _request: QueryRequest, + ) -> QueryResult { + unreachable!("pass distribution tests should not execute remote queries") + } + } + + #[test] + fn passes_hash_requirement_through_projection_to_merge_scan() { + let schema = test_schema(); + let left_merge_scan = Arc::new(test_merge_scan_exec(schema.clone())); + let right_merge_scan = Arc::new(test_merge_scan_exec(schema.clone())); + let left_projection = Arc::new( + ProjectionExec::try_new( + vec![ + ProjectionExpr::new(partition_column("greptime_value", 3), "greptime_value"), + ProjectionExpr::new( + partition_column(DATA_SCHEMA_TSID_COLUMN_NAME, 1), + DATA_SCHEMA_TSID_COLUMN_NAME, + ), + ProjectionExpr::new( + partition_column("greptime_timestamp", 2), + "greptime_timestamp", + ), + ], + left_merge_scan, + ) + .unwrap(), + ) as Arc; + let join = Arc::new( + HashJoinExec::try_new( + left_projection, + right_merge_scan, + vec![ + ( + partition_column(DATA_SCHEMA_TSID_COLUMN_NAME, 1), + partition_column(DATA_SCHEMA_TSID_COLUMN_NAME, 1), + ), + ( + partition_column("greptime_timestamp", 2), + partition_column("greptime_timestamp", 2), + ), + ], + None, + &JoinType::Inner, + None, + PartitionMode::Partitioned, + NullEquality::NullEqualsNull, + false, + ) + .unwrap(), + ) as Arc; + + let optimized = PassDistribution + .optimize(join, &ConfigOptions::default()) + .unwrap(); + let hash_join = optimized.as_any().downcast_ref::().unwrap(); + let left_projection = hash_join + .left() + .as_any() + .downcast_ref::() + .unwrap(); + let left_partitioning = left_projection.input().output_partitioning(); + let right_partitioning = hash_join.right().output_partitioning(); + + let Partitioning::Hash(left_exprs, left_count) = left_partitioning else { + panic!("expected left merge scan hash partitioning"); + }; + let Partitioning::Hash(right_exprs, right_count) = right_partitioning else { + panic!("expected right merge scan hash partitioning"); + }; + + assert_eq!(*left_count, 32); + assert_eq!(*right_count, 32); + assert_eq!( + column_names(left_exprs), + vec![DATA_SCHEMA_TSID_COLUMN_NAME, "greptime_timestamp"] + ); + assert_eq!( + column_names(right_exprs), + vec![DATA_SCHEMA_TSID_COLUMN_NAME, "greptime_timestamp"] + ); + } + + fn test_merge_scan_exec(schema: SchemaRef) -> MergeScanExec { + let session_state = SessionStateBuilder::new().with_default_features().build(); + let partition_cols = BTreeMap::from([( + DATA_SCHEMA_TSID_COLUMN_NAME.to_string(), + BTreeSet::from([datafusion_common::Column::from_name( + DATA_SCHEMA_TSID_COLUMN_NAME, + )]), + )]); + let plan = LogicalPlanBuilder::empty(false).build().unwrap(); + + MergeScanExec::new( + &session_state, + TableName::new("greptime", "public", "test"), + vec![RegionId::new(1, 0), RegionId::new(1, 1)], + plan, + schema.as_ref(), + Arc::new(NoopRegionQueryHandler), + QueryContext::arc(), + 32, + partition_cols, + ) + .unwrap() + } + + fn test_schema() -> SchemaRef { + Arc::new(Schema::new(vec![ + Field::new("host", DataType::Utf8, true), + Field::new(DATA_SCHEMA_TSID_COLUMN_NAME, DataType::UInt64, false), + Field::new( + "greptime_timestamp", + DataType::Timestamp(TimeUnit::Millisecond, None), + false, + ), + Field::new("greptime_value", DataType::Float64, true), + ])) + } + + fn partition_column(name: &str, index: usize) -> Arc { + Arc::new(PhysicalColumn::new(name, index)) + } + + fn column_names(exprs: &[Arc]) -> Vec<&str> { + exprs + .iter() + .map(|expr| { + expr.as_any() + .downcast_ref::() + .unwrap() + .name() + }) + .collect() + } } diff --git a/src/query/src/promql/planner.rs b/src/query/src/promql/planner.rs index e7a85d9670..0d2250cdbe 100644 --- a/src/query/src/promql/planner.rs +++ b/src/query/src/promql/planner.rs @@ -203,14 +203,6 @@ pub struct PromPlanner { ctx: PromPlannerContext, } -struct RepeatedBinaryPattern<'a> { - repeated: &'a PromExpr, - other: &'a PromExpr, - inner: &'a PromBinaryExpr, - inner_repeated_on_left: bool, - outer_inner_on_left: bool, -} - impl PromPlanner { pub async fn stmt_to_plan( table_provider: DfTableSourceProvider, @@ -670,15 +662,6 @@ impl PromPlanner { } // both are columns. join them on time index (None, None) => { - if !is_comparison_op - && !Self::is_token_a_set_op(*op) - && let Some(plan) = self - .try_plan_collapsed_repeated_binary_expr(query_engine_state, binary_expr) - .await? - { - return Ok(plan); - } - let left_input = self.prom_expr_to_plan(lhs, query_engine_state).await?; let left_field_columns = self.ctx.field_columns.clone(); let left_time_index_column = self.ctx.time_index_column.clone(); @@ -799,123 +782,6 @@ impl PromPlanner { } } - async fn try_plan_collapsed_repeated_binary_expr( - &mut self, - query_engine_state: &QueryEngineState, - binary_expr: &PromBinaryExpr, - ) -> Result> { - let PromBinaryExpr { - lhs, - rhs, - op, - modifier, - } = binary_expr; - let Some(pattern) = Self::find_repeated_binary_pattern(binary_expr) else { - return Ok(None); - }; - - if modifier.is_some() - || pattern.inner.modifier.is_some() - || Self::is_token_a_set_op(*op) - || Self::is_token_a_set_op(pattern.inner.op) - || Self::is_token_a_comparison_op(*op) - || Self::is_token_a_comparison_op(pattern.inner.op) - || lhs.value_type() != ValueType::Vector - || rhs.value_type() != ValueType::Vector - || pattern.repeated.value_type() != ValueType::Vector - || pattern.other.value_type() != ValueType::Vector - { - return Ok(None); - } - - let repeated_input = self - .prom_expr_to_plan(pattern.repeated, query_engine_state) - .await?; - let repeated_field_columns = self.ctx.field_columns.clone(); - let repeated_time_index_column = self.ctx.time_index_column.clone(); - let mut repeated_table_ref = self - .table_ref() - .unwrap_or_else(|_| TableReference::bare("")); - let repeated_context = self.ctx.clone(); - - let other_input = self - .prom_expr_to_plan(pattern.other, query_engine_state) - .await?; - let other_field_columns = self.ctx.field_columns.clone(); - let other_time_index_column = self.ctx.time_index_column.clone(); - let mut other_table_ref = self - .table_ref() - .unwrap_or_else(|_| TableReference::bare("")); - let other_context = self.ctx.clone(); - - if repeated_table_ref == other_table_ref { - repeated_table_ref = TableReference::bare("lhs"); - other_table_ref = TableReference::bare("rhs"); - if self.ctx.tag_columns.is_empty() { - self.ctx = repeated_context.clone(); - self.ctx.table_name = Some("lhs".to_string()); - } else { - self.ctx.table_name = Some("rhs".to_string()); - } - } - - let field_columns = repeated_field_columns - .iter() - .zip(other_field_columns.iter()) - .collect::>(); - // The collapsed fast path must preserve the same zipped-field semantics as the - // original two-step plan: only the shared prefix of value columns participates. - self.ctx.field_columns = field_columns - .iter() - .map(|(repeated_col_name, _)| (*repeated_col_name).clone()) - .collect(); - let mut field_columns = field_columns.into_iter(); - - let join_plan = self.join_on_non_field_columns( - repeated_input, - other_input, - repeated_table_ref.clone(), - other_table_ref.clone(), - repeated_time_index_column, - other_time_index_column, - repeated_context.tag_columns.is_empty() || other_context.tag_columns.is_empty(), - true, - &None, - )?; - let join_plan_schema = join_plan.schema().clone(); - - let field_expr_builder = |_: &String| { - let (repeated_col_name, other_col_name) = field_columns.next().unwrap(); - let repeated_col = join_plan_schema - .qualified_field_with_name(Some(&repeated_table_ref), repeated_col_name) - .context(DataFusionPlanningSnafu)? - .into(); - let other_col = join_plan_schema - .qualified_field_with_name(Some(&other_table_ref), other_col_name) - .context(DataFusionPlanningSnafu)? - .into(); - let repeated_expr = DfExpr::Column(repeated_col); - let other_expr = DfExpr::Column(other_col); - - let inner_expr_builder = Self::prom_token_to_binary_expr_builder(pattern.inner.op)?; - let inner_expr = if pattern.inner_repeated_on_left { - inner_expr_builder(repeated_expr.clone(), other_expr)? - } else { - inner_expr_builder(other_expr, repeated_expr.clone())? - }; - - let outer_expr_builder = Self::prom_token_to_binary_expr_builder(*op)?; - if pattern.outer_inner_on_left { - outer_expr_builder(inner_expr, repeated_expr) - } else { - outer_expr_builder(repeated_expr, inner_expr) - } - }; - - self.projection_for_each_field_column(join_plan, field_expr_builder) - .map(Some) - } - fn project_binary_join_side( &mut self, input: LogicalPlan, @@ -3567,69 +3433,6 @@ impl PromPlanner { .any(|field| field.name() == DATA_SCHEMA_TSID_COLUMN_NAME) } - fn prom_expr_eq(left: &PromExpr, right: &PromExpr) -> bool { - Self::strip_paren_expr(left).to_string() == Self::strip_paren_expr(right).to_string() - } - - fn strip_paren_expr(mut expr: &PromExpr) -> &PromExpr { - while let PromExpr::Paren(paren) = expr { - expr = paren.expr.as_ref(); - } - expr - } - - fn find_repeated_binary_pattern<'a>( - binary_expr: &'a PromBinaryExpr, - ) -> Option> { - let PromBinaryExpr { lhs, rhs, .. } = binary_expr; - let lhs = Self::strip_paren_expr(lhs.as_ref()); - let rhs = Self::strip_paren_expr(rhs.as_ref()); - - if let PromExpr::Binary(inner) = lhs { - if Self::prom_expr_eq(inner.lhs.as_ref(), rhs) { - return Some(RepeatedBinaryPattern { - repeated: rhs, - other: inner.rhs.as_ref(), - inner, - inner_repeated_on_left: true, - outer_inner_on_left: true, - }); - } - if Self::prom_expr_eq(inner.rhs.as_ref(), rhs) { - return Some(RepeatedBinaryPattern { - repeated: rhs, - other: inner.lhs.as_ref(), - inner, - inner_repeated_on_left: false, - outer_inner_on_left: true, - }); - } - } - - if let PromExpr::Binary(inner) = rhs { - if Self::prom_expr_eq(inner.lhs.as_ref(), lhs) { - return Some(RepeatedBinaryPattern { - repeated: lhs, - other: inner.rhs.as_ref(), - inner, - inner_repeated_on_left: true, - outer_inner_on_left: false, - }); - } - if Self::prom_expr_eq(inner.rhs.as_ref(), lhs) { - return Some(RepeatedBinaryPattern { - repeated: lhs, - other: inner.lhs.as_ref(), - inner, - inner_repeated_on_left: false, - outer_inner_on_left: false, - }); - } - } - - None - } - /// Build a inner join on time index column and tag columns to concat two logical plans. /// When `only_join_time_index == true` we only join on the time index, because these two plan may not have the same tag columns #[allow(clippy::too_many_arguments)] @@ -5102,7 +4905,7 @@ mod test { } #[tokio::test] - async fn repeated_tsid_binary_operand_reuses_single_join() { + async fn repeated_tsid_binary_operand_keeps_tsid_join_keys() { let prom_expr = parser::parse("((some_metric - some_alt_metric) / some_metric) * 100").unwrap(); let eval_stmt = EvalStmt { @@ -5133,12 +4936,12 @@ mod test { .unwrap(); let plan_str = plan.display_indent_schema().to_string(); - assert_eq!(plan_str.matches("__tsid =").count(), 1, "{plan_str}"); + assert_eq!(plan_str.matches("__tsid =").count(), 2, "{plan_str}"); assert!(!plan_str.contains("tag_0 ="), "{plan_str}"); } #[tokio::test] - async fn repeated_tsid_binary_operand_uses_shorter_field_side() { + async fn repeated_tsid_binary_operand_keeps_shorter_field_side() { let prom_expr = parser::parse("((two_field_metric - one_field_metric) / one_field_metric) * 100") .unwrap(); @@ -5190,6 +4993,9 @@ mod test { }) .count(); assert_eq!(value_columns, 1, "{field_names:?}"); + let plan_str = plan.display_indent_schema().to_string(); + assert_eq!(plan_str.matches("__tsid =").count(), 2, "{plan_str}"); + assert!(!plan_str.contains("tag_0 ="), "{plan_str}"); } #[tokio::test] diff --git a/src/query/src/query_engine/state.rs b/src/query/src/query_engine/state.rs index f696c8b53e..d216d99872 100644 --- a/src/query/src/query_engine/state.rs +++ b/src/query/src/query_engine/state.rs @@ -62,6 +62,7 @@ use crate::optimizer::ExtensionAnalyzerRule; use crate::optimizer::constant_term::MatchesConstantTermOptimizer; use crate::optimizer::count_nest_aggr::CountNestAggrRule; use crate::optimizer::count_wildcard::CountWildcardToTimeIndexRule; +use crate::optimizer::join_reduce::JoinReduceRule; use crate::optimizer::parallelize_scan::ParallelizeScan; use crate::optimizer::pass_distribution::PassDistribution; use crate::optimizer::remove_duplicate::RemoveDuplicate; @@ -173,6 +174,10 @@ impl QueryEngineState { analyzer.rules.push(Arc::new(FixStateUdafOrderingAnalyzer)); let mut optimizer = Optimizer::new(); + let join_reduce_insert_at = optimizer.rules.len().saturating_sub(1); + optimizer + .rules + .insert(join_reduce_insert_at, Arc::new(JoinReduceRule)); optimizer.rules.push(Arc::new(ScanHintRule)); // add physical optimizer diff --git a/tests/cases/standalone/common/promql/set_operation.result b/tests/cases/standalone/common/promql/set_operation.result index 83af6fc875..5bb1d04d8b 100644 --- a/tests/cases/standalone/common/promql/set_operation.result +++ b/tests/cases/standalone/common/promql/set_operation.result @@ -616,14 +616,14 @@ Affected Rows: 4 -- SQLNESS SORT_RESULT 3 1 tql eval (3, 4, '1s') cache_hit / (cache_miss + cache_hit); -+-------+---------------------+---------------------------------------------------------------------------------+ -| job | ts | cache_hit.greptime_value / cache_miss.greptime_value + cache_hit.greptime_value | -+-------+---------------------+---------------------------------------------------------------------------------+ -| read | 1970-01-01T00:00:03 | 0.5 | -| read | 1970-01-01T00:00:04 | 0.75 | -| write | 1970-01-01T00:00:03 | 0.5 | -| write | 1970-01-01T00:00:04 | 0.6666666666666666 | -+-------+---------------------+---------------------------------------------------------------------------------+ ++-------+---------------------+-------------------------------------------------------------------------------+ +| job | ts | lhs.greptime_value / rhs.cache_miss.greptime_value + cache_hit.greptime_value | ++-------+---------------------+-------------------------------------------------------------------------------+ +| read | 1970-01-01T00:00:03 | 0.5 | +| read | 1970-01-01T00:00:04 | 0.75 | +| write | 1970-01-01T00:00:03 | 0.5 | +| write | 1970-01-01T00:00:04 | 0.6666666666666666 | ++-------+---------------------+-------------------------------------------------------------------------------+ drop table cache_hit; @@ -672,14 +672,14 @@ Affected Rows: 4 -- SQLNESS SORT_RESULT 3 1 tql eval (3, 4, '1s') cache_hit_with_null_label / (cache_miss_with_null_label + cache_hit_with_null_label); -+-------+------------+---------------------+---------------------------------------------------------------------------------------------------------------------------------+ -| job | null_label | ts | cache_hit_with_null_label.greptime_value / cache_miss_with_null_label.greptime_value + cache_hit_with_null_label.greptime_value | -+-------+------------+---------------------+---------------------------------------------------------------------------------------------------------------------------------+ -| read | | 1970-01-01T00:00:03 | 0.5 | -| read | | 1970-01-01T00:00:04 | 0.75 | -| write | | 1970-01-01T00:00:03 | 0.5 | -| write | | 1970-01-01T00:00:04 | 0.6666666666666666 | -+-------+------------+---------------------+---------------------------------------------------------------------------------------------------------------------------------+ ++-------+------------+---------------------+---------------------------------------------------------------------------------------------------------------+ +| job | null_label | ts | lhs.greptime_value / rhs.cache_miss_with_null_label.greptime_value + cache_hit_with_null_label.greptime_value | ++-------+------------+---------------------+---------------------------------------------------------------------------------------------------------------+ +| read | | 1970-01-01T00:00:03 | 0.5 | +| read | | 1970-01-01T00:00:04 | 0.75 | +| write | | 1970-01-01T00:00:03 | 0.5 | +| write | | 1970-01-01T00:00:04 | 0.6666666666666666 | ++-------+------------+---------------------+---------------------------------------------------------------------------------------------------------------+ -- SQLNESS SORT_RESULT 3 1 tql eval (3, 4, '1s') cache_hit_with_null_label / ignoring(null_label) (cache_miss_with_null_label + ignoring(null_label) cache_hit_with_null_label); diff --git a/tests/cases/standalone/common/tql-explain-analyze/explain.result b/tests/cases/standalone/common/tql-explain-analyze/explain.result index 1e4cf18b40..75f38d22e4 100644 --- a/tests/cases/standalone/common/tql-explain-analyze/explain.result +++ b/tests/cases/standalone/common/tql-explain-analyze/explain.result @@ -123,6 +123,7 @@ TQL EXPLAIN VERBOSE (0, 10, '5s') test; | logical_plan after common_sub_expression_eliminate_| SAME TEXT AS ABOVE_| | logical_plan after extract_leaf_expressions_| SAME TEXT AS ABOVE_| | logical_plan after push_down_leaf_projections_| SAME TEXT AS ABOVE_| +| logical_plan after JoinReduceRule_| SAME TEXT AS ABOVE_| | logical_plan after optimize_projections_| MergeScan [is_placeholder=false, remote_input=[_| |_| PromInstantManipulate: range=[0..10000], lookback=[300000], interval=[5000], time index=[j]_| |_|_PromSeriesDivide: tags=["k"]_| @@ -154,6 +155,7 @@ TQL EXPLAIN VERBOSE (0, 10, '5s') test; | logical_plan after common_sub_expression_eliminate_| SAME TEXT AS ABOVE_| | logical_plan after extract_leaf_expressions_| SAME TEXT AS ABOVE_| | logical_plan after push_down_leaf_projections_| SAME TEXT AS ABOVE_| +| logical_plan after JoinReduceRule_| SAME TEXT AS ABOVE_| | logical_plan after optimize_projections_| SAME TEXT AS ABOVE_| | logical_plan after ScanHintRule_| SAME TEXT AS ABOVE_| | logical_plan_| MergeScan [is_placeholder=false, remote_input=[_| @@ -267,6 +269,7 @@ TQL EXPLAIN VERBOSE (0, 10, '5s') test AS series; | logical_plan after common_sub_expression_eliminate_| SAME TEXT AS ABOVE_| | logical_plan after extract_leaf_expressions_| SAME TEXT AS ABOVE_| | logical_plan after push_down_leaf_projections_| SAME TEXT AS ABOVE_| +| logical_plan after JoinReduceRule_| SAME TEXT AS ABOVE_| | logical_plan after optimize_projections_| MergeScan [is_placeholder=false, remote_input=[_| |_| Projection: test.i AS series, test.k, test.j_| |_|_PromInstantManipulate: range=[0..10000], lookback=[300000], interval=[5000], time index=[j]_| @@ -299,6 +302,7 @@ TQL EXPLAIN VERBOSE (0, 10, '5s') test AS series; | logical_plan after common_sub_expression_eliminate_| SAME TEXT AS ABOVE_| | logical_plan after extract_leaf_expressions_| SAME TEXT AS ABOVE_| | logical_plan after push_down_leaf_projections_| SAME TEXT AS ABOVE_| +| logical_plan after JoinReduceRule_| SAME TEXT AS ABOVE_| | logical_plan after optimize_projections_| SAME TEXT AS ABOVE_| | logical_plan after ScanHintRule_| SAME TEXT AS ABOVE_| | logical_plan_| MergeScan [is_placeholder=false, remote_input=[_|