diff --git a/src/promql/src/extension_plan/series_divide.rs b/src/promql/src/extension_plan/series_divide.rs index c60e2280d6..19385b7a97 100644 --- a/src/promql/src/extension_plan/series_divide.rs +++ b/src/promql/src/extension_plan/series_divide.rs @@ -397,20 +397,6 @@ impl ExecutionPlan for SeriesDivideExec { if self.tag_columns.is_empty() { return vec![Distribution::SinglePartition]; } - - // If the upstream already produces a single partition, `SeriesDivide` can safely process - // the full ordered stream directly. Repartitioning a single partition into the global - // target partition count only adds shuffle/merge overhead without improving correctness. - if self - .input - .properties() - .output_partitioning() - .partition_count() - <= 1 - { - return vec![Distribution::UnspecifiedDistribution]; - } - let schema = self.input.schema(); vec![Distribution::HashPartitioned( self.tag_columns @@ -754,45 +740,6 @@ mod test { )) } - fn prepare_multi_partition_test_data() -> DataSourceExec { - let schema = Arc::new(Schema::new(vec![ - Field::new("host", DataType::Utf8, true), - Field::new("path", DataType::Utf8, true), - Field::new( - "time_index", - DataType::Timestamp(datafusion::arrow::datatypes::TimeUnit::Millisecond, None), - false, - ), - ])); - - let data_1 = RecordBatch::try_new( - schema.clone(), - vec![ - Arc::new(StringArray::from(vec!["000", "000"])) as _, - Arc::new(StringArray::from(vec!["foo", "foo"])) as _, - Arc::new(datafusion::arrow::array::TimestampMillisecondArray::from( - vec![1000, 2000], - )) as _, - ], - ) - .unwrap(); - let data_2 = RecordBatch::try_new( - schema.clone(), - vec![ - Arc::new(StringArray::from(vec!["001", "001"])) as _, - Arc::new(StringArray::from(vec!["bar", "bar"])) as _, - Arc::new(datafusion::arrow::array::TimestampMillisecondArray::from( - vec![3000, 4000], - )) as _, - ], - ) - .unwrap(); - - DataSourceExec::new(Arc::new( - MemorySourceConfig::try_new(&[vec![data_1], vec![data_2]], schema, None).unwrap(), - )) - } - #[test] fn pruning_should_keep_tags_and_time_index_columns_for_exec() { let df_schema = prepare_test_data().schema().to_dfschema_ref().unwrap(); @@ -813,38 +760,6 @@ mod test { assert_eq!(required.as_slice(), &[0, 1, 2]); } - #[test] - fn single_partition_input_skips_repartition_requirement() { - let divide_exec = SeriesDivideExec { - tag_columns: vec!["host".to_string()], - time_index_column: "time_index".to_string(), - input: Arc::new(prepare_test_data()), - metric: ExecutionPlanMetricsSet::new(), - }; - - let requirement = divide_exec.required_input_distribution(); - assert!(matches!( - requirement.as_slice(), - [Distribution::UnspecifiedDistribution] - )); - } - - #[test] - fn multi_partition_input_still_requires_hash_partitioning() { - let divide_exec = SeriesDivideExec { - tag_columns: vec!["host".to_string()], - time_index_column: "time_index".to_string(), - input: Arc::new(prepare_multi_partition_test_data()), - metric: ExecutionPlanMetricsSet::new(), - }; - - let requirement = divide_exec.required_input_distribution(); - assert!(matches!( - requirement.as_slice(), - [Distribution::HashPartitioned(_)] - )); - } - #[tokio::test] async fn overall_data() { let memory_exec = Arc::new(prepare_test_data()); diff --git a/src/query/src/dist_plan/analyzer.rs b/src/query/src/dist_plan/analyzer.rs index 084a29cb5e..6eb8b5eac6 100644 --- a/src/query/src/dist_plan/analyzer.rs +++ b/src/query/src/dist_plan/analyzer.rs @@ -27,7 +27,6 @@ use datafusion_expr::utils::expr_to_columns; use datafusion_expr::{Expr, LogicalPlan, LogicalPlanBuilder, Subquery, col as col_fn}; use datafusion_optimizer::analyzer::AnalyzerRule; use promql::extension_plan::SeriesDivide; -use store_api::metric_engine_consts::DATA_SCHEMA_TSID_COLUMN_NAME; use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan}; use table::metadata::TableType; use table::table::adapter::DfTableProviderAdapter; @@ -510,20 +509,6 @@ impl PlanRewriter { .map(|index| schema.column_name_by_index(index).to_string()) .collect::>(); - // Metric engine scans can project the internal `__tsid` column even though it is - // not part of the logical table schema. Equal `__tsid` values always belong to the - // same series, so carry it as an additional partition key when it is available. - if plan - .schema() - .index_of_column_by_name(None, DATA_SCHEMA_TSID_COLUMN_NAME) - .is_some() - && !partition_cols - .iter() - .any(|col| col == DATA_SCHEMA_TSID_COLUMN_NAME) - { - partition_cols.push(DATA_SCHEMA_TSID_COLUMN_NAME.to_string()); - } - let partition_rules = table.partition_rules(); let exist_phy_part_cols_not_in_logical_table = partition_rules .map(|r| !r.extra_phy_cols_not_in_logical_table.is_empty()) diff --git a/src/query/src/dist_plan/merge_scan.rs b/src/query/src/dist_plan/merge_scan.rs index ae97a13635..2d32ce16b3 100644 --- a/src/query/src/dist_plan/merge_scan.rs +++ b/src/query/src/dist_plan/merge_scan.rs @@ -37,18 +37,12 @@ use datafusion::physical_plan::{ use datafusion_common::{Column as ColumnExpr, DataFusionError, Result}; use datafusion_expr::{Expr, Extension, LogicalPlan, UserDefinedLogicalNodeCore}; use datafusion_physical_expr::expressions::Column; -use datafusion_physical_expr::{ - Distribution, EquivalenceProperties, LexOrdering, PhysicalSortExpr, -}; +use datafusion_physical_expr::{Distribution, EquivalenceProperties, PhysicalSortExpr}; use futures_util::StreamExt; use greptime_proto::v1::region::RegionRequestHeader; use meter_core::data::ReadItem; use meter_macros::read_meter; -use promql::extension_plan::{ - InstantManipulate, RangeManipulate, ScalarCalculate, SeriesDivide, SeriesNormalize, -}; use session::context::QueryContextRef; -use store_api::metric_engine_consts::DATA_SCHEMA_TSID_COLUMN_NAME; use store_api::storage::RegionId; use table::table_name::TableName; use tokio::time::Instant; @@ -166,185 +160,12 @@ impl std::fmt::Debug for MergeScanExec { } impl MergeScanExec { - fn default_partition_exprs( - session_state: &SessionState, - plan: &LogicalPlan, - partition_cols: &AliasMapping, - ) -> Vec> { - partition_cols - .iter() - .filter_map(|col| Self::partition_expr_for_alias(session_state, plan, col.1.first())) - .collect() - } - - fn partition_expr_for_alias( - session_state: &SessionState, - plan: &LogicalPlan, - column: Option<&ColumnExpr>, - ) -> Option> { - let column = column?; - session_state - .create_physical_expr( - Expr::Column(ColumnExpr::new_unqualified(column.name().to_string())), - plan.schema(), - ) - .ok() - } - - fn prefer_tsid_partition_exprs( - session_state: &SessionState, - plan: &LogicalPlan, - partition_cols: &AliasMapping, - ) -> Option>> { - Self::promql_tsid_ordered_time_index(plan)?; - - let tsid_aliases = partition_cols.get(DATA_SCHEMA_TSID_COLUMN_NAME)?; - let tsid_expr = Self::partition_expr_for_alias(session_state, plan, tsid_aliases.first())?; - Some(vec![tsid_expr]) - } - - fn promql_tsid_ordered_time_index(plan: &LogicalPlan) -> Option { - let time_index_column = match plan { - LogicalPlan::Sort(sort) => { - if sort.expr.len() != 2 { - return None; - } - - let [tsid_sort, time_sort] = sort.expr.as_slice() else { - return None; - }; - - let tsid_column = Self::ascending_nulls_first_sort_column(tsid_sort)?; - let time_column = Self::ascending_nulls_first_sort_column(time_sort)?; - (tsid_column == DATA_SCHEMA_TSID_COLUMN_NAME).then_some(time_column) - } - LogicalPlan::Projection(projection) => { - Self::promql_tsid_ordered_time_index(projection.input.as_ref()) - } - LogicalPlan::Filter(filter) => { - Self::promql_tsid_ordered_time_index(filter.input.as_ref()) - } - LogicalPlan::SubqueryAlias(alias) => { - Self::promql_tsid_ordered_time_index(alias.input.as_ref()) - } - LogicalPlan::Extension(extension) if Self::is_promql_passthrough_node(extension) => { - extension - .node - .inputs() - .first() - .and_then(|input| Self::promql_tsid_ordered_time_index(input)) - } - _ => None, - }?; - - let schema = plan.schema(); - let has_tsid = schema - .index_of_column_by_name(None, DATA_SCHEMA_TSID_COLUMN_NAME) - .is_some(); - let has_time_index = schema - .index_of_column_by_name(None, &time_index_column) - .is_some(); - - (has_tsid && has_time_index).then_some(time_index_column) - } - - fn is_promql_passthrough_node(extension: &Extension) -> bool { - let node = extension.node.as_any(); - node.is::() - || node.is::() - || node.is::() - || node.is::() - || node.is::() - } - - fn ascending_nulls_first_sort_column( - sort_expr: &datafusion_expr::expr::Sort, - ) -> Option { - (sort_expr.asc && sort_expr.nulls_first) - .then(|| sort_expr.expr.try_as_col().map(|col| col.name.clone())) - .flatten() - } - - fn schema_exposes_column(plan: &LogicalPlan, column_name: &str) -> bool { - plan.schema() - .index_of_column_by_name(None, column_name) - .is_some() - } - - pub(crate) fn logical_sort_ordering( - session_state: &SessionState, - plan: &LogicalPlan, - ) -> Result> { - if let Some(time_index_column) = Self::promql_tsid_ordered_time_index(plan) { - if !Self::schema_exposes_column(plan, DATA_SCHEMA_TSID_COLUMN_NAME) - || !Self::schema_exposes_column(plan, &time_index_column) - { - return Ok(None); - } - - let tsid_expr = session_state.create_physical_expr( - Expr::Column(ColumnExpr::new_unqualified( - DATA_SCHEMA_TSID_COLUMN_NAME.to_string(), - )), - plan.schema(), - )?; - let time_expr = session_state.create_physical_expr( - Expr::Column(ColumnExpr::new_unqualified(time_index_column)), - plan.schema(), - )?; - return Ok(LexOrdering::new(vec![ - PhysicalSortExpr::new( - tsid_expr, - SortOptions { - descending: false, - nulls_first: true, - }, - ), - PhysicalSortExpr::new( - time_expr, - SortOptions { - descending: false, - nulls_first: true, - }, - ), - ])); - } - - let LogicalPlan::Sort(sort) = plan else { - return Ok(None); - }; - - let lex_ordering = LexOrdering::new( - sort.expr - .iter() - .map(|sort_expr| { - let physical_expr = session_state - .create_physical_expr(sort_expr.expr.clone(), plan.schema())?; - Ok(PhysicalSortExpr::new( - physical_expr, - SortOptions { - descending: !sort_expr.asc, - nulls_first: sort_expr.nulls_first, - }, - )) - }) - .collect::>>()?, - ) - .ok_or_else(|| { - DataFusionError::Internal(format!( - "Sort plan must contain at least one expression: {plan}" - )) - })?; - Ok(Some(lex_ordering)) - } - #[allow(clippy::too_many_arguments)] pub fn new( session_state: &SessionState, table: TableName, regions: Vec, plan: LogicalPlan, - remote_orderings: Vec, arrow_schema: &ArrowSchema, region_query_handler: RegionQueryHandlerRef, query_ctx: QueryContextRef, @@ -363,23 +184,46 @@ impl MergeScanExec { // break the ordering on merging (of MergeScan). // // Otherwise, we need to use the default ordering. - let eq_properties = if target_partition >= regions.len() { - if !remote_orderings.is_empty() { - EquivalenceProperties::new_with_orderings(arrow_schema.clone(), remote_orderings) - } else if let Some(ordering) = Self::logical_sort_ordering(session_state, &plan)? { - EquivalenceProperties::new_with_orderings(arrow_schema.clone(), vec![ordering]) - } else { - EquivalenceProperties::new(arrow_schema.clone()) - } + let eq_properties = if let LogicalPlan::Sort(sort) = &plan + && target_partition >= regions.len() + { + let lex_ordering = sort + .expr + .iter() + .map(|sort_expr| { + let physical_expr = session_state + .create_physical_expr(sort_expr.expr.clone(), plan.schema())?; + Ok(PhysicalSortExpr::new( + physical_expr, + SortOptions { + descending: !sort_expr.asc, + nulls_first: sort_expr.nulls_first, + }, + )) + }) + .collect::>>()?; + EquivalenceProperties::new_with_orderings(arrow_schema.clone(), vec![lex_ordering]) } else { EquivalenceProperties::new(arrow_schema.clone()) }; - let partition_exprs = - Self::prefer_tsid_partition_exprs(session_state, &plan, &partition_cols) - .unwrap_or_else(|| { - Self::default_partition_exprs(session_state, &plan, &partition_cols) - }); + let partition_exprs = partition_cols + .iter() + .filter_map(|col| { + if let Some(first_alias) = col.1.first() { + session_state + .create_physical_expr( + Expr::Column(ColumnExpr::new_unqualified( + first_alias.name().to_string(), + )), + plan.schema(), + ) + .ok() + } else { + None + } + }) + .collect(); let partitioning = Partitioning::Hash(partition_exprs, target_partition); let properties = Arc::new(PlanProperties::new( @@ -577,49 +421,31 @@ impl MergeScanExec { return None; }; + if let Partitioning::Hash(curr_dist, _) = &self.properties.partitioning + && curr_dist == &hash_exprs + { + // No need to change the distribution + return None; + } + let all_partition_col_aliases: HashSet<_> = self .partition_cols .values() .flat_map(|aliases| aliases.iter().map(|c| c.name())) .collect(); - let mut overlaps = hash_exprs - .iter() - .filter(|expr| { - expr.as_any() - .downcast_ref::() - .is_some_and(|col_expr| all_partition_col_aliases.contains(col_expr.name())) - }) - .cloned() - .collect::>(); - - // Metric-engine scans can satisfy any hash distribution that includes `__tsid`. - // Equal requested keys must also share the same `__tsid`, and equal `__tsid` values are - // guaranteed to stay co-located across MergeScan partitions. Advertise the full requested - // distribution so EnforceDistribution can skip redundant reshuffles. - if self - .arrow_schema - .column_with_name(DATA_SCHEMA_TSID_COLUMN_NAME) - .is_some() - && hash_exprs.iter().any(|expr| { - expr.as_any() - .downcast_ref::() - .is_some_and(|col_expr| col_expr.name() == DATA_SCHEMA_TSID_COLUMN_NAME) - }) - { - overlaps = hash_exprs.clone(); + let mut overlaps = vec![]; + for expr in &hash_exprs { + if let Some(col_expr) = expr.as_any().downcast_ref::() + && all_partition_col_aliases.contains(col_expr.name()) + { + overlaps.push(expr.clone()); + } } if overlaps.is_empty() { return None; } - if let Partitioning::Hash(curr_dist, _) = &self.properties.partitioning - && curr_dist == &overlaps - { - // No need to change the distribution. - return None; - } - Some(Self { table: self.table.clone(), regions: self.regions.clone(), @@ -668,278 +494,6 @@ impl MergeScanExec { } } -#[cfg(test)] -mod tests { - use std::collections::{BTreeMap, BTreeSet}; - - use arrow_schema::{DataType, Field, Schema, TimeUnit}; - use async_trait::async_trait; - use common_query::request::QueryRequest; - use common_recordbatch::SendableRecordBatchStream; - use datafusion::execution::SessionStateBuilder; - use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType}; - use datafusion_common::ToDFSchema; - use datafusion_expr::{EmptyRelation, Extension, LogicalPlan, LogicalPlanBuilder, col}; - use datafusion_physical_expr::{EquivalenceProperties, PhysicalExpr}; - use promql::extension_plan::{InstantManipulate, SeriesDivide}; - use session::ReadPreference; - use session::context::QueryContext; - - use super::*; - use crate::error::Result as QueryResult; - use crate::region_query::RegionQueryHandler; - - struct NoopRegionQueryHandler; - - #[async_trait] - impl RegionQueryHandler for NoopRegionQueryHandler { - async fn do_get( - &self, - _read_preference: ReadPreference, - _request: QueryRequest, - ) -> QueryResult { - unreachable!("merge scan distribution tests should not execute remote queries") - } - } - - #[test] - fn try_with_new_distribution_satisfies_tsid_hash_requirements() { - let merge_scan = test_merge_scan_exec( - BTreeMap::from([ - ( - "host".to_string(), - BTreeSet::from([ColumnExpr::from_name("host")]), - ), - ( - DATA_SCHEMA_TSID_COLUMN_NAME.to_string(), - BTreeSet::from([ColumnExpr::from_name(DATA_SCHEMA_TSID_COLUMN_NAME)]), - ), - ]), - vec![ - partition_column("host", 0), - partition_column(DATA_SCHEMA_TSID_COLUMN_NAME, 1), - ], - ); - - let optimized = merge_scan - .try_with_new_distribution(Distribution::HashPartitioned(vec![ - partition_column(DATA_SCHEMA_TSID_COLUMN_NAME, 1), - partition_column("greptime_timestamp", 2), - ])) - .unwrap(); - - let Partitioning::Hash(ref exprs, partition_count) = optimized.properties.partitioning - else { - panic!("expected hash partitioning"); - }; - assert_eq!(partition_count, 32); - assert_eq!( - column_names(exprs), - vec![DATA_SCHEMA_TSID_COLUMN_NAME, "greptime_timestamp"] - ); - } - - #[test] - fn try_with_new_distribution_keeps_regular_partition_overlap() { - let merge_scan = test_merge_scan_exec( - BTreeMap::from([( - "host".to_string(), - BTreeSet::from([ColumnExpr::from_name("host")]), - )]), - vec![partition_column("greptime_timestamp", 2)], - ); - - let optimized = merge_scan - .try_with_new_distribution(Distribution::HashPartitioned(vec![ - partition_column("host", 0), - partition_column("greptime_timestamp", 2), - ])) - .unwrap(); - - let Partitioning::Hash(ref exprs, partition_count) = optimized.properties.partitioning - else { - panic!("expected hash partitioning"); - }; - assert_eq!(partition_count, 32); - assert_eq!(column_names(exprs), vec!["host"]); - } - - #[test] - fn new_prefers_tsid_partitioning_for_promql_tsid_sort() { - let session_state = SessionStateBuilder::new().with_default_features().build(); - let schema = Arc::new(Schema::new(vec![ - Field::new("host", DataType::Utf8, true), - Field::new(DATA_SCHEMA_TSID_COLUMN_NAME, DataType::UInt64, false), - Field::new( - "ts", - DataType::Timestamp(TimeUnit::Millisecond, None), - false, - ), - Field::new("greptime_value", DataType::Float64, true), - ])); - let partition_cols = BTreeMap::from([ - ( - "host".to_string(), - BTreeSet::from([ColumnExpr::from_name("host")]), - ), - ( - DATA_SCHEMA_TSID_COLUMN_NAME.to_string(), - BTreeSet::from([ColumnExpr::from_name(DATA_SCHEMA_TSID_COLUMN_NAME)]), - ), - ]); - let plan = promql_tsid_sorted_plan(schema.clone(), "ts"); - - let ordering = MergeScanExec::logical_sort_ordering(&session_state, &plan) - .unwrap() - .unwrap(); - - let merge_scan = MergeScanExec::new( - &session_state, - TableName::new("greptime", "public", "test"), - vec![RegionId::new(1, 0), RegionId::new(1, 1)], - plan, - vec![], - schema.as_ref(), - Arc::new(NoopRegionQueryHandler), - QueryContext::arc(), - 32, - partition_cols, - ) - .unwrap(); - - let Partitioning::Hash(ref exprs, partition_count) = merge_scan.properties.partitioning - else { - panic!("expected hash partitioning"); - }; - assert_eq!(partition_count, 32); - assert_eq!(column_names(exprs), vec![DATA_SCHEMA_TSID_COLUMN_NAME]); - assert_eq!( - ordering_column_names(&ordering), - vec![DATA_SCHEMA_TSID_COLUMN_NAME, "ts"] - ); - } - - #[test] - fn logical_sort_ordering_ignores_projected_away_tsid_columns() { - let session_state = SessionStateBuilder::new().with_default_features().build(); - let schema = Arc::new(Schema::new(vec![ - Field::new("host", DataType::Utf8, true), - Field::new(DATA_SCHEMA_TSID_COLUMN_NAME, DataType::UInt64, false), - Field::new( - "ts", - DataType::Timestamp(TimeUnit::Millisecond, None), - false, - ), - Field::new("greptime_value", DataType::Float64, true), - ])); - let projected = LogicalPlanBuilder::from(promql_tsid_sorted_plan(schema, "ts")) - .project(vec![col("host"), col("ts"), col("greptime_value")]) - .unwrap() - .build() - .unwrap(); - let plan = LogicalPlan::Extension(Extension { - node: Arc::new(InstantManipulate::new( - 0, - 10, - 1, - 1, - "ts".to_string(), - Some("greptime_value".to_string()), - projected, - )), - }); - - let ordering = MergeScanExec::logical_sort_ordering(&session_state, &plan).unwrap(); - - assert!(ordering.is_none()); - } - - fn test_merge_scan_exec( - partition_cols: AliasMapping, - current_partition_exprs: Vec>, - ) -> MergeScanExec { - let schema = Arc::new(Schema::new(vec![ - Field::new("host", DataType::Utf8, true), - Field::new(DATA_SCHEMA_TSID_COLUMN_NAME, DataType::UInt64, false), - Field::new( - "greptime_timestamp", - DataType::Timestamp(TimeUnit::Millisecond, None), - false, - ), - Field::new("greptime_value", DataType::Float64, true), - ])); - let plan = LogicalPlanBuilder::empty(false).build().unwrap(); - - MergeScanExec { - table: TableName::new("greptime", "public", "test"), - regions: vec![RegionId::new(1, 0), RegionId::new(1, 1)], - plan, - arrow_schema: schema.clone(), - region_query_handler: Arc::new(NoopRegionQueryHandler), - metric: ExecutionPlanMetricsSet::new(), - properties: Arc::new(PlanProperties::new( - EquivalenceProperties::new(schema), - Partitioning::Hash(current_partition_exprs, 32), - EmissionType::Incremental, - Boundedness::Bounded, - )), - sub_stage_metrics: Arc::default(), - partition_metrics: Arc::default(), - query_ctx: QueryContext::arc(), - target_partition: 32, - partition_cols, - } - } - - fn partition_column(name: &str, index: usize) -> Arc { - Arc::new(Column::new(name, index)) - } - - fn column_names(exprs: &[Arc]) -> Vec<&str> { - exprs - .iter() - .map(|expr| expr.as_any().downcast_ref::().unwrap().name()) - .collect() - } - - fn ordering_column_names(ordering: &LexOrdering) -> Vec<&str> { - ordering - .iter() - .map(|sort_expr| { - sort_expr - .expr - .as_any() - .downcast_ref::() - .unwrap() - .name() - }) - .collect() - } - - fn promql_tsid_sorted_plan(schema: Arc, time_index: &str) -> LogicalPlan { - let input = LogicalPlan::EmptyRelation(EmptyRelation { - produce_one_row: false, - schema: schema.to_dfschema_ref().unwrap(), - }); - let sorted = LogicalPlanBuilder::from(input) - .sort(vec![ - col(DATA_SCHEMA_TSID_COLUMN_NAME).sort(true, true), - col(time_index).sort(true, true), - ]) - .unwrap() - .build() - .unwrap(); - - LogicalPlan::Extension(Extension { - node: Arc::new(SeriesDivide::new( - vec!["host".to_string()], - time_index.to_string(), - sorted, - )), - }) - } -} - /// Metrics for a region of a partition. #[derive(Debug, Clone)] struct RegionMetrics { diff --git a/src/query/src/dist_plan/planner.rs b/src/query/src/dist_plan/planner.rs index d01ccc9800..5864ea7bd0 100644 --- a/src/query/src/dist_plan/planner.rs +++ b/src/query/src/dist_plan/planner.rs @@ -20,22 +20,17 @@ use ahash::HashMap; use async_trait::async_trait; use catalog::CatalogManagerRef; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; -use common_query::prelude::greptime_timestamp; -use datafusion::arrow::compute::SortOptions; use datafusion::common::Result; use datafusion::datasource::DefaultTableSource; use datafusion::execution::context::SessionState; -use datafusion::physical_plan::{ExecutionPlan, ExecutionPlanProperties}; +use datafusion::physical_plan::ExecutionPlan; use datafusion::physical_planner::{ExtensionPlanner, PhysicalPlanner}; use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion, TreeNodeVisitor}; use datafusion_common::{DataFusionError, TableReference}; use datafusion_expr::{LogicalPlan, UserDefinedLogicalNode}; -use datafusion_physical_expr::expressions::Column as PhysicalColumn; -use datafusion_physical_expr::{LexOrdering, PhysicalSortExpr}; use partition::manager::{PartitionRuleManagerRef, create_partitions_from_region_routes}; use session::context::QueryContext; use snafu::{OptionExt, ResultExt}; -use store_api::metric_engine_consts::DATA_SCHEMA_TSID_COLUMN_NAME; use store_api::storage::RegionId; pub use table::metadata::TableType; use table::table::adapter::DfTableProviderAdapter; @@ -167,51 +162,6 @@ impl ExtensionPlanner for DistExtensionPlanner { return fallback(optimized_plan).await; }; - let remote_orderings = if let Ok(optimized_remote_plan) = - self.optimize_input_logical_plan(session_state, input_plan) - { - planner - .create_physical_plan(&optimized_remote_plan, session_state) - .await - .ok() - .map(|plan| { - let mut remote_orderings = Vec::new(); - if let Some(preferred_ordering) = - tsid_time_ordering(plan.as_ref()).filter(|ordering| { - plan.equivalence_properties() - .ordering_satisfy(ordering.clone()) - .unwrap_or(false) - }) - { - remote_orderings.push(preferred_ordering); - } - - if let Some(preferred_ordering) = - MergeScanExec::logical_sort_ordering(session_state, input_plan) - .ok() - .flatten() - .filter(|ordering| { - plan.equivalence_properties() - .ordering_satisfy(ordering.clone()) - .unwrap_or(false) - }) - { - remote_orderings.push(preferred_ordering); - } - - for ordering in plan.equivalence_properties().oeq_class().iter().cloned() { - if !remote_orderings.contains(&ordering) { - remote_orderings.push(ordering); - } - } - - remote_orderings - }) - .unwrap_or_default() - } else { - Vec::new() - }; - // TODO(ruihang): generate different execution plans for different variant merge operation let schema = optimized_plan.schema().as_arrow(); let query_ctx = session_state @@ -223,7 +173,6 @@ impl ExtensionPlanner for DistExtensionPlanner { table_name, regions, input_plan.clone(), - remote_orderings, schema, self.region_query_handler.clone(), query_ctx, @@ -234,31 +183,6 @@ impl ExtensionPlanner for DistExtensionPlanner { } } -fn tsid_time_ordering(plan: &dyn ExecutionPlan) -> Option { - let schema = plan.schema(); - let tsid_index = schema.index_of(DATA_SCHEMA_TSID_COLUMN_NAME).ok()?; - let time_index = schema.index_of(greptime_timestamp()).ok()?; - LexOrdering::new(vec![ - PhysicalSortExpr::new( - Arc::new(PhysicalColumn::new( - DATA_SCHEMA_TSID_COLUMN_NAME, - tsid_index, - )), - SortOptions { - descending: false, - nulls_first: true, - }, - ), - PhysicalSortExpr::new( - Arc::new(PhysicalColumn::new(greptime_timestamp(), time_index)), - SortOptions { - descending: false, - nulls_first: true, - }, - ), - ]) -} - impl DistExtensionPlanner { /// Extract fully resolved table name from logical plan fn extract_full_table_name(plan: &LogicalPlan) -> Result> { diff --git a/src/query/src/optimizer.rs b/src/query/src/optimizer.rs index bd797dde35..aaac1e3124 100644 --- a/src/query/src/optimizer.rs +++ b/src/query/src/optimizer.rs @@ -23,7 +23,6 @@ pub mod string_normalization; #[cfg(test)] pub(crate) mod test_util; pub mod transcribe_atat; -pub mod tsid_join_repartition; pub mod type_conversion; pub mod windowed_sort; diff --git a/src/query/src/optimizer/pass_distribution.rs b/src/query/src/optimizer/pass_distribution.rs index 769e260519..7ce2ffd752 100644 --- a/src/query/src/optimizer/pass_distribution.rs +++ b/src/query/src/optimizer/pass_distribution.rs @@ -17,10 +17,8 @@ use std::sync::Arc; use datafusion::config::ConfigOptions; use datafusion::physical_optimizer::PhysicalOptimizerRule; use datafusion::physical_plan::ExecutionPlan; -use datafusion::physical_plan::projection::ProjectionExec; use datafusion_common::Result as DfResult; use datafusion_physical_expr::Distribution; -use datafusion_physical_expr::utils::map_columns_before_projection; use crate::dist_plan::MergeScanExec; @@ -85,9 +83,7 @@ impl PassDistribution { let mut new_children = Vec::with_capacity(children.len()); for (idx, child) in children.into_iter().enumerate() { let child_req = match required.get(idx) { - Some(Distribution::UnspecifiedDistribution) => { - Self::propagate_unspecified_child_requirement(plan.as_ref(), idx, ¤t_req) - } + Some(Distribution::UnspecifiedDistribution) => None, None => current_req.clone(), Some(req) => Some(req.clone()), }; @@ -107,200 +103,4 @@ impl PassDistribution { plan.with_new_children(new_children) } } - - fn propagate_unspecified_child_requirement( - plan: &dyn ExecutionPlan, - idx: usize, - current_req: &Option, - ) -> Option { - if idx != 0 { - return None; - } - - let Some(Distribution::HashPartitioned(required_exprs)) = current_req else { - return None; - }; - - let projection = plan.as_any().downcast_ref::()?; - let proj_exprs = projection - .expr() - .iter() - .map(|expr| (Arc::clone(&expr.expr), expr.alias.clone())) - .collect::>(); - let mapped = map_columns_before_projection(required_exprs, &proj_exprs); - - (mapped.len() == required_exprs.len()).then_some(Distribution::HashPartitioned(mapped)) - } -} - -#[cfg(test)] -mod tests { - use std::collections::{BTreeMap, BTreeSet}; - - use arrow_schema::{DataType, Field, Schema, SchemaRef, TimeUnit}; - use async_trait::async_trait; - use common_query::request::QueryRequest; - use common_recordbatch::SendableRecordBatchStream; - use datafusion::common::NullEquality; - use datafusion::execution::SessionStateBuilder; - use datafusion::physical_optimizer::PhysicalOptimizerRule; - use datafusion::physical_plan::joins::{HashJoinExec, PartitionMode}; - use datafusion::physical_plan::projection::{ProjectionExec, ProjectionExpr}; - use datafusion::physical_plan::{ExecutionPlanProperties, Partitioning}; - use datafusion_expr::{JoinType, LogicalPlanBuilder}; - use datafusion_physical_expr::PhysicalExpr; - use datafusion_physical_expr::expressions::Column as PhysicalColumn; - use session::ReadPreference; - use session::context::QueryContext; - use store_api::metric_engine_consts::DATA_SCHEMA_TSID_COLUMN_NAME; - use store_api::storage::RegionId; - use table::table_name::TableName; - - use super::*; - use crate::error::Result as QueryResult; - use crate::region_query::RegionQueryHandler; - - struct NoopRegionQueryHandler; - - #[async_trait] - impl RegionQueryHandler for NoopRegionQueryHandler { - async fn do_get( - &self, - _read_preference: ReadPreference, - _request: QueryRequest, - ) -> QueryResult { - unreachable!("pass distribution tests should not execute remote queries") - } - } - - #[test] - fn passes_hash_requirement_through_projection_to_merge_scan() { - let schema = test_schema(); - let left_merge_scan = Arc::new(test_merge_scan_exec(schema.clone())); - let right_merge_scan = Arc::new(test_merge_scan_exec(schema.clone())); - let left_projection = Arc::new( - ProjectionExec::try_new( - vec![ - ProjectionExpr::new(partition_column("greptime_value", 3), "greptime_value"), - ProjectionExpr::new( - partition_column(DATA_SCHEMA_TSID_COLUMN_NAME, 1), - DATA_SCHEMA_TSID_COLUMN_NAME, - ), - ProjectionExpr::new( - partition_column("greptime_timestamp", 2), - "greptime_timestamp", - ), - ], - left_merge_scan, - ) - .unwrap(), - ) as Arc; - let join = Arc::new( - HashJoinExec::try_new( - left_projection, - right_merge_scan, - vec![ - ( - partition_column(DATA_SCHEMA_TSID_COLUMN_NAME, 1), - partition_column(DATA_SCHEMA_TSID_COLUMN_NAME, 1), - ), - ( - partition_column("greptime_timestamp", 2), - partition_column("greptime_timestamp", 2), - ), - ], - None, - &JoinType::Inner, - None, - PartitionMode::Partitioned, - NullEquality::NullEqualsNull, - false, - ) - .unwrap(), - ) as Arc; - - let optimized = PassDistribution - .optimize(join, &ConfigOptions::default()) - .unwrap(); - let hash_join = optimized.as_any().downcast_ref::().unwrap(); - let left_projection = hash_join - .left() - .as_any() - .downcast_ref::() - .unwrap(); - let left_partitioning = left_projection.input().output_partitioning(); - let right_partitioning = hash_join.right().output_partitioning(); - - let Partitioning::Hash(left_exprs, left_count) = left_partitioning else { - panic!("expected left merge scan hash partitioning"); - }; - let Partitioning::Hash(right_exprs, right_count) = right_partitioning else { - panic!("expected right merge scan hash partitioning"); - }; - - assert_eq!(*left_count, 32); - assert_eq!(*right_count, 32); - assert_eq!( - column_names(left_exprs), - vec![DATA_SCHEMA_TSID_COLUMN_NAME, "greptime_timestamp"] - ); - assert_eq!( - column_names(right_exprs), - vec![DATA_SCHEMA_TSID_COLUMN_NAME, "greptime_timestamp"] - ); - } - - fn test_merge_scan_exec(schema: SchemaRef) -> MergeScanExec { - let session_state = SessionStateBuilder::new().with_default_features().build(); - let partition_cols = BTreeMap::from([( - DATA_SCHEMA_TSID_COLUMN_NAME.to_string(), - BTreeSet::from([datafusion_common::Column::from_name( - DATA_SCHEMA_TSID_COLUMN_NAME, - )]), - )]); - let plan = LogicalPlanBuilder::empty(false).build().unwrap(); - - MergeScanExec::new( - &session_state, - TableName::new("greptime", "public", "test"), - vec![RegionId::new(1, 0), RegionId::new(1, 1)], - plan, - vec![], - schema.as_ref(), - Arc::new(NoopRegionQueryHandler), - QueryContext::arc(), - 32, - partition_cols, - ) - .unwrap() - } - - fn test_schema() -> SchemaRef { - Arc::new(Schema::new(vec![ - Field::new("host", DataType::Utf8, true), - Field::new(DATA_SCHEMA_TSID_COLUMN_NAME, DataType::UInt64, false), - Field::new( - "greptime_timestamp", - DataType::Timestamp(TimeUnit::Millisecond, None), - false, - ), - Field::new("greptime_value", DataType::Float64, true), - ])) - } - - fn partition_column(name: &str, index: usize) -> Arc { - Arc::new(PhysicalColumn::new(name, index)) - } - - fn column_names(exprs: &[Arc]) -> Vec<&str> { - exprs - .iter() - .map(|expr| { - expr.as_any() - .downcast_ref::() - .unwrap() - .name() - }) - .collect() - } } diff --git a/src/query/src/optimizer/tsid_join_repartition.rs b/src/query/src/optimizer/tsid_join_repartition.rs deleted file mode 100644 index b12cded1aa..0000000000 --- a/src/query/src/optimizer/tsid_join_repartition.rs +++ /dev/null @@ -1,383 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::collections::BTreeSet; -use std::sync::Arc; - -use datafusion::config::ConfigOptions; -use datafusion::physical_optimizer::PhysicalOptimizerRule; -use datafusion::physical_plan::joins::{HashJoinExec, PartitionMode}; -use datafusion::physical_plan::repartition::RepartitionExec; -use datafusion::physical_plan::{ExecutionPlan, ExecutionPlanProperties, Partitioning}; -use datafusion_common::Result as DfResult; -use datafusion_common::tree_node::{Transformed, TreeNode}; -use datafusion_physical_expr::PhysicalExpr; -use datafusion_physical_expr::expressions::Column as PhysicalColumn; -use store_api::metric_engine_consts::DATA_SCHEMA_TSID_COLUMN_NAME; - -/// Removes repartitions that only strengthen an existing `__tsid` distribution into -/// `(__tsid, time)` for partitioned hash joins. -#[derive(Debug)] -pub struct TsidJoinRepartition; - -impl PhysicalOptimizerRule for TsidJoinRepartition { - fn optimize( - &self, - plan: Arc, - _config: &ConfigOptions, - ) -> DfResult> { - plan.transform_down(|plan| { - let Some(hash_join) = plan.as_any().downcast_ref::() else { - return Ok(Transformed::no(plan)); - }; - - if *hash_join.partition_mode() != PartitionMode::Partitioned { - return Ok(Transformed::no(plan)); - } - - let left_join_keys = join_side_column_names(hash_join.on(), true); - let right_join_keys = join_side_column_names(hash_join.on(), false); - let Some(left_join_keys) = left_join_keys else { - return Ok(Transformed::no(plan)); - }; - let Some(right_join_keys) = right_join_keys else { - return Ok(Transformed::no(plan)); - }; - - let Some(left_repartition) = - hash_join.left().as_any().downcast_ref::() - else { - return Ok(Transformed::no(plan)); - }; - let Some(right_repartition) = - hash_join.right().as_any().downcast_ref::() - else { - return Ok(Transformed::no(plan)); - }; - - let Some(left_input) = removable_tsid_repartition(left_repartition, &left_join_keys) - else { - return Ok(Transformed::no(plan)); - }; - let Some(right_input) = removable_tsid_repartition(right_repartition, &right_join_keys) - else { - return Ok(Transformed::no(plan)); - }; - - if left_input.output_partitioning().partition_count() - != right_input.output_partitioning().partition_count() - { - return Ok(Transformed::no(plan)); - } - - let new_join = hash_join - .builder() - .with_new_children(vec![left_input, right_input])? - .recompute_properties() - .reset_state() - .build_exec()?; - - Ok(Transformed::yes(new_join)) - }) - .map(|result| result.data) - } - - fn name(&self) -> &str { - "TsidJoinRepartition" - } - - fn schema_check(&self) -> bool { - false - } -} - -fn removable_tsid_repartition( - repartition: &RepartitionExec, - join_keys: &BTreeSet, -) -> Option> { - if repartition.preserve_order() { - return None; - } - - let Partitioning::Hash(requested_exprs, requested_partition_count) = repartition.partitioning() - else { - return None; - }; - let Partitioning::Hash(existing_exprs, existing_partition_count) = - repartition.input().output_partitioning() - else { - return None; - }; - - if *requested_partition_count != *existing_partition_count { - return None; - } - - let requested_names = column_names(requested_exprs)?; - if requested_names != *join_keys || !requested_names.contains(DATA_SCHEMA_TSID_COLUMN_NAME) { - return None; - } - - let existing_names = column_names(&existing_exprs)?; - if existing_names != BTreeSet::from([DATA_SCHEMA_TSID_COLUMN_NAME.to_string()]) { - return None; - } - - if requested_names.len() <= existing_names.len() { - return None; - } - - Some(repartition.input().clone()) -} - -fn join_side_column_names( - on: &[(Arc, Arc)], - left: bool, -) -> Option> { - on.iter() - .map(|(left_expr, right_expr)| { - if left { - physical_column_name(left_expr) - } else { - physical_column_name(right_expr) - } - }) - .collect() -} - -fn column_names(exprs: &[Arc]) -> Option> { - exprs.iter().map(physical_column_name).collect() -} - -fn physical_column_name(expr: &Arc) -> Option { - expr.as_any() - .downcast_ref::() - .map(|column| column.name().to_string()) -} - -#[cfg(test)] -mod tests { - use std::any::Any; - use std::sync::Arc; - - use arrow_schema::{DataType, Field, Schema, SchemaRef, TimeUnit}; - use datafusion::common::NullEquality; - use datafusion::execution::TaskContext; - use datafusion::physical_optimizer::PhysicalOptimizerRule; - use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType}; - use datafusion::physical_plan::joins::HashJoinExec; - use datafusion::physical_plan::{ - DisplayAs, DisplayFormatType, Partitioning, PlanProperties, SendableRecordBatchStream, - displayable, - }; - use datafusion_common::Result as DfResult; - use datafusion_expr::JoinType; - use datafusion_physical_expr::EquivalenceProperties; - use datafusion_physical_expr::expressions::Column as PhysicalColumn; - - use super::*; - - #[derive(Debug)] - struct PartitionedTestExec { - properties: Arc, - } - - impl PartitionedTestExec { - fn new(schema: SchemaRef, partitioning: Partitioning) -> Self { - Self { - properties: Arc::new(PlanProperties::new( - EquivalenceProperties::new(schema.clone()), - partitioning, - EmissionType::Incremental, - Boundedness::Bounded, - )), - } - } - } - - impl DisplayAs for PartitionedTestExec { - fn fmt_as( - &self, - _t: DisplayFormatType, - f: &mut std::fmt::Formatter<'_>, - ) -> std::fmt::Result { - write!(f, "PartitionedTestExec") - } - } - - impl ExecutionPlan for PartitionedTestExec { - fn name(&self) -> &str { - "PartitionedTestExec" - } - - fn as_any(&self) -> &dyn Any { - self - } - - fn properties(&self) -> &Arc { - &self.properties - } - - fn children(&self) -> Vec<&Arc> { - vec![] - } - - fn with_new_children( - self: Arc, - _children: Vec>, - ) -> DfResult> { - Ok(self) - } - - fn execute( - &self, - _partition: usize, - _context: Arc, - ) -> DfResult { - unreachable!("optimizer tests should not execute PartitionedTestExec") - } - } - - #[test] - fn removes_repartition_for_tsid_strengthening_join() { - let schema = test_schema(); - let left = repartitioned_child( - schema.clone(), - vec![partition_column(DATA_SCHEMA_TSID_COLUMN_NAME, 0)], - ); - let right = repartitioned_child( - schema.clone(), - vec![partition_column(DATA_SCHEMA_TSID_COLUMN_NAME, 0)], - ); - let join_on = vec![ - ( - partition_column(DATA_SCHEMA_TSID_COLUMN_NAME, 0), - partition_column(DATA_SCHEMA_TSID_COLUMN_NAME, 0), - ), - ( - partition_column("greptime_timestamp", 1), - partition_column("greptime_timestamp", 1), - ), - ]; - let join = Arc::new( - HashJoinExec::try_new( - left, - right, - join_on, - None, - &JoinType::Inner, - None, - PartitionMode::Partitioned, - NullEquality::NullEqualsNull, - false, - ) - .unwrap(), - ) as Arc; - - let optimized = TsidJoinRepartition - .optimize(join, &ConfigOptions::default()) - .unwrap(); - - let hash_join = optimized.as_any().downcast_ref::().unwrap(); - assert!(!hash_join.left().as_any().is::()); - assert!(!hash_join.right().as_any().is::()); - - let plan_str = displayable(optimized.as_ref()).indent(false).to_string(); - assert!(!plan_str.contains("RepartitionExec"), "{plan_str}"); - } - - #[test] - fn keeps_repartition_without_existing_tsid_distribution() { - let schema = test_schema(); - let left = repartitioned_child( - schema.clone(), - vec![partition_column("greptime_timestamp", 1)], - ); - let right = repartitioned_child( - schema.clone(), - vec![partition_column(DATA_SCHEMA_TSID_COLUMN_NAME, 0)], - ); - let join_on = vec![ - ( - partition_column(DATA_SCHEMA_TSID_COLUMN_NAME, 0), - partition_column(DATA_SCHEMA_TSID_COLUMN_NAME, 0), - ), - ( - partition_column("greptime_timestamp", 1), - partition_column("greptime_timestamp", 1), - ), - ]; - let join = Arc::new( - HashJoinExec::try_new( - left, - right, - join_on, - None, - &JoinType::Inner, - None, - PartitionMode::Partitioned, - NullEquality::NullEqualsNull, - false, - ) - .unwrap(), - ) as Arc; - - let optimized = TsidJoinRepartition - .optimize(join, &ConfigOptions::default()) - .unwrap(); - - let hash_join = optimized.as_any().downcast_ref::().unwrap(); - assert!(hash_join.left().as_any().is::()); - assert!(hash_join.right().as_any().is::()); - } - - fn test_schema() -> SchemaRef { - Arc::new(Schema::new(vec![ - Field::new(DATA_SCHEMA_TSID_COLUMN_NAME, DataType::UInt64, false), - Field::new( - "greptime_timestamp", - DataType::Timestamp(TimeUnit::Millisecond, None), - false, - ), - Field::new("greptime_value", DataType::Float64, true), - ])) - } - - fn repartitioned_child( - schema: SchemaRef, - existing_partition_exprs: Vec>, - ) -> Arc { - let input = Arc::new(PartitionedTestExec::new( - schema, - Partitioning::Hash(existing_partition_exprs, 32), - )); - Arc::new( - RepartitionExec::try_new( - input, - Partitioning::Hash( - vec![ - partition_column(DATA_SCHEMA_TSID_COLUMN_NAME, 0), - partition_column("greptime_timestamp", 1), - ], - 32, - ), - ) - .unwrap(), - ) - } - - fn partition_column(name: &str, index: usize) -> Arc { - Arc::new(PhysicalColumn::new(name, index)) - } -} diff --git a/src/query/src/query_engine/state.rs b/src/query/src/query_engine/state.rs index 3bc7f32f9f..f696c8b53e 100644 --- a/src/query/src/query_engine/state.rs +++ b/src/query/src/query_engine/state.rs @@ -68,7 +68,6 @@ use crate::optimizer::remove_duplicate::RemoveDuplicate; use crate::optimizer::scan_hint::ScanHintRule; use crate::optimizer::string_normalization::StringNormalizationRule; use crate::optimizer::transcribe_atat::TranscribeAtatRule; -use crate::optimizer::tsid_join_repartition::TsidJoinRepartition; use crate::optimizer::type_conversion::TypeConversionRule; use crate::optimizer::windowed_sort::WindowedSortPhysicalRule; use crate::options::QueryOptions as QueryOptionsNew; @@ -195,8 +194,6 @@ impl QueryEngineState { physical_optimizer .rules .push(Arc::new(WindowedSortPhysicalRule)); - // Relax redundant repartitions for tsid-based PromQL joins after distribution is enforced. - physical_optimizer.rules.push(Arc::new(TsidJoinRepartition)); // explicitly not do filter pushdown for windowed sort&part sort // (notice that `PartSortExec` create another new dyn filter that need to be pushdown if want to use dyn filter optimization) // benchmark shows it can cause performance regression due to useless filtering and extra shuffle. diff --git a/tests/cases/distributed/flow-tql/tsid_on_phy.result b/tests/cases/distributed/flow-tql/tsid_on_phy.result index 923a884093..6ae3e77b0c 100644 --- a/tests/cases/distributed/flow-tql/tsid_on_phy.result +++ b/tests/cases/distributed/flow-tql/tsid_on_phy.result @@ -100,35 +100,31 @@ TQL EXPLAIN ( ) ); -+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ -| plan_type | plan | -+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ -| logical_plan | HistogramFold: le=le, field=sum(prom_avg_over_time(ts_range,v)), quantile=0.5 | -| | Sort: test_tsid.le ASC NULLS LAST, test_tsid.tag4 ASC NULLS LAST, test_tsid.tag5 ASC NULLS LAST, test_tsid.ts ASC NULLS LAST | -| | Aggregate: groupBy=[[test_tsid.le, test_tsid.tag4, test_tsid.tag5, test_tsid.ts]], aggr=[[__sum_merge(__sum_state(prom_avg_over_time(ts_range,v))) AS sum(prom_avg_over_time(ts_range,v))]] | -| | MergeScan [is_placeholder=false, remote_input=[ | -| | Aggregate: groupBy=[[test_tsid.le, test_tsid.tag4, test_tsid.tag5, test_tsid.ts]], aggr=[[__sum_state(prom_avg_over_time(ts_range,v))]] | -| | Filter: prom_avg_over_time(ts_range,v) IS NOT NULL | -| | Projection: test_tsid.ts, prom_avg_over_time(ts_range, v) AS prom_avg_over_time(ts_range,v), test_tsid.le, test_tsid.tag1, test_tsid.tag2, test_tsid.tag4, test_tsid.tag5, test_tsid.tag6, test_tsid.tag7, test_tsid.tag8 | -| | PromRangeManipulate: req range=[1769139000000..1769139900000], interval=[60000], eval range=[1800000], time index=[ts], values=["v"] | -| | PromSeriesNormalize: offset=[0], time index=[ts], filter NaN: [true] | -| | PromSeriesDivide: tags=["__tsid"] | -| | Sort: test_tsid.__tsid ASC NULLS FIRST, test_tsid.ts ASC NULLS FIRST | -| | Filter: test_tsid.ts >= TimestampMillisecond(1769137200001, None) AND test_tsid.ts <= TimestampMillisecond(1769139900000, None) | -| | Projection: test_tsid.v, test_tsid.le, test_tsid.tag1, test_tsid.tag2, test_tsid.tag4, test_tsid.tag5, test_tsid.tag6, test_tsid.tag7, test_tsid.tag8, test_tsid.__tsid, test_tsid.ts | -| | SubqueryAlias: test_tsid | -| | Filter: phy.__table_id=UInt32(REDACTED) | -| | TableScan: phy projection=[ts, v, tag1, tag2, le, tag4, tag5, tag6, tag7, tag8, __table_id, __tsid] | -| | ]] | -| physical_plan | HistogramFoldExec: le=@0, field=@4, quantile=0.5 | -| | SortExec: expr=[tag4@1 ASC NULLS LAST, tag5@2 ASC NULLS LAST, ts@3 ASC NULLS LAST, CAST(le@0 AS Float64) ASC NULLS LAST], preserve_partitioning=[true] | ++---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ +| plan_type | plan | ++---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ +| logical_plan | HistogramFold: le=le, field=sum(prom_avg_over_time(ts_range,v)), quantile=0.5 | +| | MergeScan [is_placeholder=false, remote_input=[ | +| | Sort: test_tsid.le ASC NULLS LAST, test_tsid.tag4 ASC NULLS LAST, test_tsid.tag5 ASC NULLS LAST, test_tsid.ts ASC NULLS LAST | +| | Aggregate: groupBy=[[test_tsid.le, test_tsid.tag4, test_tsid.tag5, test_tsid.ts]], aggr=[[sum(prom_avg_over_time(ts_range,v))]] | +| | Filter: prom_avg_over_time(ts_range,v) IS NOT NULL | +| | Projection: test_tsid.ts, prom_avg_over_time(ts_range, v) AS prom_avg_over_time(ts_range,v), test_tsid.le, test_tsid.tag1, test_tsid.tag2, test_tsid.tag4, test_tsid.tag5, test_tsid.tag6, test_tsid.tag7, test_tsid.tag8 | +| | PromRangeManipulate: req range=[1769139000000..1769139900000], interval=[60000], eval range=[1800000], time index=[ts], values=["v"] | +| | PromSeriesNormalize: offset=[0], time index=[ts], filter NaN: [true] | +| | PromSeriesDivide: tags=["__tsid"] | +| | Sort: test_tsid.__tsid ASC NULLS FIRST, test_tsid.ts ASC NULLS FIRST | +| | Filter: test_tsid.ts >= TimestampMillisecond(1769137200001, None) AND test_tsid.ts <= TimestampMillisecond(1769139900000, None) | +| | Projection: test_tsid.v, test_tsid.le, test_tsid.tag1, test_tsid.tag2, test_tsid.tag4, test_tsid.tag5, test_tsid.tag6, test_tsid.tag7, test_tsid.tag8, test_tsid.__tsid, test_tsid.ts | +| | SubqueryAlias: test_tsid | +| | Filter: phy.__table_id=UInt32(REDACTED) | +| | TableScan: phy projection=[ts, v, tag1, tag2, le, tag4, tag5, tag6, tag7, tag8, __table_id, __tsid] | +| | ]] | +| physical_plan | HistogramFoldExec: le=@0, field=@4, quantile=0.5 | +| | SortExec: expr=[tag4@1 ASC NULLS LAST, tag5@2 ASC NULLS LAST, ts@3 ASC NULLS LAST, CAST(le@0 AS Float64) ASC NULLS LAST], preserve_partitioning=[true] | | | RepartitionExec: REDACTED -| | AggregateExec: mode=FinalPartitioned, gby=[le@0 as le, tag4@1 as tag4, tag5@2 as tag5, ts@3 as ts], aggr=[sum(prom_avg_over_time(ts_range,v))] | -| | RepartitionExec: REDACTED -| | AggregateExec: mode=Partial, gby=[le@0 as le, tag4@1 as tag4, tag5@2 as tag5, ts@3 as ts], aggr=[sum(prom_avg_over_time(ts_range,v))] | -| | MergeScanExec: REDACTED -| | | -+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ +| | MergeScanExec: REDACTED +| | | ++---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ CREATE FLOW IF NOT EXISTS test_tsid SINK TO 'test_tsid_output' diff --git a/tests/cases/standalone/common/tql-explain-analyze/explain.result b/tests/cases/standalone/common/tql-explain-analyze/explain.result index 4580d1161f..1e4cf18b40 100644 --- a/tests/cases/standalone/common/tql-explain-analyze/explain.result +++ b/tests/cases/standalone/common/tql-explain-analyze/explain.result @@ -196,7 +196,6 @@ TQL EXPLAIN VERBOSE (0, 10, '5s') test; |_|_| | physical_plan after FilterPushdown(Post)_| SAME TEXT AS ABOVE_| | physical_plan after WindowedSortRule_| SAME TEXT AS ABOVE_| -| physical_plan after TsidJoinRepartition_| SAME TEXT AS ABOVE_| | physical_plan after MatchesConstantTerm_| SAME TEXT AS ABOVE_| | physical_plan after RemoveDuplicateRule_| SAME TEXT AS ABOVE_| | physical_plan after SanityCheckPlan_| SAME TEXT AS ABOVE_| @@ -343,7 +342,6 @@ TQL EXPLAIN VERBOSE (0, 10, '5s') test AS series; |_|_| | physical_plan after FilterPushdown(Post)_| SAME TEXT AS ABOVE_| | physical_plan after WindowedSortRule_| SAME TEXT AS ABOVE_| -| physical_plan after TsidJoinRepartition_| SAME TEXT AS ABOVE_| | physical_plan after MatchesConstantTerm_| SAME TEXT AS ABOVE_| | physical_plan after RemoveDuplicateRule_| SAME TEXT AS ABOVE_| | physical_plan after SanityCheckPlan_| SAME TEXT AS ABOVE_| diff --git a/tests/cases/standalone/tql-explain-analyze/tsid_column.result b/tests/cases/standalone/tql-explain-analyze/tsid_column.result index 882358074a..4a7a875060 100644 --- a/tests/cases/standalone/tql-explain-analyze/tsid_column.result +++ b/tests/cases/standalone/tql-explain-analyze/tsid_column.result @@ -42,16 +42,14 @@ TQL ANALYZE (0, 10, '5s') sum(tsid_metric); +-+-+-+ | stage | node | plan_| +-+-+-+ -| 0_| 0_|_SortPreservingMergeExec: [ts@0 ASC NULLS LAST] REDACTED +| 0_| 0_|_CooperativeExec REDACTED +|_|_|_MergeScanExec: REDACTED +|_|_|_| +| 1_| 0_|_SortPreservingMergeExec: [ts@0 ASC NULLS LAST] REDACTED |_|_|_SortExec: expr=[ts@0 ASC NULLS LAST], preserve_partitioning=[true] REDACTED |_|_|_AggregateExec: mode=FinalPartitioned, gby=[ts@0 as ts], aggr=[sum(tsid_metric.val)] REDACTED |_|_|_RepartitionExec: partitioning=REDACTED -|_|_|_AggregateExec: mode=Partial, gby=[ts@0 as ts], aggr=[sum(tsid_metric.val)] REDACTED -|_|_|_MergeScanExec: REDACTED -|_|_|_| -| 1_| 0_|_AggregateExec: mode=FinalPartitioned, gby=[ts@0 as ts], aggr=[__sum_state(tsid_metric.val)] REDACTED -|_|_|_RepartitionExec: partitioning=REDACTED -|_|_|_AggregateExec: mode=Partial, gby=[ts@1 as ts], aggr=[__sum_state(tsid_metric.val)] REDACTED +|_|_|_AggregateExec: mode=Partial, gby=[ts@1 as ts], aggr=[sum(tsid_metric.val)] REDACTED |_|_|_ProjectionExec: expr=[val@0 as val, ts@2 as ts] REDACTED |_|_|_PromInstantManipulateExec: range=[0..10000], lookback=[300000], interval=[5000], time index=[ts] REDACTED |_|_|_PromSeriesDivideExec: tags=["__tsid"] REDACTED @@ -73,16 +71,14 @@ TQL ANALYZE (0, 10, '5s') sum by (job, instance) (tsid_metric); +-+-+-+ | stage | node | plan_| +-+-+-+ -| 0_| 0_|_SortPreservingMergeExec: [job@0 ASC NULLS LAST, instance@1 ASC NULLS LAST, ts@2 ASC NULLS LAST] REDACTED +| 0_| 0_|_CooperativeExec REDACTED +|_|_|_MergeScanExec: REDACTED +|_|_|_| +| 1_| 0_|_SortPreservingMergeExec: [job@0 ASC NULLS LAST, instance@1 ASC NULLS LAST, ts@2 ASC NULLS LAST] REDACTED |_|_|_SortExec: expr=[job@0 ASC NULLS LAST, instance@1 ASC NULLS LAST, ts@2 ASC NULLS LAST], preserve_partitioning=[true] REDACTED |_|_|_AggregateExec: mode=FinalPartitioned, gby=[job@0 as job, instance@1 as instance, ts@2 as ts], aggr=[sum(tsid_metric.val), __tsid] REDACTED |_|_|_RepartitionExec: partitioning=REDACTED -|_|_|_AggregateExec: mode=Partial, gby=[job@0 as job, instance@1 as instance, ts@2 as ts], aggr=[sum(tsid_metric.val), __tsid] REDACTED -|_|_|_MergeScanExec: REDACTED -|_|_|_| -| 1_| 0_|_AggregateExec: mode=FinalPartitioned, gby=[job@0 as job, instance@1 as instance, ts@2 as ts], aggr=[__sum_state(tsid_metric.val), __first_value_state(tsid_metric.__tsid)] REDACTED -|_|_|_RepartitionExec: partitioning=REDACTED -|_|_|_AggregateExec: mode=Partial, gby=[job@2 as job, instance@1 as instance, ts@4 as ts], aggr=[__sum_state(tsid_metric.val), __first_value_state(tsid_metric.__tsid)] REDACTED +|_|_|_AggregateExec: mode=Partial, gby=[job@2 as job, instance@1 as instance, ts@4 as ts], aggr=[sum(tsid_metric.val), __tsid] REDACTED |_|_|_PromInstantManipulateExec: range=[0..10000], lookback=[300000], interval=[5000], time index=[ts] REDACTED |_|_|_PromSeriesDivideExec: tags=["__tsid"] REDACTED |_|_|_ProjectionExec: expr=[val@1 as val, instance@3 as instance, job@4 as job, __tsid@2 as __tsid, ts@0 as ts] REDACTED @@ -107,6 +103,12 @@ TQL ANALYZE (0, 10, '5s') sum(irate(tsid_metric[1h])) / scalar(count(count(tsid |_|_|_REDACTED |_|_|_ScalarCalculateExec: tags=[] REDACTED |_|_|_CoalescePartitionsExec REDACTED +|_|_|_MergeScanExec: REDACTED +|_|_|_CooperativeExec REDACTED +|_|_|_MergeScanExec: REDACTED +|_|_|_| +| 1_| 0_|_SortPreservingMergeExec: [ts@0 ASC NULLS LAST] REDACTED +|_|_|_SortExec: expr=[ts@0 ASC NULLS LAST], preserve_partitioning=[true] REDACTED |_|_|_AggregateExec: mode=FinalPartitioned, gby=[ts@0 as ts], aggr=[count(count(tsid_metric.val))] REDACTED |_|_|_RepartitionExec: partitioning=REDACTED |_|_|_AggregateExec: mode=Partial, gby=[ts@0 as ts], aggr=[count(count(tsid_metric.val))] REDACTED @@ -114,25 +116,17 @@ TQL ANALYZE (0, 10, '5s') sum(irate(tsid_metric[1h])) / scalar(count(count(tsid |_|_|_AggregateExec: mode=FinalPartitioned, gby=[ts@0 as ts, job@1 as job], aggr=[] REDACTED |_|_|_RepartitionExec: partitioning=REDACTED |_|_|_AggregateExec: mode=Partial, gby=[ts@0 as ts, job@1 as job], aggr=[] REDACTED -|_|_|_MergeScanExec: REDACTED -|_|_|_SortExec: expr=[ts@0 ASC NULLS LAST], preserve_partitioning=[true] REDACTED -|_|_|_AggregateExec: mode=FinalPartitioned, gby=[ts@0 as ts], aggr=[sum(prom_irate(ts_range,val))] REDACTED -|_|_|_RepartitionExec: partitioning=REDACTED -|_|_|_AggregateExec: mode=Partial, gby=[ts@0 as ts], aggr=[sum(prom_irate(ts_range,val))] REDACTED -|_|_|_MergeScanExec: REDACTED -|_|_|_| -| 1_| 0_|_AggregateExec: mode=FinalPartitioned, gby=[ts@0 as ts, job@1 as job], aggr=[] REDACTED -|_|_|_RepartitionExec: partitioning=REDACTED -|_|_|_AggregateExec: mode=Partial, gby=[ts@0 as ts, job@1 as job], aggr=[] REDACTED |_|_|_ProjectionExec: expr=[ts@3 as ts, job@1 as job] REDACTED |_|_|_PromInstantManipulateExec: range=[0..10000], lookback=[300000], interval=[5000], time index=[ts] REDACTED |_|_|_PromSeriesDivideExec: tags=["__tsid"] REDACTED |_|_|_ProjectionExec: expr=[val@1 as val, job@3 as job, __tsid@2 as __tsid, ts@0 as ts] REDACTED |_|_|_SeriesScan: region=REDACTED, "partition_count":{"count":1, "mem_ranges":1, "files":0, "file_ranges":0}, "distribution":"PerSeries" REDACTED |_|_|_| -| 1_| 0_|_AggregateExec: mode=FinalPartitioned, gby=[ts@0 as ts], aggr=[__sum_state(prom_irate(ts_range,val))] REDACTED +| 1_| 0_|_SortPreservingMergeExec: [ts@0 ASC NULLS LAST] REDACTED +|_|_|_SortExec: expr=[ts@0 ASC NULLS LAST], preserve_partitioning=[true] REDACTED +|_|_|_AggregateExec: mode=FinalPartitioned, gby=[ts@0 as ts], aggr=[sum(prom_irate(ts_range,val))] REDACTED |_|_|_RepartitionExec: partitioning=REDACTED -|_|_|_AggregateExec: mode=Partial, gby=[ts@0 as ts], aggr=[__sum_state(prom_irate(ts_range,val))] REDACTED +|_|_|_AggregateExec: mode=Partial, gby=[ts@0 as ts], aggr=[sum(prom_irate(ts_range,val))] REDACTED |_|_|_FilterExec: prom_irate(ts_range,val)@1 IS NOT NULL REDACTED |_|_|_ProjectionExec: expr=[ts@2 as ts, prom_irate(ts_range@3, val@0) as prom_irate(ts_range,val)] REDACTED |_|_|_PromRangeManipulateExec: req range=[0..10000], interval=[5000], eval range=[3600000], time index=[ts] REDACTED @@ -160,6 +154,12 @@ TQL ANALYZE (0, 10, '5s') sum(irate(tsid_metric[1h])) / scalar(count(sum(tsid_m |_|_|_REDACTED |_|_|_ScalarCalculateExec: tags=[] REDACTED |_|_|_CoalescePartitionsExec REDACTED +|_|_|_MergeScanExec: REDACTED +|_|_|_CooperativeExec REDACTED +|_|_|_MergeScanExec: REDACTED +|_|_|_| +| 1_| 0_|_SortPreservingMergeExec: [ts@0 ASC NULLS LAST] REDACTED +|_|_|_SortExec: expr=[ts@0 ASC NULLS LAST], preserve_partitioning=[true] REDACTED |_|_|_AggregateExec: mode=FinalPartitioned, gby=[ts@0 as ts], aggr=[count(sum(tsid_metric.val))] REDACTED |_|_|_RepartitionExec: partitioning=REDACTED |_|_|_AggregateExec: mode=Partial, gby=[ts@0 as ts], aggr=[count(sum(tsid_metric.val))] REDACTED @@ -167,16 +167,6 @@ TQL ANALYZE (0, 10, '5s') sum(irate(tsid_metric[1h])) / scalar(count(sum(tsid_m |_|_|_AggregateExec: mode=FinalPartitioned, gby=[ts@0 as ts, job@1 as job], aggr=[] REDACTED |_|_|_RepartitionExec: partitioning=REDACTED |_|_|_AggregateExec: mode=Partial, gby=[ts@0 as ts, job@1 as job], aggr=[] REDACTED -|_|_|_MergeScanExec: REDACTED -|_|_|_SortExec: expr=[ts@0 ASC NULLS LAST], preserve_partitioning=[true] REDACTED -|_|_|_AggregateExec: mode=FinalPartitioned, gby=[ts@0 as ts], aggr=[sum(prom_irate(ts_range,val))] REDACTED -|_|_|_RepartitionExec: partitioning=REDACTED -|_|_|_AggregateExec: mode=Partial, gby=[ts@0 as ts], aggr=[sum(prom_irate(ts_range,val))] REDACTED -|_|_|_MergeScanExec: REDACTED -|_|_|_| -| 1_| 0_|_AggregateExec: mode=FinalPartitioned, gby=[ts@0 as ts, job@1 as job], aggr=[] REDACTED -|_|_|_RepartitionExec: partitioning=REDACTED -|_|_|_AggregateExec: mode=Partial, gby=[ts@0 as ts, job@1 as job], aggr=[] REDACTED |_|_|_ProjectionExec: expr=[ts@1 as ts, job@0 as job] REDACTED |_|_|_FilterExec: val@0 IS NOT NULL, projection=[job@1, ts@2] REDACTED |_|_|_ProjectionExec: expr=[val@0 as val, job@1 as job, ts@3 as ts] REDACTED @@ -185,9 +175,11 @@ TQL ANALYZE (0, 10, '5s') sum(irate(tsid_metric[1h])) / scalar(count(sum(tsid_m |_|_|_ProjectionExec: expr=[val@1 as val, job@3 as job, __tsid@2 as __tsid, ts@0 as ts] REDACTED |_|_|_SeriesScan: region=REDACTED, "partition_count":{"count":1, "mem_ranges":1, "files":0, "file_ranges":0}, "distribution":"PerSeries" REDACTED |_|_|_| -| 1_| 0_|_AggregateExec: mode=FinalPartitioned, gby=[ts@0 as ts], aggr=[__sum_state(prom_irate(ts_range,val))] REDACTED +| 1_| 0_|_SortPreservingMergeExec: [ts@0 ASC NULLS LAST] REDACTED +|_|_|_SortExec: expr=[ts@0 ASC NULLS LAST], preserve_partitioning=[true] REDACTED +|_|_|_AggregateExec: mode=FinalPartitioned, gby=[ts@0 as ts], aggr=[sum(prom_irate(ts_range,val))] REDACTED |_|_|_RepartitionExec: partitioning=REDACTED -|_|_|_AggregateExec: mode=Partial, gby=[ts@0 as ts], aggr=[__sum_state(prom_irate(ts_range,val))] REDACTED +|_|_|_AggregateExec: mode=Partial, gby=[ts@0 as ts], aggr=[sum(prom_irate(ts_range,val))] REDACTED |_|_|_FilterExec: prom_irate(ts_range,val)@1 IS NOT NULL REDACTED |_|_|_ProjectionExec: expr=[ts@2 as ts, prom_irate(ts_range@3, val@0) as prom_irate(ts_range,val)] REDACTED |_|_|_PromRangeManipulateExec: req range=[0..10000], interval=[5000], eval range=[3600000], time index=[ts] REDACTED