From c9671fd66926e74dce6aaa0451a1a3e1ae8d2ae9 Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Wed, 26 Feb 2025 19:28:04 -0800 Subject: [PATCH] feat(promql): implement subquery (#5606) * feat: initial implement for promql subquery Signed-off-by: Ruihang Xia * impl and test Signed-off-by: Ruihang Xia * refactor Signed-off-by: Ruihang Xia * fix clippy Signed-off-by: Ruihang Xia --------- Signed-off-by: Ruihang Xia --- src/query/src/promql/planner.rs | 49 ++++++++++++-- .../standalone/common/promql/subquery.result | 65 +++++++++++++++++++ .../standalone/common/promql/subquery.sql | 22 +++++++ 3 files changed, 132 insertions(+), 4 deletions(-) create mode 100644 tests/cases/standalone/common/promql/subquery.result create mode 100644 tests/cases/standalone/common/promql/subquery.sql diff --git a/src/query/src/promql/planner.rs b/src/query/src/promql/planner.rs index 42bf447e95..b1cb51e829 100644 --- a/src/query/src/promql/planner.rs +++ b/src/query/src/promql/planner.rs @@ -200,10 +200,9 @@ impl PromPlanner { PromExpr::Paren(ParenExpr { expr }) => { self.prom_expr_to_plan(expr, session_state).await? } - PromExpr::Subquery(SubqueryExpr { .. }) => UnsupportedExprSnafu { - name: "Prom Subquery", + PromExpr::Subquery(expr) => { + self.prom_subquery_expr_to_plan(session_state, expr).await? } - .fail()?, PromExpr::NumberLiteral(lit) => self.prom_number_lit_to_plan(lit)?, PromExpr::StringLiteral(lit) => self.prom_string_lit_to_plan(lit)?, PromExpr::VectorSelector(selector) => { @@ -218,6 +217,48 @@ impl PromPlanner { Ok(res) } + async fn prom_subquery_expr_to_plan( + &mut self, + session_state: &SessionState, + subquery_expr: &SubqueryExpr, + ) -> Result { + let SubqueryExpr { + expr, range, step, .. + } = subquery_expr; + + let current_interval = self.ctx.interval; + if let Some(step) = step { + self.ctx.interval = step.as_millis() as _; + } + let current_start = self.ctx.start; + self.ctx.start -= range.as_millis() as i64 - self.ctx.interval; + let input = self.prom_expr_to_plan(expr, session_state).await?; + self.ctx.interval = current_interval; + self.ctx.start = current_start; + + ensure!(!range.is_zero(), ZeroRangeSelectorSnafu); + let range_ms = range.as_millis() as _; + self.ctx.range = Some(range_ms); + + let manipulate = RangeManipulate::new( + self.ctx.start, + self.ctx.end, + self.ctx.interval, + range_ms, + self.ctx + .time_index_column + .clone() + .expect("time index should be set in `setup_context`"), + self.ctx.field_columns.clone(), + input, + ) + .context(DataFusionPlanningSnafu)?; + + Ok(LogicalPlan::Extension(Extension { + node: Arc::new(manipulate), + })) + } + async fn prom_aggr_expr_to_plan( &mut self, session_state: &SessionState, @@ -1674,7 +1715,7 @@ impl PromPlanner { ensure!( !src_labels.is_empty(), FunctionInvalidArgumentSnafu { - fn_name: "label_join", + fn_name: "label_join" } ); diff --git a/tests/cases/standalone/common/promql/subquery.result b/tests/cases/standalone/common/promql/subquery.result new file mode 100644 index 0000000000..d088468b17 --- /dev/null +++ b/tests/cases/standalone/common/promql/subquery.result @@ -0,0 +1,65 @@ +create table metric_total ( + ts timestamp time index, + val double, +); + +Affected Rows: 0 + +insert into metric_total values + (0, 1), + (10000, 2); + +Affected Rows: 2 + +tql eval (10, 10, '1s') sum_over_time(metric_total[50s:10s]); + ++---------------------+----------------------------------+ +| ts | prom_sum_over_time(ts_range,val) | ++---------------------+----------------------------------+ +| 1970-01-01T00:00:10 | 3.0 | ++---------------------+----------------------------------+ + +tql eval (10, 10, '1s') sum_over_time(metric_total[50s:5s]); + ++---------------------+----------------------------------+ +| ts | prom_sum_over_time(ts_range,val) | ++---------------------+----------------------------------+ +| 1970-01-01T00:00:10 | 4.0 | ++---------------------+----------------------------------+ + +tql eval (300, 300, '1s') sum_over_time(metric_total[50s:10s]); + ++---------------------+----------------------------------+ +| ts | prom_sum_over_time(ts_range,val) | ++---------------------+----------------------------------+ +| 1970-01-01T00:05:00 | 10.0 | ++---------------------+----------------------------------+ + +tql eval (359, 359, '1s') sum_over_time(metric_total[60s:10s]); + ++---------------------+----------------------------------+ +| ts | prom_sum_over_time(ts_range,val) | ++---------------------+----------------------------------+ +| 1970-01-01T00:05:59 | 2.0 | ++---------------------+----------------------------------+ + +tql eval (10, 10, '1s') rate(metric_total[20s:10s]); + ++---------------------+----------------------------+ +| ts | prom_rate(ts_range,val,ts) | ++---------------------+----------------------------+ +| 1970-01-01T00:00:10 | 0.1 | ++---------------------+----------------------------+ + +tql eval (20, 20, '1s') rate(metric_total[20s:5s]); + ++---------------------+----------------------------+ +| ts | prom_rate(ts_range,val,ts) | ++---------------------+----------------------------+ +| 1970-01-01T00:00:20 | 0.06666666666666667 | ++---------------------+----------------------------+ + +drop table metric_total; + +Affected Rows: 0 + diff --git a/tests/cases/standalone/common/promql/subquery.sql b/tests/cases/standalone/common/promql/subquery.sql new file mode 100644 index 0000000000..95215b8488 --- /dev/null +++ b/tests/cases/standalone/common/promql/subquery.sql @@ -0,0 +1,22 @@ +create table metric_total ( + ts timestamp time index, + val double, +); + +insert into metric_total values + (0, 1), + (10000, 2); + +tql eval (10, 10, '1s') sum_over_time(metric_total[50s:10s]); + +tql eval (10, 10, '1s') sum_over_time(metric_total[50s:5s]); + +tql eval (300, 300, '1s') sum_over_time(metric_total[50s:10s]); + +tql eval (359, 359, '1s') sum_over_time(metric_total[60s:10s]); + +tql eval (10, 10, '1s') rate(metric_total[20s:10s]); + +tql eval (20, 20, '1s') rate(metric_total[20s:5s]); + +drop table metric_total;