From 0ac50632aaa63bbc3bbd71e60e39d61e81ef4333 Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Thu, 27 Apr 2023 20:54:26 +0800 Subject: [PATCH] feat: use server time if it's not specified (#1480) Signed-off-by: Ruihang Xia --- src/common/time/src/util.rs | 5 +++ src/servers/src/grpc/prom_query_gateway.rs | 27 ++++++++---- src/servers/src/prom.rs | 51 ++++++++++++++-------- 3 files changed, 55 insertions(+), 28 deletions(-) diff --git a/src/common/time/src/util.rs b/src/common/time/src/util.rs index 3f52166c98..f565914182 100644 --- a/src/common/time/src/util.rs +++ b/src/common/time/src/util.rs @@ -17,6 +17,11 @@ pub fn current_time_millis() -> i64 { chrono::Utc::now().timestamp_millis() } +/// Returns the current time in rfc3339 format. +pub fn current_time_rfc3339() -> String { + chrono::Utc::now().to_rfc3339() +} + /// Port of rust unstable features `int_roundings`. pub(crate) fn div_ceil(this: i64, rhs: i64) -> i64 { let d = this / rhs; diff --git a/src/servers/src/grpc/prom_query_gateway.rs b/src/servers/src/grpc/prom_query_gateway.rs index 71189e42b3..0735f5eab2 100644 --- a/src/servers/src/grpc/prom_query_gateway.rs +++ b/src/servers/src/grpc/prom_query_gateway.rs @@ -19,6 +19,7 @@ use api::v1::prometheus_gateway_server::PrometheusGateway; use api::v1::promql_request::Promql; use api::v1::{PromqlRequest, PromqlResponse, ResponseHeader}; use async_trait::async_trait; +use common_time::util::current_time_rfc3339; use query::parser::PromQuery; use snafu::OptionExt; use tonic::{Request, Response}; @@ -26,7 +27,7 @@ use tonic::{Request, Response}; use crate::error::InvalidQuerySnafu; use crate::grpc::handler::create_query_context; use crate::grpc::TonicResult; -use crate::prom::{retrieve_metric_name, PromHandlerRef, PromJsonResponse}; +use crate::prom::{retrieve_metric_name_and_result_type, PromHandlerRef, PromJsonResponse}; pub struct PrometheusGatewayService { handler: PromHandlerRef, @@ -45,18 +46,26 @@ impl PrometheusGateway for PrometheusGatewayService { end: range_query.end, step: range_query.step, }, - Promql::InstantQuery(instant_query) => PromQuery { - query: instant_query.query, - start: instant_query.time.clone(), - end: instant_query.time, - step: String::from("1s"), - }, + Promql::InstantQuery(instant_query) => { + let time = if instant_query.time.is_empty() { + current_time_rfc3339() + } else { + instant_query.time + }; + PromQuery { + query: instant_query.query, + start: time.clone(), + end: time, + step: String::from("1s"), + } + } }; let query_context = create_query_context(inner.header.as_ref()); let result = self.handler.do_query(&prom_query, query_context).await; - let metric_name = retrieve_metric_name(&prom_query.query).unwrap_or_default(); - let json_response = PromJsonResponse::from_query_result(result, metric_name) + let (metric_name, result_type) = + retrieve_metric_name_and_result_type(&prom_query.query).unwrap_or_default(); + let json_response = PromJsonResponse::from_query_result(result, metric_name, result_type) .await .0; let json_bytes = serde_json::to_string(&json_response).unwrap().into_bytes(); diff --git a/src/servers/src/prom.rs b/src/servers/src/prom.rs index de43841b5b..d8827f3107 100644 --- a/src/servers/src/prom.rs +++ b/src/servers/src/prom.rs @@ -27,6 +27,7 @@ use common_error::status_code::StatusCode; use common_query::Output; use common_recordbatch::RecordBatches; use common_telemetry::info; +use common_time::util::current_time_rfc3339; use datatypes::prelude::ConcreteDataType; use datatypes::scalars::ScalarVector; use datatypes::vectors::{Float64Vector, StringVector, TimestampMillisecondVector}; @@ -208,7 +209,11 @@ impl PromJsonResponse { } /// Convert from `Result` - pub async fn from_query_result(result: Result, metric_name: String) -> Json { + pub async fn from_query_result( + result: Result, + metric_name: String, + result_type: String, + ) -> Json { let response: Result> = try { let json = match result? { Output::RecordBatches(batches) => { @@ -237,7 +242,7 @@ impl PromJsonResponse { || err.status_code() == StatusCode::TableColumnNotFound { Self::success(PromData { - result_type: "matrix".to_string(), + result_type, ..Default::default() }) } else { @@ -363,7 +368,11 @@ pub async fn instant_query( Query(params): Query, Form(form_params): Form, ) -> Json { - let time = params.time.or(form_params.time).unwrap_or_default(); + // Extract time from query string, or use current server time if not specified. + let time = params + .time + .or(form_params.time) + .unwrap_or_else(current_time_rfc3339); let prom_query = PromQuery { query: params.query.or(form_params.query).unwrap_or_default(), start: time.clone(), @@ -377,8 +386,9 @@ pub async fn instant_query( let query_ctx = QueryContext::with(catalog, schema); let result = handler.do_query(&prom_query, Arc::new(query_ctx)).await; - let metric_name = retrieve_metric_name(&prom_query.query).unwrap_or_default(); - PromJsonResponse::from_query_result(result, metric_name).await + let (metric_name, result_type) = + retrieve_metric_name_and_result_type(&prom_query.query).unwrap_or_default(); + PromJsonResponse::from_query_result(result, metric_name, result_type).await } #[derive(Debug, Default, Serialize, Deserialize, JsonSchema)] @@ -410,24 +420,28 @@ pub async fn range_query( let query_ctx = QueryContext::with(catalog, schema); let result = handler.do_query(&prom_query, Arc::new(query_ctx)).await; - let metric_name = retrieve_metric_name(&prom_query.query).unwrap_or_default(); - PromJsonResponse::from_query_result(result, metric_name).await + let (metric_name, result_type) = + retrieve_metric_name_and_result_type(&prom_query.query).unwrap_or_default(); + PromJsonResponse::from_query_result(result, metric_name, result_type).await } -pub(crate) fn retrieve_metric_name(promql: &str) -> Option { +pub(crate) fn retrieve_metric_name_and_result_type(promql: &str) -> Option<(String, String)> { let promql_expr = promql_parser::parser::parse(promql).ok()?; - promql_expr_to_metric_name(promql_expr) + let metric_name = promql_expr_to_metric_name(&promql_expr)?; + let result_type = promql_expr.value_type().to_string(); + + Some((metric_name, result_type)) } -fn promql_expr_to_metric_name(expr: PromqlExpr) -> Option { +fn promql_expr_to_metric_name(expr: &PromqlExpr) -> Option { match expr { - PromqlExpr::Aggregate(AggregateExpr { expr, .. }) => promql_expr_to_metric_name(*expr), - PromqlExpr::Unary(UnaryExpr { expr }) => promql_expr_to_metric_name(*expr), + PromqlExpr::Aggregate(AggregateExpr { expr, .. }) => promql_expr_to_metric_name(expr), + PromqlExpr::Unary(UnaryExpr { expr }) => promql_expr_to_metric_name(expr), PromqlExpr::Binary(BinaryExpr { lhs, rhs, .. }) => { - promql_expr_to_metric_name(*lhs).or(promql_expr_to_metric_name(*rhs)) + promql_expr_to_metric_name(lhs).or(promql_expr_to_metric_name(rhs)) } - PromqlExpr::Paren(ParenExpr { expr }) => promql_expr_to_metric_name(*expr), - PromqlExpr::Subquery(SubqueryExpr { expr, .. }) => promql_expr_to_metric_name(*expr), + PromqlExpr::Paren(ParenExpr { expr }) => promql_expr_to_metric_name(expr), + PromqlExpr::Subquery(SubqueryExpr { expr, .. }) => promql_expr_to_metric_name(expr), PromqlExpr::NumberLiteral(_) => None, PromqlExpr::StringLiteral(_) => None, PromqlExpr::VectorSelector(VectorSelector { matchers, .. }) => { @@ -439,9 +453,8 @@ fn promql_expr_to_metric_name(expr: PromqlExpr) -> Option { let VectorSelector { matchers, .. } = vector_selector; matchers.find_matchers(METRIC_NAME).pop().cloned() } - PromqlExpr::Call(Call { args, .. }) => args - .args - .into_iter() - .find_map(|e| promql_expr_to_metric_name(*e)), + PromqlExpr::Call(Call { args, .. }) => { + args.args.iter().find_map(|e| promql_expr_to_metric_name(e)) + } } }