From 0d19e8f0891d886b7f51deb7e1e7d1764b543a2d Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Fri, 14 Feb 2025 00:21:05 -0800 Subject: [PATCH] fix: promql join operation won't consider time index (#5535) Signed-off-by: Ruihang Xia Co-authored-by: Weny Xu --- src/query/src/promql/planner.rs | 27 +++++----- .../common/promql/set_operation.result | 54 +++++++++++++++++++ .../common/promql/set_operation.sql | 33 ++++++++++++ 3 files changed, 102 insertions(+), 12 deletions(-) diff --git a/src/query/src/promql/planner.rs b/src/query/src/promql/planner.rs index e64e67183b..1b0f996bc1 100644 --- a/src/query/src/promql/planner.rs +++ b/src/query/src/promql/planner.rs @@ -385,6 +385,7 @@ impl PromPlanner { (None, None) => { let left_input = self.prom_expr_to_plan(lhs, session_state).await?; let left_field_columns = self.ctx.field_columns.clone(); + let left_time_index_column = self.ctx.time_index_column.clone(); let mut left_table_ref = self .table_ref() .unwrap_or_else(|_| TableReference::bare("")); @@ -392,6 +393,7 @@ impl PromPlanner { let right_input = self.prom_expr_to_plan(rhs, session_state).await?; let right_field_columns = self.ctx.field_columns.clone(); + let right_time_index_column = self.ctx.time_index_column.clone(); let mut right_table_ref = self .table_ref() .unwrap_or_else(|_| TableReference::bare("")); @@ -429,19 +431,17 @@ impl PromPlanner { } } let mut field_columns = left_field_columns.iter().zip(right_field_columns.iter()); - let has_special_vector_function = (left_field_columns.len() == 1 - && left_field_columns[0] == GREPTIME_VALUE) - || (right_field_columns.len() == 1 && right_field_columns[0] == GREPTIME_VALUE); let join_plan = self.join_on_non_field_columns( left_input, right_input, left_table_ref.clone(), right_table_ref.clone(), + left_time_index_column, + right_time_index_column, // if left plan or right plan tag is empty, means case like `scalar(...) + host` or `host + scalar(...)` // under this case we only join on time index left_context.tag_columns.is_empty() || right_context.tag_columns.is_empty(), - has_special_vector_function, )?; let join_plan_schema = join_plan.schema().clone(); @@ -2055,16 +2055,18 @@ impl PromPlanner { /// Build a inner join on time index column and tag columns to concat two logical plans. /// When `only_join_time_index == true` we only join on the time index, because these two plan may not have the same tag columns + #[allow(clippy::too_many_arguments)] fn join_on_non_field_columns( &self, left: LogicalPlan, right: LogicalPlan, left_table_ref: TableReference, right_table_ref: TableReference, + left_time_index_column: Option, + right_time_index_column: Option, only_join_time_index: bool, - has_special_vector_function: bool, ) -> Result { - let mut tag_columns = if only_join_time_index { + let mut left_tag_columns = if only_join_time_index { vec![] } else { self.ctx @@ -2073,13 +2075,14 @@ impl PromPlanner { .map(Column::from_name) .collect::>() }; + let mut right_tag_columns = left_tag_columns.clone(); // push time index column if it exists - if let Some(time_index_column) = &self.ctx.time_index_column { - // issue #5392 if is special vector function - if !has_special_vector_function { - tag_columns.push(Column::from_name(time_index_column)); - } + if let (Some(left_time_index_column), Some(right_time_index_column)) = + (left_time_index_column, right_time_index_column) + { + left_tag_columns.push(Column::from_name(left_time_index_column)); + right_tag_columns.push(Column::from_name(right_time_index_column)); } let right = LogicalPlanBuilder::from(right) @@ -2095,7 +2098,7 @@ impl PromPlanner { .join( right, JoinType::Inner, - (tag_columns.clone(), tag_columns), + (left_tag_columns, right_tag_columns), None, ) .context(DataFusionPlanningSnafu)? diff --git a/tests/cases/standalone/common/promql/set_operation.result b/tests/cases/standalone/common/promql/set_operation.result index 605582e195..1b15d2a59d 100644 --- a/tests/cases/standalone/common/promql/set_operation.result +++ b/tests/cases/standalone/common/promql/set_operation.result @@ -465,3 +465,57 @@ drop table t2; Affected Rows: 0 +create table cache_hit ( + ts timestamp time index, + job string, + greptime_value double, + primary key (job) +); + +Affected Rows: 0 + +create table cache_miss ( + ts timestamp time index, + job string, + greptime_value double, + primary key (job) +); + +Affected Rows: 0 + +insert into cache_hit values + (3000, "read", 1.0), + (3000, "write", 2.0), + (4000, "read", 3.0), + (4000, "write", 4.0); + +Affected Rows: 4 + +insert into cache_miss values + (3000, "read", 1.0), + (3000, "write", 2.0), + (4000, "read", 1.0), + (4000, "write", 2.0); + +Affected Rows: 4 + +-- SQLNESS SORT_RESULT 3 1 +tql eval (3, 4, '1s') cache_hit / (cache_miss + cache_hit); + ++-------+---------------------+-------------------------------------------------------------------------------+ +| job | ts | lhs.greptime_value / rhs.cache_miss.greptime_value + cache_hit.greptime_value | ++-------+---------------------+-------------------------------------------------------------------------------+ +| read | 1970-01-01T00:00:03 | 0.5 | +| read | 1970-01-01T00:00:04 | 0.75 | +| write | 1970-01-01T00:00:03 | 0.5 | +| write | 1970-01-01T00:00:04 | 0.6666666666666666 | ++-------+---------------------+-------------------------------------------------------------------------------+ + +drop table cache_hit; + +Affected Rows: 0 + +drop table cache_miss; + +Affected Rows: 0 + diff --git a/tests/cases/standalone/common/promql/set_operation.sql b/tests/cases/standalone/common/promql/set_operation.sql index 87189323fd..757103142f 100644 --- a/tests/cases/standalone/common/promql/set_operation.sql +++ b/tests/cases/standalone/common/promql/set_operation.sql @@ -206,3 +206,36 @@ tql eval (0, 2000, '400') t2 or on(job) t1; drop table t1; drop table t2; + +create table cache_hit ( + ts timestamp time index, + job string, + greptime_value double, + primary key (job) +); + +create table cache_miss ( + ts timestamp time index, + job string, + greptime_value double, + primary key (job) +); + +insert into cache_hit values + (3000, "read", 1.0), + (3000, "write", 2.0), + (4000, "read", 3.0), + (4000, "write", 4.0); + +insert into cache_miss values + (3000, "read", 1.0), + (3000, "write", 2.0), + (4000, "read", 1.0), + (4000, "write", 2.0); + +-- SQLNESS SORT_RESULT 3 1 +tql eval (3, 4, '1s') cache_hit / (cache_miss + cache_hit); + +drop table cache_hit; + +drop table cache_miss;