diff --git a/src/frontend/src/instance.rs b/src/frontend/src/instance.rs index 8dc34ae89f..1d835d1814 100644 --- a/src/frontend/src/instance.rs +++ b/src/frontend/src/instance.rs @@ -603,6 +603,10 @@ impl PrometheusHandler for Instance { Ok(interceptor.post_execute(output, query_ctx)?) } + + fn catalog_manager(&self) -> CatalogManagerRef { + self.catalog_manager.clone() + } } pub fn check_permission( diff --git a/src/servers/src/prometheus.rs b/src/servers/src/prometheus.rs index c2e42aa048..705325a40e 100644 --- a/src/servers/src/prometheus.rs +++ b/src/servers/src/prometheus.rs @@ -21,6 +21,7 @@ use async_trait::async_trait; use axum::body::BoxBody; use axum::extract::{Path, Query, State}; use axum::{middleware, routing, Form, Json, Router}; +use catalog::CatalogManagerRef; use common_catalog::consts::DEFAULT_SCHEMA_NAME; use common_error::ext::ErrorExt; use common_error::status_code::StatusCode; @@ -57,7 +58,7 @@ use crate::error::{ }; use crate::http::authorize::HttpAuth; use crate::http::track_metrics; -use crate::prom_store::{FIELD_COLUMN_NAME, TIMESTAMP_COLUMN_NAME}; +use crate::prom_store::{FIELD_COLUMN_NAME, METRIC_NAME_LABEL, TIMESTAMP_COLUMN_NAME}; use crate::server::Server; pub const PROMETHEUS_API_VERSION: &str = "v1"; @@ -67,6 +68,8 @@ pub type PrometheusHandlerRef = Arc; #[async_trait] pub trait PrometheusHandler { async fn do_query(&self, query: &PromQuery, query_ctx: QueryContextRef) -> Result; + + fn catalog_manager(&self) -> CatalogManagerRef; } /// PromServer represents PrometheusServer which handles the compliance with prometheus HTTP API @@ -546,12 +549,24 @@ pub async fn labels_query( Form(form_params): Form, ) -> Json { let _timer = timer!(crate::metrics::METRIC_HTTP_PROMQL_LABEL_QUERY_ELAPSED); + + let db = ¶ms.db.unwrap_or(DEFAULT_SCHEMA_NAME.to_string()); + let (catalog, schema) = crate::parse_catalog_and_schema_from_client_database_name(db); + let query_ctx = QueryContext::with(catalog, schema); + let mut queries = params.matches.0; if queries.is_empty() { queries = form_params.matches.0; } if queries.is_empty() { - return PrometheusJsonResponse::error("Unsupported", "match[] parameter is required"); + match get_all_column_names(catalog, schema, &handler.catalog_manager()).await { + Ok(labels) => { + return PrometheusJsonResponse::success(PrometheusResponse::Labels(labels)) + } + Err(e) => { + return PrometheusJsonResponse::error(e.status_code().to_string(), e.to_string()) + } + } } let start = params @@ -563,10 +578,6 @@ pub async fn labels_query( .or(form_params.end) .unwrap_or_else(current_time_rfc3339); - let db = ¶ms.db.unwrap_or(DEFAULT_SCHEMA_NAME.to_string()); - let (catalog, schema) = crate::parse_catalog_and_schema_from_client_database_name(db); - let query_ctx = QueryContext::with(catalog, schema); - let mut labels = HashSet::new(); let _ = labels.insert(METRIC_NAME.to_string()); @@ -603,6 +614,27 @@ pub async fn labels_query( PrometheusJsonResponse::success(PrometheusResponse::Labels(sorted_labels)) } +async fn get_all_column_names( + catalog: &str, + schema: &str, + manager: &CatalogManagerRef, +) -> std::result::Result, catalog::error::Error> { + let table_names = manager.table_names(catalog, schema).await?; + + let mut labels = HashSet::new(); + for table_name in table_names { + let Some(table) = manager.table(catalog, schema, &table_name).await? else { continue }; + let schema = table.schema(); + for column in schema.column_schemas() { + labels.insert(column.name.to_string()); + } + } + + let mut labels_vec = labels.into_iter().collect::>(); + labels_vec.sort_unstable(); + Ok(labels_vec) +} + async fn retrieve_series_from_query_result( result: Result, series: &mut Vec>, @@ -781,6 +813,21 @@ pub async fn label_values_query( Query(params): Query, ) -> Json { let _timer = timer!(crate::metrics::METRIC_HTTP_PROMQL_LABEL_VALUE_QUERY_ELAPSED); + + let db = ¶ms.db.unwrap_or(DEFAULT_SCHEMA_NAME.to_string()); + let (catalog, schema) = crate::parse_catalog_and_schema_from_client_database_name(db); + + if label_name == METRIC_NAME_LABEL { + let mut table_names = match handler.catalog_manager().table_names(catalog, schema).await { + Ok(table_names) => table_names, + Err(e) => { + return PrometheusJsonResponse::error(e.status_code().to_string(), e.to_string()); + } + }; + table_names.sort_unstable(); + return PrometheusJsonResponse::success(PrometheusResponse::LabelValues(table_names)); + } + let queries = params.matches.0; if queries.is_empty() { return PrometheusJsonResponse::error("Invalid argument", "match[] parameter is required"); @@ -788,8 +835,6 @@ pub async fn label_values_query( let start = params.start.unwrap_or_else(yesterday_rfc3339); let end = params.end.unwrap_or_else(current_time_rfc3339); - let db = ¶ms.db.unwrap_or(DEFAULT_SCHEMA_NAME.to_string()); - let (catalog, schema) = crate::parse_catalog_and_schema_from_client_database_name(db); let query_ctx = QueryContext::with(catalog, schema); let mut label_values = HashSet::new(); diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index dca65775db..c40e4d793f 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -386,6 +386,10 @@ pub async fn test_prom_http_api(store_type: StorageType) { .unwrap() ); + // labels without match[] param + let res = client.get("/api/v1/labels").send().await; + assert_eq!(res.status(), StatusCode::OK); + // labels query with multiple match[] params let res = client .get("/api/v1/labels?match[]=up&match[]=down") @@ -455,6 +459,14 @@ pub async fn test_prom_http_api(store_type: StorageType) { assert!(prom_resp.error.is_none()); assert!(prom_resp.error_type.is_none()); + // query `__name__` without match[] + let res = client.get("/api/v1/label/__name__/values").send().await; + assert_eq!(res.status(), StatusCode::OK); + let prom_resp = res.json::().await; + assert_eq!(prom_resp.status, "success"); + assert!(prom_resp.error.is_none()); + assert!(prom_resp.error_type.is_none()); + guard.remove_all().await; }