fix: promql join operation won't consider time index (#5535)

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
Co-authored-by: Weny Xu <wenymedia@gmail.com>
This commit is contained in:
Ruihang Xia
2025-02-14 00:21:05 -08:00
committed by GitHub
parent c56106b883
commit 0d19e8f089
3 changed files with 102 additions and 12 deletions

View File

@@ -385,6 +385,7 @@ impl PromPlanner {
(None, None) => {
let left_input = self.prom_expr_to_plan(lhs, session_state).await?;
let left_field_columns = self.ctx.field_columns.clone();
let left_time_index_column = self.ctx.time_index_column.clone();
let mut left_table_ref = self
.table_ref()
.unwrap_or_else(|_| TableReference::bare(""));
@@ -392,6 +393,7 @@ impl PromPlanner {
let right_input = self.prom_expr_to_plan(rhs, session_state).await?;
let right_field_columns = self.ctx.field_columns.clone();
let right_time_index_column = self.ctx.time_index_column.clone();
let mut right_table_ref = self
.table_ref()
.unwrap_or_else(|_| TableReference::bare(""));
@@ -429,19 +431,17 @@ impl PromPlanner {
}
}
let mut field_columns = left_field_columns.iter().zip(right_field_columns.iter());
let has_special_vector_function = (left_field_columns.len() == 1
&& left_field_columns[0] == GREPTIME_VALUE)
|| (right_field_columns.len() == 1 && right_field_columns[0] == GREPTIME_VALUE);
let join_plan = self.join_on_non_field_columns(
left_input,
right_input,
left_table_ref.clone(),
right_table_ref.clone(),
left_time_index_column,
right_time_index_column,
// if left plan or right plan tag is empty, means case like `scalar(...) + host` or `host + scalar(...)`
// under this case we only join on time index
left_context.tag_columns.is_empty() || right_context.tag_columns.is_empty(),
has_special_vector_function,
)?;
let join_plan_schema = join_plan.schema().clone();
@@ -2055,16 +2055,18 @@ impl PromPlanner {
/// Build a inner join on time index column and tag columns to concat two logical plans.
/// When `only_join_time_index == true` we only join on the time index, because these two plan may not have the same tag columns
#[allow(clippy::too_many_arguments)]
fn join_on_non_field_columns(
&self,
left: LogicalPlan,
right: LogicalPlan,
left_table_ref: TableReference,
right_table_ref: TableReference,
left_time_index_column: Option<String>,
right_time_index_column: Option<String>,
only_join_time_index: bool,
has_special_vector_function: bool,
) -> Result<LogicalPlan> {
let mut tag_columns = if only_join_time_index {
let mut left_tag_columns = if only_join_time_index {
vec![]
} else {
self.ctx
@@ -2073,13 +2075,14 @@ impl PromPlanner {
.map(Column::from_name)
.collect::<Vec<_>>()
};
let mut right_tag_columns = left_tag_columns.clone();
// push time index column if it exists
if let Some(time_index_column) = &self.ctx.time_index_column {
// issue #5392 if is special vector function
if !has_special_vector_function {
tag_columns.push(Column::from_name(time_index_column));
}
if let (Some(left_time_index_column), Some(right_time_index_column)) =
(left_time_index_column, right_time_index_column)
{
left_tag_columns.push(Column::from_name(left_time_index_column));
right_tag_columns.push(Column::from_name(right_time_index_column));
}
let right = LogicalPlanBuilder::from(right)
@@ -2095,7 +2098,7 @@ impl PromPlanner {
.join(
right,
JoinType::Inner,
(tag_columns.clone(), tag_columns),
(left_tag_columns, right_tag_columns),
None,
)
.context(DataFusionPlanningSnafu)?

View File

@@ -465,3 +465,57 @@ drop table t2;
Affected Rows: 0
create table cache_hit (
ts timestamp time index,
job string,
greptime_value double,
primary key (job)
);
Affected Rows: 0
create table cache_miss (
ts timestamp time index,
job string,
greptime_value double,
primary key (job)
);
Affected Rows: 0
insert into cache_hit values
(3000, "read", 1.0),
(3000, "write", 2.0),
(4000, "read", 3.0),
(4000, "write", 4.0);
Affected Rows: 4
insert into cache_miss values
(3000, "read", 1.0),
(3000, "write", 2.0),
(4000, "read", 1.0),
(4000, "write", 2.0);
Affected Rows: 4
-- SQLNESS SORT_RESULT 3 1
tql eval (3, 4, '1s') cache_hit / (cache_miss + cache_hit);
+-------+---------------------+-------------------------------------------------------------------------------+
| job | ts | lhs.greptime_value / rhs.cache_miss.greptime_value + cache_hit.greptime_value |
+-------+---------------------+-------------------------------------------------------------------------------+
| read | 1970-01-01T00:00:03 | 0.5 |
| read | 1970-01-01T00:00:04 | 0.75 |
| write | 1970-01-01T00:00:03 | 0.5 |
| write | 1970-01-01T00:00:04 | 0.6666666666666666 |
+-------+---------------------+-------------------------------------------------------------------------------+
drop table cache_hit;
Affected Rows: 0
drop table cache_miss;
Affected Rows: 0

View File

@@ -206,3 +206,36 @@ tql eval (0, 2000, '400') t2 or on(job) t1;
drop table t1;
drop table t2;
create table cache_hit (
ts timestamp time index,
job string,
greptime_value double,
primary key (job)
);
create table cache_miss (
ts timestamp time index,
job string,
greptime_value double,
primary key (job)
);
insert into cache_hit values
(3000, "read", 1.0),
(3000, "write", 2.0),
(4000, "read", 3.0),
(4000, "write", 4.0);
insert into cache_miss values
(3000, "read", 1.0),
(3000, "write", 2.0),
(4000, "read", 1.0),
(4000, "write", 2.0);
-- SQLNESS SORT_RESULT 3 1
tql eval (3, 4, '1s') cache_hit / (cache_miss + cache_hit);
drop table cache_hit;
drop table cache_miss;