diff --git a/src/servers/src/http/prometheus.rs b/src/servers/src/http/prometheus.rs index 84574e6b3f..b02b9b8bd7 100644 --- a/src/servers/src/http/prometheus.rs +++ b/src/servers/src/http/prometheus.rs @@ -18,7 +18,6 @@ use std::collections::{HashMap, HashSet}; use axum::extract::{Path, Query, State}; use axum::{Extension, Form}; use catalog::CatalogManagerRef; -use common_catalog::consts::DEFAULT_SCHEMA_NAME; use common_catalog::parse_catalog_and_schema_from_db_string; use common_error::ext::ErrorExt; use common_error::status_code::StatusCode; @@ -41,7 +40,7 @@ use schemars::JsonSchema; use serde::de::{self, MapAccess, Visitor}; use serde::{Deserialize, Serialize}; use serde_json::Value; -use session::context::QueryContextRef; +use session::context::{QueryContext, QueryContextBuilder, QueryContextRef}; use snafu::{Location, ResultExt}; pub use super::prometheus_resp::PrometheusJsonResponse; @@ -166,7 +165,7 @@ pub struct InstantQuery { pub async fn instant_query( State(handler): State, Query(params): Query, - Extension(query_ctx): Extension, + Extension(mut query_ctx): Extension, Form(form_params): Form, ) -> PrometheusJsonResponse { // Extract time from query string, or use current server time if not specified. @@ -185,6 +184,12 @@ pub async fn instant_query( .unwrap_or_else(|| DEFAULT_LOOKBACK_STRING.to_string()), }; + // update catalog and schema in query context if necessary + if let Some(db) = ¶ms.db { + let (catalog, schema) = parse_catalog_and_schema_from_db_string(db); + query_ctx = try_update_catalog_schema(query_ctx, &catalog, &schema); + } + let result = handler.do_query(&prom_query, query_ctx).await; let (metric_name, result_type) = match retrieve_metric_name_and_result_type(&prom_query.query) { Ok((metric_name, result_type)) => (metric_name.unwrap_or_default(), result_type), @@ -214,7 +219,7 @@ pub struct RangeQuery { pub async fn range_query( State(handler): State, Query(params): Query, - Extension(query_ctx): Extension, + Extension(mut query_ctx): Extension, Form(form_params): Form, ) -> PrometheusJsonResponse { let prom_query = PromQuery { @@ -228,6 +233,12 @@ pub async fn range_query( .unwrap_or_else(|| DEFAULT_LOOKBACK_STRING.to_string()), }; + // update catalog and schema in query context if necessary + if let Some(db) = ¶ms.db { + let (catalog, schema) = parse_catalog_and_schema_from_db_string(db); + query_ctx = try_update_catalog_schema(query_ctx, &catalog, &schema); + } + let result = handler.do_query(&prom_query, query_ctx).await; let metric_name = match retrieve_metric_name_and_result_type(&prom_query.query) { Err(err) => { @@ -294,8 +305,8 @@ pub async fn labels_query( Extension(query_ctx): Extension, Form(form_params): Form, ) -> PrometheusJsonResponse { - let db = ¶ms.db.unwrap_or(DEFAULT_SCHEMA_NAME.to_string()); - let (catalog, schema) = parse_catalog_and_schema_from_db_string(db); + let (catalog, schema) = get_catalog_schema(¶ms.db, &query_ctx); + let query_ctx = try_update_catalog_schema(query_ctx, &catalog, &schema); let mut queries = params.matches.0; if queries.is_empty() { @@ -534,6 +545,35 @@ pub(crate) fn retrieve_metric_name_and_result_type( Ok((metric_name, result_type)) } +/// Tries to get catalog and schema from an optional db param. And retrieves +/// them from [QueryContext] if they don't present. +pub(crate) fn get_catalog_schema(db: &Option, ctx: &QueryContext) -> (String, String) { + if let Some(db) = db { + parse_catalog_and_schema_from_db_string(db) + } else { + ( + ctx.current_catalog().to_string(), + ctx.current_schema().to_string(), + ) + } +} + +/// Update catalog and schema in [QueryContext] if necessary. +pub(crate) fn try_update_catalog_schema( + ctx: QueryContextRef, + catalog: &str, + schema: &str, +) -> QueryContextRef { + if ctx.current_catalog() != catalog || ctx.current_schema() != schema { + QueryContextBuilder::from_existing(&ctx) + .current_catalog(catalog.to_string()) + .current_schema(schema.to_string()) + .build() + } else { + ctx + } +} + fn promql_expr_to_metric_name(expr: &PromqlExpr) -> Option { match expr { PromqlExpr::Aggregate(AggregateExpr { expr, .. }) => promql_expr_to_metric_name(expr), @@ -580,8 +620,8 @@ pub async fn label_values_query( Extension(query_ctx): Extension, Query(params): Query, ) -> PrometheusJsonResponse { - let db = ¶ms.db.unwrap_or(DEFAULT_SCHEMA_NAME.to_string()); - let (catalog, schema) = parse_catalog_and_schema_from_db_string(db); + let (catalog, schema) = get_catalog_schema(¶ms.db, &query_ctx); + let query_ctx = try_update_catalog_schema(query_ctx, &catalog, &schema); if label_name == METRIC_NAME_LABEL { let mut table_names = match handler @@ -731,7 +771,7 @@ pub struct SeriesQuery { pub async fn series_query( State(handler): State, Query(params): Query, - Extension(query_ctx): Extension, + Extension(mut query_ctx): Extension, Form(form_params): Form, ) -> PrometheusJsonResponse { let mut queries: Vec = params.matches.0; @@ -754,6 +794,12 @@ pub async fn series_query( .or(form_params.lookback) .unwrap_or_else(|| DEFAULT_LOOKBACK_STRING.to_string()); + // update catalog and schema in query context if necessary + if let Some(db) = ¶ms.db { + let (catalog, schema) = parse_catalog_and_schema_from_db_string(db); + query_ctx = try_update_catalog_schema(query_ctx, &catalog, &schema); + } + let mut series = Vec::new(); let mut merge_map = HashMap::new(); for query in queries { diff --git a/src/session/src/context.rs b/src/session/src/context.rs index d2c060919e..948b6144a6 100644 --- a/src/session/src/context.rs +++ b/src/session/src/context.rs @@ -223,6 +223,18 @@ impl QueryContextBuilder { .insert(key, value); self } + + pub fn from_existing(context: &QueryContext) -> QueryContextBuilder { + QueryContextBuilder { + current_catalog: Some(context.current_catalog.clone()), + current_schema: Some(context.current_schema.clone()), + current_user: Some(context.current_user.load().clone().into()), + timezone: Some(context.timezone.load().clone().into()), + sql_dialect: Some(context.sql_dialect.clone()), + extension: Some(context.extension.clone()), + configuration_parameter: Some(context.configuration_parameter.clone()), + } + } } #[derive(Debug)] diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index bbeafc18f7..7a3a1a43c5 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -23,7 +23,7 @@ use serde_json::json; use servers::http::error_result::ErrorResponse; use servers::http::greptime_result_v1::GreptimedbV1Response; use servers::http::handler::HealthResponse; -use servers::http::header::GREPTIME_TIMEZONE_HEADER_NAME; +use servers::http::header::{GREPTIME_DB_HEADER_NAME, GREPTIME_TIMEZONE_HEADER_NAME}; use servers::http::influxdb_result_v1::{InfluxdbOutput, InfluxdbV1Response}; use servers::http::prometheus::{PrometheusJsonResponse, PrometheusResponse}; use servers::http::test_helpers::TestClient; @@ -543,6 +543,34 @@ pub async fn test_prom_http_api(store_type: StorageType) { serde_json::from_value::(json!(["host1", "host2"])).unwrap() ); + // query an empty database should return nothing + let res = client + .get("/v1/prometheus/api/v1/label/host/values?match[]=demo&start=0&end=600") + .header(GREPTIME_DB_HEADER_NAME.clone(), "nonexistent") + .send() + .await; + assert_eq!(res.status(), StatusCode::OK); + let body = serde_json::from_str::(&res.text().await).unwrap(); + assert_eq!(body.status, "success"); + assert_eq!( + body.data, + serde_json::from_value::(json!([])).unwrap() + ); + + // db header will be overrode by `db` in param + let res = client + .get("/v1/prometheus/api/v1/label/host/values?match[]=demo&start=0&end=600&db=public") + .header(GREPTIME_DB_HEADER_NAME.clone(), "nonexistent") + .send() + .await; + assert_eq!(res.status(), StatusCode::OK); + let body = serde_json::from_str::(&res.text().await).unwrap(); + assert_eq!(body.status, "success"); + assert_eq!( + body.data, + serde_json::from_value::(json!(["host1", "host2"])).unwrap() + ); + // multiple match[] let res = client .get("/v1/prometheus/api/v1/label/instance/values?match[]=up&match[]=system_metrics")