diff --git a/src/servers/src/prom.rs b/src/servers/src/prom.rs index 14256374ab..4669a36fe2 100644 --- a/src/servers/src/prom.rs +++ b/src/servers/src/prom.rs @@ -19,7 +19,7 @@ use std::sync::Arc; use async_trait::async_trait; use axum::body::BoxBody; -use axum::extract::{Query, State}; +use axum::extract::{Path, Query, State}; use axum::{routing, Form, Json, Router}; use common_catalog::consts::DEFAULT_SCHEMA_NAME; use common_error::prelude::ErrorExt; @@ -42,7 +42,7 @@ use schemars::JsonSchema; use serde::de::{self, MapAccess, Visitor}; use serde::{Deserialize, Serialize}; use session::context::{QueryContext, QueryContextRef}; -use snafu::{ensure, Location, OptionExt, ResultExt}; +use snafu::{ensure, OptionExt, ResultExt}; use tokio::sync::oneshot::Sender; use tokio::sync::{oneshot, Mutex}; use tower::ServiceBuilder; @@ -53,7 +53,7 @@ use tower_http::trace::TraceLayer; use crate::auth::UserProviderRef; use crate::error::{ AlreadyStartedSnafu, CollectRecordbatchSnafu, Error, InternalSnafu, NotSupportedSnafu, Result, - StartHttpSnafu, + StartHttpSnafu, UnexpectedResultSnafu, }; use crate::http::authorize::HttpAuth; use crate::prometheus::{FIELD_COLUMN_NAME, TIMESTAMP_COLUMN_NAME}; @@ -96,6 +96,10 @@ impl PromServer { .route("/query", routing::post(instant_query).get(instant_query)) .route("/query_range", routing::post(range_query).get(range_query)) .route("/labels", routing::post(labels_query).get(labels_query)) + .route( + "/label/:label_name/values", + routing::get(label_values_query), + ) .with_state(self.query_handler.clone()); Router::new() @@ -183,6 +187,7 @@ pub struct PromData { pub enum PromResponse { PromData(PromData), Labels(Vec), + LabelValues(Vec), } impl Default for PromResponse { @@ -434,7 +439,7 @@ pub async fn instant_query( }; let db = ¶ms.db.unwrap_or(DEFAULT_SCHEMA_NAME.to_string()); - let (catalog, schema) = super::parse_catalog_and_schema_from_client_database_name(db); + let (catalog, schema) = crate::parse_catalog_and_schema_from_client_database_name(db); let query_ctx = QueryContext::with(catalog, schema); @@ -468,7 +473,7 @@ pub async fn range_query( }; let db = ¶ms.db.unwrap_or(DEFAULT_SCHEMA_NAME.to_string()); - let (catalog, schema) = super::parse_catalog_and_schema_from_client_database_name(db); + let (catalog, schema) = crate::parse_catalog_and_schema_from_client_database_name(db); let query_ctx = QueryContext::with(catalog, schema); @@ -528,7 +533,7 @@ pub async fn labels_query( Query(params): Query, Form(form_params): Form, ) -> Json { - let mut queries: Vec = params.matches.0; + let mut queries = params.matches.0; if queries.is_empty() { queries = form_params.matches.0; } @@ -546,10 +551,10 @@ pub async fn labels_query( .unwrap_or_else(current_time_rfc3339); let db = ¶ms.db.unwrap_or(DEFAULT_SCHEMA_NAME.to_string()); - let (catalog, schema) = super::parse_catalog_and_schema_from_client_database_name(db); + let (catalog, schema) = crate::parse_catalog_and_schema_from_client_database_name(db); let query_ctx = Arc::new(QueryContext::with(catalog, schema)); - let mut labels: HashSet = HashSet::new(); + let mut labels = HashSet::new(); labels.insert(METRIC_NAME.to_string()); for query in queries { @@ -557,7 +562,6 @@ pub async fn labels_query( query, start: start.clone(), end: end.clone(), - // TODO: find a better value for step step: DEFAULT_LOOKBACK_STRING.to_string(), }; @@ -600,10 +604,10 @@ async fn retrieve_labels_name_from_query_result( record_batches_to_labels_name(batches, labels)?; Ok(()) } - Output::AffectedRows(_) => Err(Error::UnexpectedResult { + Output::AffectedRows(_) => UnexpectedResultSnafu { reason: "expected data result, but got affected rows".to_string(), - location: Location::default(), - }), + } + .fail(), } } @@ -699,3 +703,114 @@ fn promql_expr_to_metric_name(expr: &PromqlExpr) -> Option { } } } + +#[derive(Debug, Default, Serialize, Deserialize, JsonSchema)] +pub struct LabelValueQuery { + start: Option, + end: Option, + #[serde(flatten)] + matches: Matches, + db: Option, +} + +#[axum_macros::debug_handler] +pub async fn label_values_query( + State(handler): State, + Path(label_name): Path, + Query(params): Query, +) -> Json { + let queries = params.matches.0; + if queries.is_empty() { + return PromJsonResponse::error("Invalid argument", "match[] parameter is required"); + } + + let start = params.start.unwrap_or_else(yesterday_rfc3339); + let end = params.end.unwrap_or_else(current_time_rfc3339); + let db = ¶ms.db.unwrap_or(DEFAULT_SCHEMA_NAME.to_string()); + let (catalog, schema) = crate::parse_catalog_and_schema_from_client_database_name(db); + let query_ctx = Arc::new(QueryContext::with(catalog, schema)); + + let mut label_values = HashSet::new(); + + for query in queries { + let prom_query = PromQuery { + query, + start: start.clone(), + end: end.clone(), + step: DEFAULT_LOOKBACK_STRING.to_string(), + }; + let result = handler.do_query(&prom_query, query_ctx.clone()).await; + let result = retrieve_label_values(result, &label_name, &mut label_values).await; + if let Err(err) = result { + // Prometheus won't report error if querying nonexist label and metric + if err.status_code() != StatusCode::TableNotFound + && err.status_code() != StatusCode::TableColumnNotFound + { + return PromJsonResponse::error(err.status_code().to_string(), err.to_string()); + } + } + } + + let mut label_values: Vec<_> = label_values.into_iter().collect(); + label_values.sort(); + PromJsonResponse::success(PromResponse::LabelValues(label_values)) +} + +async fn retrieve_label_values( + result: Result, + label_name: &str, + labels_values: &mut HashSet, +) -> Result<()> { + match result? { + Output::RecordBatches(batches) => { + retrieve_label_values_from_record_batch(batches, label_name, labels_values).await + } + Output::Stream(stream) => { + let batches = RecordBatches::try_collect(stream) + .await + .context(CollectRecordbatchSnafu)?; + retrieve_label_values_from_record_batch(batches, label_name, labels_values).await + } + Output::AffectedRows(_) => UnexpectedResultSnafu { + reason: "expected data result, but got affected rows".to_string(), + } + .fail(), + } +} + +async fn retrieve_label_values_from_record_batch( + batches: RecordBatches, + label_name: &str, + labels_values: &mut HashSet, +) -> Result<()> { + let Some(label_col_idx) = batches.schema().column_index_by_name(label_name) else { + return Ok(()); + }; + + // check whether label_name belongs to tag column + match batches + .schema() + .column_schema_by_name(label_name) + .unwrap() + .data_type + { + ConcreteDataType::String(_) => {} + _ => return Ok(()), + } + + for batch in batches.iter() { + let label_column = batch + .column(label_col_idx) + .as_any() + .downcast_ref::() + .unwrap(); + + for row_index in 0..batch.num_rows() { + if let Some(label_value) = label_column.get_data(row_index) { + labels_values.insert(label_value.to_string()); + } + } + } + + Ok(()) +} diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index f7b76544d4..2337ee3d59 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -18,6 +18,7 @@ use common_error::status_code::StatusCode as ErrorCode; use serde_json::json; use servers::http::handler::HealthResponse; use servers::http::{JsonOutput, JsonResponse}; +use servers::prom::PromJsonResponse; use tests_integration::test_util::{ setup_test_http_app, setup_test_http_app_with_frontend, setup_test_prom_app_with_frontend, StorageType, @@ -335,6 +336,37 @@ pub async fn test_prom_http_api(store_type: StorageType) { .await; assert_eq!(res.status(), StatusCode::OK); + // label values + // should return error if there is no match[] + let res = client.get("/api/v1/label/instance/values").send().await; + assert_eq!(res.status(), StatusCode::OK); + let prom_resp = res.json::().await; + assert_eq!(prom_resp.status, "error"); + assert!(prom_resp.error.is_some_and(|err| !err.is_empty())); + assert!(prom_resp.error_type.is_some_and(|err| !err.is_empty())); + + // single match[] + let res = client + .get("/api/v1/label/instance/values?match[]=up") + .send() + .await; + assert_eq!(res.status(), StatusCode::OK); + let prom_resp = res.json::().await; + assert_eq!(prom_resp.status, "success"); + assert!(prom_resp.error.is_none()); + assert!(prom_resp.error_type.is_none()); + + // multiple match[] + let res = client + .get("/api/v1/label/instance/values?match[]=up&match[]=system_metrics") + .send() + .await; + assert_eq!(res.status(), StatusCode::OK); + let prom_resp = res.json::().await; + assert_eq!(prom_resp.status, "success"); + assert!(prom_resp.error.is_none()); + assert!(prom_resp.error_type.is_none()); + guard.remove_all().await; }