feat: support group by op in promql (#7663)

* feat: support group by op in promql

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* restrict to single field table

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
Ruihang Xia
2026-02-04 17:43:17 +08:00
committed by GitHub
parent f9030a84c8
commit 2dc4c294cf
3 changed files with 152 additions and 8 deletions

View File

@@ -28,7 +28,6 @@ use datafusion::datasource::DefaultTableSource;
use datafusion::functions_aggregate::average::avg_udaf;
use datafusion::functions_aggregate::count::count_udaf;
use datafusion::functions_aggregate::expr_fn::first_value;
use datafusion::functions_aggregate::grouping::grouping_udaf;
use datafusion::functions_aggregate::min_max::{max_udaf, min_udaf};
use datafusion::functions_aggregate::stddev::stddev_pop_udaf;
use datafusion::functions_aggregate::sum::sum_udaf;
@@ -2722,6 +2721,15 @@ impl PromPlanner {
input_plan: &LogicalPlan,
) -> Result<(Vec<DfExpr>, Vec<DfExpr>)> {
let mut non_col_args = Vec::new();
let is_group_agg = op.id() == token::T_GROUP;
if is_group_agg {
ensure!(
self.ctx.field_columns.len() == 1,
MultiFieldsNotSupportedSnafu {
operator: "group()"
}
);
}
let aggr = match op.id() {
token::T_SUM => sum_udaf(),
token::T_QUANTILE => {
@@ -2734,7 +2742,9 @@ impl PromPlanner {
token::T_COUNT_VALUES | token::T_COUNT => count_udaf(),
token::T_MIN => min_udaf(),
token::T_MAX => max_udaf(),
token::T_GROUP => grouping_udaf(),
// PromQL's `group()` aggregator produces 1 for each group.
// Use `max(1.0)` (per-group) to match semantics and output type (Float64).
token::T_GROUP => max_udaf(),
token::T_STDDEV => stddev_pop_udaf(),
token::T_STDVAR => var_pop_udaf(),
token::T_TOPK | token::T_BOTTOMK => UnsupportedExprSnafu {
@@ -2750,10 +2760,14 @@ impl PromPlanner {
.field_columns
.iter()
.map(|col| {
non_col_args.push(DfExpr::Column(Column::from_name(col)));
let expr = aggr.call(non_col_args.clone());
non_col_args.pop();
expr
if is_group_agg {
aggr.call(vec![lit(1_f64)])
} else {
non_col_args.push(DfExpr::Column(Column::from_name(col)));
let expr = aggr.call(non_col_args.clone());
non_col_args.pop();
expr
}
})
.collect::<Vec<_>>();
@@ -4970,9 +4984,39 @@ mod test {
}
#[tokio::test]
#[should_panic] // output type doesn't match
async fn aggregate_group() {
do_aggregate_expr_plan("grouping", "GROUPING").await;
// Regression test for `group()` aggregator.
// PromQL: sum(group by (cluster)(kubernetes_build_info{service="kubernetes",job="apiserver"}))
// should be plannable, and `group()` should produce constant 1 for each group.
let prom_expr = parser::parse(
"sum(group by (cluster)(kubernetes_build_info{service=\"kubernetes\",job=\"apiserver\"}))",
)
.unwrap();
let eval_stmt = EvalStmt {
expr: prom_expr,
start: UNIX_EPOCH,
end: UNIX_EPOCH
.checked_add(Duration::from_secs(100_000))
.unwrap(),
interval: Duration::from_secs(5),
lookback_delta: Duration::from_secs(1),
};
let table_provider = build_test_table_provider_with_fields(
&[(
DEFAULT_SCHEMA_NAME.to_string(),
"kubernetes_build_info".to_string(),
)],
&["cluster", "service", "job"],
)
.await;
let plan =
PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
.await
.unwrap();
let plan_str = plan.display_indent_schema().to_string();
assert!(plan_str.contains("max(Float64(1"));
}
#[tokio::test]

View File

@@ -0,0 +1,60 @@
-- Regression test for promql `group()` aggregator planning.
CREATE TABLE kubernetes_build_info (
ts timestamp(3) time index,
cluster_name STRING,
service_name STRING,
job_name STRING,
instance STRING,
val DOUBLE,
PRIMARY KEY(cluster_name, service_name, job_name, instance),
);
Affected Rows: 0
INSERT INTO TABLE kubernetes_build_info VALUES
(0, 'cluster_a', 'kubernetes', 'apiserver', '0', 123.0),
(0, 'cluster_a', 'kubernetes', 'apiserver', '1', 456.0),
(0, 'cluster_b', 'kubernetes', 'apiserver', '0', 789.0);
Affected Rows: 3
TQL EVAL (0, 0, '1s') sum(group by (cluster_name)(kubernetes_build_info{service_name="kubernetes",job_name="apiserver"}));
+---------------------+----------------------+
| ts | sum(max(Float64(1))) |
+---------------------+----------------------+
| 1970-01-01T00:00:00 | 2.0 |
+---------------------+----------------------+
DROP TABLE kubernetes_build_info;
Affected Rows: 0
-- `group()` doesn't support multi-field input without selecting a single field.
CREATE TABLE kubernetes_build_info_multi (
ts timestamp(3) time index,
cluster_name STRING,
service_name STRING,
job_name STRING,
instance STRING,
cpu DOUBLE,
mem DOUBLE,
PRIMARY KEY(cluster_name, service_name, job_name, instance),
);
Affected Rows: 0
INSERT INTO TABLE kubernetes_build_info_multi VALUES
(0, 'cluster_a', 'kubernetes', 'apiserver', '0', 1.0, 2.0),
(0, 'cluster_b', 'kubernetes', 'apiserver', '0', 3.0, 4.0);
Affected Rows: 2
TQL EVAL (0, 0, '1s') group by (cluster_name)(kubernetes_build_info_multi{service_name="kubernetes",job_name="apiserver"});
Error: 1001(Unsupported), Multi fields calculation is not supported in group()
DROP TABLE kubernetes_build_info_multi;
Affected Rows: 0

View File

@@ -0,0 +1,40 @@
-- Regression test for promql `group()` aggregator planning.
CREATE TABLE kubernetes_build_info (
ts timestamp(3) time index,
cluster_name STRING,
service_name STRING,
job_name STRING,
instance STRING,
val DOUBLE,
PRIMARY KEY(cluster_name, service_name, job_name, instance),
);
INSERT INTO TABLE kubernetes_build_info VALUES
(0, 'cluster_a', 'kubernetes', 'apiserver', '0', 123.0),
(0, 'cluster_a', 'kubernetes', 'apiserver', '1', 456.0),
(0, 'cluster_b', 'kubernetes', 'apiserver', '0', 789.0);
TQL EVAL (0, 0, '1s') sum(group by (cluster_name)(kubernetes_build_info{service_name="kubernetes",job_name="apiserver"}));
DROP TABLE kubernetes_build_info;
-- `group()` doesn't support multi-field input without selecting a single field.
CREATE TABLE kubernetes_build_info_multi (
ts timestamp(3) time index,
cluster_name STRING,
service_name STRING,
job_name STRING,
instance STRING,
cpu DOUBLE,
mem DOUBLE,
PRIMARY KEY(cluster_name, service_name, job_name, instance),
);
INSERT INTO TABLE kubernetes_build_info_multi VALUES
(0, 'cluster_a', 'kubernetes', 'apiserver', '0', 1.0, 2.0),
(0, 'cluster_b', 'kubernetes', 'apiserver', '0', 3.0, 4.0);
TQL EVAL (0, 0, '1s') group by (cluster_name)(kubernetes_build_info_multi{service_name="kubernetes",job_name="apiserver"});
DROP TABLE kubernetes_build_info_multi;