mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-07 13:52:59 +00:00
feat: Support unary, paren, bool keyword and nonexistent metric/label in PromQL (#1049)
* feat: don't report metric/label not found as error Signed-off-by: Ruihang Xia <waynestxia@gmail.com> * feat: impl unary expr Signed-off-by: Ruihang Xia <waynestxia@gmail.com> * feat: impl paren expr Signed-off-by: Ruihang Xia <waynestxia@gmail.com> * feat: support bool keyword Signed-off-by: Ruihang Xia <waynestxia@gmail.com> * add some tests Signed-off-by: Ruihang Xia <waynestxia@gmail.com> * ignore nonexistence labels during planning Signed-off-by: Ruihang Xia <waynestxia@gmail.com> * fix clippy Signed-off-by: Ruihang Xia <waynestxia@gmail.com> --------- Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
@@ -16,7 +16,6 @@ use std::any::Any;
|
||||
|
||||
use common_error::prelude::*;
|
||||
use datafusion::error::DataFusionError;
|
||||
use promql_parser::label::Label;
|
||||
use promql_parser::parser::{Expr as PromExpr, TokenType};
|
||||
|
||||
#[derive(Debug, Snafu)]
|
||||
@@ -49,13 +48,6 @@ pub enum Error {
|
||||
#[snafu(display("Cannot find value columns in table {}", table))]
|
||||
ValueNotFound { table: String, backtrace: Backtrace },
|
||||
|
||||
#[snafu(display("Cannot find label {} in table {}", label, table,))]
|
||||
LabelNotFound {
|
||||
table: String,
|
||||
label: Label,
|
||||
backtrace: Backtrace,
|
||||
},
|
||||
|
||||
#[snafu(display("Cannot find the table {}", table))]
|
||||
TableNotFound {
|
||||
table: String,
|
||||
@@ -107,15 +99,15 @@ impl ErrorExt for Error {
|
||||
| UnsupportedExpr { .. }
|
||||
| UnexpectedToken { .. }
|
||||
| MultipleVector { .. }
|
||||
| LabelNotFound { .. }
|
||||
| ExpectExpr { .. } => StatusCode::InvalidArguments,
|
||||
|
||||
UnknownTable { .. }
|
||||
| TableNotFound { .. }
|
||||
| DataFusionPlanning { .. }
|
||||
| UnexpectedPlanExpr { .. }
|
||||
| IllegalRange { .. }
|
||||
| EmptyRange { .. }
|
||||
| TableNameNotFound { .. } => StatusCode::Internal,
|
||||
| EmptyRange { .. } => StatusCode::Internal,
|
||||
|
||||
TableNotFound { .. } | TableNameNotFound { .. } => StatusCode::TableNotFound,
|
||||
}
|
||||
}
|
||||
fn backtrace_opt(&self) -> Option<&Backtrace> {
|
||||
|
||||
@@ -22,7 +22,7 @@ use datafusion::datasource::DefaultTableSource;
|
||||
use datafusion::logical_expr::expr::AggregateFunction;
|
||||
use datafusion::logical_expr::expr_rewriter::normalize_cols;
|
||||
use datafusion::logical_expr::{
|
||||
AggregateFunction as AggregateFunctionEnum, BinaryExpr, BuiltinScalarFunction, Extension,
|
||||
AggregateFunction as AggregateFunctionEnum, BinaryExpr, BuiltinScalarFunction, Cast, Extension,
|
||||
LogicalPlan, LogicalPlanBuilder, Operator,
|
||||
};
|
||||
use datafusion::optimizer::utils;
|
||||
@@ -30,19 +30,20 @@ use datafusion::prelude::{Column, Expr as DfExpr, JoinType};
|
||||
use datafusion::scalar::ScalarValue;
|
||||
use datafusion::sql::planner::ContextProvider;
|
||||
use datafusion::sql::TableReference;
|
||||
use datatypes::arrow::datatypes::DataType as ArrowDataType;
|
||||
use promql_parser::label::{MatchOp, Matchers, METRIC_NAME};
|
||||
use promql_parser::parser::{
|
||||
token, AggModifier, AggregateExpr, BinaryExpr as PromBinaryExpr, Call, EvalStmt,
|
||||
Expr as PromExpr, Function, MatrixSelector, NumberLiteral, Offset, ParenExpr, StringLiteral,
|
||||
SubqueryExpr, TokenType, UnaryExpr, VectorSelector,
|
||||
};
|
||||
use snafu::{ensure, OptionExt, ResultExt};
|
||||
use snafu::{OptionExt, ResultExt};
|
||||
use table::table::adapter::DfTableProviderAdapter;
|
||||
|
||||
use crate::error::{
|
||||
DataFusionPlanningSnafu, ExpectExprSnafu, LabelNotFoundSnafu, MultipleVectorSnafu, Result,
|
||||
TableNameNotFoundSnafu, TableNotFoundSnafu, TimeIndexNotFoundSnafu, UnexpectedTokenSnafu,
|
||||
UnknownTableSnafu, UnsupportedExprSnafu, ValueNotFoundSnafu,
|
||||
DataFusionPlanningSnafu, ExpectExprSnafu, MultipleVectorSnafu, Result, TableNameNotFoundSnafu,
|
||||
TableNotFoundSnafu, TimeIndexNotFoundSnafu, UnexpectedTokenSnafu, UnknownTableSnafu,
|
||||
UnsupportedExprSnafu, ValueNotFoundSnafu,
|
||||
};
|
||||
use crate::extension_plan::{
|
||||
InstantManipulate, Millisecond, RangeManipulate, SeriesDivide, SeriesNormalize,
|
||||
@@ -129,11 +130,25 @@ impl<S: ContextProvider> PromPlanner<S> {
|
||||
.build()
|
||||
.context(DataFusionPlanningSnafu)?
|
||||
}
|
||||
PromExpr::Unary(UnaryExpr { .. }) => UnsupportedExprSnafu {
|
||||
name: "Prom Unary Expr",
|
||||
PromExpr::Unary(UnaryExpr { expr }) => {
|
||||
// Unary Expr in PromQL implys the `-` operator
|
||||
let input = self.prom_expr_to_plan(*expr.clone())?;
|
||||
self.projection_for_each_value_column(input, |col| {
|
||||
Ok(DfExpr::Negative(Box::new(DfExpr::Column(col.into()))))
|
||||
})?
|
||||
}
|
||||
.fail()?,
|
||||
PromExpr::Binary(PromBinaryExpr { lhs, rhs, op, .. }) => {
|
||||
PromExpr::Binary(PromBinaryExpr {
|
||||
lhs,
|
||||
rhs,
|
||||
op,
|
||||
modifier,
|
||||
}) => {
|
||||
let should_cast_to_bool = if let Some(modifier) = modifier {
|
||||
modifier.return_bool && Self::is_token_a_comparison_op(*op)
|
||||
} else {
|
||||
false
|
||||
};
|
||||
|
||||
match (
|
||||
Self::try_build_literal_expr(lhs),
|
||||
Self::try_build_literal_expr(rhs),
|
||||
@@ -147,22 +162,36 @@ impl<S: ContextProvider> PromPlanner<S> {
|
||||
(Some(expr), None) => {
|
||||
let input = self.prom_expr_to_plan(*rhs.clone())?;
|
||||
self.projection_for_each_value_column(input, |col| {
|
||||
Ok(DfExpr::BinaryExpr(BinaryExpr {
|
||||
let mut binary_expr = DfExpr::BinaryExpr(BinaryExpr {
|
||||
left: Box::new(expr.clone()),
|
||||
op: Self::prom_token_to_binary_op(*op)?,
|
||||
right: Box::new(DfExpr::Column(col.into())),
|
||||
}))
|
||||
});
|
||||
if should_cast_to_bool {
|
||||
binary_expr = DfExpr::Cast(Cast {
|
||||
expr: Box::new(binary_expr),
|
||||
data_type: ArrowDataType::Float64,
|
||||
});
|
||||
}
|
||||
Ok(binary_expr)
|
||||
})?
|
||||
}
|
||||
// lhs is a column, rhs is a literal
|
||||
(None, Some(expr)) => {
|
||||
let input = self.prom_expr_to_plan(*lhs.clone())?;
|
||||
self.projection_for_each_value_column(input, |col| {
|
||||
Ok(DfExpr::BinaryExpr(BinaryExpr {
|
||||
let mut binary_expr = DfExpr::BinaryExpr(BinaryExpr {
|
||||
left: Box::new(DfExpr::Column(col.into())),
|
||||
op: Self::prom_token_to_binary_op(*op)?,
|
||||
right: Box::new(expr.clone()),
|
||||
}))
|
||||
});
|
||||
if should_cast_to_bool {
|
||||
binary_expr = DfExpr::Cast(Cast {
|
||||
expr: Box::new(binary_expr),
|
||||
data_type: ArrowDataType::Float64,
|
||||
});
|
||||
}
|
||||
Ok(binary_expr)
|
||||
})?
|
||||
}
|
||||
// both are columns. join them on time index
|
||||
@@ -190,19 +219,23 @@ impl<S: ContextProvider> PromPlanner<S> {
|
||||
.context(DataFusionPlanningSnafu)?
|
||||
.qualified_column();
|
||||
|
||||
Ok(DfExpr::BinaryExpr(BinaryExpr {
|
||||
let mut binary_expr = DfExpr::BinaryExpr(BinaryExpr {
|
||||
left: Box::new(DfExpr::Column(left_col)),
|
||||
op: Self::prom_token_to_binary_op(*op)?,
|
||||
right: Box::new(DfExpr::Column(right_col)),
|
||||
}))
|
||||
});
|
||||
if should_cast_to_bool {
|
||||
binary_expr = DfExpr::Cast(Cast {
|
||||
expr: Box::new(binary_expr),
|
||||
data_type: ArrowDataType::Float64,
|
||||
});
|
||||
}
|
||||
Ok(binary_expr)
|
||||
})?
|
||||
}
|
||||
}
|
||||
}
|
||||
PromExpr::Paren(ParenExpr { .. }) => UnsupportedExprSnafu {
|
||||
name: "Prom Paren Expr",
|
||||
}
|
||||
.fail()?,
|
||||
PromExpr::Paren(ParenExpr { expr }) => self.prom_expr_to_plan(*expr.clone())?,
|
||||
PromExpr::Subquery(SubqueryExpr { .. }) => UnsupportedExprSnafu {
|
||||
name: "Prom Subquery",
|
||||
}
|
||||
@@ -376,20 +409,10 @@ impl<S: ContextProvider> PromPlanner<S> {
|
||||
AggModifier::By(labels) => {
|
||||
let mut exprs = Vec::with_capacity(labels.len());
|
||||
for label in labels {
|
||||
let field = input_schema
|
||||
.field_with_unqualified_name(label)
|
||||
.map_err(|_| {
|
||||
LabelNotFoundSnafu {
|
||||
table: self
|
||||
.ctx
|
||||
.table_name
|
||||
.clone()
|
||||
.unwrap_or("no_table_name".to_string()),
|
||||
label: label.clone(),
|
||||
}
|
||||
.build()
|
||||
})?;
|
||||
exprs.push(DfExpr::Column(Column::from(field.name())));
|
||||
// nonexistence label will be ignored
|
||||
if let Ok(field) = input_schema.field_with_unqualified_name(label) {
|
||||
exprs.push(DfExpr::Column(Column::from(field.name())));
|
||||
}
|
||||
}
|
||||
|
||||
// change the tag columns in context
|
||||
@@ -406,21 +429,13 @@ impl<S: ContextProvider> PromPlanner<S> {
|
||||
.iter()
|
||||
.map(|f| f.name())
|
||||
.collect::<BTreeSet<_>>();
|
||||
|
||||
// remove "without"-ed fields
|
||||
// nonexistence label will be ignored
|
||||
for label in labels {
|
||||
ensure!(
|
||||
// ensure this field was existed
|
||||
all_fields.remove(label),
|
||||
LabelNotFoundSnafu {
|
||||
table: self
|
||||
.ctx
|
||||
.table_name
|
||||
.clone()
|
||||
.unwrap_or("no_table_name".to_string()),
|
||||
label: label.clone(),
|
||||
}
|
||||
);
|
||||
all_fields.remove(label);
|
||||
}
|
||||
|
||||
// remove time index and value fields
|
||||
if let Some(time_index) = &self.ctx.time_index_column {
|
||||
all_fields.remove(time_index);
|
||||
@@ -495,7 +510,7 @@ impl<S: ContextProvider> PromPlanner<S> {
|
||||
let table = self
|
||||
.schema_provider
|
||||
.get_table_provider(TableReference::Bare { table: &table_name })
|
||||
.context(DataFusionPlanningSnafu)?
|
||||
.context(TableNotFoundSnafu { table: &table_name })?
|
||||
.as_any()
|
||||
.downcast_ref::<DefaultTableSource>()
|
||||
.context(UnknownTableSnafu)?
|
||||
@@ -750,6 +765,19 @@ impl<S: ContextProvider> PromPlanner<S> {
|
||||
}
|
||||
}
|
||||
|
||||
/// Check if the given op is a [comparison operator](https://prometheus.io/docs/prometheus/latest/querying/operators/#comparison-binary-operators).
|
||||
fn is_token_a_comparison_op(token: TokenType) -> bool {
|
||||
matches!(
|
||||
token.id(),
|
||||
token::T_EQLC
|
||||
| token::T_NEQ
|
||||
| token::T_GTR
|
||||
| token::T_LSS
|
||||
| token::T_GTE
|
||||
| token::T_LTE
|
||||
)
|
||||
}
|
||||
|
||||
/// Build a inner join on time index column and tag columns to concat two logical plans.
|
||||
/// The left plan will be alised as [`LEFT_PLAN_JOIN_ALIAS`].
|
||||
fn join_on_non_value_columns(
|
||||
@@ -1317,9 +1345,8 @@ mod test {
|
||||
assert_eq!(plan.display_indent_schema().to_string(), expected);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn binary_op_literal_column() {
|
||||
let prom_expr = parser::parse(r#"1 + some_metric{tag_0="bar"}"#).unwrap();
|
||||
async fn indie_query_plan_compare(query: &str, expected: String) {
|
||||
let prom_expr = parser::parse(query).unwrap();
|
||||
let eval_stmt = EvalStmt {
|
||||
expr: prom_expr,
|
||||
start: UNIX_EPOCH,
|
||||
@@ -1333,7 +1360,13 @@ mod test {
|
||||
let context_provider = build_test_context_provider("some_metric".to_string(), 1, 1).await;
|
||||
let plan = PromPlanner::stmt_to_plan(eval_stmt, context_provider).unwrap();
|
||||
|
||||
let expected = String::from(
|
||||
assert_eq!(plan.display_indent_schema().to_string(), expected);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn binary_op_literal_column() {
|
||||
let query = r#"1 + some_metric{tag_0="bar"}"#;
|
||||
let expected = String::from(
|
||||
"Projection: some_metric.tag_0, some_metric.timestamp, Float64(1) + some_metric.field_0 AS Float64(1) + field_0 [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), Float64(1) + field_0:Float64;N]\
|
||||
\n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
|
||||
\n PromSeriesNormalize: offset=[0], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
|
||||
@@ -1343,25 +1376,56 @@ mod test {
|
||||
\n TableScan: some_metric, unsupported_filters=[tag_0 = Utf8(\"bar\"), timestamp >= TimestampMillisecond(-1000, None), timestamp <= TimestampMillisecond(100000000, None)] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]"
|
||||
);
|
||||
|
||||
assert_eq!(plan.display_indent_schema().to_string(), expected);
|
||||
indie_query_plan_compare(query, expected).await;
|
||||
}
|
||||
|
||||
// TODO(ruihang): pure literal arithmetic is not supported yet.
|
||||
#[tokio::test]
|
||||
#[ignore = "pure literal arithmetic is not supported yet"]
|
||||
async fn binary_op_literal_literal() {
|
||||
let prom_expr = parser::parse(r#"1 + 1"#).unwrap();
|
||||
let eval_stmt = EvalStmt {
|
||||
expr: prom_expr,
|
||||
start: UNIX_EPOCH,
|
||||
end: UNIX_EPOCH
|
||||
.checked_add(Duration::from_secs(100_000))
|
||||
.unwrap(),
|
||||
interval: Duration::from_secs(5),
|
||||
lookback_delta: Duration::from_secs(1),
|
||||
};
|
||||
let query = r#"1 + 1"#;
|
||||
let expected = String::from("");
|
||||
|
||||
let context_provider = build_test_context_provider("some_metric".to_string(), 1, 1).await;
|
||||
let plan_result = PromPlanner::stmt_to_plan(eval_stmt, context_provider);
|
||||
assert!(plan_result.is_err());
|
||||
indie_query_plan_compare(query, expected).await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn simple_bool_grammar() {
|
||||
let query = "some_metric != bool 1.2345";
|
||||
let expected = String::from(
|
||||
"Projection: some_metric.tag_0, some_metric.timestamp, CAST(some_metric.field_0 != Float64(1.2345) AS Float64) AS field_0 != Float64(1.2345) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0 != Float64(1.2345):Float64;N]\
|
||||
\n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
|
||||
\n PromSeriesNormalize: offset=[0], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
|
||||
\n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
|
||||
\n Sort: some_metric.tag_0 DESC NULLS LAST, some_metric.timestamp DESC NULLS LAST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
|
||||
\n Filter: some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
|
||||
\n TableScan: some_metric, unsupported_filters=[timestamp >= TimestampMillisecond(-1000, None), timestamp <= TimestampMillisecond(100000000, None)] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]"
|
||||
);
|
||||
|
||||
indie_query_plan_compare(query, expected).await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
#[ignore = "pure literal arithmetic is not supported yet"]
|
||||
async fn bool_with_additional_arithmetic() {
|
||||
let query = "some_metric + (1 == bool 2)";
|
||||
let expected = String::from("");
|
||||
|
||||
indie_query_plan_compare(query, expected).await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn simple_unary() {
|
||||
let query = "-some_metric";
|
||||
let expected = String::from(
|
||||
"Projection: some_metric.tag_0, some_metric.timestamp, (- some_metric.field_0) AS (- field_0) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), (- field_0):Float64;N]\
|
||||
\n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
|
||||
\n PromSeriesNormalize: offset=[0], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
|
||||
\n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
|
||||
\n Sort: some_metric.tag_0 DESC NULLS LAST, some_metric.timestamp DESC NULLS LAST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
|
||||
\n Filter: some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
|
||||
\n TableScan: some_metric, unsupported_filters=[timestamp >= TimestampMillisecond(-1000, None), timestamp <= TimestampMillisecond(100000000, None)] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]"
|
||||
);
|
||||
|
||||
indie_query_plan_compare(query, expected).await;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -21,6 +21,7 @@ use axum::body::BoxBody;
|
||||
use axum::extract::{Query, State};
|
||||
use axum::{routing, Form, Json, Router};
|
||||
use common_error::prelude::ErrorExt;
|
||||
use common_error::status_code::StatusCode;
|
||||
use common_query::Output;
|
||||
use common_recordbatch::RecordBatches;
|
||||
use common_telemetry::info;
|
||||
@@ -216,7 +217,22 @@ impl PromqlJsonResponse {
|
||||
json
|
||||
};
|
||||
|
||||
response.unwrap_or_else(|err| Self::error(err.status_code().to_string(), err.to_string()))
|
||||
match response {
|
||||
Ok(resp) => resp,
|
||||
Err(err) => {
|
||||
// Prometheus won't report error if querying nonexist label and metric
|
||||
if err.status_code() == StatusCode::TableNotFound
|
||||
|| err.status_code() == StatusCode::TableColumnNotFound
|
||||
{
|
||||
Self::success(PromqlData {
|
||||
result_type: "matrix".to_string(),
|
||||
..Default::default()
|
||||
})
|
||||
} else {
|
||||
Self::error(err.status_code().to_string(), err.to_string())
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn record_batches_to_data(batches: RecordBatches, metric_name: String) -> Result<PromqlData> {
|
||||
|
||||
Reference in New Issue
Block a user