fix(promql): collect narrow range joins by tsid

This commit is contained in:
Ruihang Xia
2026-06-03 18:14:05 +08:00
parent d3b1c46025
commit ff4dcc3cd2
3 changed files with 118 additions and 27 deletions

View File

@@ -82,20 +82,19 @@ impl PromqlTsidNarrowJoin {
}
fn is_promql_value_tsid_time_schema(schema: &SchemaRef) -> bool {
let mut has_value = false;
let mut value_columns = 0;
let mut has_tsid = false;
let mut has_time = false;
for field in schema.fields() {
match field.name().as_str() {
"greptime_value" => has_value = true,
DATA_SCHEMA_TSID_COLUMN_NAME => has_tsid = true,
_ if matches!(field.data_type(), DataType::Timestamp(_, _)) => has_time = true,
_ => return false,
_ => value_columns += 1,
}
}
has_value && has_tsid && has_time
value_columns == 1 && has_tsid && has_time
}
fn joins_on_tsid_and_time(hash_join: &HashJoinExec) -> bool {
@@ -214,6 +213,60 @@ mod tests {
);
}
#[test]
fn chooses_collect_left_for_computed_narrow_value_column() {
let left = Arc::new(EmptyExec::new(Arc::new(Schema::new(vec![
Field::new("prom_rate(greptime_value)", DataType::Float64, true),
Field::new(DATA_SCHEMA_TSID_COLUMN_NAME, DataType::UInt64, false),
Field::new(
"greptime_timestamp",
DataType::Timestamp(TimeUnit::Millisecond, None),
false,
),
])))) as Arc<dyn ExecutionPlan>;
let right = Arc::new(EmptyExec::new(Arc::new(Schema::new(vec![
Field::new("greptime_value", DataType::Float64, true),
Field::new("host", DataType::Utf8, true),
Field::new(DATA_SCHEMA_TSID_COLUMN_NAME, DataType::UInt64, false),
Field::new(
"greptime_timestamp",
DataType::Timestamp(TimeUnit::Millisecond, None),
false,
),
])))) as Arc<dyn ExecutionPlan>;
let on = vec![
(
Arc::new(Column::new(DATA_SCHEMA_TSID_COLUMN_NAME, 1)) as Arc<dyn PhysicalExpr>,
Arc::new(Column::new(DATA_SCHEMA_TSID_COLUMN_NAME, 2)) as Arc<dyn PhysicalExpr>,
),
(
Arc::new(Column::new("greptime_timestamp", 2)) as Arc<dyn PhysicalExpr>,
Arc::new(Column::new("greptime_timestamp", 3)) as Arc<dyn PhysicalExpr>,
),
];
let join = Arc::new(
HashJoinExec::try_new(
left,
right,
on,
None,
&JoinType::Inner,
Some(vec![0, 3, 4, 5, 6]),
PartitionMode::Partitioned,
NullEquality::NullEqualsNull,
false,
)
.unwrap(),
) as Arc<dyn ExecutionPlan>;
let optimized = PromqlTsidNarrowJoin
.optimize(join, &ConfigOptions::default())
.unwrap();
let optimized_join = optimized.as_any().downcast_ref::<HashJoinExec>().unwrap();
assert_eq!(optimized_join.partition_mode(), &PartitionMode::CollectLeft);
}
#[test]
fn keeps_partitioned_join_when_left_side_carries_labels() {
let left = Arc::new(EmptyExec::new(Arc::new(Schema::new(vec![

View File

@@ -1115,6 +1115,11 @@ impl PromPlanner {
self.create_function_expr(func, args.literals.clone(), query_engine_state)?;
func_exprs.insert(0, self.create_time_index_column_expr()?);
func_exprs.extend_from_slice(&self.create_tag_column_exprs()?);
if let Some(tsid_col) =
Self::optional_tsid_projection(input.schema(), None, self.ctx.use_tsid)
{
func_exprs.push(tsid_col);
}
let builder = LogicalPlanBuilder::from(input)
.project(func_exprs)
@@ -2363,6 +2368,7 @@ impl PromPlanner {
}
"label_join" => {
self.ctx.use_tsid = false;
let (concat_expr, dst_label) = Self::build_concat_labels_expr(
&mut other_input_exprs,
&self.ctx,
@@ -2386,6 +2392,7 @@ impl PromPlanner {
ScalarFunc::GeneratedExpr
}
"label_replace" => {
self.ctx.use_tsid = false;
if let Some((replace_expr, dst_label)) = self
.build_regexp_replace_label_expr(&mut other_input_exprs, query_engine_state)?
{
@@ -5185,6 +5192,37 @@ mod test {
assert!(!plan_str.contains("tag_1 ="), "{plan_str}");
}
#[tokio::test]
async fn range_function_keeps_tsid_for_absent_ignoring_binary_join() {
let eval_stmt =
build_eval_stmt("rate(some_metric[5m]) / ignoring(missing) some_alt_metric");
let table_provider = build_test_table_provider_with_tsid(
&[
(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string()),
(
DEFAULT_SCHEMA_NAME.to_string(),
"some_alt_metric".to_string(),
),
],
2,
1,
)
.await;
let plan =
PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
.await
.unwrap();
let plan_str = plan.display_indent_schema().to_string();
assert!(
plan_str.contains("some_metric.__tsid = some_alt_metric.__tsid"),
"{plan_str}"
);
assert!(!plan_str.contains("tag_0 ="), "{plan_str}");
assert!(!plan_str.contains("tag_1 ="), "{plan_str}");
}
#[tokio::test]
async fn on_full_label_set_keeps_tsid_binary_join() {
let eval_stmt = build_eval_stmt("some_metric / on(tag_0, tag_1) some_alt_metric");

View File

@@ -100,31 +100,31 @@ TQL EXPLAIN (
)
);
+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | HistogramFold: le=le, field=sum(prom_avg_over_time(ts_range,v)), quantile=0.5 |
| | MergeScan [is_placeholder=false, remote_input=[ |
| | Sort: test_tsid.le ASC NULLS LAST, test_tsid.tag4 ASC NULLS LAST, test_tsid.tag5 ASC NULLS LAST, test_tsid.ts ASC NULLS LAST |
| | Aggregate: groupBy=[[test_tsid.le, test_tsid.tag4, test_tsid.tag5, test_tsid.ts]], aggr=[[sum(prom_avg_over_time(ts_range,v))]] |
| | Filter: prom_avg_over_time(ts_range,v) IS NOT NULL |
| | Projection: test_tsid.ts, prom_avg_over_time(ts_range, v) AS prom_avg_over_time(ts_range,v), test_tsid.le, test_tsid.tag1, test_tsid.tag2, test_tsid.tag4, test_tsid.tag5, test_tsid.tag6, test_tsid.tag7, test_tsid.tag8 |
| | PromRangeManipulate: req range=[1769139000000..1769139900000], interval=[60000], eval range=[1800000], time index=[ts], values=["v"] |
| | PromSeriesNormalize: offset=[0], time index=[ts], filter NaN: [true] |
| | PromSeriesDivide: tags=["__tsid"] |
| | Sort: test_tsid.__tsid ASC NULLS FIRST, test_tsid.ts ASC NULLS FIRST |
| | Filter: test_tsid.ts >= TimestampMillisecond(1769137200001, None) AND test_tsid.ts <= TimestampMillisecond(1769139900000, None) |
| | Projection: test_tsid.v, test_tsid.le, test_tsid.tag1, test_tsid.tag2, test_tsid.tag4, test_tsid.tag5, test_tsid.tag6, test_tsid.tag7, test_tsid.tag8, test_tsid.__tsid, test_tsid.ts |
| | SubqueryAlias: test_tsid |
| | Filter: phy.__table_id=UInt32(REDACTED) |
| | TableScan: phy projection=[ts, v, tag1, tag2, le, tag4, tag5, tag6, tag7, tag8, __table_id, __tsid] |
| | ]] |
| physical_plan | HistogramFoldExec: le=@0, field=@4, quantile=0.5 |
| | SortExec: expr=[tag4@1 ASC NULLS LAST, tag5@2 ASC NULLS LAST, ts@3 ASC NULLS LAST, CAST(le@0 AS Float64) ASC NULLS LAST], preserve_partitioning=[true] |
+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | HistogramFold: le=le, field=sum(prom_avg_over_time(ts_range,v)), quantile=0.5 |
| | MergeScan [is_placeholder=false, remote_input=[ |
| | Sort: test_tsid.le ASC NULLS LAST, test_tsid.tag4 ASC NULLS LAST, test_tsid.tag5 ASC NULLS LAST, test_tsid.ts ASC NULLS LAST |
| | Aggregate: groupBy=[[test_tsid.le, test_tsid.tag4, test_tsid.tag5, test_tsid.ts]], aggr=[[sum(prom_avg_over_time(ts_range,v))]] |
| | Filter: prom_avg_over_time(ts_range,v) IS NOT NULL |
| | Projection: test_tsid.ts, prom_avg_over_time(ts_range, v) AS prom_avg_over_time(ts_range,v), test_tsid.le, test_tsid.tag1, test_tsid.tag2, test_tsid.tag4, test_tsid.tag5, test_tsid.tag6, test_tsid.tag7, test_tsid.tag8, test_tsid.__tsid |
| | PromRangeManipulate: req range=[1769139000000..1769139900000], interval=[60000], eval range=[1800000], time index=[ts], values=["v"] |
| | PromSeriesNormalize: offset=[0], time index=[ts], filter NaN: [true] |
| | PromSeriesDivide: tags=["__tsid"] |
| | Sort: test_tsid.__tsid ASC NULLS FIRST, test_tsid.ts ASC NULLS FIRST |
| | Filter: test_tsid.ts >= TimestampMillisecond(1769137200001, None) AND test_tsid.ts <= TimestampMillisecond(1769139900000, None) |
| | Projection: test_tsid.v, test_tsid.le, test_tsid.tag1, test_tsid.tag2, test_tsid.tag4, test_tsid.tag5, test_tsid.tag6, test_tsid.tag7, test_tsid.tag8, test_tsid.__tsid, test_tsid.ts |
| | SubqueryAlias: test_tsid |
| | Filter: phy.__table_id=UInt32(REDACTED) |
| | TableScan: phy projection=[ts, v, tag1, tag2, le, tag4, tag5, tag6, tag7, tag8, __table_id, __tsid] |
| | ]] |
| physical_plan | HistogramFoldExec: le=@0, field=@4, quantile=0.5 |
| | SortExec: expr=[tag4@1 ASC NULLS LAST, tag5@2 ASC NULLS LAST, ts@3 ASC NULLS LAST, CAST(le@0 AS Float64) ASC NULLS LAST], preserve_partitioning=[true] |
| | RepartitionExec: REDACTED
| | MergeScanExec: REDACTED
| | |
+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| | |
+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
CREATE FLOW IF NOT EXISTS test_tsid
SINK TO 'test_tsid_output'