feat: impl some promql scalar functions (#6567)

* feat: impl some promql scalar functions

Signed-off-by: Dennis Zhuang <killme2008@gmail.com>

* feat: supports pi function

Signed-off-by: Dennis Zhuang <killme2008@gmail.com>

* chore: by cr comments

Signed-off-by: Dennis Zhuang <killme2008@gmail.com>

* fix: compile

Signed-off-by: Dennis Zhuang <killme2008@gmail.com>

---------

Signed-off-by: Dennis Zhuang <killme2008@gmail.com>
This commit is contained in:
dennis zhuang
2025-07-29 11:29:00 +08:00
committed by GitHub
parent b7fd4ca65d
commit 086777d938
8 changed files with 854 additions and 111 deletions

View File

@@ -172,7 +172,7 @@ impl DfLogicalPlanner {
.sql_parser
.enable_ident_normalization,
);
PromPlanner::stmt_to_plan(table_provider, stmt, &self.session_state)
PromPlanner::stmt_to_plan(table_provider, stmt, &self.engine_state)
.await
.map_err(BoxedError::new)
.context(QueryPlanSnafu)

View File

@@ -21,10 +21,10 @@ use async_recursion::async_recursion;
use catalog::table_source::DfTableSourceProvider;
use common_error::ext::ErrorExt;
use common_error::status_code::StatusCode;
use common_function::function::FunctionContext;
use common_query::prelude::GREPTIME_VALUE;
use datafusion::common::DFSchemaRef;
use datafusion::datasource::DefaultTableSource;
use datafusion::execution::context::SessionState;
use datafusion::functions_aggregate::average::avg_udaf;
use datafusion::functions_aggregate::count::count_udaf;
use datafusion::functions_aggregate::grouping::grouping_udaf;
@@ -81,6 +81,7 @@ use crate::promql::error::{
UnsupportedExprSnafu, UnsupportedMatcherOpSnafu, UnsupportedVectorMatchSnafu,
ValueNotFoundSnafu, ZeroRangeSelectorSnafu,
};
use crate::query_engine::QueryEngineState;
/// `time()` function in PromQL.
const SPECIAL_TIME_FUNCTION: &str = "time";
@@ -181,22 +182,24 @@ impl PromPlanner {
pub async fn stmt_to_plan(
table_provider: DfTableSourceProvider,
stmt: &EvalStmt,
session_state: &SessionState,
query_engine_state: &QueryEngineState,
) -> Result<LogicalPlan> {
let mut planner = Self {
table_provider,
ctx: PromPlannerContext::from_eval_stmt(stmt),
};
planner.prom_expr_to_plan(&stmt.expr, session_state).await
planner
.prom_expr_to_plan(&stmt.expr, query_engine_state)
.await
}
pub async fn prom_expr_to_plan(
&mut self,
prom_expr: &PromExpr,
session_state: &SessionState,
query_engine_state: &QueryEngineState,
) -> Result<LogicalPlan> {
self.prom_expr_to_plan_inner(prom_expr, false, session_state)
self.prom_expr_to_plan_inner(prom_expr, false, query_engine_state)
.await
}
@@ -214,18 +217,28 @@ impl PromPlanner {
&mut self,
prom_expr: &PromExpr,
timestamp_fn: bool,
session_state: &SessionState,
query_engine_state: &QueryEngineState,
) -> Result<LogicalPlan> {
let res = match prom_expr {
PromExpr::Aggregate(expr) => self.prom_aggr_expr_to_plan(session_state, expr).await?,
PromExpr::Unary(expr) => self.prom_unary_expr_to_plan(session_state, expr).await?,
PromExpr::Binary(expr) => self.prom_binary_expr_to_plan(session_state, expr).await?,
PromExpr::Aggregate(expr) => {
self.prom_aggr_expr_to_plan(query_engine_state, expr)
.await?
}
PromExpr::Unary(expr) => {
self.prom_unary_expr_to_plan(query_engine_state, expr)
.await?
}
PromExpr::Binary(expr) => {
self.prom_binary_expr_to_plan(query_engine_state, expr)
.await?
}
PromExpr::Paren(ParenExpr { expr }) => {
self.prom_expr_to_plan_inner(expr, timestamp_fn, session_state)
self.prom_expr_to_plan_inner(expr, timestamp_fn, query_engine_state)
.await?
}
PromExpr::Subquery(expr) => {
self.prom_subquery_expr_to_plan(session_state, expr).await?
self.prom_subquery_expr_to_plan(query_engine_state, expr)
.await?
}
PromExpr::NumberLiteral(lit) => self.prom_number_lit_to_plan(lit)?,
PromExpr::StringLiteral(lit) => self.prom_string_lit_to_plan(lit)?,
@@ -236,8 +249,13 @@ impl PromPlanner {
PromExpr::MatrixSelector(selector) => {
self.prom_matrix_selector_to_plan(selector).await?
}
PromExpr::Call(expr) => self.prom_call_expr_to_plan(session_state, expr).await?,
PromExpr::Extension(expr) => self.prom_ext_expr_to_plan(session_state, expr).await?,
PromExpr::Call(expr) => {
self.prom_call_expr_to_plan(query_engine_state, expr)
.await?
}
PromExpr::Extension(expr) => {
self.prom_ext_expr_to_plan(query_engine_state, expr).await?
}
};
Ok(res)
@@ -245,7 +263,7 @@ impl PromPlanner {
async fn prom_subquery_expr_to_plan(
&mut self,
session_state: &SessionState,
query_engine_state: &QueryEngineState,
subquery_expr: &SubqueryExpr,
) -> Result<LogicalPlan> {
let SubqueryExpr {
@@ -258,7 +276,7 @@ impl PromPlanner {
}
let current_start = self.ctx.start;
self.ctx.start -= range.as_millis() as i64 - self.ctx.interval;
let input = self.prom_expr_to_plan(expr, session_state).await?;
let input = self.prom_expr_to_plan(expr, query_engine_state).await?;
self.ctx.interval = current_interval;
self.ctx.start = current_start;
@@ -287,7 +305,7 @@ impl PromPlanner {
async fn prom_aggr_expr_to_plan(
&mut self,
session_state: &SessionState,
query_engine_state: &QueryEngineState,
aggr_expr: &AggregateExpr,
) -> Result<LogicalPlan> {
let AggregateExpr {
@@ -297,7 +315,7 @@ impl PromPlanner {
param,
} = aggr_expr;
let input = self.prom_expr_to_plan(expr, session_state).await?;
let input = self.prom_expr_to_plan(expr, query_engine_state).await?;
match (*op).id() {
token::T_TOPK | token::T_BOTTOMK => {
@@ -425,12 +443,12 @@ impl PromPlanner {
async fn prom_unary_expr_to_plan(
&mut self,
session_state: &SessionState,
query_engine_state: &QueryEngineState,
unary_expr: &UnaryExpr,
) -> Result<LogicalPlan> {
let UnaryExpr { expr } = unary_expr;
// Unary Expr in PromQL implys the `-` operator
let input = self.prom_expr_to_plan(expr, session_state).await?;
let input = self.prom_expr_to_plan(expr, query_engine_state).await?;
self.projection_for_each_field_column(input, |col| {
Ok(DfExpr::Negative(Box::new(DfExpr::Column(col.into()))))
})
@@ -438,7 +456,7 @@ impl PromPlanner {
async fn prom_binary_expr_to_plan(
&mut self,
session_state: &SessionState,
query_engine_state: &QueryEngineState,
binary_expr: &PromBinaryExpr,
) -> Result<LogicalPlan> {
let PromBinaryExpr {
@@ -493,7 +511,7 @@ impl PromPlanner {
}
// lhs is a literal, rhs is a column
(Some(mut expr), None) => {
let input = self.prom_expr_to_plan(rhs, session_state).await?;
let input = self.prom_expr_to_plan(rhs, query_engine_state).await?;
// check if the literal is a special time expr
if let Some(time_expr) = Self::try_build_special_time_expr(
lhs,
@@ -522,7 +540,7 @@ impl PromPlanner {
}
// lhs is a column, rhs is a literal
(None, Some(mut expr)) => {
let input = self.prom_expr_to_plan(lhs, session_state).await?;
let input = self.prom_expr_to_plan(lhs, query_engine_state).await?;
// check if the literal is a special time expr
if let Some(time_expr) = Self::try_build_special_time_expr(
rhs,
@@ -551,7 +569,7 @@ impl PromPlanner {
}
// both are columns. join them on time index
(None, None) => {
let left_input = self.prom_expr_to_plan(lhs, session_state).await?;
let left_input = self.prom_expr_to_plan(lhs, query_engine_state).await?;
let left_field_columns = self.ctx.field_columns.clone();
let left_time_index_column = self.ctx.time_index_column.clone();
let mut left_table_ref = self
@@ -559,7 +577,7 @@ impl PromPlanner {
.unwrap_or_else(|_| TableReference::bare(""));
let left_context = self.ctx.clone();
let right_input = self.prom_expr_to_plan(rhs, session_state).await?;
let right_input = self.prom_expr_to_plan(rhs, query_engine_state).await?;
let right_field_columns = self.ctx.field_columns.clone();
let right_time_index_column = self.ctx.time_index_column.clone();
let mut right_table_ref = self
@@ -819,24 +837,24 @@ impl PromPlanner {
async fn prom_call_expr_to_plan(
&mut self,
session_state: &SessionState,
query_engine_state: &QueryEngineState,
call_expr: &Call,
) -> Result<LogicalPlan> {
let Call { func, args } = call_expr;
// some special functions that are not expression but a plan
match func.name {
SPECIAL_HISTOGRAM_QUANTILE => {
return self.create_histogram_plan(args, session_state).await
return self.create_histogram_plan(args, query_engine_state).await
}
SPECIAL_VECTOR_FUNCTION => return self.create_vector_plan(args).await,
SCALAR_FUNCTION => return self.create_scalar_plan(args, session_state).await,
SCALAR_FUNCTION => return self.create_scalar_plan(args, query_engine_state).await,
_ => {}
}
// transform function arguments
let args = self.create_function_args(&args.args)?;
let input = if let Some(prom_expr) = &args.input {
self.prom_expr_to_plan_inner(prom_expr, func.name == "timestamp", session_state)
self.prom_expr_to_plan_inner(prom_expr, func.name == "timestamp", query_engine_state)
.await?
} else {
self.ctx.time_index_column = Some(SPECIAL_TIME_FUNCTION.to_string());
@@ -858,7 +876,7 @@ impl PromPlanner {
})
};
let (mut func_exprs, new_tags) =
self.create_function_expr(func, args.literals.clone(), session_state)?;
self.create_function_expr(func, args.literals.clone(), query_engine_state)?;
func_exprs.insert(0, self.create_time_index_column_expr()?);
func_exprs.extend_from_slice(&self.create_tag_column_exprs()?);
@@ -899,18 +917,23 @@ impl PromPlanner {
self.ctx.tag_columns.push(tag);
}
builder.build().context(DataFusionPlanningSnafu)
let plan = builder.build().context(DataFusionPlanningSnafu)?;
common_telemetry::debug!("Created PromQL function plan: {plan:?} for {call_expr:?}");
Ok(plan)
}
async fn prom_ext_expr_to_plan(
&mut self,
session_state: &SessionState,
query_engine_state: &QueryEngineState,
ext_expr: &promql_parser::parser::ast::Extension,
) -> Result<LogicalPlan> {
// let promql_parser::parser::ast::Extension { expr } = ext_expr;
let expr = &ext_expr.expr;
let children = expr.children();
let plan = self.prom_expr_to_plan(&children[0], session_state).await?;
let plan = self
.prom_expr_to_plan(&children[0], query_engine_state)
.await?;
// Wrapper for the explanation/analyze of the existing plan
// https://docs.rs/datafusion-expr/latest/datafusion_expr/logical_plan/builder/struct.LogicalPlanBuilder.html#method.explain
// if `analyze` is true, runs the actual plan and produces
@@ -1544,7 +1567,7 @@ impl PromPlanner {
&mut self,
func: &Function,
other_input_exprs: Vec<DfExpr>,
session_state: &SessionState,
query_engine_state: &QueryEngineState,
) -> Result<(Vec<DfExpr>, Vec<String>)> {
// TODO(ruihang): check function args list
let mut other_input_exprs: VecDeque<DfExpr> = other_input_exprs.into();
@@ -1676,7 +1699,7 @@ impl PromPlanner {
"label_join" => {
let (concat_expr, dst_label) =
Self::build_concat_labels_expr(&mut other_input_exprs, session_state)?;
Self::build_concat_labels_expr(&mut other_input_exprs, query_engine_state)?;
// Reserve the current field columns except the `dst_label`.
for value in &self.ctx.field_columns {
@@ -1695,8 +1718,8 @@ impl PromPlanner {
ScalarFunc::GeneratedExpr
}
"label_replace" => {
if let Some((replace_expr, dst_label)) =
self.build_regexp_replace_label_expr(&mut other_input_exprs, session_state)?
if let Some((replace_expr, dst_label)) = self
.build_regexp_replace_label_expr(&mut other_input_exprs, query_engine_state)?
{
// Reserve the current field columns except the `dst_label`.
for value in &self.ctx.field_columns {
@@ -1739,10 +1762,34 @@ impl PromPlanner {
}
ScalarFunc::DataFusionUdf(Arc::new(Round::scalar_udf()))
}
"rad" => ScalarFunc::DataFusionBuiltin(datafusion::functions::math::radians()),
"deg" => ScalarFunc::DataFusionBuiltin(datafusion::functions::math::degrees()),
"sgn" => ScalarFunc::DataFusionBuiltin(datafusion::functions::math::signum()),
"pi" => {
// pi functions doesn't accepts any arguments, needs special processing
let fn_expr = DfExpr::ScalarFunction(ScalarFunction {
func: datafusion::functions::math::pi(),
args: vec![],
});
exprs.push(fn_expr);
ScalarFunc::GeneratedExpr
}
_ => {
if let Some(f) = session_state.scalar_functions().get(func.name) {
if let Some(f) = query_engine_state
.session_state()
.scalar_functions()
.get(func.name)
{
ScalarFunc::DataFusionBuiltin(f.clone())
} else if let Some(factory) = query_engine_state.scalar_function(func.name) {
let func_state = query_engine_state.function_state();
let query_ctx = self.table_provider.query_ctx();
ScalarFunc::DataFusionUdf(Arc::new(factory.provide(FunctionContext {
state: func_state,
query_ctx: query_ctx.clone(),
})))
} else if let Some(f) = datafusion_functions::math::functions()
.iter()
.find(|f| f.name() == func.name)
@@ -1846,7 +1893,7 @@ impl PromPlanner {
fn build_regexp_replace_label_expr(
&self,
other_input_exprs: &mut VecDeque<DfExpr>,
session_state: &SessionState,
query_engine_state: &QueryEngineState,
) -> Result<Option<(DfExpr, String)>> {
// label_replace(vector, dst_label, replacement, src_label, regex)
let dst_label = match other_input_exprs.pop_front() {
@@ -1903,6 +1950,7 @@ impl PromPlanner {
// https://github.com/prometheus/prometheus/blob/d902abc50d6652ba8fe9a81ff8e5cce936114eba/promql/functions.go#L1575C32-L1575C37
let regex = format!("^(?s:{regex})$");
let session_state = query_engine_state.session_state();
let func = session_state
.scalar_functions()
.get("regexp_replace")
@@ -1934,7 +1982,7 @@ impl PromPlanner {
/// Build expr for `label_join` function
fn build_concat_labels_expr(
other_input_exprs: &mut VecDeque<DfExpr>,
session_state: &SessionState,
query_engine_state: &QueryEngineState,
) -> Result<(DfExpr, String)> {
// label_join(vector, dst_label, separator, src_label_1, src_label_2, ...)
@@ -1982,6 +2030,7 @@ impl PromPlanner {
}
);
let session_state = query_engine_state.session_state();
let func = session_state
.scalar_functions()
.get("concat_ws")
@@ -2315,7 +2364,7 @@ impl PromPlanner {
async fn create_histogram_plan(
&mut self,
args: &PromFunctionArgs,
session_state: &SessionState,
query_engine_state: &QueryEngineState,
) -> Result<LogicalPlan> {
if args.args.len() != 2 {
return FunctionInvalidArgumentSnafu {
@@ -2331,7 +2380,7 @@ impl PromPlanner {
})?;
let input = args.args[1].as_ref().clone();
let input_plan = self.prom_expr_to_plan(&input, session_state).await?;
let input_plan = self.prom_expr_to_plan(&input, query_engine_state).await?;
if !self.ctx.has_le_tag() {
// Return empty result instead of error when 'le' column is not found
@@ -2410,7 +2459,7 @@ impl PromPlanner {
async fn create_scalar_plan(
&mut self,
args: &PromFunctionArgs,
session_state: &SessionState,
query_engine_state: &QueryEngineState,
) -> Result<LogicalPlan> {
ensure!(
args.len() == 1,
@@ -2418,7 +2467,9 @@ impl PromPlanner {
fn_name: SCALAR_FUNCTION
}
);
let input = self.prom_expr_to_plan(&args.args[0], session_state).await?;
let input = self
.prom_expr_to_plan(&args.args[0], query_engine_state)
.await?;
ensure!(
self.ctx.field_columns.len() == 1,
MultiFieldsNotSupportedSnafu {
@@ -3116,17 +3167,33 @@ struct FunctionArgs {
literals: Vec<DfExpr>,
}
/// Represents different types of scalar functions supported in PromQL expressions.
/// Each variant defines how the function should be processed and what arguments it expects.
#[derive(Debug, Clone)]
enum ScalarFunc {
/// DataFusion's registered(including built-in) scalar functions (e.g., abs, sqrt, round, clamp).
/// These are passed through directly to DataFusion's execution engine.
/// Processing: Simple argument insertion at the specified position.
DataFusionBuiltin(Arc<ScalarUdfDef>),
/// The UDF that is defined by Datafusion itself.
/// User-defined functions registered in DataFusion's function registry.
/// Similar to DataFusionBuiltin but for custom functions not built into DataFusion.
/// Processing: Direct pass-through with argument positioning.
DataFusionUdf(Arc<ScalarUdfDef>),
/// PromQL-specific functions that operate on time series data with temporal context.
/// These functions require both timestamp ranges and values to perform calculations.
/// Processing: Automatically injects timestamp_range and value columns as first arguments.
/// Examples: idelta, irate, resets, changes, deriv, *_over_time function
Udf(Arc<ScalarUdfDef>),
// todo(ruihang): maybe merge with Udf later
/// UDF that require extra information like range length to be evaluated.
/// The second argument is range length.
/// PromQL functions requiring extrapolation calculations with explicit range information.
/// These functions need to know the time range length to perform rate calculations.
/// The second field contains the range length in milliseconds.
/// Processing: Injects timestamp_range, value, time_index columns and appends range_length.
/// Examples: increase, rate, delta
// TODO(ruihang): maybe merge with Udf later
ExtrapolateUdf(Arc<ScalarUdfDef>, i64),
/// Func that doesn't require input, like `time()`.
/// Functions that generate expressions directly without external UDF calls.
/// The expression is constructed during function matching and requires no additional processing.
/// Examples: time(), minute(), hour(), month(), year() and other date/time extractors
GeneratedExpr,
}
@@ -3134,11 +3201,11 @@ enum ScalarFunc {
mod test {
use std::time::{Duration, UNIX_EPOCH};
use catalog::memory::MemoryCatalogManager;
use catalog::memory::{new_memory_catalog_manager, MemoryCatalogManager};
use catalog::RegisterTableRequest;
use common_base::Plugins;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_query::test_util::DummyDecoder;
use datafusion::execution::SessionStateBuilder;
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::{ColumnSchema, Schema};
use promql_parser::label::Labels;
@@ -3148,9 +3215,19 @@ mod test {
use table::test_util::EmptyTable;
use super::*;
use crate::options::QueryOptions;
fn build_session_state() -> SessionState {
SessionStateBuilder::new().with_default_features().build()
fn build_query_engine_state() -> QueryEngineState {
QueryEngineState::new(
new_memory_catalog_manager().unwrap(),
None, // region_query_handler
None, // table_mutation_handler
None, // procedure_service_handler
None, // flow_service_handler
false, // with_dist_planner
Plugins::default(),
QueryOptions::default(),
)
}
async fn build_test_table_provider(
@@ -3314,9 +3391,10 @@ mod test {
1,
)
.await;
let plan = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_session_state())
.await
.unwrap();
let plan =
PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
.await
.unwrap();
let expected = String::from(
"Filter: TEMPLATE(field_0) IS NOT NULL [timestamp:Timestamp(Millisecond, None), TEMPLATE(field_0):Float64;N, tag_0:Utf8]\
@@ -3523,9 +3601,10 @@ mod test {
2,
)
.await;
let plan = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_session_state())
.await
.unwrap();
let plan =
PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
.await
.unwrap();
let expected_no_without = String::from(
"Sort: some_metric.tag_1 ASC NULLS LAST, some_metric.timestamp ASC NULLS LAST [tag_1:Utf8, timestamp:Timestamp(Millisecond, None), TEMPLATE(some_metric.field_0):Float64;N, TEMPLATE(some_metric.field_1):Float64;N]\
\n Aggregate: groupBy=[[some_metric.tag_1, some_metric.timestamp]], aggr=[[TEMPLATE(some_metric.field_0), TEMPLATE(some_metric.field_1)]] [tag_1:Utf8, timestamp:Timestamp(Millisecond, None), TEMPLATE(some_metric.field_0):Float64;N, TEMPLATE(some_metric.field_1):Float64;N]\
@@ -3552,9 +3631,10 @@ mod test {
2,
)
.await;
let plan = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_session_state())
.await
.unwrap();
let plan =
PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
.await
.unwrap();
let expected_without = String::from(
"Sort: some_metric.tag_0 ASC NULLS LAST, some_metric.timestamp ASC NULLS LAST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), TEMPLATE(some_metric.field_0):Float64;N, TEMPLATE(some_metric.field_1):Float64;N]\
\n Aggregate: groupBy=[[some_metric.tag_0, some_metric.timestamp]], aggr=[[TEMPLATE(some_metric.field_0), TEMPLATE(some_metric.field_1)]] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), TEMPLATE(some_metric.field_0):Float64;N, TEMPLATE(some_metric.field_1):Float64;N]\
@@ -3652,9 +3732,10 @@ mod test {
1,
)
.await;
let plan = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_session_state())
.await
.unwrap();
let plan =
PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
.await
.unwrap();
let expected = String::from(
"Projection: rhs.tag_0, rhs.timestamp, lhs.field_0 + rhs.field_0 AS lhs.field_0 + rhs.field_0 [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), lhs.field_0 + rhs.field_0:Float64;N]\
@@ -3700,9 +3781,10 @@ mod test {
1,
)
.await;
let plan = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_session_state())
.await
.unwrap();
let plan =
PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
.await
.unwrap();
assert_eq!(plan.display_indent_schema().to_string(), expected);
}
@@ -3855,9 +3937,10 @@ mod test {
)
.await;
// Should be ok
let plan = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_session_state())
.await
.unwrap();
let plan =
PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
.await
.unwrap();
let expected = "Projection: http_server_requests_seconds_count.uri, http_server_requests_seconds_count.kubernetes_namespace, http_server_requests_seconds_count.kubernetes_pod_name, http_server_requests_seconds_count.greptime_timestamp, http_server_requests_seconds_sum.greptime_value / http_server_requests_seconds_count.greptime_value AS http_server_requests_seconds_sum.greptime_value / http_server_requests_seconds_count.greptime_value\
\n Inner Join: http_server_requests_seconds_sum.greptime_timestamp = http_server_requests_seconds_count.greptime_timestamp, http_server_requests_seconds_sum.uri = http_server_requests_seconds_count.uri\
\n SubqueryAlias: http_server_requests_seconds_sum\
@@ -3900,7 +3983,7 @@ mod test {
)
.await;
// Should be ok
let _ = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_session_state())
let _ = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
.await
.unwrap();
}
@@ -3940,9 +4023,10 @@ mod test {
)
.await;
// Should be ok
let _ = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_session_state())
.await
.unwrap();
let _ =
PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
.await
.unwrap();
}
}
@@ -3976,7 +4060,7 @@ mod test {
)
.await;
// Should be ok
let _ = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_session_state())
let _ = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
.await
.unwrap();
}
@@ -4006,7 +4090,7 @@ mod test {
)
.await;
eval_stmt.expr = parser::parse(case).unwrap();
let _ = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_session_state())
let _ = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
.await
.unwrap();
@@ -4026,7 +4110,7 @@ mod test {
)
.await;
eval_stmt.expr = parser::parse(case).unwrap();
let _ = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_session_state())
let _ = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
.await
.unwrap();
@@ -4049,7 +4133,7 @@ mod test {
)
.await;
eval_stmt.expr = parser::parse(case).unwrap();
let _ = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_session_state())
let _ = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
.await
.unwrap();
@@ -4070,7 +4154,7 @@ mod test {
)
.await;
eval_stmt.expr = parser::parse(case).unwrap();
let _ = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_session_state())
let _ = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
.await
.unwrap();
}
@@ -4191,7 +4275,7 @@ mod test {
)
.await;
let plan =
PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_session_state())
PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
.await
.unwrap();
let mut fields = plan.schema().field_names();
@@ -4216,7 +4300,8 @@ mod test {
)
.await;
let plan =
PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_session_state()).await;
PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
.await;
assert!(plan.is_err(), "case: {:?}", case);
}
}
@@ -4299,7 +4384,8 @@ mod test {
.await;
let plan =
PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_session_state()).await;
PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
.await;
assert!(plan.is_err(), "query: {:?}", query);
}
}
@@ -4366,7 +4452,7 @@ mod test {
interval: Duration::from_secs(5),
lookback_delta: Duration::from_secs(1),
},
&build_session_state(),
&build_query_engine_state(),
)
.await
.unwrap();
@@ -4395,7 +4481,7 @@ mod test {
interval: Duration::from_secs(5),
lookback_delta: Duration::from_secs(1),
},
&build_session_state(),
&build_query_engine_state(),
)
.await
.unwrap();
@@ -4435,7 +4521,7 @@ mod test {
)
.await;
// Should be ok
let _ = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_session_state())
let _ = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
.await
.unwrap();
}
@@ -4459,9 +4545,10 @@ mod test {
let table_provider =
build_test_table_provider(&[(DEFAULT_SCHEMA_NAME.to_string(), "up".to_string())], 4, 1)
.await;
let plan = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_session_state())
.await
.unwrap();
let plan =
PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
.await
.unwrap();
let expected = r#"
Filter: up.field_0 IS NOT NULL [timestamp:Timestamp(Millisecond, None), field_0:Float64;N, foo:Utf8;N, tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8]
@@ -4495,9 +4582,10 @@ Filter: up.field_0 IS NOT NULL [timestamp:Timestamp(Millisecond, None), field_0:
let table_provider =
build_test_table_provider(&[(DEFAULT_SCHEMA_NAME.to_string(), "up".to_string())], 1, 1)
.await;
let plan = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_session_state())
.await
.unwrap();
let plan =
PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
.await
.unwrap();
let expected = r#"
Filter: up.field_0 IS NOT NULL [timestamp:Timestamp(Millisecond, None), field_0:Float64;N, foo:Utf8;N, tag_0:Utf8]
@@ -4537,9 +4625,10 @@ Filter: up.field_0 IS NOT NULL [timestamp:Timestamp(Millisecond, None), field_0:
3,
)
.await;
let plan = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_session_state())
.await
.unwrap();
let plan =
PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
.await
.unwrap();
let expected = "Sort: prometheus_tsdb_head_series.timestamp ASC NULLS LAST [timestamp:Timestamp(Millisecond, None), sum(prometheus_tsdb_head_series.field_0):Float64;N, sum(prometheus_tsdb_head_series.field_1):Float64;N, sum(prometheus_tsdb_head_series.field_2):Float64;N]\
\n Aggregate: groupBy=[[prometheus_tsdb_head_series.timestamp]], aggr=[[sum(prometheus_tsdb_head_series.field_0), sum(prometheus_tsdb_head_series.field_1), sum(prometheus_tsdb_head_series.field_2)]] [timestamp:Timestamp(Millisecond, None), sum(prometheus_tsdb_head_series.field_0):Float64;N, sum(prometheus_tsdb_head_series.field_1):Float64;N, sum(prometheus_tsdb_head_series.field_2):Float64;N]\
\n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N]\
@@ -4580,9 +4669,10 @@ Filter: up.field_0 IS NOT NULL [timestamp:Timestamp(Millisecond, None), field_0:
)
.await;
let plan = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_session_state())
.await
.unwrap();
let plan =
PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
.await
.unwrap();
let expected = "Projection: sum(prometheus_tsdb_head_series.greptime_value), prometheus_tsdb_head_series.ip, prometheus_tsdb_head_series.greptime_timestamp [sum(prometheus_tsdb_head_series.greptime_value):Float64;N, ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None)]\
\n Sort: prometheus_tsdb_head_series.greptime_timestamp ASC NULLS LAST, row_number() PARTITION BY [prometheus_tsdb_head_series.greptime_timestamp] ORDER BY [sum(prometheus_tsdb_head_series.greptime_value) DESC NULLS FIRST, prometheus_tsdb_head_series.ip DESC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW ASC NULLS LAST [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), sum(prometheus_tsdb_head_series.greptime_value):Float64;N, row_number() PARTITION BY [prometheus_tsdb_head_series.greptime_timestamp] ORDER BY [sum(prometheus_tsdb_head_series.greptime_value) DESC NULLS FIRST, prometheus_tsdb_head_series.ip DESC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW:UInt64]\
\n Filter: row_number() PARTITION BY [prometheus_tsdb_head_series.greptime_timestamp] ORDER BY [sum(prometheus_tsdb_head_series.greptime_value) DESC NULLS FIRST, prometheus_tsdb_head_series.ip DESC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW <= Float64(10) [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), sum(prometheus_tsdb_head_series.greptime_value):Float64;N, row_number() PARTITION BY [prometheus_tsdb_head_series.greptime_timestamp] ORDER BY [sum(prometheus_tsdb_head_series.greptime_value) DESC NULLS FIRST, prometheus_tsdb_head_series.ip DESC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW:UInt64]\
@@ -4628,9 +4718,10 @@ Filter: up.field_0 IS NOT NULL [timestamp:Timestamp(Millisecond, None), field_0:
)
.await;
let plan = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_session_state())
.await
.unwrap();
let plan =
PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
.await
.unwrap();
let expected = "Projection: count(prometheus_tsdb_head_series.greptime_value), prometheus_tsdb_head_series.ip, prometheus_tsdb_head_series.greptime_timestamp, series [count(prometheus_tsdb_head_series.greptime_value):Int64, ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), series:Float64;N]\
\n Sort: prometheus_tsdb_head_series.ip ASC NULLS LAST, prometheus_tsdb_head_series.greptime_timestamp ASC NULLS LAST, prometheus_tsdb_head_series.greptime_value ASC NULLS LAST [count(prometheus_tsdb_head_series.greptime_value):Int64, ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), series:Float64;N, greptime_value:Float64;N]\
\n Projection: count(prometheus_tsdb_head_series.greptime_value), prometheus_tsdb_head_series.ip, prometheus_tsdb_head_series.greptime_timestamp, prometheus_tsdb_head_series.greptime_value AS series, prometheus_tsdb_head_series.greptime_value [count(prometheus_tsdb_head_series.greptime_value):Int64, ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), series:Float64;N, greptime_value:Float64;N]\
@@ -4674,9 +4765,10 @@ Filter: up.field_0 IS NOT NULL [timestamp:Timestamp(Millisecond, None), field_0:
)
.await;
let plan = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_session_state())
.await
.unwrap();
let plan =
PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
.await
.unwrap();
let expected = "Sort: prometheus_tsdb_head_series.greptime_timestamp ASC NULLS LAST [greptime_timestamp:Timestamp(Millisecond, None), quantile(Float64(0.3),sum(prometheus_tsdb_head_series.greptime_value)):Float64;N]\
\n Aggregate: groupBy=[[prometheus_tsdb_head_series.greptime_timestamp]], aggr=[[quantile(Float64(0.3), sum(prometheus_tsdb_head_series.greptime_value))]] [greptime_timestamp:Timestamp(Millisecond, None), quantile(Float64(0.3),sum(prometheus_tsdb_head_series.greptime_value)):Float64;N]\
\n Sort: prometheus_tsdb_head_series.ip ASC NULLS LAST, prometheus_tsdb_head_series.greptime_timestamp ASC NULLS LAST [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), sum(prometheus_tsdb_head_series.greptime_value):Float64;N]\
@@ -4711,9 +4803,10 @@ Filter: up.field_0 IS NOT NULL [timestamp:Timestamp(Millisecond, None), field_0:
)
.await;
let plan = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_session_state())
.await
.unwrap();
let plan =
PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
.await
.unwrap();
let expected = "UnionDistinctOn: on col=[[\"job\"]], ts_col=[greptime_timestamp] [greptime_timestamp:Timestamp(Millisecond, None), job:Utf8, sum(metric_exists.greptime_value):Float64;N]\
\n SubqueryAlias: metric_exists [greptime_timestamp:Timestamp(Millisecond, None), job:Utf8, sum(metric_exists.greptime_value):Float64;N]\
\n Projection: metric_exists.greptime_timestamp, metric_exists.job, sum(metric_exists.greptime_value) [greptime_timestamp:Timestamp(Millisecond, None), job:Utf8, sum(metric_exists.greptime_value):Float64;N]\
@@ -4763,7 +4856,8 @@ Filter: up.field_0 IS NOT NULL [timestamp:Timestamp(Millisecond, None), field_0:
// Should return empty result instead of error
let result =
PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_session_state()).await;
PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
.await;
// This should succeed now (returning empty result) instead of failing with "Cannot find column le"
assert!(