fix: divide series for subquery output (#8173)

* fix: divide series for subquery output

Signed-off-by: evenyag <realevenyag@gmail.com>

* fix: propagate time index lookup error in prom_call_manipulate

Signed-off-by: evenyag <realevenyag@gmail.com>

---------

Signed-off-by: evenyag <realevenyag@gmail.com>
This commit is contained in:
Yingwen
2026-05-27 15:10:24 +08:00
committed by GitHub
parent 67887d3ec6
commit 9487e2c3ca
3 changed files with 100 additions and 23 deletions

View File

@@ -312,17 +312,71 @@ impl PromPlanner {
let range_ms = range.as_millis() as _;
self.ctx.range = Some(range_ms);
let time_index_column =
self.ctx
.time_index_column
.clone()
.with_context(|| TimeIndexNotFoundSnafu {
table: self.ctx.table_name.clone().unwrap_or_default(),
})?;
// `RangeManipulate` assumes each input batch holds exactly one series
// (it takes tag column values from row 0 and applies them to every
// output row). The inner expression may emit batches that mix series,
// so sort by series key + time index and split into per-series batches
// with a `SeriesDivide` first.
let input_schema = input.schema();
let input_has_tsid = input_schema.fields().iter().any(|field| {
field.name() == DATA_SCHEMA_TSID_COLUMN_NAME
&& field.data_type() == &ArrowDataType::UInt64
});
let (series_key_columns, mut sort_exprs) = if input_has_tsid {
(
vec![DATA_SCHEMA_TSID_COLUMN_NAME.to_string()],
vec![
DfExpr::Column(Column::from_name(DATA_SCHEMA_TSID_COLUMN_NAME))
.sort(true, true),
],
)
} else {
// Only use tag columns that survive in the inner plan's schema —
// `ctx.tag_columns` can drift from the actual output.
let key_columns: Vec<String> = self
.ctx
.tag_columns
.iter()
.filter(|name| input_schema.has_column_with_unqualified_name(name))
.cloned()
.collect();
let sort = key_columns
.iter()
.map(|name| DfExpr::Column(Column::from_name(name)).sort(true, true))
.collect::<Vec<_>>();
(key_columns, sort)
};
sort_exprs.push(DfExpr::Column(Column::from_name(&time_index_column)).sort(true, true));
let sort_plan = LogicalPlanBuilder::from(input)
.sort(sort_exprs)
.context(DataFusionPlanningSnafu)?
.build()
.context(DataFusionPlanningSnafu)?;
let divide_plan = LogicalPlan::Extension(Extension {
node: Arc::new(SeriesDivide::new(
series_key_columns,
time_index_column.clone(),
sort_plan,
)),
});
let manipulate = RangeManipulate::new(
self.ctx.start,
self.ctx.end,
self.ctx.interval,
range_ms,
self.ctx
.time_index_column
.clone()
.expect("time index should be set in `setup_context`"),
time_index_column,
self.ctx.field_columns.clone(),
input,
divide_plan,
)
.context(DataFusionPlanningSnafu)?;
@@ -5926,6 +5980,26 @@ mod test {
indie_query_plan_compare(query, expected).await;
}
/// The outer `PromRangeManipulate` from a subquery must be preceded by
/// `Sort` + `PromSeriesDivide`.
#[tokio::test]
async fn count_over_time_subquery() {
let query = "count_over_time(some_metric[10m:1m])";
let expected = String::from(
"Filter: prom_count_over_time(timestamp_range,field_0) IS NOT NULL [timestamp:Timestamp(ms), prom_count_over_time(timestamp_range,field_0):Float64;N, tag_0:Utf8]\
\n Projection: some_metric.timestamp, prom_count_over_time(timestamp_range, field_0) AS prom_count_over_time(timestamp_range,field_0), some_metric.tag_0 [timestamp:Timestamp(ms), prom_count_over_time(timestamp_range,field_0):Float64;N, tag_0:Utf8]\
\n PromRangeManipulate: req range=[0..100000000], interval=[5000], eval range=[600000], time index=[timestamp], values=[\"field_0\"] [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Dictionary(Int64, Float64);N, timestamp_range:Dictionary(Int64, Timestamp(ms))]\
\n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
\n Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
\n PromInstantManipulate: range=[-540000..100000000], lookback=[1000], interval=[60000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
\n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
\n Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
\n Filter: some_metric.timestamp >= TimestampMillisecond(-540999, None) AND some_metric.timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
\n TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]",
);
indie_query_plan_compare(query, expected).await;
}
#[tokio::test]
async fn test_hash_join() {
let mut eval_stmt = EvalStmt {