From 3fe8a61fadddc6d2147ee9113ae5c9b7957a669b Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Tue, 14 Apr 2026 12:43:58 -0700 Subject: [PATCH] perf: join metrics tables on the tsid key whenever possible (#7927) * feat: prefilter flat parquet scans by primary key * perf: skip redundant series divide repartitions * perf: optimize tsid promql join planning * perf: preserve tsid distribution through merge scans * perf: remove redundant tsid join repartitions * fix multi-field join case Signed-off-by: Ruihang Xia * Revert "feat: prefilter flat parquet scans by primary key" This reverts commit 767c3b44c8a62e66249be9d279ee50f9b60bfa67. * simplification Signed-off-by: Ruihang Xia * isolate rule into a dedicated mod Signed-off-by: Ruihang Xia * remove rule Signed-off-by: Ruihang Xia * more sqlness cases Signed-off-by: Ruihang Xia * fix filter join case Signed-off-by: Ruihang Xia * fix: normalize sqlness repartition input count Signed-off-by: Ruihang Xia * fix: normalize sqlness partition count in promql regression Signed-off-by: Ruihang Xia * fix: normalize sqlness hash partition fanout Signed-off-by: Ruihang Xia * simplification Signed-off-by: Ruihang Xia --------- Signed-off-by: Ruihang Xia --- src/query/src/dist_plan/merge_scan.rs | 17 +- src/query/src/optimizer/pass_distribution.rs | 201 ++++++++ src/query/src/promql/planner.rs | 472 ++++++++++++++++-- .../promql/tsid_binary_join_regression.result | 237 +++++++++ .../promql/tsid_binary_join_regression.sql | 106 ++++ 5 files changed, 975 insertions(+), 58 deletions(-) create mode 100644 tests/cases/standalone/common/promql/tsid_binary_join_regression.result create mode 100644 tests/cases/standalone/common/promql/tsid_binary_join_regression.sql diff --git a/src/query/src/dist_plan/merge_scan.rs b/src/query/src/dist_plan/merge_scan.rs index 2d32ce16b3..3c326cf390 100644 --- a/src/query/src/dist_plan/merge_scan.rs +++ b/src/query/src/dist_plan/merge_scan.rs @@ -433,14 +433,15 @@ impl MergeScanExec { .values() .flat_map(|aliases| aliases.iter().map(|c| c.name())) .collect(); - let mut overlaps = vec![]; - for expr in &hash_exprs { - if let Some(col_expr) = expr.as_any().downcast_ref::() - && all_partition_col_aliases.contains(col_expr.name()) - { - overlaps.push(expr.clone()); - } - } + let overlaps: Vec<_> = hash_exprs + .iter() + .filter(|expr| { + expr.as_any() + .downcast_ref::() + .is_some_and(|col_expr| all_partition_col_aliases.contains(col_expr.name())) + }) + .cloned() + .collect(); if overlaps.is_empty() { return None; diff --git a/src/query/src/optimizer/pass_distribution.rs b/src/query/src/optimizer/pass_distribution.rs index 7ce2ffd752..1004252636 100644 --- a/src/query/src/optimizer/pass_distribution.rs +++ b/src/query/src/optimizer/pass_distribution.rs @@ -17,8 +17,10 @@ use std::sync::Arc; use datafusion::config::ConfigOptions; use datafusion::physical_optimizer::PhysicalOptimizerRule; use datafusion::physical_plan::ExecutionPlan; +use datafusion::physical_plan::projection::ProjectionExec; use datafusion_common::Result as DfResult; use datafusion_physical_expr::Distribution; +use datafusion_physical_expr::utils::map_columns_before_projection; use crate::dist_plan::MergeScanExec; @@ -83,6 +85,9 @@ impl PassDistribution { let mut new_children = Vec::with_capacity(children.len()); for (idx, child) in children.into_iter().enumerate() { let child_req = match required.get(idx) { + Some(Distribution::UnspecifiedDistribution) if idx == 0 => { + Self::map_hash_requirement_through_projection(plan.as_ref(), ¤t_req) + } Some(Distribution::UnspecifiedDistribution) => None, None => current_req.clone(), Some(req) => Some(req.clone()), @@ -103,4 +108,200 @@ impl PassDistribution { plan.with_new_children(new_children) } } + + fn map_hash_requirement_through_projection( + plan: &dyn ExecutionPlan, + current_req: &Option, + ) -> Option { + let Some(Distribution::HashPartitioned(required_exprs)) = current_req else { + return None; + }; + + let projection = plan.as_any().downcast_ref::()?; + let proj_exprs = projection + .expr() + .iter() + .map(|expr| (Arc::clone(&expr.expr), expr.alias.clone())) + .collect::>(); + let mapped = map_columns_before_projection(required_exprs, &proj_exprs); + + (mapped.len() == required_exprs.len()).then_some(Distribution::HashPartitioned(mapped)) + } +} + +#[cfg(test)] +mod tests { + use std::collections::{BTreeMap, BTreeSet}; + + use arrow_schema::{DataType, Field, Schema, SchemaRef, TimeUnit}; + use async_trait::async_trait; + use common_query::request::QueryRequest; + use common_recordbatch::SendableRecordBatchStream; + use datafusion::common::NullEquality; + use datafusion::execution::SessionStateBuilder; + use datafusion::physical_optimizer::PhysicalOptimizerRule; + use datafusion::physical_plan::joins::{HashJoinExec, PartitionMode}; + use datafusion::physical_plan::projection::{ProjectionExec, ProjectionExpr}; + use datafusion::physical_plan::{ExecutionPlanProperties, Partitioning}; + use datafusion_expr::{JoinType, LogicalPlanBuilder}; + use datafusion_physical_expr::PhysicalExpr; + use datafusion_physical_expr::expressions::Column as PhysicalColumn; + use session::ReadPreference; + use session::context::QueryContext; + use store_api::metric_engine_consts::DATA_SCHEMA_TSID_COLUMN_NAME; + use store_api::storage::RegionId; + use table::table_name::TableName; + + use super::*; + use crate::error::Result as QueryResult; + use crate::region_query::RegionQueryHandler; + + struct NoopRegionQueryHandler; + + #[async_trait] + impl RegionQueryHandler for NoopRegionQueryHandler { + async fn do_get( + &self, + _read_preference: ReadPreference, + _request: QueryRequest, + ) -> QueryResult { + unreachable!("pass distribution tests should not execute remote queries") + } + } + + #[test] + fn passes_hash_requirement_through_projection_to_merge_scan() { + let schema = test_schema(); + let left_merge_scan = Arc::new(test_merge_scan_exec(schema.clone())); + let right_merge_scan = Arc::new(test_merge_scan_exec(schema.clone())); + let left_projection = Arc::new( + ProjectionExec::try_new( + vec![ + ProjectionExpr::new(partition_column("greptime_value", 3), "greptime_value"), + ProjectionExpr::new( + partition_column(DATA_SCHEMA_TSID_COLUMN_NAME, 1), + DATA_SCHEMA_TSID_COLUMN_NAME, + ), + ProjectionExpr::new( + partition_column("greptime_timestamp", 2), + "greptime_timestamp", + ), + ], + left_merge_scan, + ) + .unwrap(), + ) as Arc; + let join = Arc::new( + HashJoinExec::try_new( + left_projection, + right_merge_scan, + vec![ + ( + partition_column(DATA_SCHEMA_TSID_COLUMN_NAME, 1), + partition_column(DATA_SCHEMA_TSID_COLUMN_NAME, 1), + ), + ( + partition_column("greptime_timestamp", 2), + partition_column("greptime_timestamp", 2), + ), + ], + None, + &JoinType::Inner, + None, + PartitionMode::Partitioned, + NullEquality::NullEqualsNull, + false, + ) + .unwrap(), + ) as Arc; + + let optimized = PassDistribution + .optimize(join, &ConfigOptions::default()) + .unwrap(); + let hash_join = optimized.as_any().downcast_ref::().unwrap(); + let left_projection = hash_join + .left() + .as_any() + .downcast_ref::() + .unwrap(); + let left_partitioning = left_projection.input().output_partitioning(); + let right_partitioning = hash_join.right().output_partitioning(); + + let Partitioning::Hash(left_exprs, left_count) = left_partitioning else { + panic!("expected left merge scan hash partitioning"); + }; + let Partitioning::Hash(right_exprs, right_count) = right_partitioning else { + panic!("expected right merge scan hash partitioning"); + }; + + assert_eq!(*left_count, 32); + assert_eq!(*right_count, 32); + assert_eq!( + column_names(left_exprs), + vec![DATA_SCHEMA_TSID_COLUMN_NAME, "greptime_timestamp"] + ); + assert_eq!( + column_names(right_exprs), + vec![DATA_SCHEMA_TSID_COLUMN_NAME, "greptime_timestamp"] + ); + } + + fn test_merge_scan_exec(schema: SchemaRef) -> MergeScanExec { + let session_state = SessionStateBuilder::new().with_default_features().build(); + let partition_cols = BTreeMap::from([ + ( + DATA_SCHEMA_TSID_COLUMN_NAME.to_string(), + BTreeSet::from([datafusion_common::Column::from_name( + DATA_SCHEMA_TSID_COLUMN_NAME, + )]), + ), + ( + "greptime_timestamp".to_string(), + BTreeSet::from([datafusion_common::Column::from_name("greptime_timestamp")]), + ), + ]); + let plan = LogicalPlanBuilder::empty(false).build().unwrap(); + + MergeScanExec::new( + &session_state, + TableName::new("greptime", "public", "test"), + vec![RegionId::new(1, 0), RegionId::new(1, 1)], + plan, + schema.as_ref(), + Arc::new(NoopRegionQueryHandler), + QueryContext::arc(), + 32, + partition_cols, + ) + .unwrap() + } + + fn test_schema() -> SchemaRef { + Arc::new(Schema::new(vec![ + Field::new("host", DataType::Utf8, true), + Field::new(DATA_SCHEMA_TSID_COLUMN_NAME, DataType::UInt64, false), + Field::new( + "greptime_timestamp", + DataType::Timestamp(TimeUnit::Millisecond, None), + false, + ), + Field::new("greptime_value", DataType::Float64, true), + ])) + } + + fn partition_column(name: &str, index: usize) -> Arc { + Arc::new(PhysicalColumn::new(name, index)) + } + + fn column_names(exprs: &[Arc]) -> Vec<&str> { + exprs + .iter() + .map(|expr| { + expr.as_any() + .downcast_ref::() + .unwrap() + .name() + }) + .collect() + } } diff --git a/src/query/src/promql/planner.rs b/src/query/src/promql/planner.rs index 640994dea2..0c889529eb 100644 --- a/src/query/src/promql/planner.rs +++ b/src/query/src/promql/planner.rs @@ -709,7 +709,13 @@ impl PromPlanner { self.ctx.table_name = Some("rhs".to_string()); } } - let mut field_columns = left_field_columns.iter().zip(right_field_columns.iter()); + let (output_field_columns, field_columns) = + Self::align_binary_field_columns(&left_field_columns, &right_field_columns); + // PromQL binary arithmetic only combines the shared prefix of value columns. + // Keep the output field count aligned with that zipped prefix so planning + // remains stable even when the two sides have uneven multi-field schemas. + self.ctx.field_columns = output_field_columns; + let mut field_columns = field_columns.into_iter(); let join_plan = self.join_on_non_field_columns( left_input, @@ -810,11 +816,10 @@ impl PromPlanner { // Preserve `__tsid` if present, so it can still be used internally downstream. It's // stripped from the final output anyway. - if context.use_tsid - && let Ok(tsid_col) = - schema.qualified_field_with_name(Some(table_ref), DATA_SCHEMA_TSID_COLUMN_NAME) + if let Some(tsid_col) = + Self::optional_tsid_projection(schema, Some(table_ref), context.use_tsid) { - project_exprs.push(DfExpr::Column(tsid_col.into())); + project_exprs.push(tsid_col); } let plan = LogicalPlanBuilder::from(input) @@ -3393,6 +3398,97 @@ impl PromPlanner { ) } + fn align_binary_field_columns<'a>( + left_field_columns: &'a [String], + right_field_columns: &'a [String], + ) -> (Vec, Vec<(&'a String, &'a String)>) { + let field_pairs = left_field_columns + .iter() + .zip(right_field_columns.iter()) + .collect::>(); + let output_field_columns = field_pairs + .iter() + .map(|(left_col_name, _)| (*left_col_name).clone()) + .collect(); + (output_field_columns, field_pairs) + } + + fn plan_has_tsid_column(plan: &LogicalPlan) -> bool { + plan.schema() + .fields() + .iter() + .any(|field| field.name() == DATA_SCHEMA_TSID_COLUMN_NAME) + } + + fn optional_tsid_projection( + schema: &DFSchemaRef, + table_ref: Option<&TableReference>, + keep_tsid: bool, + ) -> Option { + keep_tsid.then_some(()).and_then(|_| { + schema + .qualified_field_with_name(table_ref, DATA_SCHEMA_TSID_COLUMN_NAME) + .ok() + .map(|field| DfExpr::Column(field.into())) + }) + } + + fn binary_join_key_columns( + &self, + left: &LogicalPlan, + right: &LogicalPlan, + only_join_time_index: bool, + modifier: &Option, + ) -> (BTreeSet, BTreeSet) { + let use_tsid_join = !only_join_time_index + && modifier.as_ref().is_none_or(|modifier| { + modifier.matching.is_none() + && matches!(modifier.card, VectorMatchCardinality::OneToOne) + }) + && Self::plan_has_tsid_column(left) + && Self::plan_has_tsid_column(right); + + let (mut left_tag_columns, mut right_tag_columns) = if use_tsid_join { + ( + BTreeSet::from([DATA_SCHEMA_TSID_COLUMN_NAME.to_string()]), + BTreeSet::from([DATA_SCHEMA_TSID_COLUMN_NAME.to_string()]), + ) + } else { + let left_tag_columns = if only_join_time_index { + BTreeSet::new() + } else { + self.ctx + .tag_columns + .iter() + .cloned() + .collect::>() + }; + let right_tag_columns = left_tag_columns.clone(); + (left_tag_columns, right_tag_columns) + }; + + if !use_tsid_join + && let Some(modifier) = modifier + && let Some(matching) = &modifier.matching + { + match matching { + LabelModifier::Include(on) => { + let mask = on.labels.iter().cloned().collect::>(); + left_tag_columns = left_tag_columns.intersection(&mask).cloned().collect(); + right_tag_columns = right_tag_columns.intersection(&mask).cloned().collect(); + } + LabelModifier::Exclude(ignoring) => { + for label in &ignoring.labels { + let _ = left_tag_columns.remove(label); + let _ = right_tag_columns.remove(label); + } + } + } + } + + (left_tag_columns, right_tag_columns) + } + /// Build a inner join on time index column and tag columns to concat two logical plans. /// When `only_join_time_index == true` we only join on the time index, because these two plan may not have the same tag columns #[allow(clippy::too_many_arguments)] @@ -3407,40 +3503,8 @@ impl PromPlanner { only_join_time_index: bool, modifier: &Option, ) -> Result { - let mut left_tag_columns = if only_join_time_index { - BTreeSet::new() - } else { - self.ctx - .tag_columns - .iter() - .cloned() - .collect::>() - }; - let mut right_tag_columns = left_tag_columns.clone(); - - // apply modifier - if let Some(modifier) = modifier { - // apply label modifier - if let Some(matching) = &modifier.matching { - match matching { - // keeps columns mentioned in `on` - LabelModifier::Include(on) => { - let mask = on.labels.iter().cloned().collect::>(); - left_tag_columns = left_tag_columns.intersection(&mask).cloned().collect(); - right_tag_columns = - right_tag_columns.intersection(&mask).cloned().collect(); - } - // removes columns memtioned in `ignoring` - LabelModifier::Exclude(ignoring) => { - // doesn't check existence of label - for label in &ignoring.labels { - let _ = left_tag_columns.remove(label); - let _ = right_tag_columns.remove(label); - } - } - } - } - } + let (mut left_tag_columns, mut right_tag_columns) = + self.binary_join_key_columns(&left, &right, only_join_time_index, modifier); // push time index column if it exists if let (Some(left_time_index_column), Some(right_time_index_column)) = @@ -3856,17 +3920,17 @@ impl PromPlanner { where F: FnMut(&String) -> Result, { + let table_ref = self.ctx.table_name.clone().map(TableReference::bare); let non_field_columns_iter = self .ctx .tag_columns .iter() .chain(self.ctx.time_index_column.iter()) - .map(|col| { - Ok(DfExpr::Column(Column::new( - self.ctx.table_name.clone().map(TableReference::bare), - col, - ))) - }); + .map(|col| Ok(DfExpr::Column(Column::new(table_ref.clone(), col)))); + let tsid_iter = + Self::optional_tsid_projection(input.schema(), table_ref.as_ref(), self.ctx.use_tsid) + .into_iter() + .map(Ok); // build computation exprs let result_field_columns = self @@ -3888,6 +3952,7 @@ impl PromPlanner { // chain non-field columns (unchanged) and field columns (applied computation then alias) let project_fields = non_field_columns_iter + .chain(tsid_iter) .chain(field_columns_iter) .collect::>>()?; @@ -4124,6 +4189,18 @@ mod test { assert!(!plan_str.contains("Distinct:"), "{plan_str}"); } + fn build_eval_stmt(expr: &str) -> EvalStmt { + EvalStmt { + expr: parser::parse(expr).unwrap(), + start: UNIX_EPOCH, + end: UNIX_EPOCH + .checked_add(Duration::from_secs(100_000)) + .unwrap(), + interval: Duration::from_secs(5), + lookback_delta: Duration::from_secs(1), + } + } + async fn build_test_table_provider( table_name_tuples: &[(String, String)], num_tag: usize, @@ -4195,11 +4272,27 @@ mod test { table_name_tuples: &[(String, String)], num_tag: usize, num_field: usize, + ) -> DfTableSourceProvider { + let table_specs = table_name_tuples + .iter() + .map(|(schema_name, table_name)| ((schema_name.clone(), table_name.clone()), num_field)) + .collect::>(); + build_test_table_provider_with_tsid_fields(&table_specs, num_tag).await + } + + async fn build_test_table_provider_with_tsid_fields( + table_specs: &[((String, String), usize)], + num_tag: usize, ) -> DfTableSourceProvider { let catalog_list = MemoryCatalogManager::with_default_setup(); let physical_table_name = "phy"; let physical_table_id = 999u32; + let physical_num_field = table_specs + .iter() + .map(|(_, num_field)| *num_field) + .max() + .unwrap_or(0); // Register a metric engine physical table with internal columns. { @@ -4230,7 +4323,7 @@ mod test { ) .with_time_index(true), ); - for i in 0..num_field { + for i in 0..physical_num_field { columns.push(ColumnSchema::new( format!("field_{i}"), ConcreteDataType::float64_datatype(), @@ -4243,7 +4336,7 @@ mod test { let table_meta = TableMetaBuilder::empty() .schema(schema) .primary_key_indices(primary_key_indices) - .value_indices((2 + num_tag..2 + num_tag + 1 + num_field).collect()) + .value_indices((2 + num_tag..2 + num_tag + 1 + physical_num_field).collect()) .engine(METRIC_ENGINE_NAME.to_string()) .next_column_id(1024) .build() @@ -4270,7 +4363,7 @@ mod test { } // Register metric engine logical tables without `__tsid`, referencing the physical table. - for (idx, (schema_name, table_name)) in table_name_tuples.iter().enumerate() { + for (idx, ((schema_name, table_name), num_field)) in table_specs.iter().enumerate() { let mut columns = vec![]; for i in 0..num_tag { columns.push(ColumnSchema::new( @@ -4287,7 +4380,7 @@ mod test { ) .with_time_index(true), ); - for i in 0..num_field { + for i in 0..*num_field { columns.push(ColumnSchema::new( format!("field_{i}"), ConcreteDataType::float64_datatype(), @@ -4305,7 +4398,7 @@ mod test { let table_meta = TableMetaBuilder::empty() .schema(schema) .primary_key_indices((0..num_tag).collect()) - .value_indices((num_tag + 1..num_tag + 1 + num_field).collect()) + .value_indices((num_tag + 1..num_tag + 1 + *num_field).collect()) .engine(METRIC_ENGINE_NAME.to_string()) .options(options) .next_column_id(1024) @@ -4736,6 +4829,285 @@ mod test { ); } + #[tokio::test] + async fn default_binary_join_uses_tsid_when_available() { + let eval_stmt = build_eval_stmt("some_metric / some_alt_metric"); + + let table_provider = build_test_table_provider_with_tsid( + &[ + (DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string()), + ( + DEFAULT_SCHEMA_NAME.to_string(), + "some_alt_metric".to_string(), + ), + ], + 1, + 1, + ) + .await; + let plan = + PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state()) + .await + .unwrap(); + + let plan_str = plan.display_indent_schema().to_string(); + assert!( + plan_str.contains("some_metric.__tsid = some_alt_metric.__tsid"), + "{plan_str}" + ); + assert!( + !plan_str.contains("some_metric.tag_0 = some_alt_metric.tag_0"), + "{plan_str}" + ); + } + + #[tokio::test] + async fn tsid_is_preserved_for_nested_default_binary_joins() { + let eval_stmt = build_eval_stmt("(some_metric - some_alt_metric) / some_third_metric"); + + let table_provider = build_test_table_provider_with_tsid( + &[ + (DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string()), + ( + DEFAULT_SCHEMA_NAME.to_string(), + "some_alt_metric".to_string(), + ), + ( + DEFAULT_SCHEMA_NAME.to_string(), + "some_third_metric".to_string(), + ), + ], + 1, + 1, + ) + .await; + let plan = + PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state()) + .await + .unwrap(); + + let plan_str = plan.display_indent_schema().to_string(); + assert_eq!(plan_str.matches("__tsid =").count(), 2, "{plan_str}"); + assert!(!plan_str.contains("tag_0 ="), "{plan_str}"); + } + + #[tokio::test] + async fn repeated_tsid_binary_operand_keeps_tsid_join_keys() { + let eval_stmt = build_eval_stmt("((some_metric - some_alt_metric) / some_metric) * 100"); + + let table_provider = build_test_table_provider_with_tsid( + &[ + (DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string()), + ( + DEFAULT_SCHEMA_NAME.to_string(), + "some_alt_metric".to_string(), + ), + ], + 1, + 1, + ) + .await; + let plan = + PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state()) + .await + .unwrap(); + + let plan_str = plan.display_indent_schema().to_string(); + assert_eq!(plan_str.matches("__tsid =").count(), 2, "{plan_str}"); + assert!(!plan_str.contains("tag_0 ="), "{plan_str}"); + } + + #[tokio::test] + async fn repeated_tsid_binary_operand_keeps_shorter_field_side() { + let eval_stmt = + build_eval_stmt("((two_field_metric - one_field_metric) / one_field_metric) * 100"); + + let table_provider = build_test_table_provider_with_tsid_fields( + &[ + ( + ( + DEFAULT_SCHEMA_NAME.to_string(), + "two_field_metric".to_string(), + ), + 2, + ), + ( + ( + DEFAULT_SCHEMA_NAME.to_string(), + "one_field_metric".to_string(), + ), + 1, + ), + ], + 1, + ) + .await; + let plan = + PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state()) + .await + .unwrap(); + + let field_names = plan + .schema() + .fields() + .iter() + .map(|field| field.name().clone()) + .collect::>(); + let value_columns = field_names + .iter() + .filter(|name| { + *name != "tag_0" && *name != "timestamp" && *name != DATA_SCHEMA_TSID_COLUMN_NAME + }) + .count(); + assert_eq!(value_columns, 1, "{field_names:?}"); + let plan_str = plan.display_indent_schema().to_string(); + assert_eq!(plan_str.matches("__tsid =").count(), 2, "{plan_str}"); + assert!(!plan_str.contains("tag_0 ="), "{plan_str}"); + } + + #[tokio::test] + async fn tsid_binary_join_uses_shorter_field_side() { + let eval_stmt = build_eval_stmt("one_field_metric / two_field_metric"); + + let table_provider = build_test_table_provider_with_tsid_fields( + &[ + ( + ( + DEFAULT_SCHEMA_NAME.to_string(), + "one_field_metric".to_string(), + ), + 1, + ), + ( + ( + DEFAULT_SCHEMA_NAME.to_string(), + "two_field_metric".to_string(), + ), + 2, + ), + ], + 1, + ) + .await; + let plan = + PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state()) + .await + .unwrap(); + + let field_names = plan + .schema() + .fields() + .iter() + .map(|field| field.name().clone()) + .collect::>(); + let value_columns = field_names + .iter() + .filter(|name| { + *name != "tag_0" && *name != "timestamp" && *name != DATA_SCHEMA_TSID_COLUMN_NAME + }) + .count(); + assert_eq!(value_columns, 1, "{field_names:?}"); + } + + #[tokio::test] + async fn label_matching_modifier_disables_tsid_binary_join() { + let eval_stmt = build_eval_stmt("some_metric / ignoring(tag_0) some_alt_metric"); + + let table_provider = build_test_table_provider_with_tsid( + &[ + (DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string()), + ( + DEFAULT_SCHEMA_NAME.to_string(), + "some_alt_metric".to_string(), + ), + ], + 2, + 1, + ) + .await; + let plan = + PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state()) + .await + .unwrap(); + + let plan_str = plan.display_indent_schema().to_string(); + assert!(!plan_str.contains("__tsid ="), "{plan_str}"); + assert!( + plan_str.contains("some_metric.tag_1 = some_alt_metric.tag_1"), + "{plan_str}" + ); + } + + #[tokio::test] + async fn comparison_binary_join_uses_tsid_and_keeps_it_in_filtered_result() { + let eval_stmt = build_eval_stmt("some_metric > some_alt_metric"); + + let table_provider = build_test_table_provider_with_tsid( + &[ + (DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string()), + ( + DEFAULT_SCHEMA_NAME.to_string(), + "some_alt_metric".to_string(), + ), + ], + 2, + 1, + ) + .await; + let mut planner = PromPlanner { + table_provider, + ctx: PromPlannerContext::from_eval_stmt(&eval_stmt), + }; + let plan = planner + .prom_expr_to_plan(&eval_stmt.expr, &build_query_engine_state()) + .await + .unwrap(); + + let plan_str = plan.display_indent_schema().to_string(); + assert!( + plan_str.contains("some_metric.__tsid = some_alt_metric.__tsid"), + "{plan_str}" + ); + assert!( + plan.schema() + .fields() + .iter() + .any(|field| field.name() == DATA_SCHEMA_TSID_COLUMN_NAME), + "{plan_str}" + ); + assert!(planner.ctx.use_tsid, "{plan_str}"); + } + + #[tokio::test] + async fn comparison_bool_binary_join_uses_tsid_when_available() { + let eval_stmt = build_eval_stmt("some_metric > bool some_alt_metric"); + + let table_provider = build_test_table_provider_with_tsid( + &[ + (DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string()), + ( + DEFAULT_SCHEMA_NAME.to_string(), + "some_alt_metric".to_string(), + ), + ], + 2, + 1, + ) + .await; + let plan = + PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state()) + .await + .unwrap(); + + let plan_str = plan.display_indent_schema().to_string(); + assert!( + plan_str.contains("some_metric.__tsid = some_alt_metric.__tsid"), + "{plan_str}" + ); + assert!(!plan_str.contains("tag_0 ="), "{plan_str}"); + assert!(!plan_str.contains("tag_1 ="), "{plan_str}"); + } + #[tokio::test] async fn scalar_count_count_range_keeps_full_window() { let plan_str = build_optimized_tsid_plan( diff --git a/tests/cases/standalone/common/promql/tsid_binary_join_regression.result b/tests/cases/standalone/common/promql/tsid_binary_join_regression.result new file mode 100644 index 0000000000..3640291dc3 --- /dev/null +++ b/tests/cases/standalone/common/promql/tsid_binary_join_regression.result @@ -0,0 +1,237 @@ +-- Regression test for TSID-backed PromQL binary joins on metric-engine tables. +-- Default arithmetic and comparison joins should use `__tsid` when matching is the +-- default one-to-one case. Label modifiers still have to stay label-based. +CREATE TABLE tsid_binary_join_physical ( + ts TIMESTAMP(3) TIME INDEX, + greptime_value DOUBLE, +) ENGINE = metric WITH ("physical_metric_table" = ""); + +Affected Rows: 0 + +CREATE TABLE tsid_binary_join_left ( + host STRING NULL, + job STRING NULL, + ts TIMESTAMP(3) NOT NULL, + greptime_value DOUBLE NULL, + TIME INDEX (ts), + PRIMARY KEY(host, job), +) +ENGINE = metric +WITH( + on_physical_table = 'tsid_binary_join_physical' +); + +Affected Rows: 0 + +CREATE TABLE tsid_binary_join_right ( + host STRING NULL, + job STRING NULL, + ts TIMESTAMP(3) NOT NULL, + greptime_value DOUBLE NULL, + TIME INDEX (ts), + PRIMARY KEY(host, job), +) +ENGINE = metric +WITH( + on_physical_table = 'tsid_binary_join_physical' +); + +Affected Rows: 0 + +INSERT INTO tsid_binary_join_left (host, job, ts, greptime_value) VALUES + ('host1', 'job1', 0, 12), + ('host2', 'job2', 0, 18), + ('host1', 'job1', 5000, 15), + ('host2', 'job2', 5000, 21); + +Affected Rows: 4 + +INSERT INTO tsid_binary_join_right (host, job, ts, greptime_value) VALUES + ('host1', 'job1', 0, 3), + ('host2', 'job2', 0, 6), + ('host1', 'job1', 5000, 5), + ('host2', 'job2', 5000, 7); + +Affected Rows: 4 + +-- Default vector-vector arithmetic should join on `__tsid` and time index. +-- SQLNESS REPLACE (metrics.*) REDACTED +-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED +-- SQLNESS REPLACE (-+) - +-- SQLNESS REPLACE (\s\s+) _ +-- SQLNESS REPLACE (peers.*) REDACTED +-- SQLNESS REPLACE Hash\(\[__tsid@1,\sts@2\],.* Hash([__tsid@1, ts@2],REDACTED +-- SQLNESS REPLACE Hash\(\[__tsid@3,\sts@4\],.* Hash([__tsid@3, ts@4],REDACTED +-- SQLNESS REPLACE input_partitions=\d+ input_partitions=REDACTED +-- SQLNESS REPLACE "partition_count":\{(.*?)\} "partition_count":REDACTED +-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED +TQL ANALYZE (0, 5, '5s') tsid_binary_join_left / tsid_binary_join_right; + ++-+-+-+ +| stage | node | plan_| ++-+-+-+ +| 0_| 0_|_ProjectionExec: expr=[host@2 as host, job@3 as job, ts@5 as ts, __tsid@4 as __tsid, greptime_value@0 / greptime_value@1 as tsid_binary_join_left.greptime_value / tsid_binary_join_right.greptime_value] REDACTED +|_|_|_HashJoinExec: mode=Partitioned, join_type=Inner, on=[(__tsid@1, __tsid@3), (ts@2, ts@4)], projection=[greptime_value@0, greptime_value@3, host@4, job@5, __tsid@6, ts@7], NullsEqual: true REDACTED +|_|_|_RepartitionExec: partitioning=Hash([__tsid@1, ts@2],REDACTED +|_|_|_ProjectionExec: expr=[greptime_value@0 as greptime_value, __tsid@3 as __tsid, ts@4 as ts] REDACTED +|_|_|_MergeScanExec: REDACTED +|_|_|_RepartitionExec: partitioning=Hash([__tsid@3, ts@4],REDACTED +|_|_|_MergeScanExec: REDACTED +|_|_|_| +| 1_| 0_|_PromInstantManipulateExec: range=[0..5000], lookback=[300000], interval=[5000], time index=[ts] REDACTED +|_|_|_PromSeriesDivideExec: tags=["__tsid"] REDACTED +|_|_|_ProjectionExec: expr=[greptime_value@1 as greptime_value, host@3 as host, job@4 as job, __tsid@2 as __tsid, ts@0 as ts] REDACTED +|_|_|_CooperativeExec REDACTED +|_|_|_SeriesScan: region=REDACTED, "partition_count":REDACTED, "distribution":"PerSeries" REDACTED +|_|_|_| +| 1_| 0_|_PromInstantManipulateExec: range=[0..5000], lookback=[300000], interval=[5000], time index=[ts] REDACTED +|_|_|_PromSeriesDivideExec: tags=["__tsid"] REDACTED +|_|_|_ProjectionExec: expr=[greptime_value@1 as greptime_value, host@3 as host, job@4 as job, __tsid@2 as __tsid, ts@0 as ts] REDACTED +|_|_|_CooperativeExec REDACTED +|_|_|_SeriesScan: region=REDACTED, "partition_count":REDACTED, "distribution":"PerSeries" REDACTED +|_|_|_| +|_|_| Total rows: 4_| ++-+-+-+ + +-- Label modifiers must disable the TSID shortcut and keep matching on the remaining labels. +-- SQLNESS REPLACE (metrics.*) REDACTED +-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED +-- SQLNESS REPLACE (-+) - +-- SQLNESS REPLACE (\s\s+) _ +-- SQLNESS REPLACE (peers.*) REDACTED +-- SQLNESS REPLACE Hash\(\[job@1,\sts@2\],.* Hash([job@1, ts@2],REDACTED +-- SQLNESS REPLACE Hash\(\[job@2,\sts@4\],.* Hash([job@2, ts@4],REDACTED +-- SQLNESS REPLACE input_partitions=\d+ input_partitions=REDACTED +-- SQLNESS REPLACE "partition_count":\{(.*?)\} "partition_count":REDACTED +-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED +TQL ANALYZE (0, 5, '5s') tsid_binary_join_left / ignoring(host) tsid_binary_join_right; + ++-+-+-+ +| stage | node | plan_| ++-+-+-+ +| 0_| 0_|_ProjectionExec: expr=[host@2 as host, job@3 as job, ts@5 as ts, __tsid@4 as __tsid, greptime_value@0 / greptime_value@1 as tsid_binary_join_left.greptime_value / tsid_binary_join_right.greptime_value] REDACTED +|_|_|_HashJoinExec: mode=Partitioned, join_type=Inner, on=[(job@1, job@2), (ts@2, ts@4)], projection=[greptime_value@0, greptime_value@3, host@4, job@5, __tsid@6, ts@7], NullsEqual: true REDACTED +|_|_|_RepartitionExec: partitioning=Hash([job@1, ts@2],REDACTED +|_|_|_ProjectionExec: expr=[greptime_value@0 as greptime_value, job@2 as job, ts@4 as ts] REDACTED +|_|_|_MergeScanExec: REDACTED +|_|_|_RepartitionExec: partitioning=Hash([job@2, ts@4],REDACTED +|_|_|_MergeScanExec: REDACTED +|_|_|_| +| 1_| 0_|_PromInstantManipulateExec: range=[0..5000], lookback=[300000], interval=[5000], time index=[ts] REDACTED +|_|_|_PromSeriesDivideExec: tags=["__tsid"] REDACTED +|_|_|_ProjectionExec: expr=[greptime_value@1 as greptime_value, host@3 as host, job@4 as job, __tsid@2 as __tsid, ts@0 as ts] REDACTED +|_|_|_CooperativeExec REDACTED +|_|_|_SeriesScan: region=REDACTED, "partition_count":REDACTED, "distribution":"PerSeries" REDACTED +|_|_|_| +| 1_| 0_|_PromInstantManipulateExec: range=[0..5000], lookback=[300000], interval=[5000], time index=[ts] REDACTED +|_|_|_PromSeriesDivideExec: tags=["__tsid"] REDACTED +|_|_|_ProjectionExec: expr=[greptime_value@1 as greptime_value, host@3 as host, job@4 as job, __tsid@2 as __tsid, ts@0 as ts] REDACTED +|_|_|_CooperativeExec REDACTED +|_|_|_SeriesScan: region=REDACTED, "partition_count":REDACTED, "distribution":"PerSeries" REDACTED +|_|_|_| +|_|_| Total rows: 4_| ++-+-+-+ + +-- Comparison filters can join on `__tsid`, but the filtered result must still behave like +-- a regular derived vector downstream. +-- SQLNESS REPLACE (metrics.*) REDACTED +-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED +-- SQLNESS REPLACE (-+) - +-- SQLNESS REPLACE (\s\s+) _ +-- SQLNESS REPLACE (peers.*) REDACTED +-- SQLNESS REPLACE Hash\(\[__tsid@1,\sts@2\],.* Hash([__tsid@1, ts@2],REDACTED +-- SQLNESS REPLACE Hash\(\[__tsid@3,\sts@4\],.* Hash([__tsid@3, ts@4],REDACTED +-- SQLNESS REPLACE input_partitions=\d+ input_partitions=REDACTED +-- SQLNESS REPLACE "partition_count":\{(.*?)\} "partition_count":REDACTED +-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED +TQL ANALYZE (0, 5, '5s') tsid_binary_join_left > tsid_binary_join_right; + ++-+-+-+ +| stage | node | plan_| ++-+-+-+ +| 0_| 0_|_ProjectionExec: expr=[ts@4 as ts, greptime_value@0 as greptime_value, host@1 as host, job@2 as job, __tsid@3 as __tsid] REDACTED +|_|_|_HashJoinExec: mode=Partitioned, join_type=Inner, on=[(__tsid@3, __tsid@1), (ts@4, ts@2)], filter=greptime_value@1 < greptime_value@0, projection=[greptime_value@0, host@1, job@2, __tsid@3, ts@4], NullsEqual: true REDACTED +|_|_|_RepartitionExec: partitioning=Hash([__tsid@3, ts@4],REDACTED +|_|_|_MergeScanExec: REDACTED +|_|_|_RepartitionExec: partitioning=Hash([__tsid@1, ts@2],REDACTED +|_|_|_ProjectionExec: expr=[greptime_value@0 as greptime_value, __tsid@3 as __tsid, ts@4 as ts] REDACTED +|_|_|_MergeScanExec: REDACTED +|_|_|_| +| 1_| 0_|_PromInstantManipulateExec: range=[0..5000], lookback=[300000], interval=[5000], time index=[ts] REDACTED +|_|_|_PromSeriesDivideExec: tags=["__tsid"] REDACTED +|_|_|_ProjectionExec: expr=[greptime_value@1 as greptime_value, host@3 as host, job@4 as job, __tsid@2 as __tsid, ts@0 as ts] REDACTED +|_|_|_CooperativeExec REDACTED +|_|_|_SeriesScan: region=REDACTED, "partition_count":REDACTED, "distribution":"PerSeries" REDACTED +|_|_|_| +| 1_| 0_|_PromInstantManipulateExec: range=[0..5000], lookback=[300000], interval=[5000], time index=[ts] REDACTED +|_|_|_PromSeriesDivideExec: tags=["__tsid"] REDACTED +|_|_|_ProjectionExec: expr=[greptime_value@1 as greptime_value, host@3 as host, job@4 as job, __tsid@2 as __tsid, ts@0 as ts] REDACTED +|_|_|_CooperativeExec REDACTED +|_|_|_SeriesScan: region=REDACTED, "partition_count":REDACTED, "distribution":"PerSeries" REDACTED +|_|_|_| +|_|_| Total rows: 4_| ++-+-+-+ + +-- `bool` comparison should follow the same TSID-backed matching path. +-- SQLNESS REPLACE (metrics.*) REDACTED +-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED +-- SQLNESS REPLACE (-+) - +-- SQLNESS REPLACE (\s\s+) _ +-- SQLNESS REPLACE (peers.*) REDACTED +-- SQLNESS REPLACE Hash\(\[__tsid@1,\sts@2\],.* Hash([__tsid@1, ts@2],REDACTED +-- SQLNESS REPLACE Hash\(\[__tsid@3,\sts@4\],.* Hash([__tsid@3, ts@4],REDACTED +-- SQLNESS REPLACE input_partitions=\d+ input_partitions=REDACTED +-- SQLNESS REPLACE "partition_count":\{(.*?)\} "partition_count":REDACTED +-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED +TQL ANALYZE (0, 5, '5s') tsid_binary_join_left > bool tsid_binary_join_right; + ++-+-+-+ +| stage | node | plan_| ++-+-+-+ +| 0_| 0_|_ProjectionExec: expr=[host@2 as host, job@3 as job, ts@5 as ts, __tsid@4 as __tsid, CAST(greptime_value@1 < greptime_value@0 AS Float64) as tsid_binary_join_left.greptime_value > tsid_binary_join_right.greptime_value] REDACTED +|_|_|_HashJoinExec: mode=Partitioned, join_type=Inner, on=[(__tsid@1, __tsid@3), (ts@2, ts@4)], projection=[greptime_value@0, greptime_value@3, host@4, job@5, __tsid@6, ts@7], NullsEqual: true REDACTED +|_|_|_RepartitionExec: partitioning=Hash([__tsid@1, ts@2],REDACTED +|_|_|_ProjectionExec: expr=[greptime_value@0 as greptime_value, __tsid@3 as __tsid, ts@4 as ts] REDACTED +|_|_|_MergeScanExec: REDACTED +|_|_|_RepartitionExec: partitioning=Hash([__tsid@3, ts@4],REDACTED +|_|_|_MergeScanExec: REDACTED +|_|_|_| +| 1_| 0_|_PromInstantManipulateExec: range=[0..5000], lookback=[300000], interval=[5000], time index=[ts] REDACTED +|_|_|_PromSeriesDivideExec: tags=["__tsid"] REDACTED +|_|_|_ProjectionExec: expr=[greptime_value@1 as greptime_value, host@3 as host, job@4 as job, __tsid@2 as __tsid, ts@0 as ts] REDACTED +|_|_|_CooperativeExec REDACTED +|_|_|_SeriesScan: region=REDACTED, "partition_count":REDACTED, "distribution":"PerSeries" REDACTED +|_|_|_| +| 1_| 0_|_PromInstantManipulateExec: range=[0..5000], lookback=[300000], interval=[5000], time index=[ts] REDACTED +|_|_|_PromSeriesDivideExec: tags=["__tsid"] REDACTED +|_|_|_ProjectionExec: expr=[greptime_value@1 as greptime_value, host@3 as host, job@4 as job, __tsid@2 as __tsid, ts@0 as ts] REDACTED +|_|_|_CooperativeExec REDACTED +|_|_|_SeriesScan: region=REDACTED, "partition_count":REDACTED, "distribution":"PerSeries" REDACTED +|_|_|_| +|_|_| Total rows: 4_| ++-+-+-+ + +-- SQLNESS SORT_RESULT 3 1 +TQL EVAL (0, 5, '5s') tsid_binary_join_left / tsid_binary_join_right; + ++-------+------+---------------------+------------------------------------------------------------------------------+ +| host | job | ts | tsid_binary_join_left.greptime_value / tsid_binary_join_right.greptime_value | ++-------+------+---------------------+------------------------------------------------------------------------------+ +| host1 | job1 | 1970-01-01T00:00:00 | 4.0 | +| host1 | job1 | 1970-01-01T00:00:05 | 3.0 | +| host2 | job2 | 1970-01-01T00:00:00 | 3.0 | +| host2 | job2 | 1970-01-01T00:00:05 | 3.0 | ++-------+------+---------------------+------------------------------------------------------------------------------+ + +DROP TABLE tsid_binary_join_right; + +Affected Rows: 0 + +DROP TABLE tsid_binary_join_left; + +Affected Rows: 0 + +DROP TABLE tsid_binary_join_physical; + +Affected Rows: 0 + diff --git a/tests/cases/standalone/common/promql/tsid_binary_join_regression.sql b/tests/cases/standalone/common/promql/tsid_binary_join_regression.sql new file mode 100644 index 0000000000..61dfd23765 --- /dev/null +++ b/tests/cases/standalone/common/promql/tsid_binary_join_regression.sql @@ -0,0 +1,106 @@ +-- Regression test for TSID-backed PromQL binary joins on metric-engine tables. +-- Default arithmetic and comparison joins should use `__tsid` when matching is the +-- default one-to-one case. Label modifiers still have to stay label-based. + +CREATE TABLE tsid_binary_join_physical ( + ts TIMESTAMP(3) TIME INDEX, + greptime_value DOUBLE, +) ENGINE = metric WITH ("physical_metric_table" = ""); + +CREATE TABLE tsid_binary_join_left ( + host STRING NULL, + job STRING NULL, + ts TIMESTAMP(3) NOT NULL, + greptime_value DOUBLE NULL, + TIME INDEX (ts), + PRIMARY KEY(host, job), +) +ENGINE = metric +WITH( + on_physical_table = 'tsid_binary_join_physical' +); + +CREATE TABLE tsid_binary_join_right ( + host STRING NULL, + job STRING NULL, + ts TIMESTAMP(3) NOT NULL, + greptime_value DOUBLE NULL, + TIME INDEX (ts), + PRIMARY KEY(host, job), +) +ENGINE = metric +WITH( + on_physical_table = 'tsid_binary_join_physical' +); + +INSERT INTO tsid_binary_join_left (host, job, ts, greptime_value) VALUES + ('host1', 'job1', 0, 12), + ('host2', 'job2', 0, 18), + ('host1', 'job1', 5000, 15), + ('host2', 'job2', 5000, 21); + +INSERT INTO tsid_binary_join_right (host, job, ts, greptime_value) VALUES + ('host1', 'job1', 0, 3), + ('host2', 'job2', 0, 6), + ('host1', 'job1', 5000, 5), + ('host2', 'job2', 5000, 7); + +-- Default vector-vector arithmetic should join on `__tsid` and time index. +-- SQLNESS REPLACE (metrics.*) REDACTED +-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED +-- SQLNESS REPLACE (-+) - +-- SQLNESS REPLACE (\s\s+) _ +-- SQLNESS REPLACE (peers.*) REDACTED +-- SQLNESS REPLACE Hash\(\[__tsid@1,\sts@2\],.* Hash([__tsid@1, ts@2],REDACTED +-- SQLNESS REPLACE Hash\(\[__tsid@3,\sts@4\],.* Hash([__tsid@3, ts@4],REDACTED +-- SQLNESS REPLACE input_partitions=\d+ input_partitions=REDACTED +-- SQLNESS REPLACE "partition_count":\{(.*?)\} "partition_count":REDACTED +-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED +TQL ANALYZE (0, 5, '5s') tsid_binary_join_left / tsid_binary_join_right; + +-- Label modifiers must disable the TSID shortcut and keep matching on the remaining labels. +-- SQLNESS REPLACE (metrics.*) REDACTED +-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED +-- SQLNESS REPLACE (-+) - +-- SQLNESS REPLACE (\s\s+) _ +-- SQLNESS REPLACE (peers.*) REDACTED +-- SQLNESS REPLACE Hash\(\[job@1,\sts@2\],.* Hash([job@1, ts@2],REDACTED +-- SQLNESS REPLACE Hash\(\[job@2,\sts@4\],.* Hash([job@2, ts@4],REDACTED +-- SQLNESS REPLACE input_partitions=\d+ input_partitions=REDACTED +-- SQLNESS REPLACE "partition_count":\{(.*?)\} "partition_count":REDACTED +-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED +TQL ANALYZE (0, 5, '5s') tsid_binary_join_left / ignoring(host) tsid_binary_join_right; + +-- Comparison filters can join on `__tsid`, but the filtered result must still behave like +-- a regular derived vector downstream. +-- SQLNESS REPLACE (metrics.*) REDACTED +-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED +-- SQLNESS REPLACE (-+) - +-- SQLNESS REPLACE (\s\s+) _ +-- SQLNESS REPLACE (peers.*) REDACTED +-- SQLNESS REPLACE Hash\(\[__tsid@1,\sts@2\],.* Hash([__tsid@1, ts@2],REDACTED +-- SQLNESS REPLACE Hash\(\[__tsid@3,\sts@4\],.* Hash([__tsid@3, ts@4],REDACTED +-- SQLNESS REPLACE input_partitions=\d+ input_partitions=REDACTED +-- SQLNESS REPLACE "partition_count":\{(.*?)\} "partition_count":REDACTED +-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED +TQL ANALYZE (0, 5, '5s') tsid_binary_join_left > tsid_binary_join_right; + +-- `bool` comparison should follow the same TSID-backed matching path. +-- SQLNESS REPLACE (metrics.*) REDACTED +-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED +-- SQLNESS REPLACE (-+) - +-- SQLNESS REPLACE (\s\s+) _ +-- SQLNESS REPLACE (peers.*) REDACTED +-- SQLNESS REPLACE Hash\(\[__tsid@1,\sts@2\],.* Hash([__tsid@1, ts@2],REDACTED +-- SQLNESS REPLACE Hash\(\[__tsid@3,\sts@4\],.* Hash([__tsid@3, ts@4],REDACTED +-- SQLNESS REPLACE input_partitions=\d+ input_partitions=REDACTED +-- SQLNESS REPLACE "partition_count":\{(.*?)\} "partition_count":REDACTED +-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED +TQL ANALYZE (0, 5, '5s') tsid_binary_join_left > bool tsid_binary_join_right; + +-- SQLNESS SORT_RESULT 3 1 +TQL EVAL (0, 5, '5s') tsid_binary_join_left / tsid_binary_join_right; + +DROP TABLE tsid_binary_join_right; +DROP TABLE tsid_binary_join_left; +DROP TABLE tsid_binary_join_physical;