feat: schema/database support for label_values (#6631)

* feat: initial support for __schema__ in label values

* feat: filter database with matches

* refactor: skip unnecessary check

* fix: resolve schema matcher in label values

* test: add a test case for table not exists

* refactor: add matchop check on db label

* chore: merge main

Signed-off-by: evenyag <realevenyag@gmail.com>
This commit is contained in:
Ning Sun
2025-08-04 19:56:10 +08:00
committed by Yingwen
parent b62f219810
commit 4d768b2c31
4 changed files with 155 additions and 3 deletions

View File

@@ -21,9 +21,10 @@ use common_catalog::format_full_table_name;
use common_recordbatch::util;
use common_telemetry::tracing;
use datatypes::prelude::Value;
use promql_parser::label::{Matcher, Matchers};
use promql_parser::label::{MatchOp, Matcher, Matchers};
use query::promql;
use query::promql::planner::PromPlanner;
use servers::prom_store::{DATABASE_LABEL, SCHEMA_LABEL};
use servers::prometheus;
use session::context::QueryContextRef;
use snafu::{OptionExt, ResultExt};
@@ -114,7 +115,17 @@ impl Instance {
end: SystemTime,
ctx: &QueryContextRef,
) -> Result<Vec<String>> {
let table_schema = ctx.current_schema();
let table_schema = matchers
.iter()
.find_map(|m| {
if (m.name == SCHEMA_LABEL || m.name == DATABASE_LABEL) && m.op == MatchOp::Equal {
Some(m.value.clone())
} else {
None
}
})
.unwrap_or_else(|| ctx.current_schema());
let table = self
.catalog_manager
.table(ctx.current_catalog(), &table_schema, &metric, Some(ctx))

View File

@@ -1255,6 +1255,13 @@ impl PromPlanner {
) -> Result<Vec<DfExpr>> {
let mut exprs = Vec::with_capacity(label_matchers.matchers.len());
for matcher in label_matchers.matchers {
if matcher.name == SCHEMA_COLUMN_MATCHER
|| matcher.name == DB_COLUMN_MATCHER
|| matcher.name == FIELD_COLUMN_MATCHER
{
continue;
}
let col = if table_schema
.field_with_unqualified_name(&matcher.name)
.is_err()

View File

@@ -56,7 +56,7 @@ use crate::error::{
TableNotFoundSnafu, UnexpectedResultSnafu,
};
use crate::http::header::collect_plan_metrics;
use crate::prom_store::{FIELD_NAME_LABEL, METRIC_NAME_LABEL};
use crate::prom_store::{DATABASE_LABEL, FIELD_NAME_LABEL, METRIC_NAME_LABEL, SCHEMA_LABEL};
use crate::prometheus_handler::PrometheusHandlerRef;
/// For [ValueType::Vector] result type
@@ -1034,6 +1034,19 @@ pub async fn label_values_query(
let mut field_columns = field_columns.into_iter().collect::<Vec<_>>();
field_columns.sort_unstable();
return PrometheusJsonResponse::success(PrometheusResponse::LabelValues(field_columns));
} else if label_name == SCHEMA_LABEL || label_name == DATABASE_LABEL {
let catalog_manager = handler.catalog_manager();
match retrieve_schema_names(&query_ctx, catalog_manager, params.matches.0).await {
Ok(schema_names) => {
return PrometheusJsonResponse::success(PrometheusResponse::LabelValues(
schema_names,
));
}
Err(e) => {
return PrometheusJsonResponse::error(e.status_code(), e.output_msg());
}
}
}
let queries = params.matches.0;
@@ -1151,6 +1164,44 @@ async fn retrieve_field_names(
Ok(field_columns)
}
async fn retrieve_schema_names(
query_ctx: &QueryContext,
catalog_manager: CatalogManagerRef,
matches: Vec<String>,
) -> Result<Vec<String>> {
let mut schemas = Vec::new();
let catalog = query_ctx.current_catalog();
let candidate_schemas = catalog_manager
.schema_names(catalog, Some(query_ctx))
.await
.context(CatalogSnafu)?;
for schema in candidate_schemas {
let mut found = true;
for match_item in &matches {
if let Some(table_name) = retrieve_metric_name_from_promql(match_item) {
let exists = catalog_manager
.table_exists(catalog, &schema, &table_name, Some(query_ctx))
.await
.context(CatalogSnafu)?;
if !exists {
found = false;
break;
}
}
}
if found {
schemas.push(schema);
}
}
schemas.sort_unstable();
Ok(schemas)
}
/// Try to parse and extract the name of referenced metric from the promql query.
///
/// Returns the metric name if exactly one unique metric is referenced, otherwise None.

View File

@@ -785,6 +785,89 @@ pub async fn test_prom_http_api(store_type: StorageType) {
serde_json::from_value::<PrometheusResponse>(json!(["host1", "host2"])).unwrap()
);
// special labels
let res = client
.get("/v1/prometheus/api/v1/label/__schema__/values?start=0&end=600")
.send()
.await;
assert_eq!(res.status(), StatusCode::OK);
let body = serde_json::from_str::<PrometheusJsonResponse>(&res.text().await).unwrap();
assert_eq!(body.status, "success");
assert_eq!(
body.data,
serde_json::from_value::<PrometheusResponse>(json!([
"greptime_private",
"information_schema",
"public"
]))
.unwrap()
);
// special labels
let res = client
.get("/v1/prometheus/api/v1/label/__schema__/values?match[]=demo&start=0&end=600")
.send()
.await;
assert_eq!(res.status(), StatusCode::OK);
let body = serde_json::from_str::<PrometheusJsonResponse>(&res.text().await).unwrap();
assert_eq!(body.status, "success");
assert_eq!(
body.data,
serde_json::from_value::<PrometheusResponse>(json!(["public"])).unwrap()
);
// special labels
let res = client
.get("/v1/prometheus/api/v1/label/__database__/values?match[]=demo&start=0&end=600")
.send()
.await;
assert_eq!(res.status(), StatusCode::OK);
let body = serde_json::from_str::<PrometheusJsonResponse>(&res.text().await).unwrap();
assert_eq!(body.status, "success");
assert_eq!(
body.data,
serde_json::from_value::<PrometheusResponse>(json!(["public"])).unwrap()
);
// special labels
let res = client
.get("/v1/prometheus/api/v1/label/__database__/values?match[]=multi_labels{idc=\"idc1\", env=\"dev\"}&start=0&end=600")
.send()
.await;
assert_eq!(res.status(), StatusCode::OK);
let body = serde_json::from_str::<PrometheusJsonResponse>(&res.text().await).unwrap();
assert_eq!(body.status, "success");
assert_eq!(
body.data,
serde_json::from_value::<PrometheusResponse>(json!(["public"])).unwrap()
);
// match special labels.
let res = client
.get("/v1/prometheus/api/v1/label/host/values?match[]=multi_labels{__schema__=\"public\", idc=\"idc1\", env=\"dev\"}&start=0&end=600")
.send()
.await;
assert_eq!(res.status(), StatusCode::OK);
let body = serde_json::from_str::<PrometheusJsonResponse>(&res.text().await).unwrap();
assert_eq!(body.status, "success");
assert_eq!(
body.data,
serde_json::from_value::<PrometheusResponse>(json!(["host1", "host2"])).unwrap()
);
// match special labels.
let res = client
.get("/v1/prometheus/api/v1/label/host/values?match[]=multi_labels{__schema__=\"information_schema\", idc=\"idc1\", env=\"dev\"}&start=0&end=600")
.send()
.await;
assert_eq!(res.status(), StatusCode::OK);
let body = serde_json::from_str::<PrometheusJsonResponse>(&res.text().await).unwrap();
assert_eq!(body.status, "success");
assert_eq!(
body.data,
serde_json::from_value::<PrometheusResponse>(json!([])).unwrap()
);
// search field name
let res = client
.get("/v1/prometheus/api/v1/label/__field__/values?match[]=demo")