From 9487e2c3caed8f20076b8bd6e50ca457f9f77608 Mon Sep 17 00:00:00 2001 From: Yingwen Date: Wed, 27 May 2026 15:10:24 +0800 Subject: [PATCH] fix: divide series for subquery output (#8173) * fix: divide series for subquery output Signed-off-by: evenyag * fix: propagate time index lookup error in prom_call_manipulate Signed-off-by: evenyag --------- Signed-off-by: evenyag --- src/query/src/promql/planner.rs | 84 +++++++++++++++++-- .../common/promql/encode_substrait.result | 36 ++++---- .../histogram_quantile_binary_op.result | 3 +- 3 files changed, 100 insertions(+), 23 deletions(-) diff --git a/src/query/src/promql/planner.rs b/src/query/src/promql/planner.rs index 9b05632d59..0dacc136e8 100644 --- a/src/query/src/promql/planner.rs +++ b/src/query/src/promql/planner.rs @@ -312,17 +312,71 @@ impl PromPlanner { let range_ms = range.as_millis() as _; self.ctx.range = Some(range_ms); + let time_index_column = + self.ctx + .time_index_column + .clone() + .with_context(|| TimeIndexNotFoundSnafu { + table: self.ctx.table_name.clone().unwrap_or_default(), + })?; + + // `RangeManipulate` assumes each input batch holds exactly one series + // (it takes tag column values from row 0 and applies them to every + // output row). The inner expression may emit batches that mix series, + // so sort by series key + time index and split into per-series batches + // with a `SeriesDivide` first. + let input_schema = input.schema(); + let input_has_tsid = input_schema.fields().iter().any(|field| { + field.name() == DATA_SCHEMA_TSID_COLUMN_NAME + && field.data_type() == &ArrowDataType::UInt64 + }); + let (series_key_columns, mut sort_exprs) = if input_has_tsid { + ( + vec![DATA_SCHEMA_TSID_COLUMN_NAME.to_string()], + vec![ + DfExpr::Column(Column::from_name(DATA_SCHEMA_TSID_COLUMN_NAME)) + .sort(true, true), + ], + ) + } else { + // Only use tag columns that survive in the inner plan's schema — + // `ctx.tag_columns` can drift from the actual output. + let key_columns: Vec = self + .ctx + .tag_columns + .iter() + .filter(|name| input_schema.has_column_with_unqualified_name(name)) + .cloned() + .collect(); + let sort = key_columns + .iter() + .map(|name| DfExpr::Column(Column::from_name(name)).sort(true, true)) + .collect::>(); + (key_columns, sort) + }; + sort_exprs.push(DfExpr::Column(Column::from_name(&time_index_column)).sort(true, true)); + + let sort_plan = LogicalPlanBuilder::from(input) + .sort(sort_exprs) + .context(DataFusionPlanningSnafu)? + .build() + .context(DataFusionPlanningSnafu)?; + let divide_plan = LogicalPlan::Extension(Extension { + node: Arc::new(SeriesDivide::new( + series_key_columns, + time_index_column.clone(), + sort_plan, + )), + }); + let manipulate = RangeManipulate::new( self.ctx.start, self.ctx.end, self.ctx.interval, range_ms, - self.ctx - .time_index_column - .clone() - .expect("time index should be set in `setup_context`"), + time_index_column, self.ctx.field_columns.clone(), - input, + divide_plan, ) .context(DataFusionPlanningSnafu)?; @@ -5926,6 +5980,26 @@ mod test { indie_query_plan_compare(query, expected).await; } + /// The outer `PromRangeManipulate` from a subquery must be preceded by + /// `Sort` + `PromSeriesDivide`. + #[tokio::test] + async fn count_over_time_subquery() { + let query = "count_over_time(some_metric[10m:1m])"; + let expected = String::from( + "Filter: prom_count_over_time(timestamp_range,field_0) IS NOT NULL [timestamp:Timestamp(ms), prom_count_over_time(timestamp_range,field_0):Float64;N, tag_0:Utf8]\ + \n Projection: some_metric.timestamp, prom_count_over_time(timestamp_range, field_0) AS prom_count_over_time(timestamp_range,field_0), some_metric.tag_0 [timestamp:Timestamp(ms), prom_count_over_time(timestamp_range,field_0):Float64;N, tag_0:Utf8]\ + \n PromRangeManipulate: req range=[0..100000000], interval=[5000], eval range=[600000], time index=[timestamp], values=[\"field_0\"] [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Dictionary(Int64, Float64);N, timestamp_range:Dictionary(Int64, Timestamp(ms))]\ + \n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\ + \n Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\ + \n PromInstantManipulate: range=[-540000..100000000], lookback=[1000], interval=[60000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\ + \n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\ + \n Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\ + \n Filter: some_metric.timestamp >= TimestampMillisecond(-540999, None) AND some_metric.timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\ + \n TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]", + ); + indie_query_plan_compare(query, expected).await; + } + #[tokio::test] async fn test_hash_join() { let mut eval_stmt = EvalStmt { diff --git a/tests/cases/standalone/common/promql/encode_substrait.result b/tests/cases/standalone/common/promql/encode_substrait.result index a154d9e5a2..deb72317e9 100644 --- a/tests/cases/standalone/common/promql/encode_substrait.result +++ b/tests/cases/standalone/common/promql/encode_substrait.result @@ -16,24 +16,26 @@ tql explain (0, 100, '1s') tag_a="ffa", }[1h])[12h:1h]; -+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ -| plan_type | plan | -+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ -| logical_plan | MergeScan [is_placeholder=false, remote_input=[ | -| | PromRangeManipulate: req range=[0..100000], interval=[1000], eval range=[43200000], time index=[ts], values=["prom_increase(ts_range,val,ts,Int64(3600000))"] | -| | Filter: prom_increase(ts_range,val,ts,Int64(3600000)) IS NOT NULL | -| | Projection: count_total.ts, prom_increase(ts_range, val, count_total.ts, Int64(3600000)) AS prom_increase(ts_range,val,ts,Int64(3600000)), count_total.tag_a, count_total.tag_b | -| | PromRangeManipulate: req range=[-39600000..100000], interval=[3600000], eval range=[3600000], time index=[ts], values=["val"] | -| | PromSeriesNormalize: offset=[0], time index=[ts], filter NaN: [true] | -| | PromSeriesDivide: tags=["tag_a", "tag_b"] | -| | Sort: count_total.tag_a ASC NULLS FIRST, count_total.tag_b ASC NULLS FIRST, count_total.ts ASC NULLS FIRST | -| | Filter: count_total.tag_a = Utf8("ffa") AND count_total.ts >= TimestampMillisecond(-43199999, None) AND count_total.ts <= TimestampMillisecond(100000, None) | -| | TableScan: count_total | -| | ]] | -| physical_plan | CooperativeExec | ++---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ +| plan_type | plan | ++---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ +| logical_plan | MergeScan [is_placeholder=false, remote_input=[ | +| | PromRangeManipulate: req range=[0..100000], interval=[1000], eval range=[43200000], time index=[ts], values=["prom_increase(ts_range,val,ts,Int64(3600000))"] | +| | PromSeriesDivide: tags=["tag_a", "tag_b"] | +| | Sort: count_total.tag_a ASC NULLS FIRST, count_total.tag_b ASC NULLS FIRST, count_total.ts ASC NULLS FIRST | +| | Filter: prom_increase(ts_range,val,ts,Int64(3600000)) IS NOT NULL | +| | Projection: count_total.ts, prom_increase(ts_range, val, count_total.ts, Int64(3600000)) AS prom_increase(ts_range,val,ts,Int64(3600000)), count_total.tag_a, count_total.tag_b | +| | PromRangeManipulate: req range=[-39600000..100000], interval=[3600000], eval range=[3600000], time index=[ts], values=["val"] | +| | PromSeriesNormalize: offset=[0], time index=[ts], filter NaN: [true] | +| | PromSeriesDivide: tags=["tag_a", "tag_b"] | +| | Sort: count_total.tag_a ASC NULLS FIRST, count_total.tag_b ASC NULLS FIRST, count_total.ts ASC NULLS FIRST | +| | Filter: count_total.tag_a = Utf8("ffa") AND count_total.ts >= TimestampMillisecond(-43199999, None) AND count_total.ts <= TimestampMillisecond(100000, None) | +| | TableScan: count_total | +| | ]] | +| physical_plan | CooperativeExec | | | MergeScanExec: REDACTED -| | | -+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ +| | | ++---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ tql eval (0, 100, '1s') increase(count_total{ diff --git a/tests/cases/standalone/common/promql/histogram_quantile_binary_op.result b/tests/cases/standalone/common/promql/histogram_quantile_binary_op.result index 601dad6219..df21957356 100644 --- a/tests/cases/standalone/common/promql/histogram_quantile_binary_op.result +++ b/tests/cases/standalone/common/promql/histogram_quantile_binary_op.result @@ -81,7 +81,8 @@ tql eval (3000, 3000, '1s') count_over_time((histogram_quantile(0.5, sum by (le, +---------------------+------------------------------------------------------------------------------+-------+ | ts | prom_count_over_time(ts_range,sum(prom_rate(ts_range,val,ts,Int64(300000)))) | pod | +---------------------+------------------------------------------------------------------------------+-------+ -| 1970-01-01T00:50:00 | 2.0 | pod-a | +| 1970-01-01T00:50:00 | 1.0 | pod-a | +| 1970-01-01T00:50:00 | 1.0 | pod-b | +---------------------+------------------------------------------------------------------------------+-------+ drop table http_request_duration_seconds_bucket;