mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2025-12-22 22:20:02 +00:00
feat: use server time if it's not specified (#1480)
Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
@@ -17,6 +17,11 @@ pub fn current_time_millis() -> i64 {
|
||||
chrono::Utc::now().timestamp_millis()
|
||||
}
|
||||
|
||||
/// Returns the current time in rfc3339 format.
|
||||
pub fn current_time_rfc3339() -> String {
|
||||
chrono::Utc::now().to_rfc3339()
|
||||
}
|
||||
|
||||
/// Port of rust unstable features `int_roundings`.
|
||||
pub(crate) fn div_ceil(this: i64, rhs: i64) -> i64 {
|
||||
let d = this / rhs;
|
||||
|
||||
@@ -19,6 +19,7 @@ use api::v1::prometheus_gateway_server::PrometheusGateway;
|
||||
use api::v1::promql_request::Promql;
|
||||
use api::v1::{PromqlRequest, PromqlResponse, ResponseHeader};
|
||||
use async_trait::async_trait;
|
||||
use common_time::util::current_time_rfc3339;
|
||||
use query::parser::PromQuery;
|
||||
use snafu::OptionExt;
|
||||
use tonic::{Request, Response};
|
||||
@@ -26,7 +27,7 @@ use tonic::{Request, Response};
|
||||
use crate::error::InvalidQuerySnafu;
|
||||
use crate::grpc::handler::create_query_context;
|
||||
use crate::grpc::TonicResult;
|
||||
use crate::prom::{retrieve_metric_name, PromHandlerRef, PromJsonResponse};
|
||||
use crate::prom::{retrieve_metric_name_and_result_type, PromHandlerRef, PromJsonResponse};
|
||||
|
||||
pub struct PrometheusGatewayService {
|
||||
handler: PromHandlerRef,
|
||||
@@ -45,18 +46,26 @@ impl PrometheusGateway for PrometheusGatewayService {
|
||||
end: range_query.end,
|
||||
step: range_query.step,
|
||||
},
|
||||
Promql::InstantQuery(instant_query) => PromQuery {
|
||||
query: instant_query.query,
|
||||
start: instant_query.time.clone(),
|
||||
end: instant_query.time,
|
||||
step: String::from("1s"),
|
||||
},
|
||||
Promql::InstantQuery(instant_query) => {
|
||||
let time = if instant_query.time.is_empty() {
|
||||
current_time_rfc3339()
|
||||
} else {
|
||||
instant_query.time
|
||||
};
|
||||
PromQuery {
|
||||
query: instant_query.query,
|
||||
start: time.clone(),
|
||||
end: time,
|
||||
step: String::from("1s"),
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
let query_context = create_query_context(inner.header.as_ref());
|
||||
let result = self.handler.do_query(&prom_query, query_context).await;
|
||||
let metric_name = retrieve_metric_name(&prom_query.query).unwrap_or_default();
|
||||
let json_response = PromJsonResponse::from_query_result(result, metric_name)
|
||||
let (metric_name, result_type) =
|
||||
retrieve_metric_name_and_result_type(&prom_query.query).unwrap_or_default();
|
||||
let json_response = PromJsonResponse::from_query_result(result, metric_name, result_type)
|
||||
.await
|
||||
.0;
|
||||
let json_bytes = serde_json::to_string(&json_response).unwrap().into_bytes();
|
||||
|
||||
@@ -27,6 +27,7 @@ use common_error::status_code::StatusCode;
|
||||
use common_query::Output;
|
||||
use common_recordbatch::RecordBatches;
|
||||
use common_telemetry::info;
|
||||
use common_time::util::current_time_rfc3339;
|
||||
use datatypes::prelude::ConcreteDataType;
|
||||
use datatypes::scalars::ScalarVector;
|
||||
use datatypes::vectors::{Float64Vector, StringVector, TimestampMillisecondVector};
|
||||
@@ -208,7 +209,11 @@ impl PromJsonResponse {
|
||||
}
|
||||
|
||||
/// Convert from `Result<Output>`
|
||||
pub async fn from_query_result(result: Result<Output>, metric_name: String) -> Json<Self> {
|
||||
pub async fn from_query_result(
|
||||
result: Result<Output>,
|
||||
metric_name: String,
|
||||
result_type: String,
|
||||
) -> Json<Self> {
|
||||
let response: Result<Json<Self>> = try {
|
||||
let json = match result? {
|
||||
Output::RecordBatches(batches) => {
|
||||
@@ -237,7 +242,7 @@ impl PromJsonResponse {
|
||||
|| err.status_code() == StatusCode::TableColumnNotFound
|
||||
{
|
||||
Self::success(PromData {
|
||||
result_type: "matrix".to_string(),
|
||||
result_type,
|
||||
..Default::default()
|
||||
})
|
||||
} else {
|
||||
@@ -363,7 +368,11 @@ pub async fn instant_query(
|
||||
Query(params): Query<InstantQuery>,
|
||||
Form(form_params): Form<InstantQuery>,
|
||||
) -> Json<PromJsonResponse> {
|
||||
let time = params.time.or(form_params.time).unwrap_or_default();
|
||||
// Extract time from query string, or use current server time if not specified.
|
||||
let time = params
|
||||
.time
|
||||
.or(form_params.time)
|
||||
.unwrap_or_else(current_time_rfc3339);
|
||||
let prom_query = PromQuery {
|
||||
query: params.query.or(form_params.query).unwrap_or_default(),
|
||||
start: time.clone(),
|
||||
@@ -377,8 +386,9 @@ pub async fn instant_query(
|
||||
let query_ctx = QueryContext::with(catalog, schema);
|
||||
|
||||
let result = handler.do_query(&prom_query, Arc::new(query_ctx)).await;
|
||||
let metric_name = retrieve_metric_name(&prom_query.query).unwrap_or_default();
|
||||
PromJsonResponse::from_query_result(result, metric_name).await
|
||||
let (metric_name, result_type) =
|
||||
retrieve_metric_name_and_result_type(&prom_query.query).unwrap_or_default();
|
||||
PromJsonResponse::from_query_result(result, metric_name, result_type).await
|
||||
}
|
||||
|
||||
#[derive(Debug, Default, Serialize, Deserialize, JsonSchema)]
|
||||
@@ -410,24 +420,28 @@ pub async fn range_query(
|
||||
let query_ctx = QueryContext::with(catalog, schema);
|
||||
|
||||
let result = handler.do_query(&prom_query, Arc::new(query_ctx)).await;
|
||||
let metric_name = retrieve_metric_name(&prom_query.query).unwrap_or_default();
|
||||
PromJsonResponse::from_query_result(result, metric_name).await
|
||||
let (metric_name, result_type) =
|
||||
retrieve_metric_name_and_result_type(&prom_query.query).unwrap_or_default();
|
||||
PromJsonResponse::from_query_result(result, metric_name, result_type).await
|
||||
}
|
||||
|
||||
pub(crate) fn retrieve_metric_name(promql: &str) -> Option<String> {
|
||||
pub(crate) fn retrieve_metric_name_and_result_type(promql: &str) -> Option<(String, String)> {
|
||||
let promql_expr = promql_parser::parser::parse(promql).ok()?;
|
||||
promql_expr_to_metric_name(promql_expr)
|
||||
let metric_name = promql_expr_to_metric_name(&promql_expr)?;
|
||||
let result_type = promql_expr.value_type().to_string();
|
||||
|
||||
Some((metric_name, result_type))
|
||||
}
|
||||
|
||||
fn promql_expr_to_metric_name(expr: PromqlExpr) -> Option<String> {
|
||||
fn promql_expr_to_metric_name(expr: &PromqlExpr) -> Option<String> {
|
||||
match expr {
|
||||
PromqlExpr::Aggregate(AggregateExpr { expr, .. }) => promql_expr_to_metric_name(*expr),
|
||||
PromqlExpr::Unary(UnaryExpr { expr }) => promql_expr_to_metric_name(*expr),
|
||||
PromqlExpr::Aggregate(AggregateExpr { expr, .. }) => promql_expr_to_metric_name(expr),
|
||||
PromqlExpr::Unary(UnaryExpr { expr }) => promql_expr_to_metric_name(expr),
|
||||
PromqlExpr::Binary(BinaryExpr { lhs, rhs, .. }) => {
|
||||
promql_expr_to_metric_name(*lhs).or(promql_expr_to_metric_name(*rhs))
|
||||
promql_expr_to_metric_name(lhs).or(promql_expr_to_metric_name(rhs))
|
||||
}
|
||||
PromqlExpr::Paren(ParenExpr { expr }) => promql_expr_to_metric_name(*expr),
|
||||
PromqlExpr::Subquery(SubqueryExpr { expr, .. }) => promql_expr_to_metric_name(*expr),
|
||||
PromqlExpr::Paren(ParenExpr { expr }) => promql_expr_to_metric_name(expr),
|
||||
PromqlExpr::Subquery(SubqueryExpr { expr, .. }) => promql_expr_to_metric_name(expr),
|
||||
PromqlExpr::NumberLiteral(_) => None,
|
||||
PromqlExpr::StringLiteral(_) => None,
|
||||
PromqlExpr::VectorSelector(VectorSelector { matchers, .. }) => {
|
||||
@@ -439,9 +453,8 @@ fn promql_expr_to_metric_name(expr: PromqlExpr) -> Option<String> {
|
||||
let VectorSelector { matchers, .. } = vector_selector;
|
||||
matchers.find_matchers(METRIC_NAME).pop().cloned()
|
||||
}
|
||||
PromqlExpr::Call(Call { args, .. }) => args
|
||||
.args
|
||||
.into_iter()
|
||||
.find_map(|e| promql_expr_to_metric_name(*e)),
|
||||
PromqlExpr::Call(Call { args, .. }) => {
|
||||
args.args.iter().find_map(|e| promql_expr_to_metric_name(e))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user