fix: vector and matrix in Prometheus use different field (#1520)

* fix empty tag

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix result type

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* make it work

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* clean up

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
Ruihang Xia
2023-05-05 15:54:26 +08:00
committed by GitHub
parent b0ab641602
commit 6fe117d7d5
3 changed files with 81 additions and 30 deletions

View File

@@ -21,6 +21,7 @@ use api::v1::{PromqlRequest, PromqlResponse, ResponseHeader};
use async_trait::async_trait;
use common_telemetry::timer;
use common_time::util::current_time_rfc3339;
use promql_parser::parser::ValueType;
use query::parser::PromQuery;
use snafu::OptionExt;
use tonic::{Request, Response};
@@ -37,16 +38,20 @@ pub struct PrometheusGatewayService {
#[async_trait]
impl PrometheusGateway for PrometheusGatewayService {
async fn handle(&self, req: Request<PromqlRequest>) -> TonicResult<Response<PromqlResponse>> {
let mut is_range_query = false;
let inner = req.into_inner();
let prom_query = match inner.promql.context(InvalidQuerySnafu {
reason: "Expecting non-empty PromqlRequest.",
})? {
Promql::RangeQuery(range_query) => PromQuery {
query: range_query.query,
start: range_query.start,
end: range_query.end,
step: range_query.step,
},
Promql::RangeQuery(range_query) => {
is_range_query = true;
PromQuery {
query: range_query.query,
start: range_query.start,
end: range_query.end,
step: range_query.step,
}
}
Promql::InstantQuery(instant_query) => {
let time = if instant_query.time.is_empty() {
current_time_rfc3339()
@@ -71,8 +76,12 @@ impl PrometheusGateway for PrometheusGatewayService {
)]
);
let result = self.handler.do_query(&prom_query, query_context).await;
let (metric_name, result_type) =
let (metric_name, mut result_type) =
retrieve_metric_name_and_result_type(&prom_query.query).unwrap_or_default();
// range query only returns matrix
if is_range_query {
result_type = Some(ValueType::Matrix)
};
let json_response = PromJsonResponse::from_query_result(result, metric_name, result_type)
.await
.0;

View File

@@ -35,7 +35,7 @@ use futures::FutureExt;
use promql_parser::label::METRIC_NAME;
use promql_parser::parser::{
AggregateExpr, BinaryExpr, Call, Expr as PromqlExpr, MatrixSelector, ParenExpr, SubqueryExpr,
UnaryExpr, VectorSelector,
UnaryExpr, ValueType, VectorSelector,
};
use query::parser::PromQuery;
use schemars::JsonSchema;
@@ -51,7 +51,8 @@ use tower_http::trace::TraceLayer;
use crate::auth::UserProviderRef;
use crate::error::{
AlreadyStartedSnafu, CollectRecordbatchSnafu, InternalSnafu, Result, StartHttpSnafu,
AlreadyStartedSnafu, CollectRecordbatchSnafu, InternalSnafu, NotSupportedSnafu, Result,
StartHttpSnafu,
};
use crate::http::authorize::HttpAuth;
use crate::server::Server;
@@ -160,7 +161,11 @@ impl Server for PromServer {
#[derive(Debug, Default, Serialize, Deserialize, JsonSchema, PartialEq)]
pub struct PromSeries {
pub metric: HashMap<String, String>,
/// For [ValueType::Matrix] result type
pub values: Vec<(f64, String)>,
/// For [ValueType::Vector] result type
#[serde(skip_serializing_if = "Option::is_none")]
pub value: Option<(f64, String)>,
}
#[derive(Debug, Default, Serialize, Deserialize, JsonSchema, PartialEq)]
@@ -212,18 +217,24 @@ impl PromJsonResponse {
pub async fn from_query_result(
result: Result<Output>,
metric_name: String,
result_type: String,
result_type: Option<ValueType>,
) -> Json<Self> {
let response: Result<Json<Self>> = try {
let json = match result? {
Output::RecordBatches(batches) => {
Self::success(Self::record_batches_to_data(batches, metric_name)?)
}
Output::RecordBatches(batches) => Self::success(Self::record_batches_to_data(
batches,
metric_name,
result_type,
)?),
Output::Stream(stream) => {
let record_batches = RecordBatches::try_collect(stream)
.await
.context(CollectRecordbatchSnafu)?;
Self::success(Self::record_batches_to_data(record_batches, metric_name)?)
Self::success(Self::record_batches_to_data(
record_batches,
metric_name,
result_type,
)?)
}
Output::AffectedRows(_) => Self::error(
"unexpected result",
@@ -234,6 +245,8 @@ impl PromJsonResponse {
json
};
let result_type_string = result_type.map(|t| t.to_string()).unwrap_or_default();
match response {
Ok(resp) => resp,
Err(err) => {
@@ -242,7 +255,7 @@ impl PromJsonResponse {
|| err.status_code() == StatusCode::TableColumnNotFound
{
Self::success(PromData {
result_type,
result_type: result_type_string,
..Default::default()
})
} else {
@@ -252,7 +265,12 @@ impl PromJsonResponse {
}
}
fn record_batches_to_data(batches: RecordBatches, metric_name: String) -> Result<PromData> {
/// Convert [RecordBatches] to [PromData]
fn record_batches_to_data(
batches: RecordBatches,
metric_name: String,
result_type: Option<ValueType>,
) -> Result<PromData> {
// infer semantic type of each column from schema.
// TODO(ruihang): wish there is a better way to do this.
let mut timestamp_column_index = None;
@@ -321,8 +339,10 @@ impl PromJsonResponse {
// TODO(ruihang): push table name `__metric__`
let mut tags = vec![metric_name.clone()];
for (tag_column, tag_name) in tag_columns.iter().zip(tag_names.iter()) {
let tag_value = tag_column.get_data(row_index).unwrap().to_string();
tags.push((tag_name.to_string(), tag_value));
// TODO(ruihang): add test for NULL tag
if let Some(tag_value) = tag_column.get_data(row_index) {
tags.push((tag_name.to_string(), tag_value.to_string()));
}
}
// retrieve timestamp
@@ -339,14 +359,30 @@ impl PromJsonResponse {
let result = buffer
.into_iter()
.map(|(tags, values)| PromSeries {
metric: tags.into_iter().collect(),
values,
.map(|(tags, mut values)| {
let metric = tags.into_iter().collect();
match result_type {
Some(ValueType::Vector) => Ok(PromSeries {
metric,
value: values.pop(),
..Default::default()
}),
Some(ValueType::Matrix) => Ok(PromSeries {
metric,
values,
..Default::default()
}),
other => NotSupportedSnafu {
feat: format!("PromQL result type {other:?}"),
}
.fail(),
}
})
.collect::<Vec<_>>();
.collect::<Result<Vec<_>>>()?;
let result_type_string = result_type.map(|t| t.to_string()).unwrap_or_default();
let data = PromData {
result_type: "matrix".to_string(),
result_type: result_type_string,
result,
};
@@ -420,15 +456,17 @@ pub async fn range_query(
let query_ctx = QueryContext::with(catalog, schema);
let result = handler.do_query(&prom_query, Arc::new(query_ctx)).await;
let (metric_name, result_type) =
let (metric_name, _) =
retrieve_metric_name_and_result_type(&prom_query.query).unwrap_or_default();
PromJsonResponse::from_query_result(result, metric_name, result_type).await
PromJsonResponse::from_query_result(result, metric_name, Some(ValueType::Matrix)).await
}
pub(crate) fn retrieve_metric_name_and_result_type(promql: &str) -> Option<(String, String)> {
pub(crate) fn retrieve_metric_name_and_result_type(
promql: &str,
) -> Option<(String, Option<ValueType>)> {
let promql_expr = promql_parser::parser::parse(promql).ok()?;
let metric_name = promql_expr_to_metric_name(&promql_expr)?;
let result_type = promql_expr.value_type().to_string();
let result_type = Some(promql_expr.value_type());
Some((metric_name, result_type))
}

View File

@@ -367,7 +367,7 @@ pub async fn test_prom_gateway_query(store_type: StorageType) {
let expected = PromJsonResponse {
status: "success".to_string(),
data: PromData {
result_type: "matrix".to_string(),
result_type: "vector".to_string(),
result: vec![
PromSeries {
metric: [
@@ -376,7 +376,8 @@ pub async fn test_prom_gateway_query(store_type: StorageType) {
]
.into_iter()
.collect(),
values: vec![(5.0, "2".to_string())],
value: Some((5.0, "2".to_string())),
..Default::default()
},
PromSeries {
metric: [
@@ -385,7 +386,8 @@ pub async fn test_prom_gateway_query(store_type: StorageType) {
]
.into_iter()
.collect(),
values: vec![(5.0, "1".to_string())],
value: Some((5.0, "1".to_string())),
..Default::default()
},
],
},
@@ -426,6 +428,7 @@ pub async fn test_prom_gateway_query(store_type: StorageType) {
.into_iter()
.collect(),
values: vec![(5.0, "2".to_string()), (10.0, "2".to_string())],
..Default::default()
},
PromSeries {
metric: [
@@ -435,6 +438,7 @@ pub async fn test_prom_gateway_query(store_type: StorageType) {
.into_iter()
.collect(),
values: vec![(5.0, "1".to_string()), (10.0, "1".to_string())],
..Default::default()
},
],
},