From f6db419afde3bfdc21744343846310ba99eda67b Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Thu, 12 Jun 2025 16:18:10 +0800 Subject: [PATCH] feat: support using expressions as literal in PromQL (#6297) Signed-off-by: Ruihang Xia --- src/query/src/promql/planner.rs | 117 +++++++++++------- .../standalone/common/promql/quantile.result | 11 ++ .../standalone/common/promql/quantile.sql | 2 + 3 files changed, 88 insertions(+), 42 deletions(-) diff --git a/src/query/src/promql/planner.rs b/src/query/src/promql/planner.rs index 6df5bdbbba..f20e218ba6 100644 --- a/src/query/src/promql/planner.rs +++ b/src/query/src/promql/planner.rs @@ -41,8 +41,9 @@ use datafusion::prelude as df_prelude; use datafusion::prelude::{Column, Expr as DfExpr, JoinType}; use datafusion::scalar::ScalarValue; use datafusion::sql::TableReference; +use datafusion_common::DFSchema; use datafusion_expr::utils::conjunction; -use datafusion_expr::{col, lit, SortExpr}; +use datafusion_expr::{col, lit, ExprSchemable, SortExpr}; use datatypes::arrow::datatypes::{DataType as ArrowDataType, TimeUnit as ArrowTimeUnit}; use datatypes::data_type::ConcreteDataType; use itertools::Itertools; @@ -336,7 +337,7 @@ impl PromPlanner { let group_exprs = self.agg_modifier_to_col(input.schema(), modifier, false)?; - let val = Self::get_param_value_as_f64(*op, param)?; + let val = Self::get_param_as_literal_expr(param, Some(*op), Some(ArrowDataType::Float64))?; // convert op and value columns to window exprs. let window_exprs = self.create_window_exprs(*op, group_exprs.clone(), &input)?; @@ -354,7 +355,7 @@ impl PromPlanner { let predicate = DfExpr::BinaryExpr(BinaryExpr { left: Box::new(col(rank)), op: Operator::LtEq, - right: Box::new(lit(val)), + right: Box::new(val.clone()), }); match expr { @@ -1946,8 +1947,9 @@ impl PromPlanner { let aggr = match op.id() { token::T_SUM => sum_udaf(), token::T_QUANTILE => { - let q = Self::get_param_value_as_f64(op, param)?; - non_col_args.push(lit(q)); + let q = + Self::get_param_as_literal_expr(param, Some(op), Some(ArrowDataType::Float64))?; + non_col_args.push(q); quantile_udaf() } token::T_AVG => avg_udaf(), @@ -2027,20 +2029,50 @@ impl PromPlanner { Ok(val) } - fn get_param_value_as_f64(op: TokenType, param: &Option>) -> Result { - let param = param - .as_deref() - .with_context(|| FunctionInvalidArgumentSnafu { - fn_name: op.to_string(), - })?; - let PromExpr::NumberLiteral(NumberLiteral { val }) = param else { - return FunctionInvalidArgumentSnafu { - fn_name: op.to_string(), + fn get_param_as_literal_expr( + param: &Option>, + op: Option, + expected_type: Option, + ) -> Result { + let prom_param = param.as_deref().with_context(|| { + if let Some(op) = op { + FunctionInvalidArgumentSnafu { + fn_name: op.to_string(), + } + } else { + FunctionInvalidArgumentSnafu { + fn_name: "unknown".to_string(), + } } - .fail(); - }; + })?; - Ok(*val) + let expr = Self::try_build_literal_expr(prom_param).with_context(|| { + if let Some(op) = op { + FunctionInvalidArgumentSnafu { + fn_name: op.to_string(), + } + } else { + FunctionInvalidArgumentSnafu { + fn_name: "unknown".to_string(), + } + } + })?; + + // check if the type is expected + if let Some(expected_type) = expected_type { + // literal should not have reference to column + let expr_type = expr + .get_type(&DFSchema::empty()) + .context(DataFusionPlanningSnafu)?; + if expected_type != expr_type { + return FunctionInvalidArgumentSnafu { + fn_name: format!("expected {expected_type:?}, but found {expr_type:?}"), + } + .fail(); + } + } + + Ok(expr) } /// Create [DfExpr::WindowFunction] expr for each value column with given window function. @@ -2096,6 +2128,28 @@ impl PromPlanner { Ok(normalized_exprs) } + /// Try to build a [f64] from [PromExpr]. + #[deprecated( + note = "use `Self::get_param_as_literal_expr` instead. This is only for `create_histogram_plan`" + )] + fn try_build_float_literal(expr: &PromExpr) -> Option { + match expr { + PromExpr::NumberLiteral(NumberLiteral { val }) => Some(*val), + PromExpr::Paren(ParenExpr { expr }) => Self::try_build_float_literal(expr), + PromExpr::Unary(UnaryExpr { expr, .. }) => { + Self::try_build_float_literal(expr).map(|f| -f) + } + PromExpr::StringLiteral(_) + | PromExpr::Binary(_) + | PromExpr::VectorSelector(_) + | PromExpr::MatrixSelector(_) + | PromExpr::Call(_) + | PromExpr::Extension(_) + | PromExpr::Aggregate(_) + | PromExpr::Subquery(_) => None, + } + } + /// Create a [SPECIAL_HISTOGRAM_QUANTILE] plan. async fn create_histogram_plan( &mut self, @@ -2108,11 +2162,13 @@ impl PromPlanner { } .fail(); } + #[allow(deprecated)] let phi = Self::try_build_float_literal(&args.args[0]).with_context(|| { FunctionInvalidArgumentSnafu { fn_name: SPECIAL_HISTOGRAM_QUANTILE.to_string(), } })?; + let input = args.args[1].as_ref().clone(); let input_plan = self.prom_expr_to_plan(&input, session_state).await?; @@ -2163,11 +2219,7 @@ impl PromPlanner { } .fail(); } - let lit = Self::try_build_float_literal(&args.args[0]).with_context(|| { - FunctionInvalidArgumentSnafu { - fn_name: SPECIAL_VECTOR_FUNCTION.to_string(), - } - })?; + let lit = Self::get_param_as_literal_expr(&Some(args.args[0].clone()), None, None)?; // reuse `SPECIAL_TIME_FUNCTION` as name of time index column self.ctx.time_index_column = Some(SPECIAL_TIME_FUNCTION.to_string()); @@ -2182,7 +2234,7 @@ impl PromPlanner { self.ctx.interval, SPECIAL_TIME_FUNCTION.to_string(), GREPTIME_VALUE.to_string(), - Some(DfExpr::Literal(ScalarValue::Float64(Some(lit)))), + Some(lit), ) .context(DataFusionPlanningSnafu)?, ), @@ -2301,25 +2353,6 @@ impl PromPlanner { } } - /// Try to build a [f64] from [PromExpr]. - fn try_build_float_literal(expr: &PromExpr) -> Option { - match expr { - PromExpr::NumberLiteral(NumberLiteral { val }) => Some(*val), - PromExpr::Paren(ParenExpr { expr }) => Self::try_build_float_literal(expr), - PromExpr::Unary(UnaryExpr { expr, .. }) => { - Self::try_build_float_literal(expr).map(|f| -f) - } - PromExpr::StringLiteral(_) - | PromExpr::Binary(_) - | PromExpr::VectorSelector(_) - | PromExpr::MatrixSelector(_) - | PromExpr::Call(_) - | PromExpr::Extension(_) - | PromExpr::Aggregate(_) - | PromExpr::Subquery(_) => None, - } - } - /// Return a lambda to build binary expression from token. /// Because some binary operator are function in DataFusion like `atan2` or `^`. #[allow(clippy::type_complexity)] diff --git a/tests/cases/standalone/common/promql/quantile.result b/tests/cases/standalone/common/promql/quantile.result index c3aa1ef1ec..f321a22743 100644 --- a/tests/cases/standalone/common/promql/quantile.result +++ b/tests/cases/standalone/common/promql/quantile.result @@ -65,6 +65,17 @@ TQL EVAL (0, 15, '5s') quantile(0.5, sum(test) by (idc)); | 1970-01-01T00:00:15 | 29.0 | +---------------------+--------------------------------------+ +TQL EVAL (0, 15, '5s') quantile(0.25 + 0.1 + 0.15, sum(test) by (idc)); + ++---------------------+----------------------------------------------------------------------+ +| ts | quantile(Float64(0.25) + Float64(0.1) + Float64(0.15),sum(test.val)) | ++---------------------+----------------------------------------------------------------------+ +| 1970-01-01T00:00:00 | 5.0 | +| 1970-01-01T00:00:05 | 13.0 | +| 1970-01-01T00:00:10 | 21.0 | +| 1970-01-01T00:00:15 | 29.0 | ++---------------------+----------------------------------------------------------------------+ + DROP TABLE test; Affected Rows: 0 diff --git a/tests/cases/standalone/common/promql/quantile.sql b/tests/cases/standalone/common/promql/quantile.sql index 4bef39c757..a96cac688f 100644 --- a/tests/cases/standalone/common/promql/quantile.sql +++ b/tests/cases/standalone/common/promql/quantile.sql @@ -30,4 +30,6 @@ TQL EVAL (0, 15, '5s') quantile(0.5, test) by (idc); TQL EVAL (0, 15, '5s') quantile(0.5, sum(test) by (idc)); +TQL EVAL (0, 15, '5s') quantile(0.25 + 0.1 + 0.15, sum(test) by (idc)); + DROP TABLE test;