diff --git a/src/query/src/dist_plan/merge_scan.rs b/src/query/src/dist_plan/merge_scan.rs index 7a42b1c0d3..3c326cf390 100644 --- a/src/query/src/dist_plan/merge_scan.rs +++ b/src/query/src/dist_plan/merge_scan.rs @@ -43,7 +43,6 @@ use greptime_proto::v1::region::RegionRequestHeader; use meter_core::data::ReadItem; use meter_macros::read_meter; use session::context::QueryContextRef; -use store_api::metric_engine_consts::DATA_SCHEMA_TSID_COLUMN_NAME; use store_api::storage::RegionId; use table::table_name::TableName; use tokio::time::Instant; @@ -429,35 +428,20 @@ impl MergeScanExec { return None; } - // Metric-engine scans can satisfy any hash distribution that includes `__tsid`. - // Equal requested keys also share the same `__tsid`, and equal `__tsid` values stay - // co-located across MergeScan partitions. - let overlaps = if self - .arrow_schema - .column_with_name(DATA_SCHEMA_TSID_COLUMN_NAME) - .is_some() - && hash_exprs.iter().any(|expr| { + let all_partition_col_aliases: HashSet<_> = self + .partition_cols + .values() + .flat_map(|aliases| aliases.iter().map(|c| c.name())) + .collect(); + let overlaps: Vec<_> = hash_exprs + .iter() + .filter(|expr| { expr.as_any() .downcast_ref::() - .is_some_and(|col_expr| col_expr.name() == DATA_SCHEMA_TSID_COLUMN_NAME) - }) { - hash_exprs - } else { - let all_partition_col_aliases: HashSet<_> = self - .partition_cols - .values() - .flat_map(|aliases| aliases.iter().map(|c| c.name())) - .collect(); - hash_exprs - .iter() - .filter(|expr| { - expr.as_any() - .downcast_ref::() - .is_some_and(|col_expr| all_partition_col_aliases.contains(col_expr.name())) - }) - .cloned() - .collect() - }; + .is_some_and(|col_expr| all_partition_col_aliases.contains(col_expr.name())) + }) + .cloned() + .collect(); if overlaps.is_empty() { return None; diff --git a/src/query/src/optimizer/pass_distribution.rs b/src/query/src/optimizer/pass_distribution.rs index b9cf4a4c37..1004252636 100644 --- a/src/query/src/optimizer/pass_distribution.rs +++ b/src/query/src/optimizer/pass_distribution.rs @@ -248,12 +248,18 @@ mod tests { fn test_merge_scan_exec(schema: SchemaRef) -> MergeScanExec { let session_state = SessionStateBuilder::new().with_default_features().build(); - let partition_cols = BTreeMap::from([( - DATA_SCHEMA_TSID_COLUMN_NAME.to_string(), - BTreeSet::from([datafusion_common::Column::from_name( - DATA_SCHEMA_TSID_COLUMN_NAME, - )]), - )]); + let partition_cols = BTreeMap::from([ + ( + DATA_SCHEMA_TSID_COLUMN_NAME.to_string(), + BTreeSet::from([datafusion_common::Column::from_name( + DATA_SCHEMA_TSID_COLUMN_NAME, + )]), + ), + ( + "greptime_timestamp".to_string(), + BTreeSet::from([datafusion_common::Column::from_name("greptime_timestamp")]), + ), + ]); let plan = LogicalPlanBuilder::empty(false).build().unwrap(); MergeScanExec::new( diff --git a/src/query/src/promql/planner.rs b/src/query/src/promql/planner.rs index 097a706bb8..ff2b11c83c 100644 --- a/src/query/src/promql/planner.rs +++ b/src/query/src/promql/planner.rs @@ -727,7 +727,6 @@ impl PromPlanner { // if left plan or right plan tag is empty, means case like `scalar(...) + host` or `host + scalar(...)` // under this case we only join on time index left_context.tag_columns.is_empty() || right_context.tag_columns.is_empty(), - is_comparison_op, modifier, )?; let join_plan_schema = join_plan.schema().clone(); @@ -769,7 +768,12 @@ impl PromPlanner { } _ => (&left_table_ref, &left_context), }; - self.project_binary_join_side(filtered, project_table_ref, project_context) + self.project_binary_join_side( + filtered, + project_table_ref, + project_context, + false, + ) } else { self.projection_for_each_field_column(join_plan, bin_expr_builder) } @@ -782,6 +786,7 @@ impl PromPlanner { input: LogicalPlan, table_ref: &TableReference, context: &PromPlannerContext, + keep_tsid: bool, ) -> Result { let schema = input.schema(); @@ -818,7 +823,7 @@ impl PromPlanner { // Preserve `__tsid` if present, so it can still be used internally downstream. It's // stripped from the final output anyway. if let Some(tsid_col) = - Self::optional_tsid_projection(schema, Some(table_ref), context.use_tsid) + Self::optional_tsid_projection(schema, Some(table_ref), keep_tsid && context.use_tsid) { project_exprs.push(tsid_col); } @@ -832,6 +837,7 @@ impl PromPlanner { // Update context to reflect the projected schema. Don't keep a table qualifier since // the result is a derived expression. self.ctx = context.clone(); + self.ctx.use_tsid = keep_tsid && context.use_tsid; self.ctx.table_name = None; self.ctx.schema_name = None; @@ -3439,11 +3445,9 @@ impl PromPlanner { left: &LogicalPlan, right: &LogicalPlan, only_join_time_index: bool, - is_comparison_op: bool, modifier: &Option, ) -> (BTreeSet, BTreeSet) { - let use_tsid_join = !is_comparison_op - && !only_join_time_index + let use_tsid_join = !only_join_time_index && modifier.as_ref().is_none_or(|modifier| { modifier.matching.is_none() && matches!(modifier.card, VectorMatchCardinality::OneToOne) @@ -3504,16 +3508,10 @@ impl PromPlanner { left_time_index_column: Option, right_time_index_column: Option, only_join_time_index: bool, - is_comparison_op: bool, modifier: &Option, ) -> Result { - let (mut left_tag_columns, mut right_tag_columns) = self.binary_join_key_columns( - &left, - &right, - only_join_time_index, - is_comparison_op, - modifier, - ); + let (mut left_tag_columns, mut right_tag_columns) = + self.binary_join_key_columns(&left, &right, only_join_time_index, modifier); // push time index column if it exists if let (Some(left_time_index_column), Some(right_time_index_column)) = @@ -5048,9 +5046,50 @@ mod test { } #[tokio::test] - async fn comparison_binary_join_does_not_use_tsid() { + async fn comparison_binary_join_uses_tsid_but_filtered_result_drops_it() { let eval_stmt = build_eval_stmt("some_metric > some_alt_metric"); + let table_provider = build_test_table_provider_with_tsid( + &[ + (DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string()), + ( + DEFAULT_SCHEMA_NAME.to_string(), + "some_alt_metric".to_string(), + ), + ], + 2, + 1, + ) + .await; + let mut planner = PromPlanner { + table_provider, + ctx: PromPlannerContext::from_eval_stmt(&eval_stmt), + }; + let plan = planner + .prom_expr_to_plan(&eval_stmt.expr, &build_query_engine_state()) + .await + .unwrap(); + + let plan_str = plan.display_indent_schema().to_string(); + assert!( + plan_str.contains("some_metric.__tsid = some_alt_metric.__tsid"), + "{plan_str}" + ); + assert!( + !plan + .schema() + .fields() + .iter() + .any(|field| field.name() == DATA_SCHEMA_TSID_COLUMN_NAME), + "{plan_str}" + ); + assert!(!planner.ctx.use_tsid, "{plan_str}"); + } + + #[tokio::test] + async fn comparison_bool_binary_join_uses_tsid_when_available() { + let eval_stmt = build_eval_stmt("some_metric > bool some_alt_metric"); + let table_provider = build_test_table_provider_with_tsid( &[ (DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string()), @@ -5069,11 +5108,12 @@ mod test { .unwrap(); let plan_str = plan.display_indent_schema().to_string(); - assert!(!plan_str.contains("__tsid ="), "{plan_str}"); assert!( - plan_str.contains("some_metric.tag_1 = some_alt_metric.tag_1"), + plan_str.contains("some_metric.__tsid = some_alt_metric.__tsid"), "{plan_str}" ); + assert!(!plan_str.contains("tag_0 ="), "{plan_str}"); + assert!(!plan_str.contains("tag_1 ="), "{plan_str}"); } #[tokio::test] diff --git a/tests/cases/standalone/common/promql/tsid_binary_join_regression.result b/tests/cases/standalone/common/promql/tsid_binary_join_regression.result index b2dde7033d..07b5f2e6bc 100644 --- a/tests/cases/standalone/common/promql/tsid_binary_join_regression.result +++ b/tests/cases/standalone/common/promql/tsid_binary_join_regression.result @@ -1,6 +1,6 @@ -- Regression test for TSID-backed PromQL binary joins on metric-engine tables. --- Default arithmetic joins should use `__tsid`, while label modifiers and comparison --- filters must continue to use label-based matching. +-- Default arithmetic and comparison joins should use `__tsid` when matching is the +-- default one-to-one case. Label modifiers still have to stay label-based. CREATE TABLE tsid_binary_join_physical ( ts TIMESTAMP(3) TIME INDEX, greptime_value DOUBLE, @@ -68,10 +68,10 @@ TQL ANALYZE (0, 5, '5s') tsid_binary_join_left / tsid_binary_join_right; +-+-+-+ | 0_| 0_|_ProjectionExec: expr=[host@2 as host, job@3 as job, ts@5 as ts, __tsid@4 as __tsid, greptime_value@0 / greptime_value@1 as tsid_binary_join_left.greptime_value / tsid_binary_join_right.greptime_value] REDACTED |_|_|_HashJoinExec: mode=Partitioned, join_type=Inner, on=[(__tsid@1, __tsid@3), (ts@2, ts@4)], projection=[greptime_value@0, greptime_value@3, host@4, job@5, __tsid@6, ts@7], NullsEqual: true REDACTED +|_|_|_RepartitionExec: partitioning=Hash([__tsid@1, ts@2], 32), input_partitions=32 REDACTED |_|_|_ProjectionExec: expr=[greptime_value@0 as greptime_value, __tsid@3 as __tsid, ts@4 as ts] REDACTED -|_|_|_CooperativeExec REDACTED |_|_|_MergeScanExec: REDACTED -|_|_|_CooperativeExec REDACTED +|_|_|_RepartitionExec: partitioning=Hash([__tsid@3, ts@4], 32), input_partitions=32 REDACTED |_|_|_MergeScanExec: REDACTED |_|_|_| | 1_| 0_|_PromInstantManipulateExec: range=[0..5000], lookback=[300000], interval=[5000], time index=[ts] REDACTED @@ -124,7 +124,8 @@ TQL ANALYZE (0, 5, '5s') tsid_binary_join_left / ignoring(host) tsid_binary_join |_|_| Total rows: 4_| +-+-+-+ --- Comparison filters must keep label-based matching semantics. +-- Comparison filters can join on `__tsid`, but the filtered result must still behave like +-- a regular derived vector downstream. -- SQLNESS REPLACE (metrics.*) REDACTED -- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED -- SQLNESS REPLACE (-+) - @@ -136,12 +137,47 @@ TQL ANALYZE (0, 5, '5s') tsid_binary_join_left > tsid_binary_join_right; +-+-+-+ | stage | node | plan_| +-+-+-+ -| 0_| 0_|_ProjectionExec: expr=[ts@4 as ts, greptime_value@0 as greptime_value, host@1 as host, job@2 as job, __tsid@3 as __tsid] REDACTED -|_|_|_HashJoinExec: mode=Partitioned, join_type=Inner, on=[(host@1, host@1), (job@2, job@2), (ts@4, ts@3)], filter=greptime_value@1 < greptime_value@0, projection=[greptime_value@0, host@1, job@2, __tsid@3, ts@4], NullsEqual: true REDACTED -|_|_|_RepartitionExec: partitioning=Hash([host@1, job@2, ts@4], 32), input_partitions=32 REDACTED +| 0_| 0_|_ProjectionExec: expr=[ts@3 as ts, greptime_value@0 as greptime_value, host@1 as host, job@2 as job] REDACTED +|_|_|_HashJoinExec: mode=Partitioned, join_type=Inner, on=[(__tsid@3, __tsid@1), (ts@4, ts@2)], filter=greptime_value@1 < greptime_value@0, projection=[greptime_value@0, host@1, job@2, ts@4], NullsEqual: true REDACTED +|_|_|_RepartitionExec: partitioning=Hash([__tsid@3, ts@4], 32), input_partitions=32 REDACTED |_|_|_MergeScanExec: REDACTED -|_|_|_RepartitionExec: partitioning=Hash([host@1, job@2, ts@3], 32), input_partitions=32 REDACTED -|_|_|_ProjectionExec: expr=[greptime_value@0 as greptime_value, host@1 as host, job@2 as job, ts@4 as ts] REDACTED +|_|_|_RepartitionExec: partitioning=Hash([__tsid@1, ts@2], 32), input_partitions=32 REDACTED +|_|_|_ProjectionExec: expr=[greptime_value@0 as greptime_value, __tsid@3 as __tsid, ts@4 as ts] REDACTED +|_|_|_MergeScanExec: REDACTED +|_|_|_| +| 1_| 0_|_PromInstantManipulateExec: range=[0..5000], lookback=[300000], interval=[5000], time index=[ts] REDACTED +|_|_|_PromSeriesDivideExec: tags=["__tsid"] REDACTED +|_|_|_ProjectionExec: expr=[greptime_value@1 as greptime_value, host@3 as host, job@4 as job, __tsid@2 as __tsid, ts@0 as ts] REDACTED +|_|_|_CooperativeExec REDACTED +|_|_|_SeriesScan: region=REDACTED, "partition_count":{"count":1, "mem_ranges":1, "files":0, "file_ranges":0}, "distribution":"PerSeries" REDACTED +|_|_|_| +| 1_| 0_|_PromInstantManipulateExec: range=[0..5000], lookback=[300000], interval=[5000], time index=[ts] REDACTED +|_|_|_PromSeriesDivideExec: tags=["__tsid"] REDACTED +|_|_|_ProjectionExec: expr=[greptime_value@1 as greptime_value, host@3 as host, job@4 as job, __tsid@2 as __tsid, ts@0 as ts] REDACTED +|_|_|_CooperativeExec REDACTED +|_|_|_SeriesScan: region=REDACTED, "partition_count":{"count":1, "mem_ranges":1, "files":0, "file_ranges":0}, "distribution":"PerSeries" REDACTED +|_|_|_| +|_|_| Total rows: 4_| ++-+-+-+ + +-- `bool` comparison should follow the same TSID-backed matching path. +-- SQLNESS REPLACE (metrics.*) REDACTED +-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED +-- SQLNESS REPLACE (-+) - +-- SQLNESS REPLACE (\s\s+) _ +-- SQLNESS REPLACE (peers.*) REDACTED +-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED +TQL ANALYZE (0, 5, '5s') tsid_binary_join_left > bool tsid_binary_join_right; + ++-+-+-+ +| stage | node | plan_| ++-+-+-+ +| 0_| 0_|_ProjectionExec: expr=[host@2 as host, job@3 as job, ts@5 as ts, __tsid@4 as __tsid, CAST(greptime_value@1 < greptime_value@0 AS Float64) as tsid_binary_join_left.greptime_value > tsid_binary_join_right.greptime_value] REDACTED +|_|_|_HashJoinExec: mode=Partitioned, join_type=Inner, on=[(__tsid@1, __tsid@3), (ts@2, ts@4)], projection=[greptime_value@0, greptime_value@3, host@4, job@5, __tsid@6, ts@7], NullsEqual: true REDACTED +|_|_|_RepartitionExec: partitioning=Hash([__tsid@1, ts@2], 32), input_partitions=32 REDACTED +|_|_|_ProjectionExec: expr=[greptime_value@0 as greptime_value, __tsid@3 as __tsid, ts@4 as ts] REDACTED +|_|_|_MergeScanExec: REDACTED +|_|_|_RepartitionExec: partitioning=Hash([__tsid@3, ts@4], 32), input_partitions=32 REDACTED |_|_|_MergeScanExec: REDACTED |_|_|_| | 1_| 0_|_PromInstantManipulateExec: range=[0..5000], lookback=[300000], interval=[5000], time index=[ts] REDACTED diff --git a/tests/cases/standalone/common/promql/tsid_binary_join_regression.sql b/tests/cases/standalone/common/promql/tsid_binary_join_regression.sql index bd7afe5065..3d16ad3f38 100644 --- a/tests/cases/standalone/common/promql/tsid_binary_join_regression.sql +++ b/tests/cases/standalone/common/promql/tsid_binary_join_regression.sql @@ -1,6 +1,6 @@ -- Regression test for TSID-backed PromQL binary joins on metric-engine tables. --- Default arithmetic joins should use `__tsid`, while label modifiers and comparison --- filters must continue to use label-based matching. +-- Default arithmetic and comparison joins should use `__tsid` when matching is the +-- default one-to-one case. Label modifiers still have to stay label-based. CREATE TABLE tsid_binary_join_physical ( ts TIMESTAMP(3) TIME INDEX, @@ -63,7 +63,8 @@ TQL ANALYZE (0, 5, '5s') tsid_binary_join_left / tsid_binary_join_right; -- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED TQL ANALYZE (0, 5, '5s') tsid_binary_join_left / ignoring(host) tsid_binary_join_right; --- Comparison filters must keep label-based matching semantics. +-- Comparison filters can join on `__tsid`, but the filtered result must still behave like +-- a regular derived vector downstream. -- SQLNESS REPLACE (metrics.*) REDACTED -- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED -- SQLNESS REPLACE (-+) - @@ -72,6 +73,15 @@ TQL ANALYZE (0, 5, '5s') tsid_binary_join_left / ignoring(host) tsid_binary_join -- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED TQL ANALYZE (0, 5, '5s') tsid_binary_join_left > tsid_binary_join_right; +-- `bool` comparison should follow the same TSID-backed matching path. +-- SQLNESS REPLACE (metrics.*) REDACTED +-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED +-- SQLNESS REPLACE (-+) - +-- SQLNESS REPLACE (\s\s+) _ +-- SQLNESS REPLACE (peers.*) REDACTED +-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED +TQL ANALYZE (0, 5, '5s') tsid_binary_join_left > bool tsid_binary_join_right; + -- SQLNESS SORT_RESULT 3 1 TQL EVAL (0, 5, '5s') tsid_binary_join_left / tsid_binary_join_right;