diff --git a/Cargo.lock b/Cargo.lock index b132c6d3fd..a98a7b047a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5259,7 +5259,7 @@ dependencies = [ [[package]] name = "promql-parser" version = "0.0.1" -source = "git+https://github.com/GreptimeTeam/promql-parser.git?rev=d027ce428a6a2df5a652b8558608c77d33c31644#d027ce428a6a2df5a652b8558608c77d33c31644" +source = "git+https://github.com/GreptimeTeam/promql-parser.git?rev=fa186978a1234baf5a3e372da03aa663d859cdd2#fa186978a1234baf5a3e372da03aa663d859cdd2" dependencies = [ "cfgrammar", "lazy_static", diff --git a/src/promql/Cargo.toml b/src/promql/Cargo.toml index 45d68ca08f..e3c1d0e1c6 100644 --- a/src/promql/Cargo.toml +++ b/src/promql/Cargo.toml @@ -13,7 +13,7 @@ common-catalog = { path = "../common/catalog" } datafusion.workspace = true datatypes = { path = "../datatypes" } futures = "0.3" -promql-parser = { git = "https://github.com/GreptimeTeam/promql-parser.git", rev = "d027ce428a6a2df5a652b8558608c77d33c31644" } +promql-parser = { git = "https://github.com/GreptimeTeam/promql-parser.git", rev = "fa186978a1234baf5a3e372da03aa663d859cdd2" } session = { path = "../session" } snafu = { version = "0.7", features = ["backtraces"] } table = { path = "../table" } diff --git a/src/promql/src/planner.rs b/src/promql/src/planner.rs index 72c76402e9..d9aea8419b 100644 --- a/src/promql/src/planner.rs +++ b/src/promql/src/planner.rs @@ -97,12 +97,12 @@ impl PromPlanner { expr, // TODO(ruihang): support param param: _param, - grouping, + modifier, }) => { let input = self.prom_expr_to_plan(*expr.clone())?; // calculate columns to group by - let group_exprs = self.agg_modifier_to_col(input.schema(), grouping)?; + let group_exprs = self.agg_modifier_to_col(input.schema(), modifier)?; // convert op and value columns to aggregate exprs let aggr_exprs = self.create_aggregate_exprs(*op)?; @@ -189,10 +189,10 @@ impl PromPlanner { PromExpr::VectorSelector(VectorSelector { name: _, offset, - label_matchers, + matchers, at: _, }) => { - let matchers = self.preprocess_label_matchers(label_matchers)?; + let matchers = self.preprocess_label_matchers(matchers)?; self.setup_context()?; let normalize = self.selector_to_series_normalize_plan(offset, matchers)?; let manipulate = InstantManipulate::new( @@ -215,11 +215,9 @@ impl PromPlanner { range, }) => { let VectorSelector { - offset, - label_matchers, - .. + offset, matchers, .. } = vector_selector; - let matchers = self.preprocess_label_matchers(label_matchers)?; + let matchers = self.preprocess_label_matchers(matchers)?; self.setup_context()?; let normalize = self.selector_to_series_normalize_plan(offset, matchers)?; let manipulate = RangeManipulate::new( @@ -581,7 +579,7 @@ impl PromPlanner { } fn create_aggregate_exprs(&self, op: TokenType) -> Result> { - let aggr = match op { + let aggr = match op.id() { token::T_SUM => AggregateFunctionEnum::Sum, token::T_AVG => AggregateFunctionEnum::Avg, token::T_COUNT => AggregateFunctionEnum::Count, @@ -649,7 +647,7 @@ impl PromPlanner { } fn prom_token_to_binary_op(token: TokenType) -> Result { - match token { + match token.id() { token::T_ADD => Ok(Operator::Plus), token::T_SUB => Ok(Operator::Minus), token::T_MUL => Ok(Operator::Multiply), @@ -730,11 +728,7 @@ mod test { use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use datatypes::prelude::ConcreteDataType; use datatypes::schema::{ColumnSchema, Schema}; - use promql_parser::label::Matcher; - use promql_parser::parser::{ - BinModifier, FunctionArgs as PromFunctionArgs, ValueType, VectorMatchCardinality, - VectorMatchModifier, - }; + use promql_parser::parser; use query::query_engine::QueryEngineState; use query::DfContextProviderAdapter; use session::context::QueryContext; @@ -818,37 +812,8 @@ mod test { // }, // }, async fn do_single_instant_function_call(fn_name: &'static str, plan_name: &str) { - let prom_expr = PromExpr::Call(Call { - func: Function { - name: fn_name, - arg_types: vec![ValueType::Vector], - variadic: false, - return_type: ValueType::Vector, - }, - args: PromFunctionArgs { - args: vec![Box::new(PromExpr::VectorSelector(VectorSelector { - name: Some("some_metric".to_owned()), - offset: None, - at: None, - label_matchers: Matchers { - matchers: vec![ - Matcher { - op: MatchOp::NotEqual, - name: "tag_0".to_string(), - value: "bar".to_string(), - }, - Matcher { - op: MatchOp::Equal, - name: METRIC_NAME.to_string(), - value: "some_metric".to_string(), - }, - ] - .into_iter() - .collect(), - }, - }))], - }, - }); + let prom_expr = + parser::parse(&format!("{fn_name}(some_metric{{tag_0!=\"bar\"}})")).unwrap(); let eval_stmt = EvalStmt { expr: prom_expr, start: UNIX_EPOCH, @@ -1046,33 +1011,9 @@ mod test { // }, // }, // }, - async fn do_aggregate_expr_plan(op: TokenType, name: &str) { - let prom_expr = PromExpr::Aggregate(AggregateExpr { - op, - expr: Box::new(PromExpr::VectorSelector(VectorSelector { - name: Some("some_metric".to_owned()), - offset: None, - at: None, - label_matchers: Matchers { - matchers: vec![ - Matcher { - op: MatchOp::NotEqual, - name: "tag_0".to_string(), - value: "bar".to_string(), - }, - Matcher { - op: MatchOp::Equal, - name: METRIC_NAME.to_string(), - value: "some_metric".to_string(), - }, - ] - .into_iter() - .collect(), - }, - })), - param: None, - grouping: AggModifier::By(vec![String::from("tag_1")].into_iter().collect()), - }); + async fn do_aggregate_expr_plan(name: &str) { + let prom_expr = + parser::parse(&format!("{name} by (tag_1)(some_metric{{tag_0!=\"bar\"}})",)).unwrap(); let mut eval_stmt = EvalStmt { expr: prom_expr, start: UNIX_EPOCH, @@ -1101,8 +1042,8 @@ mod test { ); // test group without - if let PromExpr::Aggregate(AggregateExpr { grouping, .. }) = &mut eval_stmt.expr { - *grouping = AggModifier::Without(vec![String::from("tag_1")].into_iter().collect()); + if let PromExpr::Aggregate(AggregateExpr { modifier, .. }) = &mut eval_stmt.expr { + *modifier = AggModifier::Without(vec![String::from("tag_1")].into_iter().collect()); } let context_provider = build_test_context_provider("some_metric".to_string(), 2, 2).await; let plan = PromPlanner::stmt_to_plan(eval_stmt, context_provider).unwrap(); @@ -1120,69 +1061,69 @@ mod test { #[tokio::test] async fn aggregate_sum() { - do_aggregate_expr_plan(token::T_SUM, "SUM").await; + do_aggregate_expr_plan("SUM").await; } #[tokio::test] async fn aggregate_avg() { - do_aggregate_expr_plan(token::T_AVG, "AVG").await; + do_aggregate_expr_plan("AVG").await; } #[tokio::test] #[should_panic] // output type doesn't match async fn aggregate_count() { - do_aggregate_expr_plan(token::T_COUNT, "COUNT").await; + do_aggregate_expr_plan("COUNT").await; } #[tokio::test] async fn aggregate_min() { - do_aggregate_expr_plan(token::T_MIN, "MIN").await; + do_aggregate_expr_plan("MIN").await; } #[tokio::test] async fn aggregate_max() { - do_aggregate_expr_plan(token::T_MAX, "MAX").await; + do_aggregate_expr_plan("MAX").await; } #[tokio::test] #[should_panic] // output type doesn't match async fn aggregate_group() { - do_aggregate_expr_plan(token::T_GROUP, "GROUPING").await; + do_aggregate_expr_plan("GROUPING").await; } #[tokio::test] async fn aggregate_stddev() { - do_aggregate_expr_plan(token::T_STDDEV, "STDDEV").await; + do_aggregate_expr_plan("STDDEV").await; } #[tokio::test] #[should_panic] // schema doesn't match async fn aggregate_stdvar() { - do_aggregate_expr_plan(token::T_STDVAR, "STDVAR").await; + do_aggregate_expr_plan("STDVAR").await; } #[tokio::test] #[should_panic] async fn aggregate_top_k() { - do_aggregate_expr_plan(token::T_TOPK, "").await; + do_aggregate_expr_plan("").await; } #[tokio::test] #[should_panic] async fn aggregate_bottom_k() { - do_aggregate_expr_plan(token::T_BOTTOMK, "").await; + do_aggregate_expr_plan("").await; } #[tokio::test] #[should_panic] async fn aggregate_count_values() { - do_aggregate_expr_plan(token::T_COUNT_VALUES, "").await; + do_aggregate_expr_plan("").await; } #[tokio::test] #[should_panic] async fn aggregate_quantile() { - do_aggregate_expr_plan(token::T_QUANTILE, "").await; + do_aggregate_expr_plan("").await; } // TODO(ruihang): add range fn tests once exprs are ready. @@ -1210,57 +1151,8 @@ mod test { // }, #[tokio::test] async fn binary_op_column_column() { - let prom_expr = PromExpr::Binary(PromBinaryExpr { - lhs: Box::new(PromExpr::VectorSelector(VectorSelector { - name: Some("some_metric".to_owned()), - offset: None, - at: None, - label_matchers: Matchers { - matchers: vec![ - Matcher { - op: MatchOp::Equal, - name: "tag_0".to_string(), - value: "foo".to_string(), - }, - Matcher { - op: MatchOp::Equal, - name: METRIC_NAME.to_string(), - value: "some_metric".to_string(), - }, - ] - .into_iter() - .collect(), - }, - })), - op: token::T_ADD, - rhs: Box::new(PromExpr::VectorSelector(VectorSelector { - name: Some("some_metric".to_owned()), - offset: None, - at: None, - label_matchers: Matchers { - matchers: vec![ - Matcher { - op: MatchOp::Equal, - name: "tag_0".to_string(), - value: "bar".to_string(), - }, - Matcher { - op: MatchOp::Equal, - name: METRIC_NAME.to_string(), - value: "some_metric".to_string(), - }, - ] - .into_iter() - .collect(), - }, - })), - matching: BinModifier { - card: VectorMatchCardinality::OneToOne, - matching: VectorMatchModifier::Ignoring(HashSet::new()), - return_bool: false, - }, - }); - + let prom_expr = + parser::parse(r#"some_metric{tag_0="foo"} + some_metric{tag_0="bar"}"#).unwrap(); let eval_stmt = EvalStmt { expr: prom_expr, start: UNIX_EPOCH, @@ -1297,37 +1189,7 @@ mod test { #[tokio::test] async fn binary_op_literal_column() { - let prom_expr = PromExpr::Binary(PromBinaryExpr { - lhs: Box::new(PromExpr::NumberLiteral(NumberLiteral { val: 1.0 })), - op: token::T_ADD, - rhs: Box::new(PromExpr::VectorSelector(VectorSelector { - name: Some("some_metric".to_owned()), - offset: None, - at: None, - label_matchers: Matchers { - matchers: vec![ - Matcher { - op: MatchOp::Equal, - name: "tag_0".to_string(), - value: "bar".to_string(), - }, - Matcher { - op: MatchOp::Equal, - name: METRIC_NAME.to_string(), - value: "some_metric".to_string(), - }, - ] - .into_iter() - .collect(), - }, - })), - matching: BinModifier { - card: VectorMatchCardinality::OneToOne, - matching: VectorMatchModifier::Ignoring(HashSet::new()), - return_bool: false, - }, - }); - + let prom_expr = parser::parse(r#"1 + some_metric{tag_0="bar"}"#).unwrap(); let eval_stmt = EvalStmt { expr: prom_expr, start: UNIX_EPOCH, @@ -1354,19 +1216,10 @@ mod test { assert_eq!(plan.display_indent_schema().to_string(), expected); } + // TODO(ruihang): pure literal arithmetic is not supported yet. #[tokio::test] async fn binary_op_literal_literal() { - let prom_expr = PromExpr::Binary(PromBinaryExpr { - lhs: Box::new(PromExpr::NumberLiteral(NumberLiteral { val: 1.0 })), - op: token::T_ADD, - rhs: Box::new(PromExpr::NumberLiteral(NumberLiteral { val: 1.0 })), - matching: BinModifier { - card: VectorMatchCardinality::OneToOne, - matching: VectorMatchModifier::Ignoring(HashSet::new()), - return_bool: false, - }, - }); - + let prom_expr = parser::parse(r#"1 + 1"#).unwrap(); let eval_stmt = EvalStmt { expr: prom_expr, start: UNIX_EPOCH, diff --git a/src/query/Cargo.toml b/src/query/Cargo.toml index 031622c6d1..c04a4ab94b 100644 --- a/src/query/Cargo.toml +++ b/src/query/Cargo.toml @@ -28,7 +28,7 @@ futures-util.workspace = true metrics = "0.20" once_cell = "1.10" promql = { path = "../promql" } -promql-parser = { git = "https://github.com/GreptimeTeam/promql-parser.git", rev = "d027ce428a6a2df5a652b8558608c77d33c31644" } +promql-parser = { git = "https://github.com/GreptimeTeam/promql-parser.git", rev = "fa186978a1234baf5a3e372da03aa663d859cdd2" } serde.workspace = true serde_json = "1.0" session = { path = "../session" }