diff --git a/src/servers/src/grpc/prom_query_gateway.rs b/src/servers/src/grpc/prom_query_gateway.rs index 37f779af37..14b20edb22 100644 --- a/src/servers/src/grpc/prom_query_gateway.rs +++ b/src/servers/src/grpc/prom_query_gateway.rs @@ -15,14 +15,18 @@ //! PrometheusGateway provides a gRPC interface to query Prometheus metrics //! by PromQL. The behavior is similar to the Prometheus HTTP API. +use std::sync::Arc; + use api::v1::prometheus_gateway_server::PrometheusGateway; use api::v1::promql_request::Promql; use api::v1::{PromqlRequest, PromqlResponse, ResponseHeader}; use async_trait::async_trait; +use common_error::prelude::ErrorExt; use common_telemetry::timer; use common_time::util::current_time_rfc3339; use promql_parser::parser::ValueType; use query::parser::PromQuery; +use session::context::QueryContext; use snafu::OptionExt; use tonic::{Request, Response}; @@ -68,23 +72,9 @@ impl PrometheusGateway for PrometheusGatewayService { }; let query_context = create_query_context(inner.header.as_ref()); - let _timer = timer!( - crate::metrics::METRIC_SERVER_GRPC_PROM_REQUEST_TIMER, - &[( - crate::metrics::METRIC_DB_LABEL, - query_context.get_db_string() - )] - ); - let result = self.handler.do_query(&prom_query, query_context).await; - let (metric_name, mut result_type) = - retrieve_metric_name_and_result_type(&prom_query.query).unwrap_or_default(); - // range query only returns matrix - if is_range_query { - result_type = Some(ValueType::Matrix) - }; - let json_response = PromJsonResponse::from_query_result(result, metric_name, result_type) - .await - .0; + let json_response = self + .handle_inner(prom_query, query_context, is_range_query) + .await; let json_bytes = serde_json::to_string(&json_response).unwrap().into_bytes(); let response = Response::new(PromqlResponse { @@ -99,4 +89,34 @@ impl PrometheusGatewayService { pub fn new(handler: PromHandlerRef) -> Self { Self { handler } } + + async fn handle_inner( + &self, + query: PromQuery, + ctx: Arc, + is_range_query: bool, + ) -> PromJsonResponse { + let _timer = timer!( + crate::metrics::METRIC_SERVER_GRPC_PROM_REQUEST_TIMER, + &[(crate::metrics::METRIC_DB_LABEL, ctx.get_db_string())] + ); + + let result = self.handler.do_query(&query, ctx).await; + let (metric_name, mut result_type) = + match retrieve_metric_name_and_result_type(&query.query) { + Ok((metric_name, result_type)) => (metric_name.unwrap_or_default(), result_type), + Err(err) => { + return PromJsonResponse::error(err.status_code().to_string(), err.to_string()) + .0 + } + }; + // range query only returns matrix + if is_range_query { + result_type = ValueType::Matrix; + }; + + PromJsonResponse::from_query_result(result, metric_name, result_type) + .await + .0 + } } diff --git a/src/servers/src/prom.rs b/src/servers/src/prom.rs index daae98feaf..3fcf2d2465 100644 --- a/src/servers/src/prom.rs +++ b/src/servers/src/prom.rs @@ -52,7 +52,7 @@ use tower_http::trace::TraceLayer; use crate::auth::UserProviderRef; use crate::error::{ - AlreadyStartedSnafu, CollectRecordbatchSnafu, Error, InternalSnafu, NotSupportedSnafu, Result, + AlreadyStartedSnafu, CollectRecordbatchSnafu, Error, InternalSnafu, InvalidQuerySnafu, Result, StartHttpSnafu, UnexpectedResultSnafu, }; use crate::http::authorize::HttpAuth; @@ -244,7 +244,7 @@ impl PromJsonResponse { pub async fn from_query_result( result: Result, metric_name: String, - result_type: Option, + result_type: ValueType, ) -> Json { let response: Result> = try { let json = match result? { @@ -271,7 +271,7 @@ impl PromJsonResponse { json }; - let result_type_string = result_type.map(|t| t.to_string()).unwrap_or_default(); + let result_type_string = result_type.to_string(); match response { Ok(resp) => resp, @@ -295,7 +295,7 @@ impl PromJsonResponse { fn record_batches_to_data( batches: RecordBatches, metric_name: String, - result_type: Option, + result_type: ValueType, ) -> Result { // infer semantic type of each column from schema. // TODO(ruihang): wish there is a better way to do this. @@ -390,27 +390,21 @@ impl PromJsonResponse { .map(|(tags, mut values)| { let metric = tags.into_iter().collect(); match result_type { - Some(ValueType::Vector) | Some(ValueType::Scalar) | Some(ValueType::String) => { - Ok(PromSeries { - metric, - value: values.pop(), - ..Default::default() - }) - } - Some(ValueType::Matrix) => Ok(PromSeries { + ValueType::Vector | ValueType::Scalar | ValueType::String => Ok(PromSeries { + metric, + value: values.pop(), + ..Default::default() + }), + ValueType::Matrix => Ok(PromSeries { metric, values, ..Default::default() }), - other => NotSupportedSnafu { - feat: format!("PromQL result type {other:?}"), - } - .fail(), } }) .collect::>>()?; - let result_type_string = result_type.map(|t| t.to_string()).unwrap_or_default(); + let result_type_string = result_type.to_string(); let data = PromResponse::PromData(PromData { result_type: result_type_string, result, @@ -452,8 +446,10 @@ pub async fn instant_query( let query_ctx = QueryContext::with(catalog, schema); let result = handler.do_query(&prom_query, Arc::new(query_ctx)).await; - let (metric_name, result_type) = - retrieve_metric_name_and_result_type(&prom_query.query).unwrap_or_default(); + let (metric_name, result_type) = match retrieve_metric_name_and_result_type(&prom_query.query) { + Ok((metric_name, result_type)) => (metric_name.unwrap_or_default(), result_type), + Err(err) => return PromJsonResponse::error(err.status_code().to_string(), err.to_string()), + }; PromJsonResponse::from_query_result(result, metric_name, result_type).await } @@ -486,9 +482,11 @@ pub async fn range_query( let query_ctx = QueryContext::with(catalog, schema); let result = handler.do_query(&prom_query, Arc::new(query_ctx)).await; - let (metric_name, _) = - retrieve_metric_name_and_result_type(&prom_query.query).unwrap_or_default(); - PromJsonResponse::from_query_result(result, metric_name, Some(ValueType::Matrix)).await + let metric_name = match retrieve_metric_name_and_result_type(&prom_query.query) { + Err(err) => return PromJsonResponse::error(err.status_code().to_string(), err.to_string()), + Ok((metric_name, _)) => metric_name.unwrap_or_default(), + }; + PromJsonResponse::from_query_result(result, metric_name, ValueType::Matrix).await } #[derive(Debug, Default, Serialize, JsonSchema)] @@ -723,12 +721,13 @@ fn record_batches_to_labels_name( pub(crate) fn retrieve_metric_name_and_result_type( promql: &str, -) -> Option<(String, Option)> { - let promql_expr = promql_parser::parser::parse(promql).ok()?; - let metric_name = promql_expr_to_metric_name(&promql_expr)?; - let result_type = Some(promql_expr.value_type()); +) -> Result<(Option, ValueType)> { + let promql_expr = promql_parser::parser::parse(promql) + .map_err(|reason| InvalidQuerySnafu { reason }.build())?; + let metric_name = promql_expr_to_metric_name(&promql_expr); + let result_type = promql_expr.value_type(); - Some((metric_name, result_type)) + Ok((metric_name, result_type)) } fn promql_expr_to_metric_name(expr: &PromqlExpr) -> Option { diff --git a/tests-integration/tests/grpc.rs b/tests-integration/tests/grpc.rs index f07409e21c..b9d59f289d 100644 --- a/tests-integration/tests/grpc.rs +++ b/tests-integration/tests/grpc.rs @@ -415,7 +415,7 @@ pub async fn test_prom_gateway_query(store_type: StorageType) { step: "5s".to_string(), }; let range_query_request: PromqlRequest = PromqlRequest { - header: Some(header), + header: Some(header.clone()), promql: Some(Promql::RangeQuery(range_query)), }; let json_bytes = gateway_client @@ -458,6 +458,36 @@ pub async fn test_prom_gateway_query(store_type: StorageType) { }; assert_eq!(range_query_result, expected); + // query nonexistent data + let range_query = PromRangeQuery { + query: "test".to_string(), + start: "1000000000".to_string(), + end: "1000001000".to_string(), + step: "5s".to_string(), + }; + let range_query_request: PromqlRequest = PromqlRequest { + header: Some(header), + promql: Some(Promql::RangeQuery(range_query)), + }; + let json_bytes = gateway_client + .handle(range_query_request) + .await + .unwrap() + .into_inner() + .body; + let range_query_result = serde_json::from_slice::(&json_bytes).unwrap(); + let expected = PromJsonResponse { + status: "success".to_string(), + data: PromResponse::PromData(PromData { + result_type: "matrix".to_string(), + result: vec![], + }), + error: None, + error_type: None, + warnings: None, + }; + assert_eq!(range_query_result, expected); + // clean up let _ = fe_grpc_server.shutdown().await; guard.remove_all().await;