From 6659a932f4371287b87201d28c29df517029783e Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Sun, 29 Mar 2026 08:44:05 +0800 Subject: [PATCH] perf: preserve tsid distribution through merge scans --- src/query/src/dist_plan/analyzer.rs | 15 + src/query/src/dist_plan/merge_scan.rs | 464 ++++++++++++++++-- src/query/src/dist_plan/planner.rs | 78 ++- src/query/src/optimizer/pass_distribution.rs | 202 +++++++- .../distributed/flow-tql/tsid_on_phy.result | 52 +- .../tql-explain-analyze/tsid_column.result | 68 +-- 6 files changed, 772 insertions(+), 107 deletions(-) diff --git a/src/query/src/dist_plan/analyzer.rs b/src/query/src/dist_plan/analyzer.rs index 6eb8b5eac6..084a29cb5e 100644 --- a/src/query/src/dist_plan/analyzer.rs +++ b/src/query/src/dist_plan/analyzer.rs @@ -27,6 +27,7 @@ use datafusion_expr::utils::expr_to_columns; use datafusion_expr::{Expr, LogicalPlan, LogicalPlanBuilder, Subquery, col as col_fn}; use datafusion_optimizer::analyzer::AnalyzerRule; use promql::extension_plan::SeriesDivide; +use store_api::metric_engine_consts::DATA_SCHEMA_TSID_COLUMN_NAME; use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan}; use table::metadata::TableType; use table::table::adapter::DfTableProviderAdapter; @@ -509,6 +510,20 @@ impl PlanRewriter { .map(|index| schema.column_name_by_index(index).to_string()) .collect::>(); + // Metric engine scans can project the internal `__tsid` column even though it is + // not part of the logical table schema. Equal `__tsid` values always belong to the + // same series, so carry it as an additional partition key when it is available. + if plan + .schema() + .index_of_column_by_name(None, DATA_SCHEMA_TSID_COLUMN_NAME) + .is_some() + && !partition_cols + .iter() + .any(|col| col == DATA_SCHEMA_TSID_COLUMN_NAME) + { + partition_cols.push(DATA_SCHEMA_TSID_COLUMN_NAME.to_string()); + } + let partition_rules = table.partition_rules(); let exist_phy_part_cols_not_in_logical_table = partition_rules .map(|r| !r.extra_phy_cols_not_in_logical_table.is_empty()) diff --git a/src/query/src/dist_plan/merge_scan.rs b/src/query/src/dist_plan/merge_scan.rs index 2d32ce16b3..a2df42b4ed 100644 --- a/src/query/src/dist_plan/merge_scan.rs +++ b/src/query/src/dist_plan/merge_scan.rs @@ -21,6 +21,7 @@ use arrow_schema::{Schema as ArrowSchema, SchemaRef as ArrowSchemaRef, SortOptio use async_stream::stream; use common_catalog::parse_catalog_and_schema_from_db_string; use common_plugins::GREPTIME_EXEC_READ_COST; +use common_query::prelude::greptime_timestamp; use common_query::request::QueryRequest; use common_recordbatch::adapter::RecordBatchMetrics; use common_telemetry::tracing_context::TracingContext; @@ -37,12 +38,15 @@ use datafusion::physical_plan::{ use datafusion_common::{Column as ColumnExpr, DataFusionError, Result}; use datafusion_expr::{Expr, Extension, LogicalPlan, UserDefinedLogicalNodeCore}; use datafusion_physical_expr::expressions::Column; -use datafusion_physical_expr::{Distribution, EquivalenceProperties, PhysicalSortExpr}; +use datafusion_physical_expr::{ + Distribution, EquivalenceProperties, LexOrdering, PhysicalSortExpr, +}; use futures_util::StreamExt; use greptime_proto::v1::region::RegionRequestHeader; use meter_core::data::ReadItem; use meter_macros::read_meter; use session::context::QueryContextRef; +use store_api::metric_engine_consts::DATA_SCHEMA_TSID_COLUMN_NAME; use store_api::storage::RegionId; use table::table_name::TableName; use tokio::time::Instant; @@ -160,12 +164,168 @@ impl std::fmt::Debug for MergeScanExec { } impl MergeScanExec { + fn default_partition_exprs( + session_state: &SessionState, + plan: &LogicalPlan, + partition_cols: &AliasMapping, + ) -> Vec> { + partition_cols + .iter() + .filter_map(|col| Self::partition_expr_for_alias(session_state, plan, col.1.first())) + .collect() + } + + fn partition_expr_for_alias( + session_state: &SessionState, + plan: &LogicalPlan, + column: Option<&ColumnExpr>, + ) -> Option> { + let column = column?; + session_state + .create_physical_expr( + Expr::Column(ColumnExpr::new_unqualified(column.name().to_string())), + plan.schema(), + ) + .ok() + } + + fn prefer_tsid_partition_exprs( + session_state: &SessionState, + plan: &LogicalPlan, + partition_cols: &AliasMapping, + ) -> Option>> { + if !Self::is_promql_tsid_ordered_plan(plan) { + return None; + } + + let tsid_aliases = partition_cols.get(DATA_SCHEMA_TSID_COLUMN_NAME)?; + let tsid_expr = Self::partition_expr_for_alias(session_state, plan, tsid_aliases.first())?; + Some(vec![tsid_expr]) + } + + fn is_promql_tsid_ordered_plan(plan: &LogicalPlan) -> bool { + match plan { + LogicalPlan::Sort(sort) => { + if sort.expr.len() != 2 { + return false; + } + + let [tsid_sort, time_sort] = sort.expr.as_slice() else { + return false; + }; + + Self::is_ascending_nulls_first_sort(tsid_sort, DATA_SCHEMA_TSID_COLUMN_NAME) + && Self::is_ascending_nulls_first_sort(time_sort, greptime_timestamp()) + } + LogicalPlan::Projection(projection) => { + Self::is_promql_tsid_ordered_plan(projection.input.as_ref()) + } + LogicalPlan::Filter(filter) => Self::is_promql_tsid_ordered_plan(filter.input.as_ref()), + LogicalPlan::SubqueryAlias(alias) => { + Self::is_promql_tsid_ordered_plan(alias.input.as_ref()) + } + LogicalPlan::Extension(extension) + if matches!( + extension.node.name(), + "PromInstantManipulate" + | "PromSeriesDivide" + | "PromNormalize" + | "PromScalarCalculate" + | "PromRangeManipulate" + ) => + { + extension + .node + .inputs() + .first() + .is_some_and(|input| Self::is_promql_tsid_ordered_plan(input)) + } + _ => false, + } + } + + fn is_ascending_nulls_first_sort( + sort_expr: &datafusion_expr::expr::Sort, + column: &str, + ) -> bool { + sort_expr.asc + && sort_expr.nulls_first + && matches!( + sort_expr.expr.try_as_col(), + Some(col) if col.name == column + ) + } + + pub(crate) fn logical_sort_ordering( + session_state: &SessionState, + plan: &LogicalPlan, + ) -> Result> { + if Self::is_promql_tsid_ordered_plan(plan) { + let tsid_expr = session_state.create_physical_expr( + Expr::Column(ColumnExpr::new_unqualified( + DATA_SCHEMA_TSID_COLUMN_NAME.to_string(), + )), + plan.schema(), + )?; + let time_expr = session_state.create_physical_expr( + Expr::Column(ColumnExpr::new_unqualified( + greptime_timestamp().to_string(), + )), + plan.schema(), + )?; + return Ok(LexOrdering::new(vec![ + PhysicalSortExpr::new( + tsid_expr, + SortOptions { + descending: false, + nulls_first: true, + }, + ), + PhysicalSortExpr::new( + time_expr, + SortOptions { + descending: false, + nulls_first: true, + }, + ), + ])); + } + + let LogicalPlan::Sort(sort) = plan else { + return Ok(None); + }; + + let lex_ordering = LexOrdering::new( + sort.expr + .iter() + .map(|sort_expr| { + let physical_expr = session_state + .create_physical_expr(sort_expr.expr.clone(), plan.schema())?; + Ok(PhysicalSortExpr::new( + physical_expr, + SortOptions { + descending: !sort_expr.asc, + nulls_first: sort_expr.nulls_first, + }, + )) + }) + .collect::>>()?, + ) + .ok_or_else(|| { + DataFusionError::Internal(format!( + "Sort plan must contain at least one expression: {plan}" + )) + })?; + Ok(Some(lex_ordering)) + } + #[allow(clippy::too_many_arguments)] pub fn new( session_state: &SessionState, table: TableName, regions: Vec, plan: LogicalPlan, + remote_orderings: Vec, arrow_schema: &ArrowSchema, region_query_handler: RegionQueryHandlerRef, query_ctx: QueryContextRef, @@ -184,46 +344,23 @@ impl MergeScanExec { // break the ordering on merging (of MergeScan). // // Otherwise, we need to use the default ordering. - let eq_properties = if let LogicalPlan::Sort(sort) = &plan - && target_partition >= regions.len() - { - let lex_ordering = sort - .expr - .iter() - .map(|sort_expr| { - let physical_expr = session_state - .create_physical_expr(sort_expr.expr.clone(), plan.schema())?; - Ok(PhysicalSortExpr::new( - physical_expr, - SortOptions { - descending: !sort_expr.asc, - nulls_first: sort_expr.nulls_first, - }, - )) - }) - .collect::>>()?; - EquivalenceProperties::new_with_orderings(arrow_schema.clone(), vec![lex_ordering]) + let eq_properties = if target_partition >= regions.len() { + if !remote_orderings.is_empty() { + EquivalenceProperties::new_with_orderings(arrow_schema.clone(), remote_orderings) + } else if let Some(ordering) = Self::logical_sort_ordering(session_state, &plan)? { + EquivalenceProperties::new_with_orderings(arrow_schema.clone(), vec![ordering]) + } else { + EquivalenceProperties::new(arrow_schema.clone()) + } } else { EquivalenceProperties::new(arrow_schema.clone()) }; - let partition_exprs = partition_cols - .iter() - .filter_map(|col| { - if let Some(first_alias) = col.1.first() { - session_state - .create_physical_expr( - Expr::Column(ColumnExpr::new_unqualified( - first_alias.name().to_string(), - )), - plan.schema(), - ) - .ok() - } else { - None - } - }) - .collect(); + let partition_exprs = + Self::prefer_tsid_partition_exprs(session_state, &plan, &partition_cols) + .unwrap_or_else(|| { + Self::default_partition_exprs(session_state, &plan, &partition_cols) + }); let partitioning = Partitioning::Hash(partition_exprs, target_partition); let properties = Arc::new(PlanProperties::new( @@ -421,31 +558,49 @@ impl MergeScanExec { return None; }; - if let Partitioning::Hash(curr_dist, _) = &self.properties.partitioning - && curr_dist == &hash_exprs - { - // No need to change the distribution - return None; - } - let all_partition_col_aliases: HashSet<_> = self .partition_cols .values() .flat_map(|aliases| aliases.iter().map(|c| c.name())) .collect(); - let mut overlaps = vec![]; - for expr in &hash_exprs { - if let Some(col_expr) = expr.as_any().downcast_ref::() - && all_partition_col_aliases.contains(col_expr.name()) - { - overlaps.push(expr.clone()); - } + let mut overlaps = hash_exprs + .iter() + .filter(|expr| { + expr.as_any() + .downcast_ref::() + .is_some_and(|col_expr| all_partition_col_aliases.contains(col_expr.name())) + }) + .cloned() + .collect::>(); + + // Metric-engine scans can satisfy any hash distribution that includes `__tsid`. + // Equal requested keys must also share the same `__tsid`, and equal `__tsid` values are + // guaranteed to stay co-located across MergeScan partitions. Advertise the full requested + // distribution so EnforceDistribution can skip redundant reshuffles. + if self + .arrow_schema + .column_with_name(DATA_SCHEMA_TSID_COLUMN_NAME) + .is_some() + && hash_exprs.iter().any(|expr| { + expr.as_any() + .downcast_ref::() + .is_some_and(|col_expr| col_expr.name() == DATA_SCHEMA_TSID_COLUMN_NAME) + }) + { + overlaps = hash_exprs.clone(); } if overlaps.is_empty() { return None; } + if let Partitioning::Hash(curr_dist, _) = &self.properties.partitioning + && curr_dist == &overlaps + { + // No need to change the distribution. + return None; + } + Some(Self { table: self.table.clone(), regions: self.regions.clone(), @@ -494,6 +649,213 @@ impl MergeScanExec { } } +#[cfg(test)] +mod tests { + use std::collections::{BTreeMap, BTreeSet}; + + use arrow_schema::{DataType, Field, Schema, TimeUnit}; + use async_trait::async_trait; + use common_query::request::QueryRequest; + use common_recordbatch::SendableRecordBatchStream; + use datafusion::execution::SessionStateBuilder; + use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType}; + use datafusion_common::ToDFSchema; + use datafusion_expr::{EmptyRelation, LogicalPlan, LogicalPlanBuilder, col}; + use datafusion_physical_expr::{EquivalenceProperties, PhysicalExpr}; + use session::ReadPreference; + use session::context::QueryContext; + + use super::*; + use crate::error::Result as QueryResult; + use crate::region_query::RegionQueryHandler; + + struct NoopRegionQueryHandler; + + #[async_trait] + impl RegionQueryHandler for NoopRegionQueryHandler { + async fn do_get( + &self, + _read_preference: ReadPreference, + _request: QueryRequest, + ) -> QueryResult { + unreachable!("merge scan distribution tests should not execute remote queries") + } + } + + #[test] + fn try_with_new_distribution_satisfies_tsid_hash_requirements() { + let merge_scan = test_merge_scan_exec( + BTreeMap::from([ + ( + "host".to_string(), + BTreeSet::from([ColumnExpr::from_name("host")]), + ), + ( + DATA_SCHEMA_TSID_COLUMN_NAME.to_string(), + BTreeSet::from([ColumnExpr::from_name(DATA_SCHEMA_TSID_COLUMN_NAME)]), + ), + ]), + vec![ + partition_column("host", 0), + partition_column(DATA_SCHEMA_TSID_COLUMN_NAME, 1), + ], + ); + + let optimized = merge_scan + .try_with_new_distribution(Distribution::HashPartitioned(vec![ + partition_column(DATA_SCHEMA_TSID_COLUMN_NAME, 1), + partition_column("greptime_timestamp", 2), + ])) + .unwrap(); + + let Partitioning::Hash(ref exprs, partition_count) = optimized.properties.partitioning + else { + panic!("expected hash partitioning"); + }; + assert_eq!(partition_count, 32); + assert_eq!( + column_names(&exprs), + vec![DATA_SCHEMA_TSID_COLUMN_NAME, "greptime_timestamp"] + ); + } + + #[test] + fn try_with_new_distribution_keeps_regular_partition_overlap() { + let merge_scan = test_merge_scan_exec( + BTreeMap::from([( + "host".to_string(), + BTreeSet::from([ColumnExpr::from_name("host")]), + )]), + vec![partition_column("greptime_timestamp", 2)], + ); + + let optimized = merge_scan + .try_with_new_distribution(Distribution::HashPartitioned(vec![ + partition_column("host", 0), + partition_column("greptime_timestamp", 2), + ])) + .unwrap(); + + let Partitioning::Hash(ref exprs, partition_count) = optimized.properties.partitioning + else { + panic!("expected hash partitioning"); + }; + assert_eq!(partition_count, 32); + assert_eq!(column_names(&exprs), vec!["host"]); + } + + #[test] + fn new_prefers_tsid_partitioning_for_promql_tsid_sort() { + let session_state = SessionStateBuilder::new().with_default_features().build(); + let schema = Arc::new(Schema::new(vec![ + Field::new("host", DataType::Utf8, true), + Field::new(DATA_SCHEMA_TSID_COLUMN_NAME, DataType::UInt64, false), + Field::new( + greptime_timestamp(), + DataType::Timestamp(TimeUnit::Millisecond, None), + false, + ), + Field::new("greptime_value", DataType::Float64, true), + ])); + let partition_cols = BTreeMap::from([ + ( + "host".to_string(), + BTreeSet::from([ColumnExpr::from_name("host")]), + ), + ( + DATA_SCHEMA_TSID_COLUMN_NAME.to_string(), + BTreeSet::from([ColumnExpr::from_name(DATA_SCHEMA_TSID_COLUMN_NAME)]), + ), + ]); + let plan = promql_tsid_sorted_plan(schema.clone()); + + let merge_scan = MergeScanExec::new( + &session_state, + TableName::new("greptime", "public", "test"), + vec![RegionId::new(1, 0), RegionId::new(1, 1)], + plan, + vec![], + schema.as_ref(), + Arc::new(NoopRegionQueryHandler), + QueryContext::arc(), + 32, + partition_cols, + ) + .unwrap(); + + let Partitioning::Hash(ref exprs, partition_count) = merge_scan.properties.partitioning + else { + panic!("expected hash partitioning"); + }; + assert_eq!(partition_count, 32); + assert_eq!(column_names(exprs), vec![DATA_SCHEMA_TSID_COLUMN_NAME]); + } + + fn test_merge_scan_exec( + partition_cols: AliasMapping, + current_partition_exprs: Vec>, + ) -> MergeScanExec { + let schema = Arc::new(Schema::new(vec![ + Field::new("host", DataType::Utf8, true), + Field::new(DATA_SCHEMA_TSID_COLUMN_NAME, DataType::UInt64, false), + Field::new( + "greptime_timestamp", + DataType::Timestamp(TimeUnit::Millisecond, None), + false, + ), + Field::new("greptime_value", DataType::Float64, true), + ])); + let plan = LogicalPlanBuilder::empty(false).build().unwrap(); + + MergeScanExec { + table: TableName::new("greptime", "public", "test"), + regions: vec![RegionId::new(1, 0), RegionId::new(1, 1)], + plan, + arrow_schema: schema.clone(), + region_query_handler: Arc::new(NoopRegionQueryHandler), + metric: ExecutionPlanMetricsSet::new(), + properties: Arc::new(PlanProperties::new( + EquivalenceProperties::new(schema), + Partitioning::Hash(current_partition_exprs, 32), + EmissionType::Incremental, + Boundedness::Bounded, + )), + sub_stage_metrics: Arc::default(), + partition_metrics: Arc::default(), + query_ctx: QueryContext::arc(), + target_partition: 32, + partition_cols, + } + } + + fn partition_column(name: &str, index: usize) -> Arc { + Arc::new(Column::new(name, index)) + } + + fn column_names(exprs: &[Arc]) -> Vec<&str> { + exprs + .iter() + .map(|expr| expr.as_any().downcast_ref::().unwrap().name()) + .collect() + } + + fn promql_tsid_sorted_plan(schema: Arc) -> LogicalPlan { + let input = LogicalPlan::EmptyRelation(EmptyRelation { + produce_one_row: false, + schema: schema.to_dfschema_ref().unwrap(), + }); + + LogicalPlanBuilder::from(input) + .sort(vec![ + col(DATA_SCHEMA_TSID_COLUMN_NAME).sort(true, true), + col(greptime_timestamp()).sort(true, true), + ]) + .unwrap() + .build() + .unwrap() + } +} + /// Metrics for a region of a partition. #[derive(Debug, Clone)] struct RegionMetrics { diff --git a/src/query/src/dist_plan/planner.rs b/src/query/src/dist_plan/planner.rs index 5864ea7bd0..d01ccc9800 100644 --- a/src/query/src/dist_plan/planner.rs +++ b/src/query/src/dist_plan/planner.rs @@ -20,17 +20,22 @@ use ahash::HashMap; use async_trait::async_trait; use catalog::CatalogManagerRef; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; +use common_query::prelude::greptime_timestamp; +use datafusion::arrow::compute::SortOptions; use datafusion::common::Result; use datafusion::datasource::DefaultTableSource; use datafusion::execution::context::SessionState; -use datafusion::physical_plan::ExecutionPlan; +use datafusion::physical_plan::{ExecutionPlan, ExecutionPlanProperties}; use datafusion::physical_planner::{ExtensionPlanner, PhysicalPlanner}; use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion, TreeNodeVisitor}; use datafusion_common::{DataFusionError, TableReference}; use datafusion_expr::{LogicalPlan, UserDefinedLogicalNode}; +use datafusion_physical_expr::expressions::Column as PhysicalColumn; +use datafusion_physical_expr::{LexOrdering, PhysicalSortExpr}; use partition::manager::{PartitionRuleManagerRef, create_partitions_from_region_routes}; use session::context::QueryContext; use snafu::{OptionExt, ResultExt}; +use store_api::metric_engine_consts::DATA_SCHEMA_TSID_COLUMN_NAME; use store_api::storage::RegionId; pub use table::metadata::TableType; use table::table::adapter::DfTableProviderAdapter; @@ -162,6 +167,51 @@ impl ExtensionPlanner for DistExtensionPlanner { return fallback(optimized_plan).await; }; + let remote_orderings = if let Ok(optimized_remote_plan) = + self.optimize_input_logical_plan(session_state, input_plan) + { + planner + .create_physical_plan(&optimized_remote_plan, session_state) + .await + .ok() + .map(|plan| { + let mut remote_orderings = Vec::new(); + if let Some(preferred_ordering) = + tsid_time_ordering(plan.as_ref()).filter(|ordering| { + plan.equivalence_properties() + .ordering_satisfy(ordering.clone()) + .unwrap_or(false) + }) + { + remote_orderings.push(preferred_ordering); + } + + if let Some(preferred_ordering) = + MergeScanExec::logical_sort_ordering(session_state, input_plan) + .ok() + .flatten() + .filter(|ordering| { + plan.equivalence_properties() + .ordering_satisfy(ordering.clone()) + .unwrap_or(false) + }) + { + remote_orderings.push(preferred_ordering); + } + + for ordering in plan.equivalence_properties().oeq_class().iter().cloned() { + if !remote_orderings.contains(&ordering) { + remote_orderings.push(ordering); + } + } + + remote_orderings + }) + .unwrap_or_default() + } else { + Vec::new() + }; + // TODO(ruihang): generate different execution plans for different variant merge operation let schema = optimized_plan.schema().as_arrow(); let query_ctx = session_state @@ -173,6 +223,7 @@ impl ExtensionPlanner for DistExtensionPlanner { table_name, regions, input_plan.clone(), + remote_orderings, schema, self.region_query_handler.clone(), query_ctx, @@ -183,6 +234,31 @@ impl ExtensionPlanner for DistExtensionPlanner { } } +fn tsid_time_ordering(plan: &dyn ExecutionPlan) -> Option { + let schema = plan.schema(); + let tsid_index = schema.index_of(DATA_SCHEMA_TSID_COLUMN_NAME).ok()?; + let time_index = schema.index_of(greptime_timestamp()).ok()?; + LexOrdering::new(vec![ + PhysicalSortExpr::new( + Arc::new(PhysicalColumn::new( + DATA_SCHEMA_TSID_COLUMN_NAME, + tsid_index, + )), + SortOptions { + descending: false, + nulls_first: true, + }, + ), + PhysicalSortExpr::new( + Arc::new(PhysicalColumn::new(greptime_timestamp(), time_index)), + SortOptions { + descending: false, + nulls_first: true, + }, + ), + ]) +} + impl DistExtensionPlanner { /// Extract fully resolved table name from logical plan fn extract_full_table_name(plan: &LogicalPlan) -> Result> { diff --git a/src/query/src/optimizer/pass_distribution.rs b/src/query/src/optimizer/pass_distribution.rs index 7ce2ffd752..769e260519 100644 --- a/src/query/src/optimizer/pass_distribution.rs +++ b/src/query/src/optimizer/pass_distribution.rs @@ -17,8 +17,10 @@ use std::sync::Arc; use datafusion::config::ConfigOptions; use datafusion::physical_optimizer::PhysicalOptimizerRule; use datafusion::physical_plan::ExecutionPlan; +use datafusion::physical_plan::projection::ProjectionExec; use datafusion_common::Result as DfResult; use datafusion_physical_expr::Distribution; +use datafusion_physical_expr::utils::map_columns_before_projection; use crate::dist_plan::MergeScanExec; @@ -83,7 +85,9 @@ impl PassDistribution { let mut new_children = Vec::with_capacity(children.len()); for (idx, child) in children.into_iter().enumerate() { let child_req = match required.get(idx) { - Some(Distribution::UnspecifiedDistribution) => None, + Some(Distribution::UnspecifiedDistribution) => { + Self::propagate_unspecified_child_requirement(plan.as_ref(), idx, ¤t_req) + } None => current_req.clone(), Some(req) => Some(req.clone()), }; @@ -103,4 +107,200 @@ impl PassDistribution { plan.with_new_children(new_children) } } + + fn propagate_unspecified_child_requirement( + plan: &dyn ExecutionPlan, + idx: usize, + current_req: &Option, + ) -> Option { + if idx != 0 { + return None; + } + + let Some(Distribution::HashPartitioned(required_exprs)) = current_req else { + return None; + }; + + let projection = plan.as_any().downcast_ref::()?; + let proj_exprs = projection + .expr() + .iter() + .map(|expr| (Arc::clone(&expr.expr), expr.alias.clone())) + .collect::>(); + let mapped = map_columns_before_projection(required_exprs, &proj_exprs); + + (mapped.len() == required_exprs.len()).then_some(Distribution::HashPartitioned(mapped)) + } +} + +#[cfg(test)] +mod tests { + use std::collections::{BTreeMap, BTreeSet}; + + use arrow_schema::{DataType, Field, Schema, SchemaRef, TimeUnit}; + use async_trait::async_trait; + use common_query::request::QueryRequest; + use common_recordbatch::SendableRecordBatchStream; + use datafusion::common::NullEquality; + use datafusion::execution::SessionStateBuilder; + use datafusion::physical_optimizer::PhysicalOptimizerRule; + use datafusion::physical_plan::joins::{HashJoinExec, PartitionMode}; + use datafusion::physical_plan::projection::{ProjectionExec, ProjectionExpr}; + use datafusion::physical_plan::{ExecutionPlanProperties, Partitioning}; + use datafusion_expr::{JoinType, LogicalPlanBuilder}; + use datafusion_physical_expr::PhysicalExpr; + use datafusion_physical_expr::expressions::Column as PhysicalColumn; + use session::ReadPreference; + use session::context::QueryContext; + use store_api::metric_engine_consts::DATA_SCHEMA_TSID_COLUMN_NAME; + use store_api::storage::RegionId; + use table::table_name::TableName; + + use super::*; + use crate::error::Result as QueryResult; + use crate::region_query::RegionQueryHandler; + + struct NoopRegionQueryHandler; + + #[async_trait] + impl RegionQueryHandler for NoopRegionQueryHandler { + async fn do_get( + &self, + _read_preference: ReadPreference, + _request: QueryRequest, + ) -> QueryResult { + unreachable!("pass distribution tests should not execute remote queries") + } + } + + #[test] + fn passes_hash_requirement_through_projection_to_merge_scan() { + let schema = test_schema(); + let left_merge_scan = Arc::new(test_merge_scan_exec(schema.clone())); + let right_merge_scan = Arc::new(test_merge_scan_exec(schema.clone())); + let left_projection = Arc::new( + ProjectionExec::try_new( + vec![ + ProjectionExpr::new(partition_column("greptime_value", 3), "greptime_value"), + ProjectionExpr::new( + partition_column(DATA_SCHEMA_TSID_COLUMN_NAME, 1), + DATA_SCHEMA_TSID_COLUMN_NAME, + ), + ProjectionExpr::new( + partition_column("greptime_timestamp", 2), + "greptime_timestamp", + ), + ], + left_merge_scan, + ) + .unwrap(), + ) as Arc; + let join = Arc::new( + HashJoinExec::try_new( + left_projection, + right_merge_scan, + vec![ + ( + partition_column(DATA_SCHEMA_TSID_COLUMN_NAME, 1), + partition_column(DATA_SCHEMA_TSID_COLUMN_NAME, 1), + ), + ( + partition_column("greptime_timestamp", 2), + partition_column("greptime_timestamp", 2), + ), + ], + None, + &JoinType::Inner, + None, + PartitionMode::Partitioned, + NullEquality::NullEqualsNull, + false, + ) + .unwrap(), + ) as Arc; + + let optimized = PassDistribution + .optimize(join, &ConfigOptions::default()) + .unwrap(); + let hash_join = optimized.as_any().downcast_ref::().unwrap(); + let left_projection = hash_join + .left() + .as_any() + .downcast_ref::() + .unwrap(); + let left_partitioning = left_projection.input().output_partitioning(); + let right_partitioning = hash_join.right().output_partitioning(); + + let Partitioning::Hash(left_exprs, left_count) = left_partitioning else { + panic!("expected left merge scan hash partitioning"); + }; + let Partitioning::Hash(right_exprs, right_count) = right_partitioning else { + panic!("expected right merge scan hash partitioning"); + }; + + assert_eq!(*left_count, 32); + assert_eq!(*right_count, 32); + assert_eq!( + column_names(left_exprs), + vec![DATA_SCHEMA_TSID_COLUMN_NAME, "greptime_timestamp"] + ); + assert_eq!( + column_names(right_exprs), + vec![DATA_SCHEMA_TSID_COLUMN_NAME, "greptime_timestamp"] + ); + } + + fn test_merge_scan_exec(schema: SchemaRef) -> MergeScanExec { + let session_state = SessionStateBuilder::new().with_default_features().build(); + let partition_cols = BTreeMap::from([( + DATA_SCHEMA_TSID_COLUMN_NAME.to_string(), + BTreeSet::from([datafusion_common::Column::from_name( + DATA_SCHEMA_TSID_COLUMN_NAME, + )]), + )]); + let plan = LogicalPlanBuilder::empty(false).build().unwrap(); + + MergeScanExec::new( + &session_state, + TableName::new("greptime", "public", "test"), + vec![RegionId::new(1, 0), RegionId::new(1, 1)], + plan, + vec![], + schema.as_ref(), + Arc::new(NoopRegionQueryHandler), + QueryContext::arc(), + 32, + partition_cols, + ) + .unwrap() + } + + fn test_schema() -> SchemaRef { + Arc::new(Schema::new(vec![ + Field::new("host", DataType::Utf8, true), + Field::new(DATA_SCHEMA_TSID_COLUMN_NAME, DataType::UInt64, false), + Field::new( + "greptime_timestamp", + DataType::Timestamp(TimeUnit::Millisecond, None), + false, + ), + Field::new("greptime_value", DataType::Float64, true), + ])) + } + + fn partition_column(name: &str, index: usize) -> Arc { + Arc::new(PhysicalColumn::new(name, index)) + } + + fn column_names(exprs: &[Arc]) -> Vec<&str> { + exprs + .iter() + .map(|expr| { + expr.as_any() + .downcast_ref::() + .unwrap() + .name() + }) + .collect() + } } diff --git a/tests/cases/distributed/flow-tql/tsid_on_phy.result b/tests/cases/distributed/flow-tql/tsid_on_phy.result index 6ae3e77b0c..923a884093 100644 --- a/tests/cases/distributed/flow-tql/tsid_on_phy.result +++ b/tests/cases/distributed/flow-tql/tsid_on_phy.result @@ -100,31 +100,35 @@ TQL EXPLAIN ( ) ); -+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ -| plan_type | plan | -+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ -| logical_plan | HistogramFold: le=le, field=sum(prom_avg_over_time(ts_range,v)), quantile=0.5 | -| | MergeScan [is_placeholder=false, remote_input=[ | -| | Sort: test_tsid.le ASC NULLS LAST, test_tsid.tag4 ASC NULLS LAST, test_tsid.tag5 ASC NULLS LAST, test_tsid.ts ASC NULLS LAST | -| | Aggregate: groupBy=[[test_tsid.le, test_tsid.tag4, test_tsid.tag5, test_tsid.ts]], aggr=[[sum(prom_avg_over_time(ts_range,v))]] | -| | Filter: prom_avg_over_time(ts_range,v) IS NOT NULL | -| | Projection: test_tsid.ts, prom_avg_over_time(ts_range, v) AS prom_avg_over_time(ts_range,v), test_tsid.le, test_tsid.tag1, test_tsid.tag2, test_tsid.tag4, test_tsid.tag5, test_tsid.tag6, test_tsid.tag7, test_tsid.tag8 | -| | PromRangeManipulate: req range=[1769139000000..1769139900000], interval=[60000], eval range=[1800000], time index=[ts], values=["v"] | -| | PromSeriesNormalize: offset=[0], time index=[ts], filter NaN: [true] | -| | PromSeriesDivide: tags=["__tsid"] | -| | Sort: test_tsid.__tsid ASC NULLS FIRST, test_tsid.ts ASC NULLS FIRST | -| | Filter: test_tsid.ts >= TimestampMillisecond(1769137200001, None) AND test_tsid.ts <= TimestampMillisecond(1769139900000, None) | -| | Projection: test_tsid.v, test_tsid.le, test_tsid.tag1, test_tsid.tag2, test_tsid.tag4, test_tsid.tag5, test_tsid.tag6, test_tsid.tag7, test_tsid.tag8, test_tsid.__tsid, test_tsid.ts | -| | SubqueryAlias: test_tsid | -| | Filter: phy.__table_id=UInt32(REDACTED) | -| | TableScan: phy projection=[ts, v, tag1, tag2, le, tag4, tag5, tag6, tag7, tag8, __table_id, __tsid] | -| | ]] | -| physical_plan | HistogramFoldExec: le=@0, field=@4, quantile=0.5 | -| | SortExec: expr=[tag4@1 ASC NULLS LAST, tag5@2 ASC NULLS LAST, ts@3 ASC NULLS LAST, CAST(le@0 AS Float64) ASC NULLS LAST], preserve_partitioning=[true] | ++---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ +| plan_type | plan | ++---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ +| logical_plan | HistogramFold: le=le, field=sum(prom_avg_over_time(ts_range,v)), quantile=0.5 | +| | Sort: test_tsid.le ASC NULLS LAST, test_tsid.tag4 ASC NULLS LAST, test_tsid.tag5 ASC NULLS LAST, test_tsid.ts ASC NULLS LAST | +| | Aggregate: groupBy=[[test_tsid.le, test_tsid.tag4, test_tsid.tag5, test_tsid.ts]], aggr=[[__sum_merge(__sum_state(prom_avg_over_time(ts_range,v))) AS sum(prom_avg_over_time(ts_range,v))]] | +| | MergeScan [is_placeholder=false, remote_input=[ | +| | Aggregate: groupBy=[[test_tsid.le, test_tsid.tag4, test_tsid.tag5, test_tsid.ts]], aggr=[[__sum_state(prom_avg_over_time(ts_range,v))]] | +| | Filter: prom_avg_over_time(ts_range,v) IS NOT NULL | +| | Projection: test_tsid.ts, prom_avg_over_time(ts_range, v) AS prom_avg_over_time(ts_range,v), test_tsid.le, test_tsid.tag1, test_tsid.tag2, test_tsid.tag4, test_tsid.tag5, test_tsid.tag6, test_tsid.tag7, test_tsid.tag8 | +| | PromRangeManipulate: req range=[1769139000000..1769139900000], interval=[60000], eval range=[1800000], time index=[ts], values=["v"] | +| | PromSeriesNormalize: offset=[0], time index=[ts], filter NaN: [true] | +| | PromSeriesDivide: tags=["__tsid"] | +| | Sort: test_tsid.__tsid ASC NULLS FIRST, test_tsid.ts ASC NULLS FIRST | +| | Filter: test_tsid.ts >= TimestampMillisecond(1769137200001, None) AND test_tsid.ts <= TimestampMillisecond(1769139900000, None) | +| | Projection: test_tsid.v, test_tsid.le, test_tsid.tag1, test_tsid.tag2, test_tsid.tag4, test_tsid.tag5, test_tsid.tag6, test_tsid.tag7, test_tsid.tag8, test_tsid.__tsid, test_tsid.ts | +| | SubqueryAlias: test_tsid | +| | Filter: phy.__table_id=UInt32(REDACTED) | +| | TableScan: phy projection=[ts, v, tag1, tag2, le, tag4, tag5, tag6, tag7, tag8, __table_id, __tsid] | +| | ]] | +| physical_plan | HistogramFoldExec: le=@0, field=@4, quantile=0.5 | +| | SortExec: expr=[tag4@1 ASC NULLS LAST, tag5@2 ASC NULLS LAST, ts@3 ASC NULLS LAST, CAST(le@0 AS Float64) ASC NULLS LAST], preserve_partitioning=[true] | | | RepartitionExec: REDACTED -| | MergeScanExec: REDACTED -| | | -+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ +| | AggregateExec: mode=FinalPartitioned, gby=[le@0 as le, tag4@1 as tag4, tag5@2 as tag5, ts@3 as ts], aggr=[sum(prom_avg_over_time(ts_range,v))] | +| | RepartitionExec: REDACTED +| | AggregateExec: mode=Partial, gby=[le@0 as le, tag4@1 as tag4, tag5@2 as tag5, ts@3 as ts], aggr=[sum(prom_avg_over_time(ts_range,v))] | +| | MergeScanExec: REDACTED +| | | ++---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ CREATE FLOW IF NOT EXISTS test_tsid SINK TO 'test_tsid_output' diff --git a/tests/cases/standalone/tql-explain-analyze/tsid_column.result b/tests/cases/standalone/tql-explain-analyze/tsid_column.result index 4a7a875060..882358074a 100644 --- a/tests/cases/standalone/tql-explain-analyze/tsid_column.result +++ b/tests/cases/standalone/tql-explain-analyze/tsid_column.result @@ -42,14 +42,16 @@ TQL ANALYZE (0, 10, '5s') sum(tsid_metric); +-+-+-+ | stage | node | plan_| +-+-+-+ -| 0_| 0_|_CooperativeExec REDACTED -|_|_|_MergeScanExec: REDACTED -|_|_|_| -| 1_| 0_|_SortPreservingMergeExec: [ts@0 ASC NULLS LAST] REDACTED +| 0_| 0_|_SortPreservingMergeExec: [ts@0 ASC NULLS LAST] REDACTED |_|_|_SortExec: expr=[ts@0 ASC NULLS LAST], preserve_partitioning=[true] REDACTED |_|_|_AggregateExec: mode=FinalPartitioned, gby=[ts@0 as ts], aggr=[sum(tsid_metric.val)] REDACTED |_|_|_RepartitionExec: partitioning=REDACTED -|_|_|_AggregateExec: mode=Partial, gby=[ts@1 as ts], aggr=[sum(tsid_metric.val)] REDACTED +|_|_|_AggregateExec: mode=Partial, gby=[ts@0 as ts], aggr=[sum(tsid_metric.val)] REDACTED +|_|_|_MergeScanExec: REDACTED +|_|_|_| +| 1_| 0_|_AggregateExec: mode=FinalPartitioned, gby=[ts@0 as ts], aggr=[__sum_state(tsid_metric.val)] REDACTED +|_|_|_RepartitionExec: partitioning=REDACTED +|_|_|_AggregateExec: mode=Partial, gby=[ts@1 as ts], aggr=[__sum_state(tsid_metric.val)] REDACTED |_|_|_ProjectionExec: expr=[val@0 as val, ts@2 as ts] REDACTED |_|_|_PromInstantManipulateExec: range=[0..10000], lookback=[300000], interval=[5000], time index=[ts] REDACTED |_|_|_PromSeriesDivideExec: tags=["__tsid"] REDACTED @@ -71,14 +73,16 @@ TQL ANALYZE (0, 10, '5s') sum by (job, instance) (tsid_metric); +-+-+-+ | stage | node | plan_| +-+-+-+ -| 0_| 0_|_CooperativeExec REDACTED -|_|_|_MergeScanExec: REDACTED -|_|_|_| -| 1_| 0_|_SortPreservingMergeExec: [job@0 ASC NULLS LAST, instance@1 ASC NULLS LAST, ts@2 ASC NULLS LAST] REDACTED +| 0_| 0_|_SortPreservingMergeExec: [job@0 ASC NULLS LAST, instance@1 ASC NULLS LAST, ts@2 ASC NULLS LAST] REDACTED |_|_|_SortExec: expr=[job@0 ASC NULLS LAST, instance@1 ASC NULLS LAST, ts@2 ASC NULLS LAST], preserve_partitioning=[true] REDACTED |_|_|_AggregateExec: mode=FinalPartitioned, gby=[job@0 as job, instance@1 as instance, ts@2 as ts], aggr=[sum(tsid_metric.val), __tsid] REDACTED |_|_|_RepartitionExec: partitioning=REDACTED -|_|_|_AggregateExec: mode=Partial, gby=[job@2 as job, instance@1 as instance, ts@4 as ts], aggr=[sum(tsid_metric.val), __tsid] REDACTED +|_|_|_AggregateExec: mode=Partial, gby=[job@0 as job, instance@1 as instance, ts@2 as ts], aggr=[sum(tsid_metric.val), __tsid] REDACTED +|_|_|_MergeScanExec: REDACTED +|_|_|_| +| 1_| 0_|_AggregateExec: mode=FinalPartitioned, gby=[job@0 as job, instance@1 as instance, ts@2 as ts], aggr=[__sum_state(tsid_metric.val), __first_value_state(tsid_metric.__tsid)] REDACTED +|_|_|_RepartitionExec: partitioning=REDACTED +|_|_|_AggregateExec: mode=Partial, gby=[job@2 as job, instance@1 as instance, ts@4 as ts], aggr=[__sum_state(tsid_metric.val), __first_value_state(tsid_metric.__tsid)] REDACTED |_|_|_PromInstantManipulateExec: range=[0..10000], lookback=[300000], interval=[5000], time index=[ts] REDACTED |_|_|_PromSeriesDivideExec: tags=["__tsid"] REDACTED |_|_|_ProjectionExec: expr=[val@1 as val, instance@3 as instance, job@4 as job, __tsid@2 as __tsid, ts@0 as ts] REDACTED @@ -103,12 +107,6 @@ TQL ANALYZE (0, 10, '5s') sum(irate(tsid_metric[1h])) / scalar(count(count(tsid |_|_|_REDACTED |_|_|_ScalarCalculateExec: tags=[] REDACTED |_|_|_CoalescePartitionsExec REDACTED -|_|_|_MergeScanExec: REDACTED -|_|_|_CooperativeExec REDACTED -|_|_|_MergeScanExec: REDACTED -|_|_|_| -| 1_| 0_|_SortPreservingMergeExec: [ts@0 ASC NULLS LAST] REDACTED -|_|_|_SortExec: expr=[ts@0 ASC NULLS LAST], preserve_partitioning=[true] REDACTED |_|_|_AggregateExec: mode=FinalPartitioned, gby=[ts@0 as ts], aggr=[count(count(tsid_metric.val))] REDACTED |_|_|_RepartitionExec: partitioning=REDACTED |_|_|_AggregateExec: mode=Partial, gby=[ts@0 as ts], aggr=[count(count(tsid_metric.val))] REDACTED @@ -116,17 +114,25 @@ TQL ANALYZE (0, 10, '5s') sum(irate(tsid_metric[1h])) / scalar(count(count(tsid |_|_|_AggregateExec: mode=FinalPartitioned, gby=[ts@0 as ts, job@1 as job], aggr=[] REDACTED |_|_|_RepartitionExec: partitioning=REDACTED |_|_|_AggregateExec: mode=Partial, gby=[ts@0 as ts, job@1 as job], aggr=[] REDACTED +|_|_|_MergeScanExec: REDACTED +|_|_|_SortExec: expr=[ts@0 ASC NULLS LAST], preserve_partitioning=[true] REDACTED +|_|_|_AggregateExec: mode=FinalPartitioned, gby=[ts@0 as ts], aggr=[sum(prom_irate(ts_range,val))] REDACTED +|_|_|_RepartitionExec: partitioning=REDACTED +|_|_|_AggregateExec: mode=Partial, gby=[ts@0 as ts], aggr=[sum(prom_irate(ts_range,val))] REDACTED +|_|_|_MergeScanExec: REDACTED +|_|_|_| +| 1_| 0_|_AggregateExec: mode=FinalPartitioned, gby=[ts@0 as ts, job@1 as job], aggr=[] REDACTED +|_|_|_RepartitionExec: partitioning=REDACTED +|_|_|_AggregateExec: mode=Partial, gby=[ts@0 as ts, job@1 as job], aggr=[] REDACTED |_|_|_ProjectionExec: expr=[ts@3 as ts, job@1 as job] REDACTED |_|_|_PromInstantManipulateExec: range=[0..10000], lookback=[300000], interval=[5000], time index=[ts] REDACTED |_|_|_PromSeriesDivideExec: tags=["__tsid"] REDACTED |_|_|_ProjectionExec: expr=[val@1 as val, job@3 as job, __tsid@2 as __tsid, ts@0 as ts] REDACTED |_|_|_SeriesScan: region=REDACTED, "partition_count":{"count":1, "mem_ranges":1, "files":0, "file_ranges":0}, "distribution":"PerSeries" REDACTED |_|_|_| -| 1_| 0_|_SortPreservingMergeExec: [ts@0 ASC NULLS LAST] REDACTED -|_|_|_SortExec: expr=[ts@0 ASC NULLS LAST], preserve_partitioning=[true] REDACTED -|_|_|_AggregateExec: mode=FinalPartitioned, gby=[ts@0 as ts], aggr=[sum(prom_irate(ts_range,val))] REDACTED +| 1_| 0_|_AggregateExec: mode=FinalPartitioned, gby=[ts@0 as ts], aggr=[__sum_state(prom_irate(ts_range,val))] REDACTED |_|_|_RepartitionExec: partitioning=REDACTED -|_|_|_AggregateExec: mode=Partial, gby=[ts@0 as ts], aggr=[sum(prom_irate(ts_range,val))] REDACTED +|_|_|_AggregateExec: mode=Partial, gby=[ts@0 as ts], aggr=[__sum_state(prom_irate(ts_range,val))] REDACTED |_|_|_FilterExec: prom_irate(ts_range,val)@1 IS NOT NULL REDACTED |_|_|_ProjectionExec: expr=[ts@2 as ts, prom_irate(ts_range@3, val@0) as prom_irate(ts_range,val)] REDACTED |_|_|_PromRangeManipulateExec: req range=[0..10000], interval=[5000], eval range=[3600000], time index=[ts] REDACTED @@ -154,12 +160,6 @@ TQL ANALYZE (0, 10, '5s') sum(irate(tsid_metric[1h])) / scalar(count(sum(tsid_m |_|_|_REDACTED |_|_|_ScalarCalculateExec: tags=[] REDACTED |_|_|_CoalescePartitionsExec REDACTED -|_|_|_MergeScanExec: REDACTED -|_|_|_CooperativeExec REDACTED -|_|_|_MergeScanExec: REDACTED -|_|_|_| -| 1_| 0_|_SortPreservingMergeExec: [ts@0 ASC NULLS LAST] REDACTED -|_|_|_SortExec: expr=[ts@0 ASC NULLS LAST], preserve_partitioning=[true] REDACTED |_|_|_AggregateExec: mode=FinalPartitioned, gby=[ts@0 as ts], aggr=[count(sum(tsid_metric.val))] REDACTED |_|_|_RepartitionExec: partitioning=REDACTED |_|_|_AggregateExec: mode=Partial, gby=[ts@0 as ts], aggr=[count(sum(tsid_metric.val))] REDACTED @@ -167,6 +167,16 @@ TQL ANALYZE (0, 10, '5s') sum(irate(tsid_metric[1h])) / scalar(count(sum(tsid_m |_|_|_AggregateExec: mode=FinalPartitioned, gby=[ts@0 as ts, job@1 as job], aggr=[] REDACTED |_|_|_RepartitionExec: partitioning=REDACTED |_|_|_AggregateExec: mode=Partial, gby=[ts@0 as ts, job@1 as job], aggr=[] REDACTED +|_|_|_MergeScanExec: REDACTED +|_|_|_SortExec: expr=[ts@0 ASC NULLS LAST], preserve_partitioning=[true] REDACTED +|_|_|_AggregateExec: mode=FinalPartitioned, gby=[ts@0 as ts], aggr=[sum(prom_irate(ts_range,val))] REDACTED +|_|_|_RepartitionExec: partitioning=REDACTED +|_|_|_AggregateExec: mode=Partial, gby=[ts@0 as ts], aggr=[sum(prom_irate(ts_range,val))] REDACTED +|_|_|_MergeScanExec: REDACTED +|_|_|_| +| 1_| 0_|_AggregateExec: mode=FinalPartitioned, gby=[ts@0 as ts, job@1 as job], aggr=[] REDACTED +|_|_|_RepartitionExec: partitioning=REDACTED +|_|_|_AggregateExec: mode=Partial, gby=[ts@0 as ts, job@1 as job], aggr=[] REDACTED |_|_|_ProjectionExec: expr=[ts@1 as ts, job@0 as job] REDACTED |_|_|_FilterExec: val@0 IS NOT NULL, projection=[job@1, ts@2] REDACTED |_|_|_ProjectionExec: expr=[val@0 as val, job@1 as job, ts@3 as ts] REDACTED @@ -175,11 +185,9 @@ TQL ANALYZE (0, 10, '5s') sum(irate(tsid_metric[1h])) / scalar(count(sum(tsid_m |_|_|_ProjectionExec: expr=[val@1 as val, job@3 as job, __tsid@2 as __tsid, ts@0 as ts] REDACTED |_|_|_SeriesScan: region=REDACTED, "partition_count":{"count":1, "mem_ranges":1, "files":0, "file_ranges":0}, "distribution":"PerSeries" REDACTED |_|_|_| -| 1_| 0_|_SortPreservingMergeExec: [ts@0 ASC NULLS LAST] REDACTED -|_|_|_SortExec: expr=[ts@0 ASC NULLS LAST], preserve_partitioning=[true] REDACTED -|_|_|_AggregateExec: mode=FinalPartitioned, gby=[ts@0 as ts], aggr=[sum(prom_irate(ts_range,val))] REDACTED +| 1_| 0_|_AggregateExec: mode=FinalPartitioned, gby=[ts@0 as ts], aggr=[__sum_state(prom_irate(ts_range,val))] REDACTED |_|_|_RepartitionExec: partitioning=REDACTED -|_|_|_AggregateExec: mode=Partial, gby=[ts@0 as ts], aggr=[sum(prom_irate(ts_range,val))] REDACTED +|_|_|_AggregateExec: mode=Partial, gby=[ts@0 as ts], aggr=[__sum_state(prom_irate(ts_range,val))] REDACTED |_|_|_FilterExec: prom_irate(ts_range,val)@1 IS NOT NULL REDACTED |_|_|_ProjectionExec: expr=[ts@2 as ts, prom_irate(ts_range@3, val@0) as prom_irate(ts_range,val)] REDACTED |_|_|_PromRangeManipulateExec: req range=[0..10000], interval=[5000], eval range=[3600000], time index=[ts] REDACTED