From f9ea6b63bf647f1af1e63bc17a9f22080db30cfd Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Tue, 25 Apr 2023 11:08:14 +0800 Subject: [PATCH] feat: impl instant query and add tests (#1452) * feat: impl instant query and add tests Signed-off-by: Ruihang Xia * clean up Signed-off-by: Ruihang Xia --------- Signed-off-by: Ruihang Xia --- Cargo.lock | 2 +- src/api/Cargo.toml | 2 +- src/client/src/client.rs | 6 + src/servers/src/grpc/prom_query_gateway.rs | 15 ++- src/servers/src/prom.rs | 30 ++--- tests-integration/tests/grpc.rs | 142 ++++++++++++++++++++- 6 files changed, 170 insertions(+), 27 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 6211ae3442..2c96765f10 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3823,7 +3823,7 @@ checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b" [[package]] name = "greptime-proto" version = "0.1.0" -source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=57e6a032db70264ec394cbb8a2f327ad33705d76#57e6a032db70264ec394cbb8a2f327ad33705d76" +source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=a26c40c004f998180b8acd853b22f083773f36b9#a26c40c004f998180b8acd853b22f083773f36b9" dependencies = [ "prost", "tonic 0.9.1", diff --git a/src/api/Cargo.toml b/src/api/Cargo.toml index 5e068c05fe..1f6163b99f 100644 --- a/src/api/Cargo.toml +++ b/src/api/Cargo.toml @@ -10,7 +10,7 @@ common-base = { path = "../common/base" } common-error = { path = "../common/error" } common-time = { path = "../common/time" } datatypes = { path = "../datatypes" } -greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "57e6a032db70264ec394cbb8a2f327ad33705d76" } +greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "a26c40c004f998180b8acd853b22f083773f36b9" } prost.workspace = true snafu = { version = "0.7", features = ["backtraces"] } tonic.workspace = true diff --git a/src/client/src/client.rs b/src/client/src/client.rs index d85931aa55..30862a1f53 100644 --- a/src/client/src/client.rs +++ b/src/client/src/client.rs @@ -16,6 +16,7 @@ use std::sync::Arc; use api::v1::greptime_database_client::GreptimeDatabaseClient; use api::v1::health_check_client::HealthCheckClient; +use api::v1::prometheus_gateway_client::PrometheusGatewayClient; use api::v1::HealthCheckRequest; use arrow_flight::flight_service_client::FlightServiceClient; use common_grpc::channel_manager::ChannelManager; @@ -156,6 +157,11 @@ impl Client { }) } + pub fn make_prometheus_gateway_client(&self) -> Result> { + let (_, channel) = self.find_channel()?; + Ok(PrometheusGatewayClient::new(channel)) + } + pub async fn health_check(&self) -> Result<()> { let (_, channel) = self.find_channel()?; let mut client = HealthCheckClient::new(channel); diff --git a/src/servers/src/grpc/prom_query_gateway.rs b/src/servers/src/grpc/prom_query_gateway.rs index fd33026d9d..71189e42b3 100644 --- a/src/servers/src/grpc/prom_query_gateway.rs +++ b/src/servers/src/grpc/prom_query_gateway.rs @@ -20,8 +20,10 @@ use api::v1::promql_request::Promql; use api::v1::{PromqlRequest, PromqlResponse, ResponseHeader}; use async_trait::async_trait; use query::parser::PromQuery; +use snafu::OptionExt; use tonic::{Request, Response}; +use crate::error::InvalidQuerySnafu; use crate::grpc::handler::create_query_context; use crate::grpc::TonicResult; use crate::prom::{retrieve_metric_name, PromHandlerRef, PromJsonResponse}; @@ -34,14 +36,21 @@ pub struct PrometheusGatewayService { impl PrometheusGateway for PrometheusGatewayService { async fn handle(&self, req: Request) -> TonicResult> { let inner = req.into_inner(); - let prom_query = match inner.promql { - Some(Promql::RangeQuery(range_query)) => PromQuery { + let prom_query = match inner.promql.context(InvalidQuerySnafu { + reason: "Expecting non-empty PromqlRequest.", + })? { + Promql::RangeQuery(range_query) => PromQuery { query: range_query.query, start: range_query.start, end: range_query.end, step: range_query.step, }, - _ => unimplemented!(), + Promql::InstantQuery(instant_query) => PromQuery { + query: instant_query.query, + start: instant_query.time.clone(), + end: instant_query.time, + step: String::from("1s"), + }, }; let query_context = create_query_context(inner.header.as_ref()); diff --git a/src/servers/src/prom.rs b/src/servers/src/prom.rs index 641c3f660b..de43841b5b 100644 --- a/src/servers/src/prom.rs +++ b/src/servers/src/prom.rs @@ -13,7 +13,7 @@ // limitations under the License. //! prom supply the prometheus HTTP API Server compliance -use std::collections::HashMap; +use std::collections::{BTreeMap, HashMap}; use std::net::SocketAddr; use std::sync::Arc; @@ -156,30 +156,30 @@ impl Server for PromServer { } } -#[derive(Debug, Default, Serialize, Deserialize, JsonSchema)] +#[derive(Debug, Default, Serialize, Deserialize, JsonSchema, PartialEq)] pub struct PromSeries { - metric: HashMap, - values: Vec<(f64, String)>, + pub metric: HashMap, + pub values: Vec<(f64, String)>, } -#[derive(Debug, Default, Serialize, Deserialize, JsonSchema)] +#[derive(Debug, Default, Serialize, Deserialize, JsonSchema, PartialEq)] pub struct PromData { #[serde(rename = "resultType")] - result_type: String, - result: Vec, + pub result_type: String, + pub result: Vec, } -#[derive(Debug, Default, Serialize, Deserialize, JsonSchema)] +#[derive(Debug, Default, Serialize, Deserialize, JsonSchema, PartialEq)] pub struct PromJsonResponse { - status: String, - data: PromData, + pub status: String, + pub data: PromData, #[serde(skip_serializing_if = "Option::is_none")] - error: Option, + pub error: Option, #[serde(skip_serializing_if = "Option::is_none")] #[serde(rename = "errorType")] - error_type: Option, + pub error_type: Option, #[serde(skip_serializing_if = "Option::is_none")] - warnings: Option>, + pub warnings: Option>, } impl PromJsonResponse { @@ -281,7 +281,7 @@ impl PromJsonResponse { })?; let metric_name = (METRIC_NAME.to_string(), metric_name); - let mut buffer = HashMap::, Vec<(f64, String)>>::new(); + let mut buffer = BTreeMap::, Vec<(f64, String)>>::new(); for batch in batches.iter() { // prepare things... @@ -338,7 +338,7 @@ impl PromJsonResponse { metric: tags.into_iter().collect(), values, }) - .collect(); + .collect::>(); let data = PromData { result_type: "matrix".to_string(), diff --git a/tests-integration/tests/grpc.rs b/tests-integration/tests/grpc.rs index c6ebb26898..71b975260c 100644 --- a/tests-integration/tests/grpc.rs +++ b/tests-integration/tests/grpc.rs @@ -11,15 +11,18 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. + use api::v1::alter_expr::Kind; use api::v1::column::SemanticType; +use api::v1::promql_request::Promql; use api::v1::{ column, AddColumn, AddColumns, AlterExpr, Column, ColumnDataType, ColumnDef, CreateTableExpr, - InsertRequest, TableId, + InsertRequest, PromInstantQuery, PromRangeQuery, PromqlRequest, RequestHeader, TableId, }; use client::{Client, Database, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use common_catalog::consts::{MIN_USER_TABLE_ID, MITO_ENGINE}; use common_query::Output; +use servers::prom::{PromData, PromJsonResponse, PromSeries}; use servers::server::Server; use tests_integration::test_util::{setup_grpc_server, StorageType}; @@ -53,18 +56,20 @@ macro_rules! grpc_tests { grpc_test!( $service, + test_invalid_dbname, test_auto_create_table, test_insert_and_select, test_dbname, + test_health_check, + test_prom_gateway_query, ); )* }; } -#[tokio::test] -pub async fn test_invalid_dbname() { +pub async fn test_invalid_dbname(store_type: StorageType) { let (addr, mut guard, fe_grpc_server) = - setup_grpc_server(StorageType::File, "auto_create_table").await; + setup_grpc_server(store_type, "auto_create_table").await; let grpc_client = Client::with_urls(vec![addr]); let db = Database::new_with_dbname("tom", grpc_client); @@ -308,10 +313,9 @@ fn testing_create_expr() -> CreateTableExpr { } } -#[tokio::test] -pub async fn test_health_check() { +pub async fn test_health_check(store_type: StorageType) { let (addr, mut guard, fe_grpc_server) = - setup_grpc_server(StorageType::File, "auto_create_table").await; + setup_grpc_server(store_type, "auto_create_table").await; let grpc_client = Client::with_urls(vec![addr]); let r = grpc_client.health_check().await; @@ -320,3 +324,127 @@ pub async fn test_health_check() { let _ = fe_grpc_server.shutdown().await; guard.remove_all().await; } + +pub async fn test_prom_gateway_query(store_type: StorageType) { + // prepare connection + let (addr, mut guard, fe_grpc_server) = setup_grpc_server(store_type, "prom_gateway").await; + let grpc_client = Client::with_urls(vec![addr]); + let db = Database::new( + DEFAULT_CATALOG_NAME, + DEFAULT_SCHEMA_NAME, + grpc_client.clone(), + ); + let mut gateway_client = grpc_client.make_prometheus_gateway_client().unwrap(); + + // create table and insert data + db.sql("CREATE TABLE test(i DOUBLE, j TIMESTAMP TIME INDEX, k STRING PRIMARY KEY);") + .await + .unwrap(); + db.sql(r#"INSERT INTO test VALUES (1, 1, "a"), (1, 1, "b"), (2, 2, "a");"#) + .await + .unwrap(); + + // Instant query using prometheus gateway service + let header = RequestHeader { + dbname: "public".to_string(), + ..Default::default() + }; + let instant_query = PromInstantQuery { + query: "test".to_string(), + time: "5".to_string(), + }; + let instant_query_request = PromqlRequest { + header: Some(header.clone()), + promql: Some(Promql::InstantQuery(instant_query)), + }; + let json_bytes = gateway_client + .handle(instant_query_request) + .await + .unwrap() + .into_inner() + .body; + let instant_query_result = serde_json::from_slice::(&json_bytes).unwrap(); + let expected = PromJsonResponse { + status: "success".to_string(), + data: PromData { + result_type: "matrix".to_string(), + result: vec![ + PromSeries { + metric: [ + ("k".to_string(), "a".to_string()), + ("__name__".to_string(), "test".to_string()), + ] + .into_iter() + .collect(), + values: vec![(5.0, "2".to_string())], + }, + PromSeries { + metric: [ + ("__name__".to_string(), "test".to_string()), + ("k".to_string(), "b".to_string()), + ] + .into_iter() + .collect(), + values: vec![(5.0, "1".to_string())], + }, + ], + }, + error: None, + error_type: None, + warnings: None, + }; + assert_eq!(instant_query_result, expected); + + // Range query using prometheus gateway service + let range_query = PromRangeQuery { + query: "test".to_string(), + start: "0".to_string(), + end: "10".to_string(), + step: "5s".to_string(), + }; + let range_query_request: PromqlRequest = PromqlRequest { + header: Some(header), + promql: Some(Promql::RangeQuery(range_query)), + }; + let json_bytes = gateway_client + .handle(range_query_request) + .await + .unwrap() + .into_inner() + .body; + let range_query_result = serde_json::from_slice::(&json_bytes).unwrap(); + let expected = PromJsonResponse { + status: "success".to_string(), + data: PromData { + result_type: "matrix".to_string(), + result: vec![ + PromSeries { + metric: [ + ("__name__".to_string(), "test".to_string()), + ("k".to_string(), "a".to_string()), + ] + .into_iter() + .collect(), + values: vec![(5.0, "2".to_string()), (10.0, "2".to_string())], + }, + PromSeries { + metric: [ + ("__name__".to_string(), "test".to_string()), + ("k".to_string(), "b".to_string()), + ] + .into_iter() + .collect(), + values: vec![(5.0, "1".to_string()), (10.0, "1".to_string())], + }, + ], + }, + error: None, + error_type: None, + warnings: None, + }; + assert_eq!(range_query_result, expected); + + // clean up + let _ = fe_grpc_server.shutdown().await; + guard.remove_all().await; +}