refactor!: move prometheus routes to default http server (#2005)

* move prometheus routes to default http server

Signed-off-by: sh2 <shawnhxh@outlook.com>

* fix ci test and remove the server logic of prometheus

* remove unused import and prometheus relevant code

* fix ci: rustfmt and test

* fix ci: silly fmt

* fix ci: silly silly fmt

* change `/prom_store` back to `/prometheus`

* remove unused variable

---------

Signed-off-by: sh2 <shawnhxh@outlook.com>
This commit is contained in:
sh2
2023-08-16 11:21:14 +08:00
committed by GitHub
parent 0967678a51
commit 202540823f
5 changed files with 64 additions and 150 deletions

View File

@@ -28,7 +28,6 @@ use servers::metrics_handler::MetricsHandler;
use servers::mysql::server::{MysqlServer, MysqlSpawnConfig, MysqlSpawnRef};
use servers::opentsdb::OpentsdbServer;
use servers::postgres::PostgresServer;
use servers::prometheus::PrometheusServer;
use servers::query_handler::grpc::ServerGrpcQueryHandlerAdaptor;
use servers::query_handler::sql::ServerSqlQueryHandlerAdaptor;
use servers::server::Server;
@@ -191,17 +190,6 @@ impl Services {
result.push((Box::new(http_server), http_addr));
}
if let Some(prometheus_options) = &opts.prometheus_options {
let prom_addr = parse_addr(&prometheus_options.addr)?;
let mut prom_server = PrometheusServer::create_server(instance);
if let Some(user_provider) = user_provider {
prom_server.set_user_provider(user_provider);
}
result.push((prom_server, prom_addr));
};
Ok(result
.into_iter()
.map(|(server, addr)| (server.name().to_string(), (server, addr)))

View File

@@ -74,6 +74,10 @@ use crate::metrics::{
METRIC_METHOD_LABEL, METRIC_PATH_LABEL,
};
use crate::metrics_handler::MetricsHandler;
use crate::prometheus::{
instant_query, label_values_query, labels_query, range_query, series_query,
PrometheusHandlerRef,
};
use crate::query_handler::grpc::ServerGrpcQueryHandlerRef;
use crate::query_handler::sql::ServerSqlQueryHandlerRef;
use crate::query_handler::{
@@ -128,6 +132,7 @@ pub struct HttpServer {
influxdb_handler: Option<InfluxdbLineProtocolHandlerRef>,
opentsdb_handler: Option<OpentsdbProtocolHandlerRef>,
prom_handler: Option<PromStoreProtocolHandlerRef>,
prometheus_handler: Option<PrometheusHandlerRef>,
otlp_handler: Option<OpenTelemetryProtocolHandlerRef>,
script_handler: Option<ScriptHandlerRef>,
shutdown_tx: Mutex<Option<Sender<()>>>,
@@ -409,6 +414,7 @@ impl HttpServerBuilder {
opentsdb_handler: None,
influxdb_handler: None,
prom_handler: None,
prometheus_handler: None,
otlp_handler: None,
user_provider: None,
script_handler: None,
@@ -450,6 +456,11 @@ impl HttpServerBuilder {
self
}
pub fn with_prometheus_handler(&mut self, handler: PrometheusHandlerRef) -> &mut Self {
let _ = self.inner.prometheus_handler.get_or_insert(handler);
self
}
pub fn with_otlp_handler(&mut self, handler: OpenTelemetryProtocolHandlerRef) -> &mut Self {
let _ = self.inner.otlp_handler.get_or_insert(handler);
self
@@ -537,6 +548,13 @@ impl HttpServer {
);
}
if let Some(prometheus_handler) = self.prometheus_handler.clone() {
router = router.nest(
&format!("/{HTTP_API_VERSION}/prometheus"),
self.route_prometheus(prometheus_handler),
);
}
if let Some(otlp_handler) = self.otlp_handler.clone() {
router = router.nest(
&format!("/{HTTP_API_VERSION}/otlp"),
@@ -648,6 +666,19 @@ impl HttpServer {
.with_state(api_state)
}
fn route_prometheus<S>(&self, prometheus_handler: PrometheusHandlerRef) -> Router<S> {
Router::new()
.route("/query", routing::post(instant_query).get(instant_query))
.route("/query_range", routing::post(range_query).get(range_query))
.route("/labels", routing::post(labels_query).get(labels_query))
.route("/series", routing::post(series_query).get(series_query))
.route(
"/label/:label_name/values",
routing::get(label_values_query),
)
.with_state(prometheus_handler)
}
fn route_prom<S>(&self, prom_handler: PromStoreProtocolHandlerRef) -> Router<S> {
Router::new()
.route("/write", routing::post(prom_store::remote_write))

View File

@@ -14,14 +14,11 @@
//! prom supply the prometheus HTTP API Server compliance
use std::collections::{BTreeMap, HashMap, HashSet};
use std::net::SocketAddr;
use std::sync::Arc;
use ::auth::UserProviderRef;
use async_trait::async_trait;
use axum::body::BoxBody;
use axum::extract::{Path, Query, State};
use axum::{middleware, routing, Form, Json, Router};
use axum::{Form, Json};
use catalog::CatalogManagerRef;
use common_catalog::consts::DEFAULT_SCHEMA_NAME;
use common_catalog::parse_catalog_and_schema_from_db_string;
@@ -29,12 +26,11 @@ use common_error::ext::ErrorExt;
use common_error::status_code::StatusCode;
use common_query::Output;
use common_recordbatch::RecordBatches;
use common_telemetry::{info, timer};
use common_telemetry::timer;
use common_time::util::{current_time_rfc3339, yesterday_rfc3339};
use datatypes::prelude::ConcreteDataType;
use datatypes::scalars::ScalarVector;
use datatypes::vectors::{Float64Vector, StringVector, TimestampMillisecondVector};
use futures::FutureExt;
use promql_parser::label::METRIC_NAME;
use promql_parser::parser::{
AggregateExpr, BinaryExpr, Call, Expr as PromqlExpr, MatrixSelector, ParenExpr, SubqueryExpr,
@@ -45,22 +41,12 @@ use schemars::JsonSchema;
use serde::de::{self, MapAccess, Visitor};
use serde::{Deserialize, Serialize};
use session::context::{QueryContext, QueryContextRef};
use snafu::{ensure, Location, OptionExt, ResultExt};
use tokio::sync::oneshot::Sender;
use tokio::sync::{oneshot, Mutex};
use tower::ServiceBuilder;
use tower_http::auth::AsyncRequireAuthorizationLayer;
use tower_http::compression::CompressionLayer;
use tower_http::trace::TraceLayer;
use snafu::{Location, OptionExt, ResultExt};
use crate::error::{
AlreadyStartedSnafu, CollectRecordbatchSnafu, Error, InternalSnafu, InvalidQuerySnafu, Result,
StartHttpSnafu, UnexpectedResultSnafu,
CollectRecordbatchSnafu, Error, InternalSnafu, InvalidQuerySnafu, Result, UnexpectedResultSnafu,
};
use crate::http::authorize::HttpAuth;
use crate::http::track_metrics;
use crate::prom_store::{FIELD_COLUMN_NAME, METRIC_NAME_LABEL, TIMESTAMP_COLUMN_NAME};
use crate::server::Server;
pub const PROMETHEUS_API_VERSION: &str = "v1";
@@ -73,107 +59,6 @@ pub trait PrometheusHandler {
fn catalog_manager(&self) -> CatalogManagerRef;
}
/// PromServer represents PrometheusServer which handles the compliance with prometheus HTTP API
pub struct PrometheusServer {
query_handler: PrometheusHandlerRef,
shutdown_tx: Mutex<Option<Sender<()>>>,
user_provider: Option<UserProviderRef>,
}
impl PrometheusServer {
pub fn create_server(query_handler: PrometheusHandlerRef) -> Box<Self> {
Box::new(PrometheusServer {
query_handler,
shutdown_tx: Mutex::new(None),
user_provider: None,
})
}
pub fn set_user_provider(&mut self, user_provider: UserProviderRef) {
debug_assert!(self.user_provider.is_none());
self.user_provider = Some(user_provider);
}
pub fn make_app(&self) -> Router {
// TODO(ruihang): implement format_query, series, values, query_exemplars and targets methods
let router = Router::new()
.route("/query", routing::post(instant_query).get(instant_query))
.route("/query_range", routing::post(range_query).get(range_query))
.route("/labels", routing::post(labels_query).get(labels_query))
.route("/series", routing::post(series_query).get(series_query))
.route(
"/label/:label_name/values",
routing::get(label_values_query),
)
.with_state(self.query_handler.clone());
Router::new()
.nest(&format!("/api/{PROMETHEUS_API_VERSION}"), router)
// middlewares
.layer(
ServiceBuilder::new()
.layer(TraceLayer::new_for_http())
.layer(CompressionLayer::new())
// custom layer
.layer(AsyncRequireAuthorizationLayer::new(
HttpAuth::<BoxBody>::new(self.user_provider.clone()),
)),
)
// We need to register the metrics layer again since start a new http server
// for the PromServer.
.route_layer(middleware::from_fn(track_metrics))
}
}
pub const PROMETHEUS_SERVER: &str = "PROMETHEUS_SERVER";
#[async_trait]
impl Server for PrometheusServer {
async fn shutdown(&self) -> Result<()> {
let mut shutdown_tx = self.shutdown_tx.lock().await;
if let Some(tx) = shutdown_tx.take() {
if tx.send(()).is_err() {
info!("Receiver dropped, the Prometheus API server has already existed");
}
}
info!("Shutdown Prometheus API server");
Ok(())
}
async fn start(&self, listening: SocketAddr) -> Result<SocketAddr> {
let (tx, rx) = oneshot::channel();
let server = {
let mut shutdown_tx = self.shutdown_tx.lock().await;
ensure!(
shutdown_tx.is_none(),
AlreadyStartedSnafu {
server: "Prometheus"
}
);
let app = self.make_app();
let server = axum::Server::bind(&listening).serve(app.into_make_service());
*shutdown_tx = Some(tx);
server
};
let listening = server.local_addr();
info!("Prometheus API server is bound to {}", listening);
let graceful = server.with_graceful_shutdown(rx.map(drop));
graceful.await.context(StartHttpSnafu)?;
Ok(listening)
}
fn name(&self) -> &str {
PROMETHEUS_SERVER
}
}
#[derive(Debug, Default, Serialize, Deserialize, JsonSchema, PartialEq)]
pub struct PromSeries {
pub metric: HashMap<String, String>,

View File

@@ -54,7 +54,6 @@ use servers::http::{HttpOptions, HttpServerBuilder};
use servers::metrics_handler::MetricsHandler;
use servers::mysql::server::{MysqlServer, MysqlSpawnConfig, MysqlSpawnRef};
use servers::postgres::PostgresServer;
use servers::prometheus::PrometheusServer;
use servers::query_handler::grpc::ServerGrpcQueryHandlerAdaptor;
use servers::query_handler::sql::ServerSqlQueryHandlerAdaptor;
use servers::server::Server;
@@ -544,11 +543,10 @@ pub async fn setup_test_prom_app_with_frontend(
.with_grpc_handler(ServerGrpcQueryHandlerAdaptor::arc(frontend_ref.clone()))
.with_script_handler(frontend_ref.clone())
.with_prom_handler(frontend_ref.clone())
.with_prometheus_handler(frontend_ref.clone())
.with_greptime_config_options(opts.to_toml_string())
.build();
let prom_server = PrometheusServer::create_server(frontend_ref);
let app = http_server.build(http_server.make_app());
let app = app.merge(prom_server.make_app());
(app, guard)
}

View File

@@ -343,10 +343,13 @@ pub async fn test_prom_http_api(store_type: StorageType) {
let client = TestClient::new(app);
// instant query
let res = client.get("/api/v1/query?query=up&time=1").send().await;
let res = client
.get("/v1/prometheus/query?query=up&time=1")
.send()
.await;
assert_eq!(res.status(), StatusCode::OK);
let res = client
.post("/api/v1/query?query=up&time=1")
.post("/v1/prometheus/query?query=up&time=1")
.header("Content-Type", "application/x-www-form-urlencoded")
.send()
.await;
@@ -354,28 +357,31 @@ pub async fn test_prom_http_api(store_type: StorageType) {
// range query
let res = client
.get("/api/v1/query_range?query=up&start=1&end=100&step=5")
.get("/v1/prometheus/query_range?query=up&start=1&end=100&step=5")
.send()
.await;
assert_eq!(res.status(), StatusCode::OK);
let res = client
.post("/api/v1/query_range?query=up&start=1&end=100&step=5")
.post("/v1/prometheus/query_range?query=up&start=1&end=100&step=5")
.header("Content-Type", "application/x-www-form-urlencoded")
.send()
.await;
assert_eq!(res.status(), StatusCode::OK);
// labels
let res = client.get("/api/v1/labels?match[]=demo").send().await;
let res = client
.get("/v1/prometheus/labels?match[]=demo")
.send()
.await;
assert_eq!(res.status(), StatusCode::OK);
let res = client
.post("/api/v1/labels?match[]=up")
.post("/v1/prometheus/labels?match[]=up")
.header("Content-Type", "application/x-www-form-urlencoded")
.send()
.await;
assert_eq!(res.status(), StatusCode::OK);
let res = client
.get("/api/v1/labels?match[]=demo&start=0")
.get("/v1/prometheus/labels?match[]=demo&start=0")
.send()
.await;
assert_eq!(res.status(), StatusCode::OK);
@@ -390,17 +396,17 @@ pub async fn test_prom_http_api(store_type: StorageType) {
);
// labels without match[] param
let res = client.get("/api/v1/labels").send().await;
let res = client.get("/v1/prometheus/labels").send().await;
assert_eq!(res.status(), StatusCode::OK);
// labels query with multiple match[] params
let res = client
.get("/api/v1/labels?match[]=up&match[]=down")
.get("/v1/prometheus/labels?match[]=up&match[]=down")
.send()
.await;
assert_eq!(res.status(), StatusCode::OK);
let res = client
.post("/api/v1/labels?match[]=up&match[]=down")
.post("/v1/prometheus/labels?match[]=up&match[]=down")
.header("Content-Type", "application/x-www-form-urlencoded")
.send()
.await;
@@ -408,7 +414,7 @@ pub async fn test_prom_http_api(store_type: StorageType) {
// series
let res = client
.get("/api/v1/series?match[]=demo&start=0&end=0")
.get("/v1/prometheus/series?match[]=demo&start=0&end=0")
.send()
.await;
assert_eq!(res.status(), StatusCode::OK);
@@ -432,7 +438,7 @@ pub async fn test_prom_http_api(store_type: StorageType) {
assert_eq!(actual, expected);
let res = client
.post("/api/v1/series?match[]=up&match[]=down")
.post("/v1/prometheus/series?match[]=up&match[]=down")
.header("Content-Type", "application/x-www-form-urlencoded")
.send()
.await;
@@ -440,7 +446,10 @@ pub async fn test_prom_http_api(store_type: StorageType) {
// label values
// should return error if there is no match[]
let res = client.get("/api/v1/label/instance/values").send().await;
let res = client
.get("/v1/prometheus/label/instance/values")
.send()
.await;
assert_eq!(res.status(), StatusCode::OK);
let prom_resp = res.json::<PrometheusJsonResponse>().await;
assert_eq!(prom_resp.status, "error");
@@ -449,7 +458,7 @@ pub async fn test_prom_http_api(store_type: StorageType) {
// single match[]
let res = client
.get("/api/v1/label/host/values?match[]=demo&start=0&end=600")
.get("/v1/prometheus/label/host/values?match[]=demo&start=0&end=600")
.send()
.await;
assert_eq!(res.status(), StatusCode::OK);
@@ -462,7 +471,7 @@ pub async fn test_prom_http_api(store_type: StorageType) {
// multiple match[]
let res = client
.get("/api/v1/label/instance/values?match[]=up&match[]=system_metrics")
.get("/v1/prometheus/label/instance/values?match[]=up&match[]=system_metrics")
.send()
.await;
assert_eq!(res.status(), StatusCode::OK);
@@ -472,7 +481,10 @@ pub async fn test_prom_http_api(store_type: StorageType) {
assert!(prom_resp.error_type.is_none());
// query `__name__` without match[]
let res = client.get("/api/v1/label/__name__/values").send().await;
let res = client
.get("/v1/prometheus/label/__name__/values")
.send()
.await;
assert_eq!(res.status(), StatusCode::OK);
let prom_resp = res.json::<PrometheusJsonResponse>().await;
assert_eq!(prom_resp.status, "success");