diff --git a/src/frontend/src/instance/promql.rs b/src/frontend/src/instance/promql.rs index f3f276bdd9..0d754167c7 100644 --- a/src/frontend/src/instance/promql.rs +++ b/src/frontend/src/instance/promql.rs @@ -21,9 +21,10 @@ use common_catalog::format_full_table_name; use common_recordbatch::util; use common_telemetry::tracing; use datatypes::prelude::Value; -use promql_parser::label::{Matcher, Matchers}; +use promql_parser::label::{MatchOp, Matcher, Matchers}; use query::promql; use query::promql::planner::PromPlanner; +use servers::prom_store::{DATABASE_LABEL, SCHEMA_LABEL}; use servers::prometheus; use session::context::QueryContextRef; use snafu::{OptionExt, ResultExt}; @@ -114,7 +115,17 @@ impl Instance { end: SystemTime, ctx: &QueryContextRef, ) -> Result> { - let table_schema = ctx.current_schema(); + let table_schema = matchers + .iter() + .find_map(|m| { + if (m.name == SCHEMA_LABEL || m.name == DATABASE_LABEL) && m.op == MatchOp::Equal { + Some(m.value.clone()) + } else { + None + } + }) + .unwrap_or_else(|| ctx.current_schema()); + let table = self .catalog_manager .table(ctx.current_catalog(), &table_schema, &metric, Some(ctx)) diff --git a/src/query/src/promql/planner.rs b/src/query/src/promql/planner.rs index 05ec55e38e..b7d91c600b 100644 --- a/src/query/src/promql/planner.rs +++ b/src/query/src/promql/planner.rs @@ -1280,6 +1280,13 @@ impl PromPlanner { ) -> Result> { let mut exprs = Vec::with_capacity(label_matchers.matchers.len()); for matcher in label_matchers.matchers { + if matcher.name == SCHEMA_COLUMN_MATCHER + || matcher.name == DB_COLUMN_MATCHER + || matcher.name == FIELD_COLUMN_MATCHER + { + continue; + } + let col = if table_schema .field_with_unqualified_name(&matcher.name) .is_err() diff --git a/src/servers/src/http/prometheus.rs b/src/servers/src/http/prometheus.rs index b55f48a000..8509fa053f 100644 --- a/src/servers/src/http/prometheus.rs +++ b/src/servers/src/http/prometheus.rs @@ -56,7 +56,7 @@ use crate::error::{ TableNotFoundSnafu, UnexpectedResultSnafu, }; use crate::http::header::collect_plan_metrics; -use crate::prom_store::{FIELD_NAME_LABEL, METRIC_NAME_LABEL}; +use crate::prom_store::{DATABASE_LABEL, FIELD_NAME_LABEL, METRIC_NAME_LABEL, SCHEMA_LABEL}; use crate::prometheus_handler::PrometheusHandlerRef; /// For [ValueType::Vector] result type @@ -1034,6 +1034,19 @@ pub async fn label_values_query( let mut field_columns = field_columns.into_iter().collect::>(); field_columns.sort_unstable(); return PrometheusJsonResponse::success(PrometheusResponse::LabelValues(field_columns)); + } else if label_name == SCHEMA_LABEL || label_name == DATABASE_LABEL { + let catalog_manager = handler.catalog_manager(); + + match retrieve_schema_names(&query_ctx, catalog_manager, params.matches.0).await { + Ok(schema_names) => { + return PrometheusJsonResponse::success(PrometheusResponse::LabelValues( + schema_names, + )); + } + Err(e) => { + return PrometheusJsonResponse::error(e.status_code(), e.output_msg()); + } + } } let queries = params.matches.0; @@ -1151,6 +1164,44 @@ async fn retrieve_field_names( Ok(field_columns) } +async fn retrieve_schema_names( + query_ctx: &QueryContext, + catalog_manager: CatalogManagerRef, + matches: Vec, +) -> Result> { + let mut schemas = Vec::new(); + let catalog = query_ctx.current_catalog(); + + let candidate_schemas = catalog_manager + .schema_names(catalog, Some(query_ctx)) + .await + .context(CatalogSnafu)?; + + for schema in candidate_schemas { + let mut found = true; + for match_item in &matches { + if let Some(table_name) = retrieve_metric_name_from_promql(match_item) { + let exists = catalog_manager + .table_exists(catalog, &schema, &table_name, Some(query_ctx)) + .await + .context(CatalogSnafu)?; + if !exists { + found = false; + break; + } + } + } + + if found { + schemas.push(schema); + } + } + + schemas.sort_unstable(); + + Ok(schemas) +} + /// Try to parse and extract the name of referenced metric from the promql query. /// /// Returns the metric name if exactly one unique metric is referenced, otherwise None. diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index 0eff0cfd02..8b0fb92f43 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -805,6 +805,89 @@ pub async fn test_prom_http_api(store_type: StorageType) { serde_json::from_value::(json!(["host1", "host2"])).unwrap() ); + // special labels + let res = client + .get("/v1/prometheus/api/v1/label/__schema__/values?start=0&end=600") + .send() + .await; + assert_eq!(res.status(), StatusCode::OK); + let body = serde_json::from_str::(&res.text().await).unwrap(); + assert_eq!(body.status, "success"); + assert_eq!( + body.data, + serde_json::from_value::(json!([ + "greptime_private", + "information_schema", + "public" + ])) + .unwrap() + ); + + // special labels + let res = client + .get("/v1/prometheus/api/v1/label/__schema__/values?match[]=demo&start=0&end=600") + .send() + .await; + assert_eq!(res.status(), StatusCode::OK); + let body = serde_json::from_str::(&res.text().await).unwrap(); + assert_eq!(body.status, "success"); + assert_eq!( + body.data, + serde_json::from_value::(json!(["public"])).unwrap() + ); + + // special labels + let res = client + .get("/v1/prometheus/api/v1/label/__database__/values?match[]=demo&start=0&end=600") + .send() + .await; + assert_eq!(res.status(), StatusCode::OK); + let body = serde_json::from_str::(&res.text().await).unwrap(); + assert_eq!(body.status, "success"); + assert_eq!( + body.data, + serde_json::from_value::(json!(["public"])).unwrap() + ); + + // special labels + let res = client + .get("/v1/prometheus/api/v1/label/__database__/values?match[]=multi_labels{idc=\"idc1\", env=\"dev\"}&start=0&end=600") + .send() + .await; + assert_eq!(res.status(), StatusCode::OK); + let body = serde_json::from_str::(&res.text().await).unwrap(); + assert_eq!(body.status, "success"); + assert_eq!( + body.data, + serde_json::from_value::(json!(["public"])).unwrap() + ); + + // match special labels. + let res = client + .get("/v1/prometheus/api/v1/label/host/values?match[]=multi_labels{__schema__=\"public\", idc=\"idc1\", env=\"dev\"}&start=0&end=600") + .send() + .await; + assert_eq!(res.status(), StatusCode::OK); + let body = serde_json::from_str::(&res.text().await).unwrap(); + assert_eq!(body.status, "success"); + assert_eq!( + body.data, + serde_json::from_value::(json!(["host1", "host2"])).unwrap() + ); + + // match special labels. + let res = client + .get("/v1/prometheus/api/v1/label/host/values?match[]=multi_labels{__schema__=\"information_schema\", idc=\"idc1\", env=\"dev\"}&start=0&end=600") + .send() + .await; + assert_eq!(res.status(), StatusCode::OK); + let body = serde_json::from_str::(&res.text().await).unwrap(); + assert_eq!(body.status, "success"); + assert_eq!( + body.data, + serde_json::from_value::(json!([])).unwrap() + ); + // search field name let res = client .get("/v1/prometheus/api/v1/label/__field__/values?match[]=demo")