diff --git a/src/query/src/dist_plan/merge_scan.rs b/src/query/src/dist_plan/merge_scan.rs index 03eb761de6..7a42b1c0d3 100644 --- a/src/query/src/dist_plan/merge_scan.rs +++ b/src/query/src/dist_plan/merge_scan.rs @@ -429,24 +429,10 @@ impl MergeScanExec { return None; } - let all_partition_col_aliases: HashSet<_> = self - .partition_cols - .values() - .flat_map(|aliases| aliases.iter().map(|c| c.name())) - .collect(); - let mut overlaps = vec![]; - for expr in &hash_exprs { - if let Some(col_expr) = expr.as_any().downcast_ref::() - && all_partition_col_aliases.contains(col_expr.name()) - { - overlaps.push(expr.clone()); - } - } - // Metric-engine scans can satisfy any hash distribution that includes `__tsid`. // Equal requested keys also share the same `__tsid`, and equal `__tsid` values stay // co-located across MergeScan partitions. - if self + let overlaps = if self .arrow_schema .column_with_name(DATA_SCHEMA_TSID_COLUMN_NAME) .is_some() @@ -454,10 +440,24 @@ impl MergeScanExec { expr.as_any() .downcast_ref::() .is_some_and(|col_expr| col_expr.name() == DATA_SCHEMA_TSID_COLUMN_NAME) - }) - { - overlaps = hash_exprs.clone(); - } + }) { + hash_exprs + } else { + let all_partition_col_aliases: HashSet<_> = self + .partition_cols + .values() + .flat_map(|aliases| aliases.iter().map(|c| c.name())) + .collect(); + hash_exprs + .iter() + .filter(|expr| { + expr.as_any() + .downcast_ref::() + .is_some_and(|col_expr| all_partition_col_aliases.contains(col_expr.name())) + }) + .cloned() + .collect() + }; if overlaps.is_empty() { return None; diff --git a/src/query/src/optimizer/pass_distribution.rs b/src/query/src/optimizer/pass_distribution.rs index 23d93b2a8e..b9cf4a4c37 100644 --- a/src/query/src/optimizer/pass_distribution.rs +++ b/src/query/src/optimizer/pass_distribution.rs @@ -85,9 +85,10 @@ impl PassDistribution { let mut new_children = Vec::with_capacity(children.len()); for (idx, child) in children.into_iter().enumerate() { let child_req = match required.get(idx) { - Some(Distribution::UnspecifiedDistribution) => { - Self::propagate_unspecified_child_requirement(plan.as_ref(), idx, ¤t_req) + Some(Distribution::UnspecifiedDistribution) if idx == 0 => { + Self::map_hash_requirement_through_projection(plan.as_ref(), ¤t_req) } + Some(Distribution::UnspecifiedDistribution) => None, None => current_req.clone(), Some(req) => Some(req.clone()), }; @@ -108,15 +109,10 @@ impl PassDistribution { } } - fn propagate_unspecified_child_requirement( + fn map_hash_requirement_through_projection( plan: &dyn ExecutionPlan, - idx: usize, current_req: &Option, ) -> Option { - if idx != 0 { - return None; - } - let Some(Distribution::HashPartitioned(required_exprs)) = current_req else { return None; }; diff --git a/src/query/src/promql/planner.rs b/src/query/src/promql/planner.rs index 0d2250cdbe..097a706bb8 100644 --- a/src/query/src/promql/planner.rs +++ b/src/query/src/promql/planner.rs @@ -709,17 +709,12 @@ impl PromPlanner { self.ctx.table_name = Some("rhs".to_string()); } } - let field_columns = left_field_columns - .iter() - .zip(right_field_columns.iter()) - .collect::>(); + let (output_field_columns, field_columns) = + Self::align_binary_field_columns(&left_field_columns, &right_field_columns); // PromQL binary arithmetic only combines the shared prefix of value columns. // Keep the output field count aligned with that zipped prefix so planning // remains stable even when the two sides have uneven multi-field schemas. - self.ctx.field_columns = field_columns - .iter() - .map(|(left_col_name, _)| (*left_col_name).clone()) - .collect(); + self.ctx.field_columns = output_field_columns; let mut field_columns = field_columns.into_iter(); let join_plan = self.join_on_non_field_columns( @@ -732,7 +727,7 @@ impl PromPlanner { // if left plan or right plan tag is empty, means case like `scalar(...) + host` or `host + scalar(...)` // under this case we only join on time index left_context.tag_columns.is_empty() || right_context.tag_columns.is_empty(), - !is_comparison_op, + is_comparison_op, modifier, )?; let join_plan_schema = join_plan.schema().clone(); @@ -822,11 +817,10 @@ impl PromPlanner { // Preserve `__tsid` if present, so it can still be used internally downstream. It's // stripped from the final output anyway. - if context.use_tsid - && let Ok(tsid_col) = - schema.qualified_field_with_name(Some(table_ref), DATA_SCHEMA_TSID_COLUMN_NAME) + if let Some(tsid_col) = + Self::optional_tsid_projection(schema, Some(table_ref), context.use_tsid) { - project_exprs.push(DfExpr::Column(tsid_col.into())); + project_exprs.push(tsid_col); } let plan = LogicalPlanBuilder::from(input) @@ -3405,56 +3399,58 @@ impl PromPlanner { ) } - fn can_use_tsid_for_binary_join( + fn align_binary_field_columns<'a>( + left_field_columns: &'a [String], + right_field_columns: &'a [String], + ) -> (Vec, Vec<(&'a String, &'a String)>) { + let field_pairs = left_field_columns + .iter() + .zip(right_field_columns.iter()) + .collect::>(); + let output_field_columns = field_pairs + .iter() + .map(|(left_col_name, _)| (*left_col_name).clone()) + .collect(); + (output_field_columns, field_pairs) + } + + fn plan_has_tsid_column(plan: &LogicalPlan) -> bool { + plan.schema() + .fields() + .iter() + .any(|field| field.name() == DATA_SCHEMA_TSID_COLUMN_NAME) + } + + fn optional_tsid_projection( + schema: &DFSchemaRef, + table_ref: Option<&TableReference>, + keep_tsid: bool, + ) -> Option { + keep_tsid.then_some(()).and_then(|_| { + schema + .qualified_field_with_name(table_ref, DATA_SCHEMA_TSID_COLUMN_NAME) + .ok() + .map(|field| DfExpr::Column(field.into())) + }) + } + + fn binary_join_key_columns( + &self, left: &LogicalPlan, right: &LogicalPlan, only_join_time_index: bool, - allow_tsid_join: bool, + is_comparison_op: bool, modifier: &Option, - ) -> bool { - if only_join_time_index || !allow_tsid_join { - return false; - } + ) -> (BTreeSet, BTreeSet) { + let use_tsid_join = !is_comparison_op + && !only_join_time_index + && modifier.as_ref().is_none_or(|modifier| { + modifier.matching.is_none() + && matches!(modifier.card, VectorMatchCardinality::OneToOne) + }) + && Self::plan_has_tsid_column(left) + && Self::plan_has_tsid_column(right); - let modifier_allows_tsid = modifier.as_ref().is_none_or(|modifier| { - modifier.matching.is_none() && matches!(modifier.card, VectorMatchCardinality::OneToOne) - }); - - modifier_allows_tsid - && left - .schema() - .fields() - .iter() - .any(|field| field.name() == DATA_SCHEMA_TSID_COLUMN_NAME) - && right - .schema() - .fields() - .iter() - .any(|field| field.name() == DATA_SCHEMA_TSID_COLUMN_NAME) - } - - /// Build a inner join on time index column and tag columns to concat two logical plans. - /// When `only_join_time_index == true` we only join on the time index, because these two plan may not have the same tag columns - #[allow(clippy::too_many_arguments)] - fn join_on_non_field_columns( - &self, - left: LogicalPlan, - right: LogicalPlan, - left_table_ref: TableReference, - right_table_ref: TableReference, - left_time_index_column: Option, - right_time_index_column: Option, - only_join_time_index: bool, - allow_tsid_join: bool, - modifier: &Option, - ) -> Result { - let use_tsid_join = Self::can_use_tsid_for_binary_join( - &left, - &right, - only_join_time_index, - allow_tsid_join, - modifier, - ); let (mut left_tag_columns, mut right_tag_columns) = if use_tsid_join { ( BTreeSet::from([DATA_SCHEMA_TSID_COLUMN_NAME.to_string()]), @@ -3474,30 +3470,51 @@ impl PromPlanner { (left_tag_columns, right_tag_columns) }; - // apply modifier - if !use_tsid_join && let Some(modifier) = modifier { - // apply label modifier - if let Some(matching) = &modifier.matching { - match matching { - // keeps columns mentioned in `on` - LabelModifier::Include(on) => { - let mask = on.labels.iter().cloned().collect::>(); - left_tag_columns = left_tag_columns.intersection(&mask).cloned().collect(); - right_tag_columns = - right_tag_columns.intersection(&mask).cloned().collect(); - } - // removes columns memtioned in `ignoring` - LabelModifier::Exclude(ignoring) => { - // doesn't check existence of label - for label in &ignoring.labels { - let _ = left_tag_columns.remove(label); - let _ = right_tag_columns.remove(label); - } + if !use_tsid_join + && let Some(modifier) = modifier + && let Some(matching) = &modifier.matching + { + match matching { + LabelModifier::Include(on) => { + let mask = on.labels.iter().cloned().collect::>(); + left_tag_columns = left_tag_columns.intersection(&mask).cloned().collect(); + right_tag_columns = right_tag_columns.intersection(&mask).cloned().collect(); + } + LabelModifier::Exclude(ignoring) => { + for label in &ignoring.labels { + let _ = left_tag_columns.remove(label); + let _ = right_tag_columns.remove(label); } } } } + (left_tag_columns, right_tag_columns) + } + + /// Build a inner join on time index column and tag columns to concat two logical plans. + /// When `only_join_time_index == true` we only join on the time index, because these two plan may not have the same tag columns + #[allow(clippy::too_many_arguments)] + fn join_on_non_field_columns( + &self, + left: LogicalPlan, + right: LogicalPlan, + left_table_ref: TableReference, + right_table_ref: TableReference, + left_time_index_column: Option, + right_time_index_column: Option, + only_join_time_index: bool, + is_comparison_op: bool, + modifier: &Option, + ) -> Result { + let (mut left_tag_columns, mut right_tag_columns) = self.binary_join_key_columns( + &left, + &right, + only_join_time_index, + is_comparison_op, + modifier, + ); + // push time index column if it exists if let (Some(left_time_index_column), Some(right_time_index_column)) = (left_time_index_column, right_time_index_column) @@ -3919,24 +3936,10 @@ impl PromPlanner { .iter() .chain(self.ctx.time_index_column.iter()) .map(|col| Ok(DfExpr::Column(Column::new(table_ref.clone(), col)))); - let tsid_iter = self - .ctx - .use_tsid - .then_some(()) - .filter(|_| { - input - .schema() - .fields() - .iter() - .any(|field| field.name() == DATA_SCHEMA_TSID_COLUMN_NAME) - }) - .into_iter() - .map(|_| { - Ok(DfExpr::Column(Column::new( - table_ref.clone(), - DATA_SCHEMA_TSID_COLUMN_NAME, - ))) - }); + let tsid_iter = + Self::optional_tsid_projection(input.schema(), table_ref.as_ref(), self.ctx.use_tsid) + .into_iter() + .map(Ok); // build computation exprs let result_field_columns = self @@ -4195,6 +4198,18 @@ mod test { assert!(!plan_str.contains("Distinct:"), "{plan_str}"); } + fn build_eval_stmt(expr: &str) -> EvalStmt { + EvalStmt { + expr: parser::parse(expr).unwrap(), + start: UNIX_EPOCH, + end: UNIX_EPOCH + .checked_add(Duration::from_secs(100_000)) + .unwrap(), + interval: Duration::from_secs(5), + lookback_delta: Duration::from_secs(1), + } + } + async fn build_test_table_provider( table_name_tuples: &[(String, String)], num_tag: usize, @@ -4825,16 +4840,7 @@ mod test { #[tokio::test] async fn default_binary_join_uses_tsid_when_available() { - let prom_expr = parser::parse("some_metric / some_alt_metric").unwrap(); - let eval_stmt = EvalStmt { - expr: prom_expr, - start: UNIX_EPOCH, - end: UNIX_EPOCH - .checked_add(Duration::from_secs(100_000)) - .unwrap(), - interval: Duration::from_secs(5), - lookback_delta: Duration::from_secs(1), - }; + let eval_stmt = build_eval_stmt("some_metric / some_alt_metric"); let table_provider = build_test_table_provider_with_tsid( &[ @@ -4866,17 +4872,7 @@ mod test { #[tokio::test] async fn tsid_is_preserved_for_nested_default_binary_joins() { - let prom_expr = - parser::parse("(some_metric - some_alt_metric) / some_third_metric").unwrap(); - let eval_stmt = EvalStmt { - expr: prom_expr, - start: UNIX_EPOCH, - end: UNIX_EPOCH - .checked_add(Duration::from_secs(100_000)) - .unwrap(), - interval: Duration::from_secs(5), - lookback_delta: Duration::from_secs(1), - }; + let eval_stmt = build_eval_stmt("(some_metric - some_alt_metric) / some_third_metric"); let table_provider = build_test_table_provider_with_tsid( &[ @@ -4906,17 +4902,7 @@ mod test { #[tokio::test] async fn repeated_tsid_binary_operand_keeps_tsid_join_keys() { - let prom_expr = - parser::parse("((some_metric - some_alt_metric) / some_metric) * 100").unwrap(); - let eval_stmt = EvalStmt { - expr: prom_expr, - start: UNIX_EPOCH, - end: UNIX_EPOCH - .checked_add(Duration::from_secs(100_000)) - .unwrap(), - interval: Duration::from_secs(5), - lookback_delta: Duration::from_secs(1), - }; + let eval_stmt = build_eval_stmt("((some_metric - some_alt_metric) / some_metric) * 100"); let table_provider = build_test_table_provider_with_tsid( &[ @@ -4942,18 +4928,8 @@ mod test { #[tokio::test] async fn repeated_tsid_binary_operand_keeps_shorter_field_side() { - let prom_expr = - parser::parse("((two_field_metric - one_field_metric) / one_field_metric) * 100") - .unwrap(); - let eval_stmt = EvalStmt { - expr: prom_expr, - start: UNIX_EPOCH, - end: UNIX_EPOCH - .checked_add(Duration::from_secs(100_000)) - .unwrap(), - interval: Duration::from_secs(5), - lookback_delta: Duration::from_secs(1), - }; + let eval_stmt = + build_eval_stmt("((two_field_metric - one_field_metric) / one_field_metric) * 100"); let table_provider = build_test_table_provider_with_tsid_fields( &[ @@ -5000,16 +4976,7 @@ mod test { #[tokio::test] async fn tsid_binary_join_uses_shorter_field_side() { - let prom_expr = parser::parse("one_field_metric / two_field_metric").unwrap(); - let eval_stmt = EvalStmt { - expr: prom_expr, - start: UNIX_EPOCH, - end: UNIX_EPOCH - .checked_add(Duration::from_secs(100_000)) - .unwrap(), - interval: Duration::from_secs(5), - lookback_delta: Duration::from_secs(1), - }; + let eval_stmt = build_eval_stmt("one_field_metric / two_field_metric"); let table_provider = build_test_table_provider_with_tsid_fields( &[ @@ -5053,16 +5020,7 @@ mod test { #[tokio::test] async fn label_matching_modifier_disables_tsid_binary_join() { - let prom_expr = parser::parse("some_metric / ignoring(tag_0) some_alt_metric").unwrap(); - let eval_stmt = EvalStmt { - expr: prom_expr, - start: UNIX_EPOCH, - end: UNIX_EPOCH - .checked_add(Duration::from_secs(100_000)) - .unwrap(), - interval: Duration::from_secs(5), - lookback_delta: Duration::from_secs(1), - }; + let eval_stmt = build_eval_stmt("some_metric / ignoring(tag_0) some_alt_metric"); let table_provider = build_test_table_provider_with_tsid( &[ @@ -5091,16 +5049,7 @@ mod test { #[tokio::test] async fn comparison_binary_join_does_not_use_tsid() { - let prom_expr = parser::parse("some_metric > some_alt_metric").unwrap(); - let eval_stmt = EvalStmt { - expr: prom_expr, - start: UNIX_EPOCH, - end: UNIX_EPOCH - .checked_add(Duration::from_secs(100_000)) - .unwrap(), - interval: Duration::from_secs(5), - lookback_delta: Duration::from_secs(1), - }; + let eval_stmt = build_eval_stmt("some_metric > some_alt_metric"); let table_provider = build_test_table_provider_with_tsid( &[ diff --git a/tests/cases/standalone/common/promql/tsid_binary_join_regression.result b/tests/cases/standalone/common/promql/tsid_binary_join_regression.result new file mode 100644 index 0000000000..b2dde7033d --- /dev/null +++ b/tests/cases/standalone/common/promql/tsid_binary_join_regression.result @@ -0,0 +1,185 @@ +-- Regression test for TSID-backed PromQL binary joins on metric-engine tables. +-- Default arithmetic joins should use `__tsid`, while label modifiers and comparison +-- filters must continue to use label-based matching. +CREATE TABLE tsid_binary_join_physical ( + ts TIMESTAMP(3) TIME INDEX, + greptime_value DOUBLE, +) ENGINE = metric WITH ("physical_metric_table" = ""); + +Affected Rows: 0 + +CREATE TABLE tsid_binary_join_left ( + host STRING NULL, + job STRING NULL, + ts TIMESTAMP(3) NOT NULL, + greptime_value DOUBLE NULL, + TIME INDEX (ts), + PRIMARY KEY(host, job), +) +ENGINE = metric +WITH( + on_physical_table = 'tsid_binary_join_physical' +); + +Affected Rows: 0 + +CREATE TABLE tsid_binary_join_right ( + host STRING NULL, + job STRING NULL, + ts TIMESTAMP(3) NOT NULL, + greptime_value DOUBLE NULL, + TIME INDEX (ts), + PRIMARY KEY(host, job), +) +ENGINE = metric +WITH( + on_physical_table = 'tsid_binary_join_physical' +); + +Affected Rows: 0 + +INSERT INTO tsid_binary_join_left (host, job, ts, greptime_value) VALUES + ('host1', 'job1', 0, 12), + ('host2', 'job2', 0, 18), + ('host1', 'job1', 5000, 15), + ('host2', 'job2', 5000, 21); + +Affected Rows: 4 + +INSERT INTO tsid_binary_join_right (host, job, ts, greptime_value) VALUES + ('host1', 'job1', 0, 3), + ('host2', 'job2', 0, 6), + ('host1', 'job1', 5000, 5), + ('host2', 'job2', 5000, 7); + +Affected Rows: 4 + +-- Default vector-vector arithmetic should join on `__tsid` and time index. +-- SQLNESS REPLACE (metrics.*) REDACTED +-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED +-- SQLNESS REPLACE (-+) - +-- SQLNESS REPLACE (\s\s+) _ +-- SQLNESS REPLACE (peers.*) REDACTED +-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED +TQL ANALYZE (0, 5, '5s') tsid_binary_join_left / tsid_binary_join_right; + ++-+-+-+ +| stage | node | plan_| ++-+-+-+ +| 0_| 0_|_ProjectionExec: expr=[host@2 as host, job@3 as job, ts@5 as ts, __tsid@4 as __tsid, greptime_value@0 / greptime_value@1 as tsid_binary_join_left.greptime_value / tsid_binary_join_right.greptime_value] REDACTED +|_|_|_HashJoinExec: mode=Partitioned, join_type=Inner, on=[(__tsid@1, __tsid@3), (ts@2, ts@4)], projection=[greptime_value@0, greptime_value@3, host@4, job@5, __tsid@6, ts@7], NullsEqual: true REDACTED +|_|_|_ProjectionExec: expr=[greptime_value@0 as greptime_value, __tsid@3 as __tsid, ts@4 as ts] REDACTED +|_|_|_CooperativeExec REDACTED +|_|_|_MergeScanExec: REDACTED +|_|_|_CooperativeExec REDACTED +|_|_|_MergeScanExec: REDACTED +|_|_|_| +| 1_| 0_|_PromInstantManipulateExec: range=[0..5000], lookback=[300000], interval=[5000], time index=[ts] REDACTED +|_|_|_PromSeriesDivideExec: tags=["__tsid"] REDACTED +|_|_|_ProjectionExec: expr=[greptime_value@1 as greptime_value, host@3 as host, job@4 as job, __tsid@2 as __tsid, ts@0 as ts] REDACTED +|_|_|_CooperativeExec REDACTED +|_|_|_SeriesScan: region=REDACTED, "partition_count":{"count":1, "mem_ranges":1, "files":0, "file_ranges":0}, "distribution":"PerSeries" REDACTED +|_|_|_| +| 1_| 0_|_PromInstantManipulateExec: range=[0..5000], lookback=[300000], interval=[5000], time index=[ts] REDACTED +|_|_|_PromSeriesDivideExec: tags=["__tsid"] REDACTED +|_|_|_ProjectionExec: expr=[greptime_value@1 as greptime_value, host@3 as host, job@4 as job, __tsid@2 as __tsid, ts@0 as ts] REDACTED +|_|_|_CooperativeExec REDACTED +|_|_|_SeriesScan: region=REDACTED, "partition_count":{"count":1, "mem_ranges":1, "files":0, "file_ranges":0}, "distribution":"PerSeries" REDACTED +|_|_|_| +|_|_| Total rows: 4_| ++-+-+-+ + +-- Label modifiers must disable the TSID shortcut and keep matching on the remaining labels. +-- SQLNESS REPLACE (metrics.*) REDACTED +-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED +-- SQLNESS REPLACE (-+) - +-- SQLNESS REPLACE (\s\s+) _ +-- SQLNESS REPLACE (peers.*) REDACTED +-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED +TQL ANALYZE (0, 5, '5s') tsid_binary_join_left / ignoring(host) tsid_binary_join_right; + ++-+-+-+ +| stage | node | plan_| ++-+-+-+ +| 0_| 0_|_ProjectionExec: expr=[host@2 as host, job@3 as job, ts@5 as ts, __tsid@4 as __tsid, greptime_value@0 / greptime_value@1 as tsid_binary_join_left.greptime_value / tsid_binary_join_right.greptime_value] REDACTED +|_|_|_HashJoinExec: mode=Partitioned, join_type=Inner, on=[(job@1, job@2), (ts@2, ts@4)], projection=[greptime_value@0, greptime_value@3, host@4, job@5, __tsid@6, ts@7], NullsEqual: true REDACTED +|_|_|_RepartitionExec: partitioning=Hash([job@1, ts@2], 32), input_partitions=32 REDACTED +|_|_|_ProjectionExec: expr=[greptime_value@0 as greptime_value, job@2 as job, ts@4 as ts] REDACTED +|_|_|_MergeScanExec: REDACTED +|_|_|_RepartitionExec: partitioning=Hash([job@2, ts@4], 32), input_partitions=32 REDACTED +|_|_|_MergeScanExec: REDACTED +|_|_|_| +| 1_| 0_|_PromInstantManipulateExec: range=[0..5000], lookback=[300000], interval=[5000], time index=[ts] REDACTED +|_|_|_PromSeriesDivideExec: tags=["__tsid"] REDACTED +|_|_|_ProjectionExec: expr=[greptime_value@1 as greptime_value, host@3 as host, job@4 as job, __tsid@2 as __tsid, ts@0 as ts] REDACTED +|_|_|_CooperativeExec REDACTED +|_|_|_SeriesScan: region=REDACTED, "partition_count":{"count":1, "mem_ranges":1, "files":0, "file_ranges":0}, "distribution":"PerSeries" REDACTED +|_|_|_| +| 1_| 0_|_PromInstantManipulateExec: range=[0..5000], lookback=[300000], interval=[5000], time index=[ts] REDACTED +|_|_|_PromSeriesDivideExec: tags=["__tsid"] REDACTED +|_|_|_ProjectionExec: expr=[greptime_value@1 as greptime_value, host@3 as host, job@4 as job, __tsid@2 as __tsid, ts@0 as ts] REDACTED +|_|_|_CooperativeExec REDACTED +|_|_|_SeriesScan: region=REDACTED, "partition_count":{"count":1, "mem_ranges":1, "files":0, "file_ranges":0}, "distribution":"PerSeries" REDACTED +|_|_|_| +|_|_| Total rows: 4_| ++-+-+-+ + +-- Comparison filters must keep label-based matching semantics. +-- SQLNESS REPLACE (metrics.*) REDACTED +-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED +-- SQLNESS REPLACE (-+) - +-- SQLNESS REPLACE (\s\s+) _ +-- SQLNESS REPLACE (peers.*) REDACTED +-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED +TQL ANALYZE (0, 5, '5s') tsid_binary_join_left > tsid_binary_join_right; + ++-+-+-+ +| stage | node | plan_| ++-+-+-+ +| 0_| 0_|_ProjectionExec: expr=[ts@4 as ts, greptime_value@0 as greptime_value, host@1 as host, job@2 as job, __tsid@3 as __tsid] REDACTED +|_|_|_HashJoinExec: mode=Partitioned, join_type=Inner, on=[(host@1, host@1), (job@2, job@2), (ts@4, ts@3)], filter=greptime_value@1 < greptime_value@0, projection=[greptime_value@0, host@1, job@2, __tsid@3, ts@4], NullsEqual: true REDACTED +|_|_|_RepartitionExec: partitioning=Hash([host@1, job@2, ts@4], 32), input_partitions=32 REDACTED +|_|_|_MergeScanExec: REDACTED +|_|_|_RepartitionExec: partitioning=Hash([host@1, job@2, ts@3], 32), input_partitions=32 REDACTED +|_|_|_ProjectionExec: expr=[greptime_value@0 as greptime_value, host@1 as host, job@2 as job, ts@4 as ts] REDACTED +|_|_|_MergeScanExec: REDACTED +|_|_|_| +| 1_| 0_|_PromInstantManipulateExec: range=[0..5000], lookback=[300000], interval=[5000], time index=[ts] REDACTED +|_|_|_PromSeriesDivideExec: tags=["__tsid"] REDACTED +|_|_|_ProjectionExec: expr=[greptime_value@1 as greptime_value, host@3 as host, job@4 as job, __tsid@2 as __tsid, ts@0 as ts] REDACTED +|_|_|_CooperativeExec REDACTED +|_|_|_SeriesScan: region=REDACTED, "partition_count":{"count":1, "mem_ranges":1, "files":0, "file_ranges":0}, "distribution":"PerSeries" REDACTED +|_|_|_| +| 1_| 0_|_PromInstantManipulateExec: range=[0..5000], lookback=[300000], interval=[5000], time index=[ts] REDACTED +|_|_|_PromSeriesDivideExec: tags=["__tsid"] REDACTED +|_|_|_ProjectionExec: expr=[greptime_value@1 as greptime_value, host@3 as host, job@4 as job, __tsid@2 as __tsid, ts@0 as ts] REDACTED +|_|_|_CooperativeExec REDACTED +|_|_|_SeriesScan: region=REDACTED, "partition_count":{"count":1, "mem_ranges":1, "files":0, "file_ranges":0}, "distribution":"PerSeries" REDACTED +|_|_|_| +|_|_| Total rows: 4_| ++-+-+-+ + +-- SQLNESS SORT_RESULT 3 1 +TQL EVAL (0, 5, '5s') tsid_binary_join_left / tsid_binary_join_right; + ++-------+------+---------------------+------------------------------------------------------------------------------+ +| host | job | ts | tsid_binary_join_left.greptime_value / tsid_binary_join_right.greptime_value | ++-------+------+---------------------+------------------------------------------------------------------------------+ +| host1 | job1 | 1970-01-01T00:00:00 | 4.0 | +| host1 | job1 | 1970-01-01T00:00:05 | 3.0 | +| host2 | job2 | 1970-01-01T00:00:00 | 3.0 | +| host2 | job2 | 1970-01-01T00:00:05 | 3.0 | ++-------+------+---------------------+------------------------------------------------------------------------------+ + +DROP TABLE tsid_binary_join_right; + +Affected Rows: 0 + +DROP TABLE tsid_binary_join_left; + +Affected Rows: 0 + +DROP TABLE tsid_binary_join_physical; + +Affected Rows: 0 + diff --git a/tests/cases/standalone/common/promql/tsid_binary_join_regression.sql b/tests/cases/standalone/common/promql/tsid_binary_join_regression.sql new file mode 100644 index 0000000000..bd7afe5065 --- /dev/null +++ b/tests/cases/standalone/common/promql/tsid_binary_join_regression.sql @@ -0,0 +1,80 @@ +-- Regression test for TSID-backed PromQL binary joins on metric-engine tables. +-- Default arithmetic joins should use `__tsid`, while label modifiers and comparison +-- filters must continue to use label-based matching. + +CREATE TABLE tsid_binary_join_physical ( + ts TIMESTAMP(3) TIME INDEX, + greptime_value DOUBLE, +) ENGINE = metric WITH ("physical_metric_table" = ""); + +CREATE TABLE tsid_binary_join_left ( + host STRING NULL, + job STRING NULL, + ts TIMESTAMP(3) NOT NULL, + greptime_value DOUBLE NULL, + TIME INDEX (ts), + PRIMARY KEY(host, job), +) +ENGINE = metric +WITH( + on_physical_table = 'tsid_binary_join_physical' +); + +CREATE TABLE tsid_binary_join_right ( + host STRING NULL, + job STRING NULL, + ts TIMESTAMP(3) NOT NULL, + greptime_value DOUBLE NULL, + TIME INDEX (ts), + PRIMARY KEY(host, job), +) +ENGINE = metric +WITH( + on_physical_table = 'tsid_binary_join_physical' +); + +INSERT INTO tsid_binary_join_left (host, job, ts, greptime_value) VALUES + ('host1', 'job1', 0, 12), + ('host2', 'job2', 0, 18), + ('host1', 'job1', 5000, 15), + ('host2', 'job2', 5000, 21); + +INSERT INTO tsid_binary_join_right (host, job, ts, greptime_value) VALUES + ('host1', 'job1', 0, 3), + ('host2', 'job2', 0, 6), + ('host1', 'job1', 5000, 5), + ('host2', 'job2', 5000, 7); + +-- Default vector-vector arithmetic should join on `__tsid` and time index. +-- SQLNESS REPLACE (metrics.*) REDACTED +-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED +-- SQLNESS REPLACE (-+) - +-- SQLNESS REPLACE (\s\s+) _ +-- SQLNESS REPLACE (peers.*) REDACTED +-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED +TQL ANALYZE (0, 5, '5s') tsid_binary_join_left / tsid_binary_join_right; + +-- Label modifiers must disable the TSID shortcut and keep matching on the remaining labels. +-- SQLNESS REPLACE (metrics.*) REDACTED +-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED +-- SQLNESS REPLACE (-+) - +-- SQLNESS REPLACE (\s\s+) _ +-- SQLNESS REPLACE (peers.*) REDACTED +-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED +TQL ANALYZE (0, 5, '5s') tsid_binary_join_left / ignoring(host) tsid_binary_join_right; + +-- Comparison filters must keep label-based matching semantics. +-- SQLNESS REPLACE (metrics.*) REDACTED +-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED +-- SQLNESS REPLACE (-+) - +-- SQLNESS REPLACE (\s\s+) _ +-- SQLNESS REPLACE (peers.*) REDACTED +-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED +TQL ANALYZE (0, 5, '5s') tsid_binary_join_left > tsid_binary_join_right; + +-- SQLNESS SORT_RESULT 3 1 +TQL EVAL (0, 5, '5s') tsid_binary_join_left / tsid_binary_join_right; + +DROP TABLE tsid_binary_join_right; +DROP TABLE tsid_binary_join_left; +DROP TABLE tsid_binary_join_physical;