From 85dd7e4f2470d28de74fba36e1f5e785c70c76eb Mon Sep 17 00:00:00 2001 From: Ning Sun Date: Mon, 13 Mar 2023 15:24:34 +0800 Subject: [PATCH] feat: implement promql query on grpc (#1134) * feat: implement promql query on grpc * test: resolve test errors * test: add tests for promql grpc api * refactor: align prom object name with proto * chore: switch proto revision to main --- Cargo.lock | 2 +- src/api/Cargo.toml | 2 +- src/client/src/database.rs | 20 +++++- src/datanode/src/instance/grpc.rs | 11 ++- src/frontend/src/instance/grpc.rs | 114 ++++++++++++++++++++++++++++++ src/servers/tests/mod.rs | 17 +++++ 6 files changed, 162 insertions(+), 4 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index b668be39da..dbd4ab135f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3062,7 +3062,7 @@ checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b" [[package]] name = "greptime-proto" version = "0.1.0" -source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=ad0187295035e83f76272da553453e649b7570de#ad0187295035e83f76272da553453e649b7570de" +source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=3a715150563b89d5dfc81a5838eac1f66a5658a1#3a715150563b89d5dfc81a5838eac1f66a5658a1" dependencies = [ "prost", "tonic", diff --git a/src/api/Cargo.toml b/src/api/Cargo.toml index bc4f957659..dbb7b8ee9d 100644 --- a/src/api/Cargo.toml +++ b/src/api/Cargo.toml @@ -10,7 +10,7 @@ common-base = { path = "../common/base" } common-error = { path = "../common/error" } common-time = { path = "../common/time" } datatypes = { path = "../datatypes" } -greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "ad0187295035e83f76272da553453e649b7570de" } +greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "3a715150563b89d5dfc81a5838eac1f66a5658a1" } prost.workspace = true snafu = { version = "0.7", features = ["backtraces"] } tonic.workspace = true diff --git a/src/client/src/database.rs b/src/client/src/database.rs index f76365d3a4..ab86c12de8 100644 --- a/src/client/src/database.rs +++ b/src/client/src/database.rs @@ -20,7 +20,7 @@ use api::v1::greptime_request::Request; use api::v1::query_request::Query; use api::v1::{ AlterExpr, AuthHeader, CreateTableExpr, DdlRequest, DropTableExpr, GreptimeRequest, - InsertRequest, QueryRequest, RequestHeader, + InsertRequest, PromRangeQuery, QueryRequest, RequestHeader, }; use arrow_flight::{FlightData, Ticket}; use common_error::prelude::*; @@ -96,6 +96,24 @@ impl Database { .await } + pub async fn prom_range_query( + &self, + promql: &str, + start: &str, + end: &str, + step: &str, + ) -> Result { + self.do_get(Request::Query(QueryRequest { + query: Some(Query::PromRangeQuery(PromRangeQuery { + query: promql.to_string(), + start: start.to_string(), + end: end.to_string(), + step: step.to_string(), + })), + })) + .await + } + pub async fn create(&self, expr: CreateTableExpr) -> Result { self.do_get(Request::Ddl(DdlRequest { expr: Some(DdlExpr::CreateTable(expr)), diff --git a/src/datanode/src/instance/grpc.rs b/src/datanode/src/instance/grpc.rs index 8c229520fb..8b010f2ddf 100644 --- a/src/datanode/src/instance/grpc.rs +++ b/src/datanode/src/instance/grpc.rs @@ -18,7 +18,7 @@ use api::v1::query_request::Query; use api::v1::{CreateDatabaseExpr, DdlRequest, InsertRequest}; use async_trait::async_trait; use common_query::Output; -use query::parser::QueryLanguageParser; +use query::parser::{PromQuery, QueryLanguageParser}; use query::plan::LogicalPlan; use servers::query_handler::grpc::GrpcQueryHandler; use session::context::QueryContextRef; @@ -61,6 +61,15 @@ impl Instance { self.execute_stmt(stmt, ctx).await? } Query::LogicalPlan(plan) => self.execute_logical(plan).await?, + Query::PromRangeQuery(promql) => { + let prom_query = PromQuery { + query: promql.query, + start: promql.start, + end: promql.end, + step: promql.step, + }; + self.execute_promql(&prom_query, ctx).await? + } }) } diff --git a/src/frontend/src/instance/grpc.rs b/src/frontend/src/instance/grpc.rs index 1e857b43c1..e4c6eee442 100644 --- a/src/frontend/src/instance/grpc.rs +++ b/src/frontend/src/instance/grpc.rs @@ -16,6 +16,7 @@ use api::v1::greptime_request::Request; use api::v1::query_request::Query; use async_trait::async_trait; use common_query::Output; +use query::parser::PromQuery; use servers::query_handler::grpc::GrpcQueryHandler; use servers::query_handler::sql::SqlQueryHandler; use session::context::QueryContextRef; @@ -54,6 +55,23 @@ impl GrpcQueryHandler for Instance { } .fail(); } + Query::PromRangeQuery(promql) => { + let prom_query = PromQuery { + query: promql.query, + start: promql.start, + end: promql.end, + step: promql.step, + }; + let mut result = + SqlQueryHandler::do_promql_query(self, &prom_query, ctx).await; + ensure!( + result.len() == 1, + error::NotSupportedSnafu { + feat: "execute multiple statements in PromQL query string through GRPC interface" + } + ); + result.remove(0)? + } } } Request::Ddl(request) => { @@ -542,4 +560,100 @@ CREATE TABLE {table_name} ( +---------------------+---+---+"; assert_eq!(recordbatches.pretty_print().unwrap(), expected); } + + #[tokio::test(flavor = "multi_thread")] + async fn test_promql_query() { + common_telemetry::init_default_ut_logging(); + + let standalone = tests::create_standalone_instance("test_standalone_promql_query").await; + let instance = &standalone.instance; + + let table_name = "my_table"; + let sql = format!("CREATE TABLE {table_name} (h string, a double, ts TIMESTAMP, TIME INDEX (ts), PRIMARY KEY(h))"); + create_table(instance, sql).await; + + let insert = InsertRequest { + table_name: table_name.to_string(), + columns: vec![ + Column { + column_name: "h".to_string(), + values: Some(Values { + string_values: vec![ + "t".to_string(), + "t".to_string(), + "t".to_string(), + "t".to_string(), + "t".to_string(), + "t".to_string(), + "t".to_string(), + "t".to_string(), + ], + ..Default::default() + }), + semantic_type: SemanticType::Tag as i32, + datatype: ColumnDataType::String as i32, + ..Default::default() + }, + Column { + column_name: "a".to_string(), + values: Some(Values { + f64_values: vec![1f64, 11f64, 20f64, 22f64, 50f64, 55f64, 99f64], + ..Default::default() + }), + null_mask: vec![4], + semantic_type: SemanticType::Field as i32, + datatype: ColumnDataType::Float64 as i32, + }, + Column { + column_name: "ts".to_string(), + values: Some(Values { + ts_millisecond_values: vec![ + 1672557972000, + 1672557973000, + 1672557974000, + 1672557975000, + 1672557976000, + 1672557977000, + 1672557978000, + 1672557979000, + ], + ..Default::default() + }), + semantic_type: SemanticType::Timestamp as i32, + datatype: ColumnDataType::TimestampMillisecond as i32, + ..Default::default() + }, + ], + row_count: 8, + ..Default::default() + }; + + let request = Request::Insert(insert); + let output = query(instance, request).await; + assert!(matches!(output, Output::AffectedRows(8))); + + let request = Request::Query(QueryRequest { + query: Some(Query::PromRangeQuery(api::v1::PromRangeQuery { + query: "my_table".to_owned(), + start: "1672557973".to_owned(), + end: "1672557978".to_owned(), + step: "1s".to_owned(), + })), + }); + let output = query(instance, request).await; + let Output::Stream(stream) = output else { unreachable!() }; + let recordbatches = RecordBatches::try_collect(stream).await.unwrap(); + let expected = "\ ++---+------+---------------------+ +| h | a | ts | ++---+------+---------------------+ +| t | 11.0 | 2023-01-01T07:26:13 | +| t | | 2023-01-01T07:26:14 | +| t | 20.0 | 2023-01-01T07:26:15 | +| t | 22.0 | 2023-01-01T07:26:16 | +| t | 50.0 | 2023-01-01T07:26:17 | +| t | 55.0 | 2023-01-01T07:26:18 | ++---+------+---------------------+"; + assert_eq!(recordbatches.pretty_print().unwrap(), expected); + } } diff --git a/src/servers/tests/mod.rs b/src/servers/tests/mod.rs index 4b3a472085..6d5d41996f 100644 --- a/src/servers/tests/mod.rs +++ b/src/servers/tests/mod.rs @@ -175,6 +175,23 @@ impl GrpcQueryHandler for DummyInstance { result.remove(0)? } Query::LogicalPlan(_) => unimplemented!(), + Query::PromRangeQuery(promql) => { + let prom_query = PromQuery { + query: promql.query, + start: promql.start, + end: promql.end, + step: promql.step, + }; + let mut result = + SqlQueryHandler::do_promql_query(self, &prom_query, ctx).await; + ensure!( + result.len() == 1, + NotSupportedSnafu { + feat: "execute multiple statements in PromQL query string through GRPC interface" + } + ); + result.remove(0)? + } } } Request::Ddl(_) => unimplemented!(),