diff --git a/src/frontend/src/server.rs b/src/frontend/src/server.rs index ad4ba5cae0..7751f8590e 100644 --- a/src/frontend/src/server.rs +++ b/src/frontend/src/server.rs @@ -28,7 +28,6 @@ use servers::metrics_handler::MetricsHandler; use servers::mysql::server::{MysqlServer, MysqlSpawnConfig, MysqlSpawnRef}; use servers::opentsdb::OpentsdbServer; use servers::postgres::PostgresServer; -use servers::prometheus::PrometheusServer; use servers::query_handler::grpc::ServerGrpcQueryHandlerAdaptor; use servers::query_handler::sql::ServerSqlQueryHandlerAdaptor; use servers::server::Server; @@ -191,17 +190,6 @@ impl Services { result.push((Box::new(http_server), http_addr)); } - if let Some(prometheus_options) = &opts.prometheus_options { - let prom_addr = parse_addr(&prometheus_options.addr)?; - - let mut prom_server = PrometheusServer::create_server(instance); - if let Some(user_provider) = user_provider { - prom_server.set_user_provider(user_provider); - } - - result.push((prom_server, prom_addr)); - }; - Ok(result .into_iter() .map(|(server, addr)| (server.name().to_string(), (server, addr))) diff --git a/src/servers/src/http.rs b/src/servers/src/http.rs index c2457c492c..a2773b77bc 100644 --- a/src/servers/src/http.rs +++ b/src/servers/src/http.rs @@ -74,6 +74,10 @@ use crate::metrics::{ METRIC_METHOD_LABEL, METRIC_PATH_LABEL, }; use crate::metrics_handler::MetricsHandler; +use crate::prometheus::{ + instant_query, label_values_query, labels_query, range_query, series_query, + PrometheusHandlerRef, +}; use crate::query_handler::grpc::ServerGrpcQueryHandlerRef; use crate::query_handler::sql::ServerSqlQueryHandlerRef; use crate::query_handler::{ @@ -128,6 +132,7 @@ pub struct HttpServer { influxdb_handler: Option, opentsdb_handler: Option, prom_handler: Option, + prometheus_handler: Option, otlp_handler: Option, script_handler: Option, shutdown_tx: Mutex>>, @@ -409,6 +414,7 @@ impl HttpServerBuilder { opentsdb_handler: None, influxdb_handler: None, prom_handler: None, + prometheus_handler: None, otlp_handler: None, user_provider: None, script_handler: None, @@ -450,6 +456,11 @@ impl HttpServerBuilder { self } + pub fn with_prometheus_handler(&mut self, handler: PrometheusHandlerRef) -> &mut Self { + let _ = self.inner.prometheus_handler.get_or_insert(handler); + self + } + pub fn with_otlp_handler(&mut self, handler: OpenTelemetryProtocolHandlerRef) -> &mut Self { let _ = self.inner.otlp_handler.get_or_insert(handler); self @@ -537,6 +548,13 @@ impl HttpServer { ); } + if let Some(prometheus_handler) = self.prometheus_handler.clone() { + router = router.nest( + &format!("/{HTTP_API_VERSION}/prometheus"), + self.route_prometheus(prometheus_handler), + ); + } + if let Some(otlp_handler) = self.otlp_handler.clone() { router = router.nest( &format!("/{HTTP_API_VERSION}/otlp"), @@ -648,6 +666,19 @@ impl HttpServer { .with_state(api_state) } + fn route_prometheus(&self, prometheus_handler: PrometheusHandlerRef) -> Router { + Router::new() + .route("/query", routing::post(instant_query).get(instant_query)) + .route("/query_range", routing::post(range_query).get(range_query)) + .route("/labels", routing::post(labels_query).get(labels_query)) + .route("/series", routing::post(series_query).get(series_query)) + .route( + "/label/:label_name/values", + routing::get(label_values_query), + ) + .with_state(prometheus_handler) + } + fn route_prom(&self, prom_handler: PromStoreProtocolHandlerRef) -> Router { Router::new() .route("/write", routing::post(prom_store::remote_write)) diff --git a/src/servers/src/prometheus.rs b/src/servers/src/prometheus.rs index 7482301107..e7261a5615 100644 --- a/src/servers/src/prometheus.rs +++ b/src/servers/src/prometheus.rs @@ -14,14 +14,11 @@ //! prom supply the prometheus HTTP API Server compliance use std::collections::{BTreeMap, HashMap, HashSet}; -use std::net::SocketAddr; use std::sync::Arc; -use ::auth::UserProviderRef; use async_trait::async_trait; -use axum::body::BoxBody; use axum::extract::{Path, Query, State}; -use axum::{middleware, routing, Form, Json, Router}; +use axum::{Form, Json}; use catalog::CatalogManagerRef; use common_catalog::consts::DEFAULT_SCHEMA_NAME; use common_catalog::parse_catalog_and_schema_from_db_string; @@ -29,12 +26,11 @@ use common_error::ext::ErrorExt; use common_error::status_code::StatusCode; use common_query::Output; use common_recordbatch::RecordBatches; -use common_telemetry::{info, timer}; +use common_telemetry::timer; use common_time::util::{current_time_rfc3339, yesterday_rfc3339}; use datatypes::prelude::ConcreteDataType; use datatypes::scalars::ScalarVector; use datatypes::vectors::{Float64Vector, StringVector, TimestampMillisecondVector}; -use futures::FutureExt; use promql_parser::label::METRIC_NAME; use promql_parser::parser::{ AggregateExpr, BinaryExpr, Call, Expr as PromqlExpr, MatrixSelector, ParenExpr, SubqueryExpr, @@ -45,22 +41,12 @@ use schemars::JsonSchema; use serde::de::{self, MapAccess, Visitor}; use serde::{Deserialize, Serialize}; use session::context::{QueryContext, QueryContextRef}; -use snafu::{ensure, Location, OptionExt, ResultExt}; -use tokio::sync::oneshot::Sender; -use tokio::sync::{oneshot, Mutex}; -use tower::ServiceBuilder; -use tower_http::auth::AsyncRequireAuthorizationLayer; -use tower_http::compression::CompressionLayer; -use tower_http::trace::TraceLayer; +use snafu::{Location, OptionExt, ResultExt}; use crate::error::{ - AlreadyStartedSnafu, CollectRecordbatchSnafu, Error, InternalSnafu, InvalidQuerySnafu, Result, - StartHttpSnafu, UnexpectedResultSnafu, + CollectRecordbatchSnafu, Error, InternalSnafu, InvalidQuerySnafu, Result, UnexpectedResultSnafu, }; -use crate::http::authorize::HttpAuth; -use crate::http::track_metrics; use crate::prom_store::{FIELD_COLUMN_NAME, METRIC_NAME_LABEL, TIMESTAMP_COLUMN_NAME}; -use crate::server::Server; pub const PROMETHEUS_API_VERSION: &str = "v1"; @@ -73,107 +59,6 @@ pub trait PrometheusHandler { fn catalog_manager(&self) -> CatalogManagerRef; } -/// PromServer represents PrometheusServer which handles the compliance with prometheus HTTP API -pub struct PrometheusServer { - query_handler: PrometheusHandlerRef, - shutdown_tx: Mutex>>, - user_provider: Option, -} - -impl PrometheusServer { - pub fn create_server(query_handler: PrometheusHandlerRef) -> Box { - Box::new(PrometheusServer { - query_handler, - shutdown_tx: Mutex::new(None), - user_provider: None, - }) - } - - pub fn set_user_provider(&mut self, user_provider: UserProviderRef) { - debug_assert!(self.user_provider.is_none()); - self.user_provider = Some(user_provider); - } - - pub fn make_app(&self) -> Router { - // TODO(ruihang): implement format_query, series, values, query_exemplars and targets methods - - let router = Router::new() - .route("/query", routing::post(instant_query).get(instant_query)) - .route("/query_range", routing::post(range_query).get(range_query)) - .route("/labels", routing::post(labels_query).get(labels_query)) - .route("/series", routing::post(series_query).get(series_query)) - .route( - "/label/:label_name/values", - routing::get(label_values_query), - ) - .with_state(self.query_handler.clone()); - - Router::new() - .nest(&format!("/api/{PROMETHEUS_API_VERSION}"), router) - // middlewares - .layer( - ServiceBuilder::new() - .layer(TraceLayer::new_for_http()) - .layer(CompressionLayer::new()) - // custom layer - .layer(AsyncRequireAuthorizationLayer::new( - HttpAuth::::new(self.user_provider.clone()), - )), - ) - // We need to register the metrics layer again since start a new http server - // for the PromServer. - .route_layer(middleware::from_fn(track_metrics)) - } -} - -pub const PROMETHEUS_SERVER: &str = "PROMETHEUS_SERVER"; - -#[async_trait] -impl Server for PrometheusServer { - async fn shutdown(&self) -> Result<()> { - let mut shutdown_tx = self.shutdown_tx.lock().await; - if let Some(tx) = shutdown_tx.take() { - if tx.send(()).is_err() { - info!("Receiver dropped, the Prometheus API server has already existed"); - } - } - info!("Shutdown Prometheus API server"); - - Ok(()) - } - - async fn start(&self, listening: SocketAddr) -> Result { - let (tx, rx) = oneshot::channel(); - let server = { - let mut shutdown_tx = self.shutdown_tx.lock().await; - ensure!( - shutdown_tx.is_none(), - AlreadyStartedSnafu { - server: "Prometheus" - } - ); - - let app = self.make_app(); - let server = axum::Server::bind(&listening).serve(app.into_make_service()); - - *shutdown_tx = Some(tx); - - server - }; - let listening = server.local_addr(); - info!("Prometheus API server is bound to {}", listening); - - let graceful = server.with_graceful_shutdown(rx.map(drop)); - graceful.await.context(StartHttpSnafu)?; - - Ok(listening) - } - - fn name(&self) -> &str { - PROMETHEUS_SERVER - } -} - #[derive(Debug, Default, Serialize, Deserialize, JsonSchema, PartialEq)] pub struct PromSeries { pub metric: HashMap, diff --git a/tests-integration/src/test_util.rs b/tests-integration/src/test_util.rs index ddd98b9d4a..fc802873d7 100644 --- a/tests-integration/src/test_util.rs +++ b/tests-integration/src/test_util.rs @@ -54,7 +54,6 @@ use servers::http::{HttpOptions, HttpServerBuilder}; use servers::metrics_handler::MetricsHandler; use servers::mysql::server::{MysqlServer, MysqlSpawnConfig, MysqlSpawnRef}; use servers::postgres::PostgresServer; -use servers::prometheus::PrometheusServer; use servers::query_handler::grpc::ServerGrpcQueryHandlerAdaptor; use servers::query_handler::sql::ServerSqlQueryHandlerAdaptor; use servers::server::Server; @@ -544,11 +543,10 @@ pub async fn setup_test_prom_app_with_frontend( .with_grpc_handler(ServerGrpcQueryHandlerAdaptor::arc(frontend_ref.clone())) .with_script_handler(frontend_ref.clone()) .with_prom_handler(frontend_ref.clone()) + .with_prometheus_handler(frontend_ref.clone()) .with_greptime_config_options(opts.to_toml_string()) .build(); - let prom_server = PrometheusServer::create_server(frontend_ref); let app = http_server.build(http_server.make_app()); - let app = app.merge(prom_server.make_app()); (app, guard) } diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index d8b63bf8a5..207b1d2994 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -343,10 +343,13 @@ pub async fn test_prom_http_api(store_type: StorageType) { let client = TestClient::new(app); // instant query - let res = client.get("/api/v1/query?query=up&time=1").send().await; + let res = client + .get("/v1/prometheus/query?query=up&time=1") + .send() + .await; assert_eq!(res.status(), StatusCode::OK); let res = client - .post("/api/v1/query?query=up&time=1") + .post("/v1/prometheus/query?query=up&time=1") .header("Content-Type", "application/x-www-form-urlencoded") .send() .await; @@ -354,28 +357,31 @@ pub async fn test_prom_http_api(store_type: StorageType) { // range query let res = client - .get("/api/v1/query_range?query=up&start=1&end=100&step=5") + .get("/v1/prometheus/query_range?query=up&start=1&end=100&step=5") .send() .await; assert_eq!(res.status(), StatusCode::OK); let res = client - .post("/api/v1/query_range?query=up&start=1&end=100&step=5") + .post("/v1/prometheus/query_range?query=up&start=1&end=100&step=5") .header("Content-Type", "application/x-www-form-urlencoded") .send() .await; assert_eq!(res.status(), StatusCode::OK); // labels - let res = client.get("/api/v1/labels?match[]=demo").send().await; + let res = client + .get("/v1/prometheus/labels?match[]=demo") + .send() + .await; assert_eq!(res.status(), StatusCode::OK); let res = client - .post("/api/v1/labels?match[]=up") + .post("/v1/prometheus/labels?match[]=up") .header("Content-Type", "application/x-www-form-urlencoded") .send() .await; assert_eq!(res.status(), StatusCode::OK); let res = client - .get("/api/v1/labels?match[]=demo&start=0") + .get("/v1/prometheus/labels?match[]=demo&start=0") .send() .await; assert_eq!(res.status(), StatusCode::OK); @@ -390,17 +396,17 @@ pub async fn test_prom_http_api(store_type: StorageType) { ); // labels without match[] param - let res = client.get("/api/v1/labels").send().await; + let res = client.get("/v1/prometheus/labels").send().await; assert_eq!(res.status(), StatusCode::OK); // labels query with multiple match[] params let res = client - .get("/api/v1/labels?match[]=up&match[]=down") + .get("/v1/prometheus/labels?match[]=up&match[]=down") .send() .await; assert_eq!(res.status(), StatusCode::OK); let res = client - .post("/api/v1/labels?match[]=up&match[]=down") + .post("/v1/prometheus/labels?match[]=up&match[]=down") .header("Content-Type", "application/x-www-form-urlencoded") .send() .await; @@ -408,7 +414,7 @@ pub async fn test_prom_http_api(store_type: StorageType) { // series let res = client - .get("/api/v1/series?match[]=demo&start=0&end=0") + .get("/v1/prometheus/series?match[]=demo&start=0&end=0") .send() .await; assert_eq!(res.status(), StatusCode::OK); @@ -432,7 +438,7 @@ pub async fn test_prom_http_api(store_type: StorageType) { assert_eq!(actual, expected); let res = client - .post("/api/v1/series?match[]=up&match[]=down") + .post("/v1/prometheus/series?match[]=up&match[]=down") .header("Content-Type", "application/x-www-form-urlencoded") .send() .await; @@ -440,7 +446,10 @@ pub async fn test_prom_http_api(store_type: StorageType) { // label values // should return error if there is no match[] - let res = client.get("/api/v1/label/instance/values").send().await; + let res = client + .get("/v1/prometheus/label/instance/values") + .send() + .await; assert_eq!(res.status(), StatusCode::OK); let prom_resp = res.json::().await; assert_eq!(prom_resp.status, "error"); @@ -449,7 +458,7 @@ pub async fn test_prom_http_api(store_type: StorageType) { // single match[] let res = client - .get("/api/v1/label/host/values?match[]=demo&start=0&end=600") + .get("/v1/prometheus/label/host/values?match[]=demo&start=0&end=600") .send() .await; assert_eq!(res.status(), StatusCode::OK); @@ -462,7 +471,7 @@ pub async fn test_prom_http_api(store_type: StorageType) { // multiple match[] let res = client - .get("/api/v1/label/instance/values?match[]=up&match[]=system_metrics") + .get("/v1/prometheus/label/instance/values?match[]=up&match[]=system_metrics") .send() .await; assert_eq!(res.status(), StatusCode::OK); @@ -472,7 +481,10 @@ pub async fn test_prom_http_api(store_type: StorageType) { assert!(prom_resp.error_type.is_none()); // query `__name__` without match[] - let res = client.get("/api/v1/label/__name__/values").send().await; + let res = client + .get("/v1/prometheus/label/__name__/values") + .send() + .await; assert_eq!(res.status(), StatusCode::OK); let prom_resp = res.json::().await; assert_eq!(prom_resp.status, "success");