diff --git a/src/query/src/optimizer/promql_tsid_narrow_join.rs b/src/query/src/optimizer/promql_tsid_narrow_join.rs index 419415662e..6dd0b5e4bb 100644 --- a/src/query/src/optimizer/promql_tsid_narrow_join.rs +++ b/src/query/src/optimizer/promql_tsid_narrow_join.rs @@ -82,20 +82,19 @@ impl PromqlTsidNarrowJoin { } fn is_promql_value_tsid_time_schema(schema: &SchemaRef) -> bool { - let mut has_value = false; + let mut value_columns = 0; let mut has_tsid = false; let mut has_time = false; for field in schema.fields() { match field.name().as_str() { - "greptime_value" => has_value = true, DATA_SCHEMA_TSID_COLUMN_NAME => has_tsid = true, _ if matches!(field.data_type(), DataType::Timestamp(_, _)) => has_time = true, - _ => return false, + _ => value_columns += 1, } } - has_value && has_tsid && has_time + value_columns == 1 && has_tsid && has_time } fn joins_on_tsid_and_time(hash_join: &HashJoinExec) -> bool { @@ -214,6 +213,60 @@ mod tests { ); } + #[test] + fn chooses_collect_left_for_computed_narrow_value_column() { + let left = Arc::new(EmptyExec::new(Arc::new(Schema::new(vec![ + Field::new("prom_rate(greptime_value)", DataType::Float64, true), + Field::new(DATA_SCHEMA_TSID_COLUMN_NAME, DataType::UInt64, false), + Field::new( + "greptime_timestamp", + DataType::Timestamp(TimeUnit::Millisecond, None), + false, + ), + ])))) as Arc; + let right = Arc::new(EmptyExec::new(Arc::new(Schema::new(vec![ + Field::new("greptime_value", DataType::Float64, true), + Field::new("host", DataType::Utf8, true), + Field::new(DATA_SCHEMA_TSID_COLUMN_NAME, DataType::UInt64, false), + Field::new( + "greptime_timestamp", + DataType::Timestamp(TimeUnit::Millisecond, None), + false, + ), + ])))) as Arc; + let on = vec![ + ( + Arc::new(Column::new(DATA_SCHEMA_TSID_COLUMN_NAME, 1)) as Arc, + Arc::new(Column::new(DATA_SCHEMA_TSID_COLUMN_NAME, 2)) as Arc, + ), + ( + Arc::new(Column::new("greptime_timestamp", 2)) as Arc, + Arc::new(Column::new("greptime_timestamp", 3)) as Arc, + ), + ]; + let join = Arc::new( + HashJoinExec::try_new( + left, + right, + on, + None, + &JoinType::Inner, + Some(vec![0, 3, 4, 5, 6]), + PartitionMode::Partitioned, + NullEquality::NullEqualsNull, + false, + ) + .unwrap(), + ) as Arc; + + let optimized = PromqlTsidNarrowJoin + .optimize(join, &ConfigOptions::default()) + .unwrap(); + let optimized_join = optimized.as_any().downcast_ref::().unwrap(); + + assert_eq!(optimized_join.partition_mode(), &PartitionMode::CollectLeft); + } + #[test] fn keeps_partitioned_join_when_left_side_carries_labels() { let left = Arc::new(EmptyExec::new(Arc::new(Schema::new(vec![ diff --git a/src/query/src/promql/planner.rs b/src/query/src/promql/planner.rs index 6985a3b6da..c3f70a7877 100644 --- a/src/query/src/promql/planner.rs +++ b/src/query/src/promql/planner.rs @@ -1115,6 +1115,11 @@ impl PromPlanner { self.create_function_expr(func, args.literals.clone(), query_engine_state)?; func_exprs.insert(0, self.create_time_index_column_expr()?); func_exprs.extend_from_slice(&self.create_tag_column_exprs()?); + if let Some(tsid_col) = + Self::optional_tsid_projection(input.schema(), None, self.ctx.use_tsid) + { + func_exprs.push(tsid_col); + } let builder = LogicalPlanBuilder::from(input) .project(func_exprs) @@ -2363,6 +2368,7 @@ impl PromPlanner { } "label_join" => { + self.ctx.use_tsid = false; let (concat_expr, dst_label) = Self::build_concat_labels_expr( &mut other_input_exprs, &self.ctx, @@ -2386,6 +2392,7 @@ impl PromPlanner { ScalarFunc::GeneratedExpr } "label_replace" => { + self.ctx.use_tsid = false; if let Some((replace_expr, dst_label)) = self .build_regexp_replace_label_expr(&mut other_input_exprs, query_engine_state)? { @@ -5185,6 +5192,37 @@ mod test { assert!(!plan_str.contains("tag_1 ="), "{plan_str}"); } + #[tokio::test] + async fn range_function_keeps_tsid_for_absent_ignoring_binary_join() { + let eval_stmt = + build_eval_stmt("rate(some_metric[5m]) / ignoring(missing) some_alt_metric"); + + let table_provider = build_test_table_provider_with_tsid( + &[ + (DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string()), + ( + DEFAULT_SCHEMA_NAME.to_string(), + "some_alt_metric".to_string(), + ), + ], + 2, + 1, + ) + .await; + let plan = + PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state()) + .await + .unwrap(); + + let plan_str = plan.display_indent_schema().to_string(); + assert!( + plan_str.contains("some_metric.__tsid = some_alt_metric.__tsid"), + "{plan_str}" + ); + assert!(!plan_str.contains("tag_0 ="), "{plan_str}"); + assert!(!plan_str.contains("tag_1 ="), "{plan_str}"); + } + #[tokio::test] async fn on_full_label_set_keeps_tsid_binary_join() { let eval_stmt = build_eval_stmt("some_metric / on(tag_0, tag_1) some_alt_metric"); diff --git a/tests/cases/distributed/flow-tql/tsid_on_phy.result b/tests/cases/distributed/flow-tql/tsid_on_phy.result index 6ae3e77b0c..d008a24e19 100644 --- a/tests/cases/distributed/flow-tql/tsid_on_phy.result +++ b/tests/cases/distributed/flow-tql/tsid_on_phy.result @@ -100,31 +100,31 @@ TQL EXPLAIN ( ) ); -+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ -| plan_type | plan | -+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ -| logical_plan | HistogramFold: le=le, field=sum(prom_avg_over_time(ts_range,v)), quantile=0.5 | -| | MergeScan [is_placeholder=false, remote_input=[ | -| | Sort: test_tsid.le ASC NULLS LAST, test_tsid.tag4 ASC NULLS LAST, test_tsid.tag5 ASC NULLS LAST, test_tsid.ts ASC NULLS LAST | -| | Aggregate: groupBy=[[test_tsid.le, test_tsid.tag4, test_tsid.tag5, test_tsid.ts]], aggr=[[sum(prom_avg_over_time(ts_range,v))]] | -| | Filter: prom_avg_over_time(ts_range,v) IS NOT NULL | -| | Projection: test_tsid.ts, prom_avg_over_time(ts_range, v) AS prom_avg_over_time(ts_range,v), test_tsid.le, test_tsid.tag1, test_tsid.tag2, test_tsid.tag4, test_tsid.tag5, test_tsid.tag6, test_tsid.tag7, test_tsid.tag8 | -| | PromRangeManipulate: req range=[1769139000000..1769139900000], interval=[60000], eval range=[1800000], time index=[ts], values=["v"] | -| | PromSeriesNormalize: offset=[0], time index=[ts], filter NaN: [true] | -| | PromSeriesDivide: tags=["__tsid"] | -| | Sort: test_tsid.__tsid ASC NULLS FIRST, test_tsid.ts ASC NULLS FIRST | -| | Filter: test_tsid.ts >= TimestampMillisecond(1769137200001, None) AND test_tsid.ts <= TimestampMillisecond(1769139900000, None) | -| | Projection: test_tsid.v, test_tsid.le, test_tsid.tag1, test_tsid.tag2, test_tsid.tag4, test_tsid.tag5, test_tsid.tag6, test_tsid.tag7, test_tsid.tag8, test_tsid.__tsid, test_tsid.ts | -| | SubqueryAlias: test_tsid | -| | Filter: phy.__table_id=UInt32(REDACTED) | -| | TableScan: phy projection=[ts, v, tag1, tag2, le, tag4, tag5, tag6, tag7, tag8, __table_id, __tsid] | -| | ]] | -| physical_plan | HistogramFoldExec: le=@0, field=@4, quantile=0.5 | -| | SortExec: expr=[tag4@1 ASC NULLS LAST, tag5@2 ASC NULLS LAST, ts@3 ASC NULLS LAST, CAST(le@0 AS Float64) ASC NULLS LAST], preserve_partitioning=[true] | ++---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ +| plan_type | plan | ++---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ +| logical_plan | HistogramFold: le=le, field=sum(prom_avg_over_time(ts_range,v)), quantile=0.5 | +| | MergeScan [is_placeholder=false, remote_input=[ | +| | Sort: test_tsid.le ASC NULLS LAST, test_tsid.tag4 ASC NULLS LAST, test_tsid.tag5 ASC NULLS LAST, test_tsid.ts ASC NULLS LAST | +| | Aggregate: groupBy=[[test_tsid.le, test_tsid.tag4, test_tsid.tag5, test_tsid.ts]], aggr=[[sum(prom_avg_over_time(ts_range,v))]] | +| | Filter: prom_avg_over_time(ts_range,v) IS NOT NULL | +| | Projection: test_tsid.ts, prom_avg_over_time(ts_range, v) AS prom_avg_over_time(ts_range,v), test_tsid.le, test_tsid.tag1, test_tsid.tag2, test_tsid.tag4, test_tsid.tag5, test_tsid.tag6, test_tsid.tag7, test_tsid.tag8, test_tsid.__tsid | +| | PromRangeManipulate: req range=[1769139000000..1769139900000], interval=[60000], eval range=[1800000], time index=[ts], values=["v"] | +| | PromSeriesNormalize: offset=[0], time index=[ts], filter NaN: [true] | +| | PromSeriesDivide: tags=["__tsid"] | +| | Sort: test_tsid.__tsid ASC NULLS FIRST, test_tsid.ts ASC NULLS FIRST | +| | Filter: test_tsid.ts >= TimestampMillisecond(1769137200001, None) AND test_tsid.ts <= TimestampMillisecond(1769139900000, None) | +| | Projection: test_tsid.v, test_tsid.le, test_tsid.tag1, test_tsid.tag2, test_tsid.tag4, test_tsid.tag5, test_tsid.tag6, test_tsid.tag7, test_tsid.tag8, test_tsid.__tsid, test_tsid.ts | +| | SubqueryAlias: test_tsid | +| | Filter: phy.__table_id=UInt32(REDACTED) | +| | TableScan: phy projection=[ts, v, tag1, tag2, le, tag4, tag5, tag6, tag7, tag8, __table_id, __tsid] | +| | ]] | +| physical_plan | HistogramFoldExec: le=@0, field=@4, quantile=0.5 | +| | SortExec: expr=[tag4@1 ASC NULLS LAST, tag5@2 ASC NULLS LAST, ts@3 ASC NULLS LAST, CAST(le@0 AS Float64) ASC NULLS LAST], preserve_partitioning=[true] | | | RepartitionExec: REDACTED | | MergeScanExec: REDACTED -| | | -+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ +| | | ++---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ CREATE FLOW IF NOT EXISTS test_tsid SINK TO 'test_tsid_output'