From 51641db39e4b648438faf5fde4002ff2b7fbd485 Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Thu, 23 Feb 2023 11:55:23 +0800 Subject: [PATCH] feat: support filter expression in PromQL (#1066) feat: support filter expression Signed-off-by: Ruihang Xia --- src/promql/src/planner.rs | 86 +++++++++++++++++++++++++++++++++------ 1 file changed, 74 insertions(+), 12 deletions(-) diff --git a/src/promql/src/planner.rs b/src/promql/src/planner.rs index 2bed426094..33ded08911 100644 --- a/src/promql/src/planner.rs +++ b/src/promql/src/planner.rs @@ -37,7 +37,7 @@ use promql_parser::parser::{ Expr as PromExpr, Function, MatrixSelector, NumberLiteral, Offset, ParenExpr, StringLiteral, SubqueryExpr, TokenType, UnaryExpr, VectorSelector, }; -use snafu::{OptionExt, ResultExt}; +use snafu::{ensure, OptionExt, ResultExt}; use table::table::adapter::DfTableProviderAdapter; use crate::error::{ @@ -143,12 +143,17 @@ impl PromPlanner { op, modifier, }) => { - let should_cast_to_bool = if let Some(modifier) = modifier { - modifier.return_bool && Self::is_token_a_comparison_op(*op) + // if set to true, comparison operator will return 0/1 (for true/false) instead of + // filter on the result column + let should_return_bool = if let Some(m) = modifier { + m.return_bool } else { false }; + let is_comparison_op = Self::is_token_a_comparison_op(*op); + // we should build a filter plan here if the op is comparison op and need not + // to return 0/1. Otherwise, we should build a projection plan match ( Self::try_build_literal_expr(lhs), Self::try_build_literal_expr(rhs), @@ -161,38 +166,48 @@ impl PromPlanner { // lhs is a literal, rhs is a column (Some(expr), None) => { let input = self.prom_expr_to_plan(*rhs.clone())?; - self.projection_for_each_value_column(input, |col| { + let bin_expr_builder = |col: &String| { let mut binary_expr = DfExpr::BinaryExpr(BinaryExpr { left: Box::new(expr.clone()), op: Self::prom_token_to_binary_op(*op)?, right: Box::new(DfExpr::Column(col.into())), }); - if should_cast_to_bool { + if is_comparison_op && should_return_bool { binary_expr = DfExpr::Cast(Cast { expr: Box::new(binary_expr), data_type: ArrowDataType::Float64, }); } Ok(binary_expr) - })? + }; + if is_comparison_op && !should_return_bool { + self.filter_on_value_column(input, bin_expr_builder)? + } else { + self.projection_for_each_value_column(input, bin_expr_builder)? + } } // lhs is a column, rhs is a literal (None, Some(expr)) => { let input = self.prom_expr_to_plan(*lhs.clone())?; - self.projection_for_each_value_column(input, |col| { + let bin_expr_builder = |col: &String| { let mut binary_expr = DfExpr::BinaryExpr(BinaryExpr { left: Box::new(DfExpr::Column(col.into())), op: Self::prom_token_to_binary_op(*op)?, right: Box::new(expr.clone()), }); - if should_cast_to_bool { + if is_comparison_op && should_return_bool { binary_expr = DfExpr::Cast(Cast { expr: Box::new(binary_expr), data_type: ArrowDataType::Float64, }); } Ok(binary_expr) - })? + }; + if is_comparison_op && !should_return_bool { + self.filter_on_value_column(input, bin_expr_builder)? + } else { + self.projection_for_each_value_column(input, bin_expr_builder)? + } } // both are columns. join them on time index (None, None) => { @@ -208,7 +223,7 @@ impl PromPlanner { left_value_columns.iter().zip(right_value_columns.iter()); // the new ctx.value_columns for the generated join plan let join_plan = self.join_on_non_value_columns(left_input, right_input)?; - self.projection_for_each_value_column(join_plan, |_| { + let bin_expr_builder = |_: &String| { let (left_col_name, right_col_name) = value_columns.next().unwrap(); let left_col = left_schema .field_with_name(None, left_col_name) @@ -224,14 +239,19 @@ impl PromPlanner { op: Self::prom_token_to_binary_op(*op)?, right: Box::new(DfExpr::Column(right_col)), }); - if should_cast_to_bool { + if is_comparison_op && should_return_bool { binary_expr = DfExpr::Cast(Cast { expr: Box::new(binary_expr), data_type: ArrowDataType::Float64, }); } Ok(binary_expr) - })? + }; + if is_comparison_op && !should_return_bool { + self.filter_on_value_column(join_plan, bin_expr_builder)? + } else { + self.projection_for_each_value_column(join_plan, bin_expr_builder)? + } } } } @@ -865,6 +885,32 @@ impl PromPlanner { .build() .context(DataFusionPlanningSnafu) } + + /// Build a filter plan that filter on value column. Notice that only one value column + /// is expected. + fn filter_on_value_column( + &self, + input: LogicalPlan, + mut name_to_expr: F, + ) -> Result + where + F: FnMut(&String) -> Result, + { + ensure!( + self.ctx.value_columns.len() == 1, + UnsupportedExprSnafu { + name: "filter on multi-value input" + } + ); + + let value_column_filter = name_to_expr(&self.ctx.value_columns[0])?; + + LogicalPlanBuilder::from(input) + .filter(value_column_filter) + .context(DataFusionPlanningSnafu)? + .build() + .context(DataFusionPlanningSnafu) + } } #[derive(Default, Debug)] @@ -1428,4 +1474,20 @@ mod test { indie_query_plan_compare(query, expected).await; } + + #[tokio::test] + async fn less_filter_on_value() { + let query = "some_metric < 1.2345"; + let expected = String::from( + "Filter: some_metric.field_0 < Float64(1.2345) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\ + \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\ + \n PromSeriesNormalize: offset=[0], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\ + \n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\ + \n Sort: some_metric.tag_0 DESC NULLS LAST, some_metric.timestamp DESC NULLS LAST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\ + \n Filter: some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\ + \n TableScan: some_metric, unsupported_filters=[timestamp >= TimestampMillisecond(-1000, None), timestamp <= TimestampMillisecond(100000000, None)] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]" + ); + + indie_query_plan_compare(query, expected).await; + } }