feat: support filter expression in PromQL (#1066)

feat: support filter expression

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
Ruihang Xia
2023-02-23 11:55:23 +08:00
committed by GitHub
parent 98ef74bff4
commit 51641db39e

View File

@@ -37,7 +37,7 @@ use promql_parser::parser::{
Expr as PromExpr, Function, MatrixSelector, NumberLiteral, Offset, ParenExpr, StringLiteral,
SubqueryExpr, TokenType, UnaryExpr, VectorSelector,
};
use snafu::{OptionExt, ResultExt};
use snafu::{ensure, OptionExt, ResultExt};
use table::table::adapter::DfTableProviderAdapter;
use crate::error::{
@@ -143,12 +143,17 @@ impl<S: ContextProvider> PromPlanner<S> {
op,
modifier,
}) => {
let should_cast_to_bool = if let Some(modifier) = modifier {
modifier.return_bool && Self::is_token_a_comparison_op(*op)
// if set to true, comparison operator will return 0/1 (for true/false) instead of
// filter on the result column
let should_return_bool = if let Some(m) = modifier {
m.return_bool
} else {
false
};
let is_comparison_op = Self::is_token_a_comparison_op(*op);
// we should build a filter plan here if the op is comparison op and need not
// to return 0/1. Otherwise, we should build a projection plan
match (
Self::try_build_literal_expr(lhs),
Self::try_build_literal_expr(rhs),
@@ -161,38 +166,48 @@ impl<S: ContextProvider> PromPlanner<S> {
// lhs is a literal, rhs is a column
(Some(expr), None) => {
let input = self.prom_expr_to_plan(*rhs.clone())?;
self.projection_for_each_value_column(input, |col| {
let bin_expr_builder = |col: &String| {
let mut binary_expr = DfExpr::BinaryExpr(BinaryExpr {
left: Box::new(expr.clone()),
op: Self::prom_token_to_binary_op(*op)?,
right: Box::new(DfExpr::Column(col.into())),
});
if should_cast_to_bool {
if is_comparison_op && should_return_bool {
binary_expr = DfExpr::Cast(Cast {
expr: Box::new(binary_expr),
data_type: ArrowDataType::Float64,
});
}
Ok(binary_expr)
})?
};
if is_comparison_op && !should_return_bool {
self.filter_on_value_column(input, bin_expr_builder)?
} else {
self.projection_for_each_value_column(input, bin_expr_builder)?
}
}
// lhs is a column, rhs is a literal
(None, Some(expr)) => {
let input = self.prom_expr_to_plan(*lhs.clone())?;
self.projection_for_each_value_column(input, |col| {
let bin_expr_builder = |col: &String| {
let mut binary_expr = DfExpr::BinaryExpr(BinaryExpr {
left: Box::new(DfExpr::Column(col.into())),
op: Self::prom_token_to_binary_op(*op)?,
right: Box::new(expr.clone()),
});
if should_cast_to_bool {
if is_comparison_op && should_return_bool {
binary_expr = DfExpr::Cast(Cast {
expr: Box::new(binary_expr),
data_type: ArrowDataType::Float64,
});
}
Ok(binary_expr)
})?
};
if is_comparison_op && !should_return_bool {
self.filter_on_value_column(input, bin_expr_builder)?
} else {
self.projection_for_each_value_column(input, bin_expr_builder)?
}
}
// both are columns. join them on time index
(None, None) => {
@@ -208,7 +223,7 @@ impl<S: ContextProvider> PromPlanner<S> {
left_value_columns.iter().zip(right_value_columns.iter());
// the new ctx.value_columns for the generated join plan
let join_plan = self.join_on_non_value_columns(left_input, right_input)?;
self.projection_for_each_value_column(join_plan, |_| {
let bin_expr_builder = |_: &String| {
let (left_col_name, right_col_name) = value_columns.next().unwrap();
let left_col = left_schema
.field_with_name(None, left_col_name)
@@ -224,14 +239,19 @@ impl<S: ContextProvider> PromPlanner<S> {
op: Self::prom_token_to_binary_op(*op)?,
right: Box::new(DfExpr::Column(right_col)),
});
if should_cast_to_bool {
if is_comparison_op && should_return_bool {
binary_expr = DfExpr::Cast(Cast {
expr: Box::new(binary_expr),
data_type: ArrowDataType::Float64,
});
}
Ok(binary_expr)
})?
};
if is_comparison_op && !should_return_bool {
self.filter_on_value_column(join_plan, bin_expr_builder)?
} else {
self.projection_for_each_value_column(join_plan, bin_expr_builder)?
}
}
}
}
@@ -865,6 +885,32 @@ impl<S: ContextProvider> PromPlanner<S> {
.build()
.context(DataFusionPlanningSnafu)
}
/// Build a filter plan that filter on value column. Notice that only one value column
/// is expected.
fn filter_on_value_column<F>(
&self,
input: LogicalPlan,
mut name_to_expr: F,
) -> Result<LogicalPlan>
where
F: FnMut(&String) -> Result<DfExpr>,
{
ensure!(
self.ctx.value_columns.len() == 1,
UnsupportedExprSnafu {
name: "filter on multi-value input"
}
);
let value_column_filter = name_to_expr(&self.ctx.value_columns[0])?;
LogicalPlanBuilder::from(input)
.filter(value_column_filter)
.context(DataFusionPlanningSnafu)?
.build()
.context(DataFusionPlanningSnafu)
}
}
#[derive(Default, Debug)]
@@ -1428,4 +1474,20 @@ mod test {
indie_query_plan_compare(query, expected).await;
}
#[tokio::test]
async fn less_filter_on_value() {
let query = "some_metric < 1.2345";
let expected = String::from(
"Filter: some_metric.field_0 < Float64(1.2345) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n PromSeriesNormalize: offset=[0], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n Sort: some_metric.tag_0 DESC NULLS LAST, some_metric.timestamp DESC NULLS LAST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n Filter: some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n TableScan: some_metric, unsupported_filters=[timestamp >= TimestampMillisecond(-1000, None), timestamp <= TimestampMillisecond(100000000, None)] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]"
);
indie_query_plan_compare(query, expected).await;
}
}