feat(promql): implement subquery (#5606)

* feat: initial implement for promql subquery

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* impl and test

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* refactor

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix clippy

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
Ruihang Xia
2025-02-26 19:28:04 -08:00
committed by GitHub
parent b5efc75aab
commit c9671fd669
3 changed files with 132 additions and 4 deletions

View File

@@ -200,10 +200,9 @@ impl PromPlanner {
PromExpr::Paren(ParenExpr { expr }) => {
self.prom_expr_to_plan(expr, session_state).await?
}
PromExpr::Subquery(SubqueryExpr { .. }) => UnsupportedExprSnafu {
name: "Prom Subquery",
PromExpr::Subquery(expr) => {
self.prom_subquery_expr_to_plan(session_state, expr).await?
}
.fail()?,
PromExpr::NumberLiteral(lit) => self.prom_number_lit_to_plan(lit)?,
PromExpr::StringLiteral(lit) => self.prom_string_lit_to_plan(lit)?,
PromExpr::VectorSelector(selector) => {
@@ -218,6 +217,48 @@ impl PromPlanner {
Ok(res)
}
async fn prom_subquery_expr_to_plan(
&mut self,
session_state: &SessionState,
subquery_expr: &SubqueryExpr,
) -> Result<LogicalPlan> {
let SubqueryExpr {
expr, range, step, ..
} = subquery_expr;
let current_interval = self.ctx.interval;
if let Some(step) = step {
self.ctx.interval = step.as_millis() as _;
}
let current_start = self.ctx.start;
self.ctx.start -= range.as_millis() as i64 - self.ctx.interval;
let input = self.prom_expr_to_plan(expr, session_state).await?;
self.ctx.interval = current_interval;
self.ctx.start = current_start;
ensure!(!range.is_zero(), ZeroRangeSelectorSnafu);
let range_ms = range.as_millis() as _;
self.ctx.range = Some(range_ms);
let manipulate = RangeManipulate::new(
self.ctx.start,
self.ctx.end,
self.ctx.interval,
range_ms,
self.ctx
.time_index_column
.clone()
.expect("time index should be set in `setup_context`"),
self.ctx.field_columns.clone(),
input,
)
.context(DataFusionPlanningSnafu)?;
Ok(LogicalPlan::Extension(Extension {
node: Arc::new(manipulate),
}))
}
async fn prom_aggr_expr_to_plan(
&mut self,
session_state: &SessionState,
@@ -1674,7 +1715,7 @@ impl PromPlanner {
ensure!(
!src_labels.is_empty(),
FunctionInvalidArgumentSnafu {
fn_name: "label_join",
fn_name: "label_join"
}
);

View File

@@ -0,0 +1,65 @@
create table metric_total (
ts timestamp time index,
val double,
);
Affected Rows: 0
insert into metric_total values
(0, 1),
(10000, 2);
Affected Rows: 2
tql eval (10, 10, '1s') sum_over_time(metric_total[50s:10s]);
+---------------------+----------------------------------+
| ts | prom_sum_over_time(ts_range,val) |
+---------------------+----------------------------------+
| 1970-01-01T00:00:10 | 3.0 |
+---------------------+----------------------------------+
tql eval (10, 10, '1s') sum_over_time(metric_total[50s:5s]);
+---------------------+----------------------------------+
| ts | prom_sum_over_time(ts_range,val) |
+---------------------+----------------------------------+
| 1970-01-01T00:00:10 | 4.0 |
+---------------------+----------------------------------+
tql eval (300, 300, '1s') sum_over_time(metric_total[50s:10s]);
+---------------------+----------------------------------+
| ts | prom_sum_over_time(ts_range,val) |
+---------------------+----------------------------------+
| 1970-01-01T00:05:00 | 10.0 |
+---------------------+----------------------------------+
tql eval (359, 359, '1s') sum_over_time(metric_total[60s:10s]);
+---------------------+----------------------------------+
| ts | prom_sum_over_time(ts_range,val) |
+---------------------+----------------------------------+
| 1970-01-01T00:05:59 | 2.0 |
+---------------------+----------------------------------+
tql eval (10, 10, '1s') rate(metric_total[20s:10s]);
+---------------------+----------------------------+
| ts | prom_rate(ts_range,val,ts) |
+---------------------+----------------------------+
| 1970-01-01T00:00:10 | 0.1 |
+---------------------+----------------------------+
tql eval (20, 20, '1s') rate(metric_total[20s:5s]);
+---------------------+----------------------------+
| ts | prom_rate(ts_range,val,ts) |
+---------------------+----------------------------+
| 1970-01-01T00:00:20 | 0.06666666666666667 |
+---------------------+----------------------------+
drop table metric_total;
Affected Rows: 0

View File

@@ -0,0 +1,22 @@
create table metric_total (
ts timestamp time index,
val double,
);
insert into metric_total values
(0, 1),
(10000, 2);
tql eval (10, 10, '1s') sum_over_time(metric_total[50s:10s]);
tql eval (10, 10, '1s') sum_over_time(metric_total[50s:5s]);
tql eval (300, 300, '1s') sum_over_time(metric_total[50s:10s]);
tql eval (359, 359, '1s') sum_over_time(metric_total[60s:10s]);
tql eval (10, 10, '1s') rate(metric_total[20s:10s]);
tql eval (20, 20, '1s') rate(metric_total[20s:5s]);
drop table metric_total;