feat: support /api/v1/label/<label_name>/values from Prometheus (#1604)

* feat: support `/api/v1/label/<label_name>/values` from Prometheus

* chore: apply CR

* chore: apply CR
This commit is contained in:
Chuanle Chen
2023-05-22 15:24:12 +08:00
committed by GitHub
parent e5a215de46
commit 77497ca46a
2 changed files with 159 additions and 12 deletions

View File

@@ -19,7 +19,7 @@ use std::sync::Arc;
use async_trait::async_trait;
use axum::body::BoxBody;
use axum::extract::{Query, State};
use axum::extract::{Path, Query, State};
use axum::{routing, Form, Json, Router};
use common_catalog::consts::DEFAULT_SCHEMA_NAME;
use common_error::prelude::ErrorExt;
@@ -42,7 +42,7 @@ use schemars::JsonSchema;
use serde::de::{self, MapAccess, Visitor};
use serde::{Deserialize, Serialize};
use session::context::{QueryContext, QueryContextRef};
use snafu::{ensure, Location, OptionExt, ResultExt};
use snafu::{ensure, OptionExt, ResultExt};
use tokio::sync::oneshot::Sender;
use tokio::sync::{oneshot, Mutex};
use tower::ServiceBuilder;
@@ -53,7 +53,7 @@ use tower_http::trace::TraceLayer;
use crate::auth::UserProviderRef;
use crate::error::{
AlreadyStartedSnafu, CollectRecordbatchSnafu, Error, InternalSnafu, NotSupportedSnafu, Result,
StartHttpSnafu,
StartHttpSnafu, UnexpectedResultSnafu,
};
use crate::http::authorize::HttpAuth;
use crate::prometheus::{FIELD_COLUMN_NAME, TIMESTAMP_COLUMN_NAME};
@@ -96,6 +96,10 @@ impl PromServer {
.route("/query", routing::post(instant_query).get(instant_query))
.route("/query_range", routing::post(range_query).get(range_query))
.route("/labels", routing::post(labels_query).get(labels_query))
.route(
"/label/:label_name/values",
routing::get(label_values_query),
)
.with_state(self.query_handler.clone());
Router::new()
@@ -183,6 +187,7 @@ pub struct PromData {
pub enum PromResponse {
PromData(PromData),
Labels(Vec<String>),
LabelValues(Vec<String>),
}
impl Default for PromResponse {
@@ -434,7 +439,7 @@ pub async fn instant_query(
};
let db = &params.db.unwrap_or(DEFAULT_SCHEMA_NAME.to_string());
let (catalog, schema) = super::parse_catalog_and_schema_from_client_database_name(db);
let (catalog, schema) = crate::parse_catalog_and_schema_from_client_database_name(db);
let query_ctx = QueryContext::with(catalog, schema);
@@ -468,7 +473,7 @@ pub async fn range_query(
};
let db = &params.db.unwrap_or(DEFAULT_SCHEMA_NAME.to_string());
let (catalog, schema) = super::parse_catalog_and_schema_from_client_database_name(db);
let (catalog, schema) = crate::parse_catalog_and_schema_from_client_database_name(db);
let query_ctx = QueryContext::with(catalog, schema);
@@ -528,7 +533,7 @@ pub async fn labels_query(
Query(params): Query<LabelsQuery>,
Form(form_params): Form<LabelsQuery>,
) -> Json<PromJsonResponse> {
let mut queries: Vec<String> = params.matches.0;
let mut queries = params.matches.0;
if queries.is_empty() {
queries = form_params.matches.0;
}
@@ -546,10 +551,10 @@ pub async fn labels_query(
.unwrap_or_else(current_time_rfc3339);
let db = &params.db.unwrap_or(DEFAULT_SCHEMA_NAME.to_string());
let (catalog, schema) = super::parse_catalog_and_schema_from_client_database_name(db);
let (catalog, schema) = crate::parse_catalog_and_schema_from_client_database_name(db);
let query_ctx = Arc::new(QueryContext::with(catalog, schema));
let mut labels: HashSet<String> = HashSet::new();
let mut labels = HashSet::new();
labels.insert(METRIC_NAME.to_string());
for query in queries {
@@ -557,7 +562,6 @@ pub async fn labels_query(
query,
start: start.clone(),
end: end.clone(),
// TODO: find a better value for step
step: DEFAULT_LOOKBACK_STRING.to_string(),
};
@@ -600,10 +604,10 @@ async fn retrieve_labels_name_from_query_result(
record_batches_to_labels_name(batches, labels)?;
Ok(())
}
Output::AffectedRows(_) => Err(Error::UnexpectedResult {
Output::AffectedRows(_) => UnexpectedResultSnafu {
reason: "expected data result, but got affected rows".to_string(),
location: Location::default(),
}),
}
.fail(),
}
}
@@ -699,3 +703,114 @@ fn promql_expr_to_metric_name(expr: &PromqlExpr) -> Option<String> {
}
}
}
#[derive(Debug, Default, Serialize, Deserialize, JsonSchema)]
pub struct LabelValueQuery {
start: Option<String>,
end: Option<String>,
#[serde(flatten)]
matches: Matches,
db: Option<String>,
}
#[axum_macros::debug_handler]
pub async fn label_values_query(
State(handler): State<PromHandlerRef>,
Path(label_name): Path<String>,
Query(params): Query<LabelValueQuery>,
) -> Json<PromJsonResponse> {
let queries = params.matches.0;
if queries.is_empty() {
return PromJsonResponse::error("Invalid argument", "match[] parameter is required");
}
let start = params.start.unwrap_or_else(yesterday_rfc3339);
let end = params.end.unwrap_or_else(current_time_rfc3339);
let db = &params.db.unwrap_or(DEFAULT_SCHEMA_NAME.to_string());
let (catalog, schema) = crate::parse_catalog_and_schema_from_client_database_name(db);
let query_ctx = Arc::new(QueryContext::with(catalog, schema));
let mut label_values = HashSet::new();
for query in queries {
let prom_query = PromQuery {
query,
start: start.clone(),
end: end.clone(),
step: DEFAULT_LOOKBACK_STRING.to_string(),
};
let result = handler.do_query(&prom_query, query_ctx.clone()).await;
let result = retrieve_label_values(result, &label_name, &mut label_values).await;
if let Err(err) = result {
// Prometheus won't report error if querying nonexist label and metric
if err.status_code() != StatusCode::TableNotFound
&& err.status_code() != StatusCode::TableColumnNotFound
{
return PromJsonResponse::error(err.status_code().to_string(), err.to_string());
}
}
}
let mut label_values: Vec<_> = label_values.into_iter().collect();
label_values.sort();
PromJsonResponse::success(PromResponse::LabelValues(label_values))
}
async fn retrieve_label_values(
result: Result<Output>,
label_name: &str,
labels_values: &mut HashSet<String>,
) -> Result<()> {
match result? {
Output::RecordBatches(batches) => {
retrieve_label_values_from_record_batch(batches, label_name, labels_values).await
}
Output::Stream(stream) => {
let batches = RecordBatches::try_collect(stream)
.await
.context(CollectRecordbatchSnafu)?;
retrieve_label_values_from_record_batch(batches, label_name, labels_values).await
}
Output::AffectedRows(_) => UnexpectedResultSnafu {
reason: "expected data result, but got affected rows".to_string(),
}
.fail(),
}
}
async fn retrieve_label_values_from_record_batch(
batches: RecordBatches,
label_name: &str,
labels_values: &mut HashSet<String>,
) -> Result<()> {
let Some(label_col_idx) = batches.schema().column_index_by_name(label_name) else {
return Ok(());
};
// check whether label_name belongs to tag column
match batches
.schema()
.column_schema_by_name(label_name)
.unwrap()
.data_type
{
ConcreteDataType::String(_) => {}
_ => return Ok(()),
}
for batch in batches.iter() {
let label_column = batch
.column(label_col_idx)
.as_any()
.downcast_ref::<StringVector>()
.unwrap();
for row_index in 0..batch.num_rows() {
if let Some(label_value) = label_column.get_data(row_index) {
labels_values.insert(label_value.to_string());
}
}
}
Ok(())
}

View File

@@ -18,6 +18,7 @@ use common_error::status_code::StatusCode as ErrorCode;
use serde_json::json;
use servers::http::handler::HealthResponse;
use servers::http::{JsonOutput, JsonResponse};
use servers::prom::PromJsonResponse;
use tests_integration::test_util::{
setup_test_http_app, setup_test_http_app_with_frontend, setup_test_prom_app_with_frontend,
StorageType,
@@ -335,6 +336,37 @@ pub async fn test_prom_http_api(store_type: StorageType) {
.await;
assert_eq!(res.status(), StatusCode::OK);
// label values
// should return error if there is no match[]
let res = client.get("/api/v1/label/instance/values").send().await;
assert_eq!(res.status(), StatusCode::OK);
let prom_resp = res.json::<PromJsonResponse>().await;
assert_eq!(prom_resp.status, "error");
assert!(prom_resp.error.is_some_and(|err| !err.is_empty()));
assert!(prom_resp.error_type.is_some_and(|err| !err.is_empty()));
// single match[]
let res = client
.get("/api/v1/label/instance/values?match[]=up")
.send()
.await;
assert_eq!(res.status(), StatusCode::OK);
let prom_resp = res.json::<PromJsonResponse>().await;
assert_eq!(prom_resp.status, "success");
assert!(prom_resp.error.is_none());
assert!(prom_resp.error_type.is_none());
// multiple match[]
let res = client
.get("/api/v1/label/instance/values?match[]=up&match[]=system_metrics")
.send()
.await;
assert_eq!(res.status(), StatusCode::OK);
let prom_resp = res.json::<PromJsonResponse>().await;
assert_eq!(prom_resp.status, "success");
assert!(prom_resp.error.is_none());
assert!(prom_resp.error_type.is_none());
guard.remove_all().await;
}