feat: PromQL binary expr planner (#889)

* feat: PromQL binary expr planner

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* column & column test

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* column & literal test

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* mark literal-literal unsupported

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
Ruihang Xia
2023-01-29 17:02:11 +08:00
committed by GitHub
parent dc9b5339bf
commit 71482b38d7

View File

@@ -24,7 +24,7 @@ use datafusion::logical_expr::{
Filter, LogicalPlan, LogicalPlanBuilder, Operator,
};
use datafusion::optimizer::utils;
use datafusion::prelude::{Column, Expr as DfExpr};
use datafusion::prelude::{Column, Expr as DfExpr, JoinType};
use datafusion::scalar::ScalarValue;
use datafusion::sql::planner::ContextProvider;
use datafusion::sql::TableReference;
@@ -44,6 +44,8 @@ use crate::error::{
};
use crate::extension_plan::{InstantManipulate, Millisecond, RangeManipulate, SeriesNormalize};
const LEFT_PLAN_JOIN_ALIAS: &str = "lhs";
#[derive(Default, Debug, Clone)]
struct PromPlannerContext {
// query parameters
@@ -139,14 +141,58 @@ impl<S: ContextProvider> PromPlanner<S> {
name: "Prom Unary Expr",
}
.fail()?,
PromExpr::Binary(PromBinaryExpr { lhs, rhs, .. }) => {
let _left_input = self.prom_expr_to_plan(*lhs.clone())?;
let _right_input = self.prom_expr_to_plan(*rhs.clone())?;
UnsupportedExprSnafu {
name: "Prom Binary Expr",
PromExpr::Binary(PromBinaryExpr { lhs, rhs, op, .. }) => {
match (
Self::try_build_literal_expr(lhs),
Self::try_build_literal_expr(rhs),
) {
// TODO(ruihang): handle literal-only expressions
(Some(_lhs), Some(_rhs)) => UnsupportedExprSnafu {
name: "Literal-only expression",
}
.fail()?,
// lhs is a literal, rhs is a column
(Some(expr), None) => {
let input = self.prom_expr_to_plan(*rhs.clone())?;
self.projection_for_each_value_column(input, |col| {
Ok(DfExpr::BinaryExpr(BinaryExpr {
left: Box::new(expr.clone()),
op: Self::prom_token_to_binary_op(*op)?,
right: Box::new(DfExpr::Column(col.into())),
}))
})?
}
// lhs is a column, rhs is a literal
(None, Some(expr)) => {
let input = self.prom_expr_to_plan(*lhs.clone())?;
self.projection_for_each_value_column(input, |col| {
Ok(DfExpr::BinaryExpr(BinaryExpr {
left: Box::new(DfExpr::Column(col.into())),
op: Self::prom_token_to_binary_op(*op)?,
right: Box::new(expr.clone()),
}))
})?
}
// both are columns. join them on time index
(None, None) => {
let left_input = self.prom_expr_to_plan(*lhs.clone())?;
let right_input = self.prom_expr_to_plan(*rhs.clone())?;
let join_plan = self.join_on_time_index(left_input, right_input)?;
self.projection_for_each_value_column(join_plan, |col| {
Ok(DfExpr::BinaryExpr(BinaryExpr {
left: Box::new(DfExpr::Column(Column::new(
Some(LEFT_PLAN_JOIN_ALIAS),
col,
))),
op: Self::prom_token_to_binary_op(*op)?,
right: Box::new(DfExpr::Column(Column::new(
self.ctx.table_name.as_ref(),
col,
))),
}))
})?
}
}
.fail()?
}
PromExpr::Paren(ParenExpr { .. }) => UnsupportedExprSnafu {
name: "Prom Paren Expr",
@@ -511,6 +557,105 @@ impl<S: ContextProvider> PromPlanner<S> {
.collect();
Ok(exprs)
}
/// Try to build a DataFusion Literal Expression from PromQL Expr, return
/// `None` if the input is not a literal expression.
fn try_build_literal_expr(expr: &PromExpr) -> Option<DfExpr> {
match expr {
PromExpr::NumberLiteral(NumberLiteral { val }) => {
let scalar_value = ScalarValue::Float64(Some(*val));
Some(DfExpr::Literal(scalar_value))
}
PromExpr::StringLiteral(StringLiteral { val }) => {
let scalar_value = ScalarValue::Utf8(Some(val.to_string()));
Some(DfExpr::Literal(scalar_value))
}
PromExpr::VectorSelector(_)
| PromExpr::MatrixSelector(_)
| PromExpr::Call(_)
| PromExpr::Aggregate(_)
| PromExpr::Subquery(_) => None,
PromExpr::Paren(ParenExpr { expr }) => Self::try_build_literal_expr(expr),
// TODO(ruihang): support Unary operator
PromExpr::Unary(UnaryExpr { expr, .. }) => Self::try_build_literal_expr(expr),
PromExpr::Binary(PromBinaryExpr { lhs, rhs, op, .. }) => {
let lhs = Self::try_build_literal_expr(lhs)?;
let rhs = Self::try_build_literal_expr(rhs)?;
let op = Self::prom_token_to_binary_op(*op).ok()?;
Some(DfExpr::BinaryExpr(BinaryExpr {
left: Box::new(lhs),
op,
right: Box::new(rhs),
}))
}
}
}
fn prom_token_to_binary_op(token: TokenType) -> Result<Operator> {
match token {
token::T_ADD => Ok(Operator::Plus),
token::T_SUB => Ok(Operator::Minus),
token::T_MUL => Ok(Operator::Multiply),
token::T_DIV => Ok(Operator::Divide),
token::T_MOD => Ok(Operator::Modulo),
token::T_EQLC => Ok(Operator::Eq),
token::T_NEQ => Ok(Operator::NotEq),
token::T_GTR => Ok(Operator::Gt),
token::T_LSS => Ok(Operator::Lt),
token::T_GTE => Ok(Operator::GtEq),
token::T_LTE => Ok(Operator::LtEq),
// TODO(ruihang): support these two operators
// token::T_POW => Ok(Operator::Power),
// token::T_ATAN2 => Ok(Operator::Atan2),
_ => UnexpectedTokenSnafu { token }.fail(),
}
}
/// Build a inner join on time index column to concat two logical plans.
/// The left plan will be alised as [`LEFT_PLAN_JOIN_ALIAS`].
fn join_on_time_index(&self, left: LogicalPlan, right: LogicalPlan) -> Result<LogicalPlan> {
let time_index_column = Column::from_name(
self.ctx
.time_index_column
.clone()
.context(TimeIndexNotFoundSnafu { table: "unknown" })?,
);
// Inner Join on time index column to concat two operator
LogicalPlanBuilder::from(left)
.alias(LEFT_PLAN_JOIN_ALIAS)
.context(DataFusionPlanningSnafu)?
.join(
right,
JoinType::Inner,
(vec![time_index_column.clone()], vec![time_index_column]),
None,
)
.context(DataFusionPlanningSnafu)?
.build()
.context(DataFusionPlanningSnafu)
}
// Build a projection that project and perform operation expr for every value columns.
fn projection_for_each_value_column<F>(
&self,
input: LogicalPlan,
name_to_expr: F,
) -> Result<LogicalPlan>
where
F: Fn(&String) -> Result<DfExpr>,
{
let value_columns = self
.ctx
.value_columns
.iter()
.map(name_to_expr)
.collect::<Result<Vec<_>>>()?;
LogicalPlanBuilder::from(input)
.project(value_columns)
.context(DataFusionPlanningSnafu)?
.build()
.context(DataFusionPlanningSnafu)
}
}
#[derive(Default, Debug)]
@@ -970,4 +1115,178 @@ mod test {
}
// TODO(ruihang): add range fn tests once exprs are ready.
// {
// input: "some_metric{tag_0="foo"} + some_metric{tag_0="bar"}",
// expected: &BinaryExpr{
// Op: ADD,
// LHS: &VectorSelector{
// Name: "a",
// LabelMatchers: []*labels.Matcher{
// MustLabelMatcher(labels.MatchEqual, "tag_0", "foo"),
// MustLabelMatcher(labels.MatchEqual, model.MetricNameLabel, "some_metric"),
// },
// },
// RHS: &VectorSelector{
// Name: "sum",
// LabelMatchers: []*labels.Matcher{
// MustLabelMatcher(labels.MatchxEqual, "tag_0", "bar"),
// MustLabelMatcher(labels.MatchEqual, model.MetricNameLabel, "some_metric"),
// },
// },
// VectorMatching: &VectorMatching{},
// },
// },
#[tokio::test]
async fn binary_op_column_column() {
let prom_expr = PromExpr::Binary(PromBinaryExpr {
lhs: Box::new(PromExpr::VectorSelector(VectorSelector {
name: Some("some_metric".to_owned()),
offset: None,
start_or_end: None,
label_matchers: Matchers {
matchers: vec![
Matcher {
op: MatchOp::Equal,
name: "tag_0".to_string(),
value: "foo".to_string(),
},
Matcher {
op: MatchOp::Equal,
name: METRIC_NAME.to_string(),
value: "some_metric".to_string(),
},
],
},
})),
op: token::T_ADD,
rhs: Box::new(PromExpr::VectorSelector(VectorSelector {
name: Some("some_metric".to_owned()),
offset: None,
start_or_end: None,
label_matchers: Matchers {
matchers: vec![
Matcher {
op: MatchOp::Equal,
name: "tag_0".to_string(),
value: "bar".to_string(),
},
Matcher {
op: MatchOp::Equal,
name: METRIC_NAME.to_string(),
value: "some_metric".to_string(),
},
],
},
})),
matching: None,
return_bool: false,
});
let eval_stmt = EvalStmt {
expr: prom_expr,
start: UNIX_EPOCH,
end: UNIX_EPOCH
.checked_add(Duration::from_secs(100_000))
.unwrap(),
interval: Duration::from_secs(5),
lookback_delta: Duration::from_secs(1),
};
let context_provider = build_test_context_provider("some_metric".to_string(), 1, 1).await;
let plan = PromPlanner::stmt_to_plan(eval_stmt, context_provider).unwrap();
let expected = String::from(
"Projection: lhs.field_0 + some_metric.field_0 [lhs.field_0 + some_metric.field_0:Float64;N]\
\n Inner Join: lhs.timestamp = some_metric.timestamp [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n SubqueryAlias: lhs [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n PromSeriesNormalize: offset=[0], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n Filter: tag_0 = Utf8(\"foo\") AND timestamp >= TimestampMillisecond(0, None) AND timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n TableScan: some_metric, unsupported_filters=[tag_0 = Utf8(\"foo\"), timestamp >= TimestampMillisecond(0, None), timestamp <= TimestampMillisecond(100000000, None)] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n PromSeriesNormalize: offset=[0], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n Filter: tag_0 = Utf8(\"bar\") AND timestamp >= TimestampMillisecond(0, None) AND timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n TableScan: some_metric, unsupported_filters=[tag_0 = Utf8(\"bar\"), timestamp >= TimestampMillisecond(0, None), timestamp <= TimestampMillisecond(100000000, None)] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]"
);
assert_eq!(plan.display_indent_schema().to_string(), expected);
}
#[tokio::test]
async fn binary_op_literal_column() {
let prom_expr = PromExpr::Binary(PromBinaryExpr {
lhs: Box::new(PromExpr::NumberLiteral(NumberLiteral { val: 1.0 })),
op: token::T_ADD,
rhs: Box::new(PromExpr::VectorSelector(VectorSelector {
name: Some("some_metric".to_owned()),
offset: None,
start_or_end: None,
label_matchers: Matchers {
matchers: vec![
Matcher {
op: MatchOp::Equal,
name: "tag_0".to_string(),
value: "bar".to_string(),
},
Matcher {
op: MatchOp::Equal,
name: METRIC_NAME.to_string(),
value: "some_metric".to_string(),
},
],
},
})),
matching: None,
return_bool: false,
});
let eval_stmt = EvalStmt {
expr: prom_expr,
start: UNIX_EPOCH,
end: UNIX_EPOCH
.checked_add(Duration::from_secs(100_000))
.unwrap(),
interval: Duration::from_secs(5),
lookback_delta: Duration::from_secs(1),
};
let context_provider = build_test_context_provider("some_metric".to_string(), 1, 1).await;
let plan = PromPlanner::stmt_to_plan(eval_stmt, context_provider).unwrap();
let expected = String::from(
"Projection: Float64(1) + some_metric.field_0 [Float64(1) + some_metric.field_0:Float64;N]\
\n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n PromSeriesNormalize: offset=[0], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n Filter: tag_0 = Utf8(\"bar\") AND timestamp >= TimestampMillisecond(0, None) AND timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n TableScan: some_metric, unsupported_filters=[tag_0 = Utf8(\"bar\"), timestamp >= TimestampMillisecond(0, None), timestamp <= TimestampMillisecond(100000000, None)] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]"
);
assert_eq!(plan.display_indent_schema().to_string(), expected);
}
#[tokio::test]
async fn binary_op_literal_literal() {
let prom_expr = PromExpr::Binary(PromBinaryExpr {
lhs: Box::new(PromExpr::NumberLiteral(NumberLiteral { val: 1.0 })),
op: token::T_ADD,
rhs: Box::new(PromExpr::NumberLiteral(NumberLiteral { val: 1.0 })),
matching: None,
return_bool: false,
});
let eval_stmt = EvalStmt {
expr: prom_expr,
start: UNIX_EPOCH,
end: UNIX_EPOCH
.checked_add(Duration::from_secs(100_000))
.unwrap(),
interval: Duration::from_secs(5),
lookback_delta: Duration::from_secs(1),
};
let context_provider = build_test_context_provider("some_metric".to_string(), 1, 1).await;
let plan_result = PromPlanner::stmt_to_plan(eval_stmt, context_provider);
assert!(plan_result.is_err());
}
}