feat: support using expressions as literal in PromQL (#6297)

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
Ruihang Xia
2025-06-12 16:18:10 +08:00
committed by GitHub
parent 05b708ed2e
commit f6db419afd
3 changed files with 88 additions and 42 deletions

View File

@@ -41,8 +41,9 @@ use datafusion::prelude as df_prelude;
use datafusion::prelude::{Column, Expr as DfExpr, JoinType};
use datafusion::scalar::ScalarValue;
use datafusion::sql::TableReference;
use datafusion_common::DFSchema;
use datafusion_expr::utils::conjunction;
use datafusion_expr::{col, lit, SortExpr};
use datafusion_expr::{col, lit, ExprSchemable, SortExpr};
use datatypes::arrow::datatypes::{DataType as ArrowDataType, TimeUnit as ArrowTimeUnit};
use datatypes::data_type::ConcreteDataType;
use itertools::Itertools;
@@ -336,7 +337,7 @@ impl PromPlanner {
let group_exprs = self.agg_modifier_to_col(input.schema(), modifier, false)?;
let val = Self::get_param_value_as_f64(*op, param)?;
let val = Self::get_param_as_literal_expr(param, Some(*op), Some(ArrowDataType::Float64))?;
// convert op and value columns to window exprs.
let window_exprs = self.create_window_exprs(*op, group_exprs.clone(), &input)?;
@@ -354,7 +355,7 @@ impl PromPlanner {
let predicate = DfExpr::BinaryExpr(BinaryExpr {
left: Box::new(col(rank)),
op: Operator::LtEq,
right: Box::new(lit(val)),
right: Box::new(val.clone()),
});
match expr {
@@ -1946,8 +1947,9 @@ impl PromPlanner {
let aggr = match op.id() {
token::T_SUM => sum_udaf(),
token::T_QUANTILE => {
let q = Self::get_param_value_as_f64(op, param)?;
non_col_args.push(lit(q));
let q =
Self::get_param_as_literal_expr(param, Some(op), Some(ArrowDataType::Float64))?;
non_col_args.push(q);
quantile_udaf()
}
token::T_AVG => avg_udaf(),
@@ -2027,20 +2029,50 @@ impl PromPlanner {
Ok(val)
}
fn get_param_value_as_f64(op: TokenType, param: &Option<Box<PromExpr>>) -> Result<f64> {
let param = param
.as_deref()
.with_context(|| FunctionInvalidArgumentSnafu {
fn_name: op.to_string(),
})?;
let PromExpr::NumberLiteral(NumberLiteral { val }) = param else {
return FunctionInvalidArgumentSnafu {
fn_name: op.to_string(),
fn get_param_as_literal_expr(
param: &Option<Box<PromExpr>>,
op: Option<TokenType>,
expected_type: Option<ArrowDataType>,
) -> Result<DfExpr> {
let prom_param = param.as_deref().with_context(|| {
if let Some(op) = op {
FunctionInvalidArgumentSnafu {
fn_name: op.to_string(),
}
} else {
FunctionInvalidArgumentSnafu {
fn_name: "unknown".to_string(),
}
}
.fail();
};
})?;
Ok(*val)
let expr = Self::try_build_literal_expr(prom_param).with_context(|| {
if let Some(op) = op {
FunctionInvalidArgumentSnafu {
fn_name: op.to_string(),
}
} else {
FunctionInvalidArgumentSnafu {
fn_name: "unknown".to_string(),
}
}
})?;
// check if the type is expected
if let Some(expected_type) = expected_type {
// literal should not have reference to column
let expr_type = expr
.get_type(&DFSchema::empty())
.context(DataFusionPlanningSnafu)?;
if expected_type != expr_type {
return FunctionInvalidArgumentSnafu {
fn_name: format!("expected {expected_type:?}, but found {expr_type:?}"),
}
.fail();
}
}
Ok(expr)
}
/// Create [DfExpr::WindowFunction] expr for each value column with given window function.
@@ -2096,6 +2128,28 @@ impl PromPlanner {
Ok(normalized_exprs)
}
/// Try to build a [f64] from [PromExpr].
#[deprecated(
note = "use `Self::get_param_as_literal_expr` instead. This is only for `create_histogram_plan`"
)]
fn try_build_float_literal(expr: &PromExpr) -> Option<f64> {
match expr {
PromExpr::NumberLiteral(NumberLiteral { val }) => Some(*val),
PromExpr::Paren(ParenExpr { expr }) => Self::try_build_float_literal(expr),
PromExpr::Unary(UnaryExpr { expr, .. }) => {
Self::try_build_float_literal(expr).map(|f| -f)
}
PromExpr::StringLiteral(_)
| PromExpr::Binary(_)
| PromExpr::VectorSelector(_)
| PromExpr::MatrixSelector(_)
| PromExpr::Call(_)
| PromExpr::Extension(_)
| PromExpr::Aggregate(_)
| PromExpr::Subquery(_) => None,
}
}
/// Create a [SPECIAL_HISTOGRAM_QUANTILE] plan.
async fn create_histogram_plan(
&mut self,
@@ -2108,11 +2162,13 @@ impl PromPlanner {
}
.fail();
}
#[allow(deprecated)]
let phi = Self::try_build_float_literal(&args.args[0]).with_context(|| {
FunctionInvalidArgumentSnafu {
fn_name: SPECIAL_HISTOGRAM_QUANTILE.to_string(),
}
})?;
let input = args.args[1].as_ref().clone();
let input_plan = self.prom_expr_to_plan(&input, session_state).await?;
@@ -2163,11 +2219,7 @@ impl PromPlanner {
}
.fail();
}
let lit = Self::try_build_float_literal(&args.args[0]).with_context(|| {
FunctionInvalidArgumentSnafu {
fn_name: SPECIAL_VECTOR_FUNCTION.to_string(),
}
})?;
let lit = Self::get_param_as_literal_expr(&Some(args.args[0].clone()), None, None)?;
// reuse `SPECIAL_TIME_FUNCTION` as name of time index column
self.ctx.time_index_column = Some(SPECIAL_TIME_FUNCTION.to_string());
@@ -2182,7 +2234,7 @@ impl PromPlanner {
self.ctx.interval,
SPECIAL_TIME_FUNCTION.to_string(),
GREPTIME_VALUE.to_string(),
Some(DfExpr::Literal(ScalarValue::Float64(Some(lit)))),
Some(lit),
)
.context(DataFusionPlanningSnafu)?,
),
@@ -2301,25 +2353,6 @@ impl PromPlanner {
}
}
/// Try to build a [f64] from [PromExpr].
fn try_build_float_literal(expr: &PromExpr) -> Option<f64> {
match expr {
PromExpr::NumberLiteral(NumberLiteral { val }) => Some(*val),
PromExpr::Paren(ParenExpr { expr }) => Self::try_build_float_literal(expr),
PromExpr::Unary(UnaryExpr { expr, .. }) => {
Self::try_build_float_literal(expr).map(|f| -f)
}
PromExpr::StringLiteral(_)
| PromExpr::Binary(_)
| PromExpr::VectorSelector(_)
| PromExpr::MatrixSelector(_)
| PromExpr::Call(_)
| PromExpr::Extension(_)
| PromExpr::Aggregate(_)
| PromExpr::Subquery(_) => None,
}
}
/// Return a lambda to build binary expression from token.
/// Because some binary operator are function in DataFusion like `atan2` or `^`.
#[allow(clippy::type_complexity)]

View File

@@ -65,6 +65,17 @@ TQL EVAL (0, 15, '5s') quantile(0.5, sum(test) by (idc));
| 1970-01-01T00:00:15 | 29.0 |
+---------------------+--------------------------------------+
TQL EVAL (0, 15, '5s') quantile(0.25 + 0.1 + 0.15, sum(test) by (idc));
+---------------------+----------------------------------------------------------------------+
| ts | quantile(Float64(0.25) + Float64(0.1) + Float64(0.15),sum(test.val)) |
+---------------------+----------------------------------------------------------------------+
| 1970-01-01T00:00:00 | 5.0 |
| 1970-01-01T00:00:05 | 13.0 |
| 1970-01-01T00:00:10 | 21.0 |
| 1970-01-01T00:00:15 | 29.0 |
+---------------------+----------------------------------------------------------------------+
DROP TABLE test;
Affected Rows: 0

View File

@@ -30,4 +30,6 @@ TQL EVAL (0, 15, '5s') quantile(0.5, test) by (idc);
TQL EVAL (0, 15, '5s') quantile(0.5, sum(test) by (idc));
TQL EVAL (0, 15, '5s') quantile(0.25 + 0.1 + 0.15, sum(test) by (idc));
DROP TABLE test;