diff --git a/src/promql/src/error.rs b/src/promql/src/error.rs index 185764e26d..9de9a6490c 100644 --- a/src/promql/src/error.rs +++ b/src/promql/src/error.rs @@ -100,6 +100,9 @@ pub enum Error { #[snafu(display("Zero range in range selector"))] ZeroRangeSelector { backtrace: Backtrace }, + + #[snafu(display("Cannot find column {col}"))] + ColumnNotFound { col: String, backtrace: Backtrace }, } impl ErrorExt for Error { @@ -113,7 +116,8 @@ impl ErrorExt for Error { | MultipleVector { .. } | ExpectExpr { .. } | ExpectRangeSelector { .. } - | ZeroRangeSelector { .. } => StatusCode::InvalidArguments, + | ZeroRangeSelector { .. } + | ColumnNotFound { .. } => StatusCode::InvalidArguments, UnknownTable { .. } | DataFusionPlanning { .. } diff --git a/src/promql/src/functions.rs b/src/promql/src/functions.rs index 24ef35f1b5..426e6dc8d7 100644 --- a/src/promql/src/functions.rs +++ b/src/promql/src/functions.rs @@ -26,12 +26,14 @@ pub use aggr_over_time::{ AbsentOverTime, AvgOverTime, CountOverTime, LastOverTime, MaxOverTime, MinOverTime, PresentOverTime, StddevOverTime, StdvarOverTime, SumOverTime, }; +pub use changes::Changes; use datafusion::arrow::array::ArrayRef; use datafusion::error::DataFusionError; use datafusion::physical_plan::ColumnarValue; pub use extrapolate_rate::{Delta, Increase, Rate}; pub use idelta::IDelta; pub use quantile::QuantileOverTime; +pub use resets::Resets; pub(crate) fn extract_array(columnar_value: &ColumnarValue) -> Result { if let ColumnarValue::Array(array) = columnar_value { diff --git a/src/promql/src/functions/aggr_over_time.rs b/src/promql/src/functions/aggr_over_time.rs index 0fa872c469..a8cc99dce0 100644 --- a/src/promql/src/functions/aggr_over_time.rs +++ b/src/promql/src/functions/aggr_over_time.rs @@ -183,8 +183,6 @@ pub fn stddev_over_time(_: &TimestampMillisecondArray, values: &Float64Array) -> } } -// TODO(ruihang): support quantile_over_time - #[cfg(test)] mod test { use super::*; diff --git a/src/promql/src/lib.rs b/src/promql/src/lib.rs index 7436f820ec..6cbfaa7400 100644 --- a/src/promql/src/lib.rs +++ b/src/promql/src/lib.rs @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +#![feature(option_get_or_insert_default)] + pub mod error; pub mod extension_plan; pub mod functions; diff --git a/src/promql/src/planner.rs b/src/promql/src/planner.rs index e316964823..837840af95 100644 --- a/src/promql/src/planner.rs +++ b/src/promql/src/planner.rs @@ -32,7 +32,7 @@ use datafusion::prelude::{Column, Expr as DfExpr, JoinType}; use datafusion::scalar::ScalarValue; use datafusion::sql::TableReference; use datatypes::arrow::datatypes::DataType as ArrowDataType; -use promql_parser::label::{MatchOp, Matchers, METRIC_NAME}; +use promql_parser::label::{MatchOp, Matcher, Matchers, METRIC_NAME}; use promql_parser::parser::{ token, AggModifier, AggregateExpr, BinaryExpr as PromBinaryExpr, Call, EvalStmt, Expr as PromExpr, Function, MatrixSelector, NumberLiteral, Offset, ParenExpr, StringLiteral, @@ -42,17 +42,18 @@ use snafu::{ensure, OptionExt, ResultExt}; use table::table::adapter::DfTableProviderAdapter; use crate::error::{ - CatalogSnafu, DataFusionPlanningSnafu, ExpectExprSnafu, ExpectRangeSelectorSnafu, - MultipleVectorSnafu, Result, TableNameNotFoundSnafu, TimeIndexNotFoundSnafu, - UnexpectedPlanExprSnafu, UnexpectedTokenSnafu, UnknownTableSnafu, UnsupportedExprSnafu, - ValueNotFoundSnafu, ZeroRangeSelectorSnafu, + CatalogSnafu, ColumnNotFoundSnafu, DataFusionPlanningSnafu, ExpectExprSnafu, + ExpectRangeSelectorSnafu, MultipleVectorSnafu, Result, TableNameNotFoundSnafu, + TimeIndexNotFoundSnafu, UnexpectedPlanExprSnafu, UnexpectedTokenSnafu, UnknownTableSnafu, + UnsupportedExprSnafu, ValueNotFoundSnafu, ZeroRangeSelectorSnafu, }; use crate::extension_plan::{ EmptyMetric, InstantManipulate, Millisecond, RangeManipulate, SeriesDivide, SeriesNormalize, }; use crate::functions::{ - AbsentOverTime, AvgOverTime, CountOverTime, Delta, IDelta, Increase, LastOverTime, MaxOverTime, - MinOverTime, PresentOverTime, QuantileOverTime, Rate, SumOverTime, + AbsentOverTime, AvgOverTime, Changes, CountOverTime, Delta, IDelta, Increase, LastOverTime, + MaxOverTime, MinOverTime, PresentOverTime, QuantileOverTime, Rate, Resets, StddevOverTime, + StdvarOverTime, SumOverTime, }; const LEFT_PLAN_JOIN_ALIAS: &str = "lhs"; @@ -63,6 +64,9 @@ const SPECIAL_TIME_FUNCTION: &str = "time"; /// default value column name for empty metric const DEFAULT_VALUE_COLUMN: &str = "value"; +/// Special modifier to project field columns under multi-field mode +const FIELD_COLUMN_MATCHER: &str = "__field__"; + #[derive(Default, Debug, Clone)] struct PromPlannerContext { // query parameters @@ -76,6 +80,7 @@ struct PromPlannerContext { time_index_column: Option, value_columns: Vec, tag_columns: Vec, + field_column_matcher: Option>, /// The range in millisecond of range selector. None if there is no range selector. range: Option, } @@ -399,6 +404,11 @@ impl PromPlanner { // TODO(ruihang): support other metric match ops if matcher.name == METRIC_NAME && matches!(matcher.op, MatchOp::Equal) { self.ctx.table_name = Some(matcher.value.clone()); + } else if matcher.name == FIELD_COLUMN_MATCHER { + self.ctx + .field_column_matcher + .get_or_insert_default() + .push(matcher.clone()); } else { matchers.insert(matcher.clone()); } @@ -431,10 +441,78 @@ impl PromPlanner { ))); // make table scan with filter exprs - let table_scan = self + let mut table_scan = self .create_table_scan_plan(&table_name, filters.clone()) .await?; + // make a projection plan if there is any `__field__` matcher + if let Some(field_matchers) = &self.ctx.field_column_matcher { + let col_set = self.ctx.value_columns.iter().collect::>(); + // opt-in set + let mut result_set = HashSet::new(); + // opt-out set + let mut reverse_set = HashSet::new(); + for matcher in field_matchers { + match &matcher.op { + MatchOp::Equal => { + if col_set.contains(&matcher.value) { + result_set.insert(matcher.value.clone()); + } else { + return Err(ColumnNotFoundSnafu { + col: self.ctx.table_name.clone().unwrap(), + } + .build()); + } + } + MatchOp::NotEqual => { + if col_set.contains(&matcher.value) { + reverse_set.insert(matcher.value.clone()); + } else { + return Err(ValueNotFoundSnafu { + table: self.ctx.table_name.clone().unwrap(), + } + .build()); + } + } + MatchOp::Re(regex) => { + for col in &self.ctx.value_columns { + if regex.is_match(col) { + result_set.insert(col.clone()); + } + } + } + MatchOp::NotRe(regex) => { + for col in &self.ctx.value_columns { + if regex.is_match(col) { + reverse_set.insert(col.clone()); + } + } + } + } + } + // merge two set + if result_set.is_empty() { + result_set = col_set.into_iter().cloned().collect(); + } + for col in reverse_set { + result_set.remove(&col); + } + + self.ctx.value_columns = result_set.iter().cloned().collect(); + let exprs = result_set + .into_iter() + .map(|col| DfExpr::Column(col.into())) + .chain(self.create_tag_column_exprs()?.into_iter()) + .chain(Some(self.create_time_index_column_expr()?)) + .collect::>(); + // reuse this variable for simplicity + table_scan = LogicalPlanBuilder::from(table_scan) + .project(exprs) + .context(DataFusionPlanningSnafu)? + .build() + .context(DataFusionPlanningSnafu)?; + } + // make filter and sort plan let sort_plan = LogicalPlanBuilder::from(table_scan) .filter(utils::conjunction(filters.into_iter()).unwrap()) @@ -684,6 +762,8 @@ impl PromPlanner { )), "idelta" => ScalarFunc::Udf(IDelta::::scalar_udf()), "irate" => ScalarFunc::Udf(IDelta::::scalar_udf()), + "resets" => ScalarFunc::Udf(Resets::scalar_udf()), + "changes" => ScalarFunc::Udf(Changes::scalar_udf()), "avg_over_time" => ScalarFunc::Udf(AvgOverTime::scalar_udf()), "min_over_time" => ScalarFunc::Udf(MinOverTime::scalar_udf()), "max_over_time" => ScalarFunc::Udf(MaxOverTime::scalar_udf()), @@ -692,6 +772,8 @@ impl PromPlanner { "last_over_time" => ScalarFunc::Udf(LastOverTime::scalar_udf()), "absent_over_time" => ScalarFunc::Udf(AbsentOverTime::scalar_udf()), "present_over_time" => ScalarFunc::Udf(PresentOverTime::scalar_udf()), + "stddev_over_time" => ScalarFunc::Udf(StddevOverTime::scalar_udf()), + "stdvar_over_time" => ScalarFunc::Udf(StdvarOverTime::scalar_udf()), "quantile_over_time" => { let quantile_expr = match other_input_exprs.get(0) { Some(DfExpr::Literal(ScalarValue::Float64(Some(quantile)))) => *quantile, @@ -1690,4 +1772,138 @@ mod test { indie_query_plan_compare(query, expected).await; } + + #[tokio::test] + async fn value_matcher() { + // template + let mut eval_stmt = EvalStmt { + expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }), + start: UNIX_EPOCH, + end: UNIX_EPOCH + .checked_add(Duration::from_secs(100_000)) + .unwrap(), + interval: Duration::from_secs(5), + lookback_delta: Duration::from_secs(1), + }; + + let cases = [ + // single equal matcher + ( + r#"some_metric{__field__="field_1"}"#, + vec![ + "some_metric.field_1", + "some_metric.tag_0", + "some_metric.tag_1", + "some_metric.tag_2", + "some_metric.timestamp", + ], + ), + // two equal matchers + ( + r#"some_metric{__field__="field_1", __field__="field_0"}"#, + vec![ + "some_metric.field_0", + "some_metric.field_1", + "some_metric.tag_0", + "some_metric.tag_1", + "some_metric.tag_2", + "some_metric.timestamp", + ], + ), + // single not_eq mathcer + ( + r#"some_metric{__field__!="field_1"}"#, + vec![ + "some_metric.field_0", + "some_metric.field_2", + "some_metric.tag_0", + "some_metric.tag_1", + "some_metric.tag_2", + "some_metric.timestamp", + ], + ), + // two not_eq mathcers + ( + r#"some_metric{__field__!="field_1", __field__!="field_2"}"#, + vec![ + "some_metric.field_0", + "some_metric.tag_0", + "some_metric.tag_1", + "some_metric.tag_2", + "some_metric.timestamp", + ], + ), + // equal and not_eq matchers (no conflict) + ( + r#"some_metric{__field__="field_1", __field__!="field_0"}"#, + vec![ + "some_metric.field_1", + "some_metric.tag_0", + "some_metric.tag_1", + "some_metric.tag_2", + "some_metric.timestamp", + ], + ), + // equal and not_eq matchers (conflict) + ( + r#"some_metric{__field__="field_2", __field__!="field_2"}"#, + vec![ + "some_metric.tag_0", + "some_metric.tag_1", + "some_metric.tag_2", + "some_metric.timestamp", + ], + ), + // single regex eq matcher + ( + r#"some_metric{__field__=~"field_1|field_2"}"#, + vec![ + "some_metric.field_1", + "some_metric.field_2", + "some_metric.tag_0", + "some_metric.tag_1", + "some_metric.tag_2", + "some_metric.timestamp", + ], + ), + // single regex not_eq matcher + ( + r#"some_metric{__field__!~"field_1|field_2"}"#, + vec![ + "some_metric.field_0", + "some_metric.tag_0", + "some_metric.tag_1", + "some_metric.tag_2", + "some_metric.timestamp", + ], + ), + ]; + + for case in cases { + let prom_expr = parser::parse(case.0).unwrap(); + eval_stmt.expr = prom_expr; + let table_provider = build_test_table_provider("some_metric".to_string(), 3, 3).await; + let plan = PromPlanner::stmt_to_plan(table_provider, eval_stmt.clone()) + .await + .unwrap(); + let mut fields = plan.schema().field_names(); + let mut expected = case.1.into_iter().map(String::from).collect::>(); + fields.sort(); + expected.sort(); + assert_eq!(fields, expected, "case: {:?}", case.0); + } + + let bad_cases = [ + r#"some_metric{__field__="nonexistent"}"#, + r#"some_metric{__field__!="nonexistent"}"#, + ]; + + for case in bad_cases { + let prom_expr = parser::parse(case).unwrap(); + eval_stmt.expr = prom_expr; + let table_provider = build_test_table_provider("some_metric".to_string(), 3, 3).await; + let plan = PromPlanner::stmt_to_plan(table_provider, eval_stmt.clone()).await; + assert!(plan.is_err(), "case: {:?}", case); + } + } }