From 71482b38d7ef1d641080271af2ba7f6d341892d6 Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Sun, 29 Jan 2023 17:02:11 +0800 Subject: [PATCH] feat: PromQL binary expr planner (#889) * feat: PromQL binary expr planner Signed-off-by: Ruihang Xia * column & column test Signed-off-by: Ruihang Xia * column & literal test Signed-off-by: Ruihang Xia * mark literal-literal unsupported Signed-off-by: Ruihang Xia --------- Signed-off-by: Ruihang Xia --- src/promql/src/planner.rs | 335 +++++++++++++++++++++++++++++++++++++- 1 file changed, 327 insertions(+), 8 deletions(-) diff --git a/src/promql/src/planner.rs b/src/promql/src/planner.rs index 9a05ae304e..ace1e41c9e 100644 --- a/src/promql/src/planner.rs +++ b/src/promql/src/planner.rs @@ -24,7 +24,7 @@ use datafusion::logical_expr::{ Filter, LogicalPlan, LogicalPlanBuilder, Operator, }; use datafusion::optimizer::utils; -use datafusion::prelude::{Column, Expr as DfExpr}; +use datafusion::prelude::{Column, Expr as DfExpr, JoinType}; use datafusion::scalar::ScalarValue; use datafusion::sql::planner::ContextProvider; use datafusion::sql::TableReference; @@ -44,6 +44,8 @@ use crate::error::{ }; use crate::extension_plan::{InstantManipulate, Millisecond, RangeManipulate, SeriesNormalize}; +const LEFT_PLAN_JOIN_ALIAS: &str = "lhs"; + #[derive(Default, Debug, Clone)] struct PromPlannerContext { // query parameters @@ -139,14 +141,58 @@ impl PromPlanner { name: "Prom Unary Expr", } .fail()?, - PromExpr::Binary(PromBinaryExpr { lhs, rhs, .. }) => { - let _left_input = self.prom_expr_to_plan(*lhs.clone())?; - let _right_input = self.prom_expr_to_plan(*rhs.clone())?; - - UnsupportedExprSnafu { - name: "Prom Binary Expr", + PromExpr::Binary(PromBinaryExpr { lhs, rhs, op, .. }) => { + match ( + Self::try_build_literal_expr(lhs), + Self::try_build_literal_expr(rhs), + ) { + // TODO(ruihang): handle literal-only expressions + (Some(_lhs), Some(_rhs)) => UnsupportedExprSnafu { + name: "Literal-only expression", + } + .fail()?, + // lhs is a literal, rhs is a column + (Some(expr), None) => { + let input = self.prom_expr_to_plan(*rhs.clone())?; + self.projection_for_each_value_column(input, |col| { + Ok(DfExpr::BinaryExpr(BinaryExpr { + left: Box::new(expr.clone()), + op: Self::prom_token_to_binary_op(*op)?, + right: Box::new(DfExpr::Column(col.into())), + })) + })? + } + // lhs is a column, rhs is a literal + (None, Some(expr)) => { + let input = self.prom_expr_to_plan(*lhs.clone())?; + self.projection_for_each_value_column(input, |col| { + Ok(DfExpr::BinaryExpr(BinaryExpr { + left: Box::new(DfExpr::Column(col.into())), + op: Self::prom_token_to_binary_op(*op)?, + right: Box::new(expr.clone()), + })) + })? + } + // both are columns. join them on time index + (None, None) => { + let left_input = self.prom_expr_to_plan(*lhs.clone())?; + let right_input = self.prom_expr_to_plan(*rhs.clone())?; + let join_plan = self.join_on_time_index(left_input, right_input)?; + self.projection_for_each_value_column(join_plan, |col| { + Ok(DfExpr::BinaryExpr(BinaryExpr { + left: Box::new(DfExpr::Column(Column::new( + Some(LEFT_PLAN_JOIN_ALIAS), + col, + ))), + op: Self::prom_token_to_binary_op(*op)?, + right: Box::new(DfExpr::Column(Column::new( + self.ctx.table_name.as_ref(), + col, + ))), + })) + })? + } } - .fail()? } PromExpr::Paren(ParenExpr { .. }) => UnsupportedExprSnafu { name: "Prom Paren Expr", @@ -511,6 +557,105 @@ impl PromPlanner { .collect(); Ok(exprs) } + + /// Try to build a DataFusion Literal Expression from PromQL Expr, return + /// `None` if the input is not a literal expression. + fn try_build_literal_expr(expr: &PromExpr) -> Option { + match expr { + PromExpr::NumberLiteral(NumberLiteral { val }) => { + let scalar_value = ScalarValue::Float64(Some(*val)); + Some(DfExpr::Literal(scalar_value)) + } + PromExpr::StringLiteral(StringLiteral { val }) => { + let scalar_value = ScalarValue::Utf8(Some(val.to_string())); + Some(DfExpr::Literal(scalar_value)) + } + PromExpr::VectorSelector(_) + | PromExpr::MatrixSelector(_) + | PromExpr::Call(_) + | PromExpr::Aggregate(_) + | PromExpr::Subquery(_) => None, + PromExpr::Paren(ParenExpr { expr }) => Self::try_build_literal_expr(expr), + // TODO(ruihang): support Unary operator + PromExpr::Unary(UnaryExpr { expr, .. }) => Self::try_build_literal_expr(expr), + PromExpr::Binary(PromBinaryExpr { lhs, rhs, op, .. }) => { + let lhs = Self::try_build_literal_expr(lhs)?; + let rhs = Self::try_build_literal_expr(rhs)?; + let op = Self::prom_token_to_binary_op(*op).ok()?; + Some(DfExpr::BinaryExpr(BinaryExpr { + left: Box::new(lhs), + op, + right: Box::new(rhs), + })) + } + } + } + + fn prom_token_to_binary_op(token: TokenType) -> Result { + match token { + token::T_ADD => Ok(Operator::Plus), + token::T_SUB => Ok(Operator::Minus), + token::T_MUL => Ok(Operator::Multiply), + token::T_DIV => Ok(Operator::Divide), + token::T_MOD => Ok(Operator::Modulo), + token::T_EQLC => Ok(Operator::Eq), + token::T_NEQ => Ok(Operator::NotEq), + token::T_GTR => Ok(Operator::Gt), + token::T_LSS => Ok(Operator::Lt), + token::T_GTE => Ok(Operator::GtEq), + token::T_LTE => Ok(Operator::LtEq), + // TODO(ruihang): support these two operators + // token::T_POW => Ok(Operator::Power), + // token::T_ATAN2 => Ok(Operator::Atan2), + _ => UnexpectedTokenSnafu { token }.fail(), + } + } + + /// Build a inner join on time index column to concat two logical plans. + /// The left plan will be alised as [`LEFT_PLAN_JOIN_ALIAS`]. + fn join_on_time_index(&self, left: LogicalPlan, right: LogicalPlan) -> Result { + let time_index_column = Column::from_name( + self.ctx + .time_index_column + .clone() + .context(TimeIndexNotFoundSnafu { table: "unknown" })?, + ); + // Inner Join on time index column to concat two operator + LogicalPlanBuilder::from(left) + .alias(LEFT_PLAN_JOIN_ALIAS) + .context(DataFusionPlanningSnafu)? + .join( + right, + JoinType::Inner, + (vec![time_index_column.clone()], vec![time_index_column]), + None, + ) + .context(DataFusionPlanningSnafu)? + .build() + .context(DataFusionPlanningSnafu) + } + + // Build a projection that project and perform operation expr for every value columns. + fn projection_for_each_value_column( + &self, + input: LogicalPlan, + name_to_expr: F, + ) -> Result + where + F: Fn(&String) -> Result, + { + let value_columns = self + .ctx + .value_columns + .iter() + .map(name_to_expr) + .collect::>>()?; + LogicalPlanBuilder::from(input) + .project(value_columns) + .context(DataFusionPlanningSnafu)? + .build() + .context(DataFusionPlanningSnafu) + } } #[derive(Default, Debug)] @@ -970,4 +1115,178 @@ mod test { } // TODO(ruihang): add range fn tests once exprs are ready. + + // { + // input: "some_metric{tag_0="foo"} + some_metric{tag_0="bar"}", + // expected: &BinaryExpr{ + // Op: ADD, + // LHS: &VectorSelector{ + // Name: "a", + // LabelMatchers: []*labels.Matcher{ + // MustLabelMatcher(labels.MatchEqual, "tag_0", "foo"), + // MustLabelMatcher(labels.MatchEqual, model.MetricNameLabel, "some_metric"), + // }, + // }, + // RHS: &VectorSelector{ + // Name: "sum", + // LabelMatchers: []*labels.Matcher{ + // MustLabelMatcher(labels.MatchxEqual, "tag_0", "bar"), + // MustLabelMatcher(labels.MatchEqual, model.MetricNameLabel, "some_metric"), + // }, + // }, + // VectorMatching: &VectorMatching{}, + // }, + // }, + #[tokio::test] + async fn binary_op_column_column() { + let prom_expr = PromExpr::Binary(PromBinaryExpr { + lhs: Box::new(PromExpr::VectorSelector(VectorSelector { + name: Some("some_metric".to_owned()), + offset: None, + start_or_end: None, + label_matchers: Matchers { + matchers: vec![ + Matcher { + op: MatchOp::Equal, + name: "tag_0".to_string(), + value: "foo".to_string(), + }, + Matcher { + op: MatchOp::Equal, + name: METRIC_NAME.to_string(), + value: "some_metric".to_string(), + }, + ], + }, + })), + op: token::T_ADD, + rhs: Box::new(PromExpr::VectorSelector(VectorSelector { + name: Some("some_metric".to_owned()), + offset: None, + start_or_end: None, + label_matchers: Matchers { + matchers: vec![ + Matcher { + op: MatchOp::Equal, + name: "tag_0".to_string(), + value: "bar".to_string(), + }, + Matcher { + op: MatchOp::Equal, + name: METRIC_NAME.to_string(), + value: "some_metric".to_string(), + }, + ], + }, + })), + matching: None, + return_bool: false, + }); + + let eval_stmt = EvalStmt { + expr: prom_expr, + start: UNIX_EPOCH, + end: UNIX_EPOCH + .checked_add(Duration::from_secs(100_000)) + .unwrap(), + interval: Duration::from_secs(5), + lookback_delta: Duration::from_secs(1), + }; + + let context_provider = build_test_context_provider("some_metric".to_string(), 1, 1).await; + let plan = PromPlanner::stmt_to_plan(eval_stmt, context_provider).unwrap(); + + let expected = String::from( + "Projection: lhs.field_0 + some_metric.field_0 [lhs.field_0 + some_metric.field_0:Float64;N]\ + \n Inner Join: lhs.timestamp = some_metric.timestamp [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\ + \n SubqueryAlias: lhs [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\ + \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\ + \n PromSeriesNormalize: offset=[0], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\ + \n Filter: tag_0 = Utf8(\"foo\") AND timestamp >= TimestampMillisecond(0, None) AND timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\ + \n TableScan: some_metric, unsupported_filters=[tag_0 = Utf8(\"foo\"), timestamp >= TimestampMillisecond(0, None), timestamp <= TimestampMillisecond(100000000, None)] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\ + \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\ + \n PromSeriesNormalize: offset=[0], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\ + \n Filter: tag_0 = Utf8(\"bar\") AND timestamp >= TimestampMillisecond(0, None) AND timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\ + \n TableScan: some_metric, unsupported_filters=[tag_0 = Utf8(\"bar\"), timestamp >= TimestampMillisecond(0, None), timestamp <= TimestampMillisecond(100000000, None)] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]" + ); + + assert_eq!(plan.display_indent_schema().to_string(), expected); + } + + #[tokio::test] + async fn binary_op_literal_column() { + let prom_expr = PromExpr::Binary(PromBinaryExpr { + lhs: Box::new(PromExpr::NumberLiteral(NumberLiteral { val: 1.0 })), + op: token::T_ADD, + rhs: Box::new(PromExpr::VectorSelector(VectorSelector { + name: Some("some_metric".to_owned()), + offset: None, + start_or_end: None, + label_matchers: Matchers { + matchers: vec![ + Matcher { + op: MatchOp::Equal, + name: "tag_0".to_string(), + value: "bar".to_string(), + }, + Matcher { + op: MatchOp::Equal, + name: METRIC_NAME.to_string(), + value: "some_metric".to_string(), + }, + ], + }, + })), + matching: None, + return_bool: false, + }); + + let eval_stmt = EvalStmt { + expr: prom_expr, + start: UNIX_EPOCH, + end: UNIX_EPOCH + .checked_add(Duration::from_secs(100_000)) + .unwrap(), + interval: Duration::from_secs(5), + lookback_delta: Duration::from_secs(1), + }; + + let context_provider = build_test_context_provider("some_metric".to_string(), 1, 1).await; + let plan = PromPlanner::stmt_to_plan(eval_stmt, context_provider).unwrap(); + + let expected = String::from( + "Projection: Float64(1) + some_metric.field_0 [Float64(1) + some_metric.field_0:Float64;N]\ + \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\ + \n PromSeriesNormalize: offset=[0], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\ + \n Filter: tag_0 = Utf8(\"bar\") AND timestamp >= TimestampMillisecond(0, None) AND timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\ + \n TableScan: some_metric, unsupported_filters=[tag_0 = Utf8(\"bar\"), timestamp >= TimestampMillisecond(0, None), timestamp <= TimestampMillisecond(100000000, None)] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]" + ); + + assert_eq!(plan.display_indent_schema().to_string(), expected); + } + + #[tokio::test] + async fn binary_op_literal_literal() { + let prom_expr = PromExpr::Binary(PromBinaryExpr { + lhs: Box::new(PromExpr::NumberLiteral(NumberLiteral { val: 1.0 })), + op: token::T_ADD, + rhs: Box::new(PromExpr::NumberLiteral(NumberLiteral { val: 1.0 })), + matching: None, + return_bool: false, + }); + + let eval_stmt = EvalStmt { + expr: prom_expr, + start: UNIX_EPOCH, + end: UNIX_EPOCH + .checked_add(Duration::from_secs(100_000)) + .unwrap(), + interval: Duration::from_secs(5), + lookback_delta: Duration::from_secs(1), + }; + + let context_provider = build_test_context_provider("some_metric".to_string(), 1, 1).await; + let plan_result = PromPlanner::stmt_to_plan(eval_stmt, context_provider); + assert!(plan_result.is_err()); + } }