diff --git a/src/servers/src/grpc/prom_query_gateway.rs b/src/servers/src/grpc/prom_query_gateway.rs index 95a1279f04..90bd8f31fa 100644 --- a/src/servers/src/grpc/prom_query_gateway.rs +++ b/src/servers/src/grpc/prom_query_gateway.rs @@ -21,6 +21,7 @@ use api::v1::{PromqlRequest, PromqlResponse, ResponseHeader}; use async_trait::async_trait; use common_telemetry::timer; use common_time::util::current_time_rfc3339; +use promql_parser::parser::ValueType; use query::parser::PromQuery; use snafu::OptionExt; use tonic::{Request, Response}; @@ -37,16 +38,20 @@ pub struct PrometheusGatewayService { #[async_trait] impl PrometheusGateway for PrometheusGatewayService { async fn handle(&self, req: Request) -> TonicResult> { + let mut is_range_query = false; let inner = req.into_inner(); let prom_query = match inner.promql.context(InvalidQuerySnafu { reason: "Expecting non-empty PromqlRequest.", })? { - Promql::RangeQuery(range_query) => PromQuery { - query: range_query.query, - start: range_query.start, - end: range_query.end, - step: range_query.step, - }, + Promql::RangeQuery(range_query) => { + is_range_query = true; + PromQuery { + query: range_query.query, + start: range_query.start, + end: range_query.end, + step: range_query.step, + } + } Promql::InstantQuery(instant_query) => { let time = if instant_query.time.is_empty() { current_time_rfc3339() @@ -71,8 +76,12 @@ impl PrometheusGateway for PrometheusGatewayService { )] ); let result = self.handler.do_query(&prom_query, query_context).await; - let (metric_name, result_type) = + let (metric_name, mut result_type) = retrieve_metric_name_and_result_type(&prom_query.query).unwrap_or_default(); + // range query only returns matrix + if is_range_query { + result_type = Some(ValueType::Matrix) + }; let json_response = PromJsonResponse::from_query_result(result, metric_name, result_type) .await .0; diff --git a/src/servers/src/prom.rs b/src/servers/src/prom.rs index d48d9a98d6..7dabf45f4e 100644 --- a/src/servers/src/prom.rs +++ b/src/servers/src/prom.rs @@ -35,7 +35,7 @@ use futures::FutureExt; use promql_parser::label::METRIC_NAME; use promql_parser::parser::{ AggregateExpr, BinaryExpr, Call, Expr as PromqlExpr, MatrixSelector, ParenExpr, SubqueryExpr, - UnaryExpr, VectorSelector, + UnaryExpr, ValueType, VectorSelector, }; use query::parser::PromQuery; use schemars::JsonSchema; @@ -51,7 +51,8 @@ use tower_http::trace::TraceLayer; use crate::auth::UserProviderRef; use crate::error::{ - AlreadyStartedSnafu, CollectRecordbatchSnafu, InternalSnafu, Result, StartHttpSnafu, + AlreadyStartedSnafu, CollectRecordbatchSnafu, InternalSnafu, NotSupportedSnafu, Result, + StartHttpSnafu, }; use crate::http::authorize::HttpAuth; use crate::server::Server; @@ -160,7 +161,11 @@ impl Server for PromServer { #[derive(Debug, Default, Serialize, Deserialize, JsonSchema, PartialEq)] pub struct PromSeries { pub metric: HashMap, + /// For [ValueType::Matrix] result type pub values: Vec<(f64, String)>, + /// For [ValueType::Vector] result type + #[serde(skip_serializing_if = "Option::is_none")] + pub value: Option<(f64, String)>, } #[derive(Debug, Default, Serialize, Deserialize, JsonSchema, PartialEq)] @@ -212,18 +217,24 @@ impl PromJsonResponse { pub async fn from_query_result( result: Result, metric_name: String, - result_type: String, + result_type: Option, ) -> Json { let response: Result> = try { let json = match result? { - Output::RecordBatches(batches) => { - Self::success(Self::record_batches_to_data(batches, metric_name)?) - } + Output::RecordBatches(batches) => Self::success(Self::record_batches_to_data( + batches, + metric_name, + result_type, + )?), Output::Stream(stream) => { let record_batches = RecordBatches::try_collect(stream) .await .context(CollectRecordbatchSnafu)?; - Self::success(Self::record_batches_to_data(record_batches, metric_name)?) + Self::success(Self::record_batches_to_data( + record_batches, + metric_name, + result_type, + )?) } Output::AffectedRows(_) => Self::error( "unexpected result", @@ -234,6 +245,8 @@ impl PromJsonResponse { json }; + let result_type_string = result_type.map(|t| t.to_string()).unwrap_or_default(); + match response { Ok(resp) => resp, Err(err) => { @@ -242,7 +255,7 @@ impl PromJsonResponse { || err.status_code() == StatusCode::TableColumnNotFound { Self::success(PromData { - result_type, + result_type: result_type_string, ..Default::default() }) } else { @@ -252,7 +265,12 @@ impl PromJsonResponse { } } - fn record_batches_to_data(batches: RecordBatches, metric_name: String) -> Result { + /// Convert [RecordBatches] to [PromData] + fn record_batches_to_data( + batches: RecordBatches, + metric_name: String, + result_type: Option, + ) -> Result { // infer semantic type of each column from schema. // TODO(ruihang): wish there is a better way to do this. let mut timestamp_column_index = None; @@ -321,8 +339,10 @@ impl PromJsonResponse { // TODO(ruihang): push table name `__metric__` let mut tags = vec![metric_name.clone()]; for (tag_column, tag_name) in tag_columns.iter().zip(tag_names.iter()) { - let tag_value = tag_column.get_data(row_index).unwrap().to_string(); - tags.push((tag_name.to_string(), tag_value)); + // TODO(ruihang): add test for NULL tag + if let Some(tag_value) = tag_column.get_data(row_index) { + tags.push((tag_name.to_string(), tag_value.to_string())); + } } // retrieve timestamp @@ -339,14 +359,30 @@ impl PromJsonResponse { let result = buffer .into_iter() - .map(|(tags, values)| PromSeries { - metric: tags.into_iter().collect(), - values, + .map(|(tags, mut values)| { + let metric = tags.into_iter().collect(); + match result_type { + Some(ValueType::Vector) => Ok(PromSeries { + metric, + value: values.pop(), + ..Default::default() + }), + Some(ValueType::Matrix) => Ok(PromSeries { + metric, + values, + ..Default::default() + }), + other => NotSupportedSnafu { + feat: format!("PromQL result type {other:?}"), + } + .fail(), + } }) - .collect::>(); + .collect::>>()?; + let result_type_string = result_type.map(|t| t.to_string()).unwrap_or_default(); let data = PromData { - result_type: "matrix".to_string(), + result_type: result_type_string, result, }; @@ -420,15 +456,17 @@ pub async fn range_query( let query_ctx = QueryContext::with(catalog, schema); let result = handler.do_query(&prom_query, Arc::new(query_ctx)).await; - let (metric_name, result_type) = + let (metric_name, _) = retrieve_metric_name_and_result_type(&prom_query.query).unwrap_or_default(); - PromJsonResponse::from_query_result(result, metric_name, result_type).await + PromJsonResponse::from_query_result(result, metric_name, Some(ValueType::Matrix)).await } -pub(crate) fn retrieve_metric_name_and_result_type(promql: &str) -> Option<(String, String)> { +pub(crate) fn retrieve_metric_name_and_result_type( + promql: &str, +) -> Option<(String, Option)> { let promql_expr = promql_parser::parser::parse(promql).ok()?; let metric_name = promql_expr_to_metric_name(&promql_expr)?; - let result_type = promql_expr.value_type().to_string(); + let result_type = Some(promql_expr.value_type()); Some((metric_name, result_type)) } diff --git a/tests-integration/tests/grpc.rs b/tests-integration/tests/grpc.rs index 71b975260c..9b620b241f 100644 --- a/tests-integration/tests/grpc.rs +++ b/tests-integration/tests/grpc.rs @@ -367,7 +367,7 @@ pub async fn test_prom_gateway_query(store_type: StorageType) { let expected = PromJsonResponse { status: "success".to_string(), data: PromData { - result_type: "matrix".to_string(), + result_type: "vector".to_string(), result: vec![ PromSeries { metric: [ @@ -376,7 +376,8 @@ pub async fn test_prom_gateway_query(store_type: StorageType) { ] .into_iter() .collect(), - values: vec![(5.0, "2".to_string())], + value: Some((5.0, "2".to_string())), + ..Default::default() }, PromSeries { metric: [ @@ -385,7 +386,8 @@ pub async fn test_prom_gateway_query(store_type: StorageType) { ] .into_iter() .collect(), - values: vec![(5.0, "1".to_string())], + value: Some((5.0, "1".to_string())), + ..Default::default() }, ], }, @@ -426,6 +428,7 @@ pub async fn test_prom_gateway_query(store_type: StorageType) { .into_iter() .collect(), values: vec![(5.0, "2".to_string()), (10.0, "2".to_string())], + ..Default::default() }, PromSeries { metric: [ @@ -435,6 +438,7 @@ pub async fn test_prom_gateway_query(store_type: StorageType) { .into_iter() .collect(), values: vec![(5.0, "1".to_string()), (10.0, "1".to_string())], + ..Default::default() }, ], },