refactor: remove prom store write dispatch (#5812)

* refactor: remove prom store remote write dispatch pattern

* chore: ref XIX-22
This commit is contained in:
shuiyisong
2025-04-02 12:35:28 +08:00
committed by GitHub
parent aa486db8b7
commit 054056fcbb
2 changed files with 27 additions and 105 deletions

View File

@@ -57,6 +57,7 @@ use crate::error::{
ToJsonSnafu,
};
use crate::http::influxdb::{influxdb_health, influxdb_ping, influxdb_write_v1, influxdb_write_v2};
use crate::http::prom_store::PromStoreState;
use crate::http::prometheus::{
build_info_query, format_query, instant_query, label_values_query, labels_query, parse_query,
range_query, series_query,
@@ -555,10 +556,16 @@ impl HttpServerBuilder {
prom_store_with_metric_engine: bool,
is_strict_mode: bool,
) -> Self {
let state = PromStoreState {
prom_store_handler: handler,
prom_store_with_metric_engine,
is_strict_mode,
};
Self {
router: self.router.nest(
&format!("/{HTTP_API_VERSION}/prometheus"),
HttpServer::route_prom(handler, prom_store_with_metric_engine, is_strict_mode),
HttpServer::route_prom(state),
),
..self
}
@@ -1004,36 +1011,11 @@ impl HttpServer {
///
/// [read]: https://prometheus.io/docs/prometheus/latest/querying/remote_read_api/
/// [write]: https://prometheus.io/docs/concepts/remote_write_spec/
fn route_prom<S>(
prom_handler: PromStoreProtocolHandlerRef,
prom_store_with_metric_engine: bool,
is_strict_mode: bool,
) -> Router<S> {
let mut router = Router::new().route("/read", routing::post(prom_store::remote_read));
match (prom_store_with_metric_engine, is_strict_mode) {
(true, true) => {
router = router.route("/write", routing::post(prom_store::remote_write))
}
(true, false) => {
router = router.route(
"/write",
routing::post(prom_store::remote_write_without_strict_mode),
)
}
(false, true) => {
router = router.route(
"/write",
routing::post(prom_store::route_write_without_metric_engine),
)
}
(false, false) => {
router = router.route(
"/write",
routing::post(prom_store::route_write_without_metric_engine_and_strict_mode),
)
}
}
router.with_state(prom_handler)
fn route_prom<S>(state: PromStoreState) -> Router<S> {
Router::new()
.route("/read", routing::post(prom_store::remote_read))
.route("/write", routing::post(prom_store::remote_write))
.with_state(state)
}
fn route_influxdb<S>(influxdb_handler: InfluxdbLineProtocolHandlerRef) -> Router<S> {

View File

@@ -49,6 +49,13 @@ pub const DEFAULT_ENCODING: &str = "snappy";
pub const VM_ENCODING: &str = "zstd";
pub const VM_PROTO_VERSION: &str = "1";
#[derive(Clone)]
pub struct PromStoreState {
pub prom_store_handler: PromStoreProtocolHandlerRef,
pub prom_store_with_metric_engine: bool,
pub is_strict_mode: bool,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct RemoteWriteQuery {
pub db: Option<String>,
@@ -69,99 +76,32 @@ impl Default for RemoteWriteQuery {
}
}
/// Same with [remote_write] but won't store data to metric engine.
#[axum_macros::debug_handler]
pub async fn route_write_without_metric_engine(
handler: State<PromStoreProtocolHandlerRef>,
query: Query<RemoteWriteQuery>,
extension: Extension<QueryContext>,
content_encoding: TypedHeader<headers::ContentEncoding>,
raw_body: Bytes,
) -> Result<impl IntoResponse> {
remote_write_impl(
handler,
query,
extension,
content_encoding,
raw_body,
true,
false,
)
.await
}
/// Same with [remote_write] but won't store data to metric engine.
/// And without strict_mode on will not check invalid UTF-8.
#[axum_macros::debug_handler]
pub async fn route_write_without_metric_engine_and_strict_mode(
handler: State<PromStoreProtocolHandlerRef>,
query: Query<RemoteWriteQuery>,
extension: Extension<QueryContext>,
content_encoding: TypedHeader<headers::ContentEncoding>,
raw_body: Bytes,
) -> Result<impl IntoResponse> {
remote_write_impl(
handler,
query,
extension,
content_encoding,
raw_body,
false,
false,
)
.await
}
#[axum_macros::debug_handler]
#[tracing::instrument(
skip_all,
fields(protocol = "prometheus", request_type = "remote_write")
)]
pub async fn remote_write(
handler: State<PromStoreProtocolHandlerRef>,
State(state): State<PromStoreState>,
query: Query<RemoteWriteQuery>,
extension: Extension<QueryContext>,
content_encoding: TypedHeader<headers::ContentEncoding>,
raw_body: Bytes,
) -> Result<impl IntoResponse> {
remote_write_impl(
handler,
state.prom_store_handler,
query,
extension,
content_encoding,
raw_body,
true,
true,
)
.await
}
#[axum_macros::debug_handler]
#[tracing::instrument(
skip_all,
fields(protocol = "prometheus", request_type = "remote_write")
)]
pub async fn remote_write_without_strict_mode(
handler: State<PromStoreProtocolHandlerRef>,
query: Query<RemoteWriteQuery>,
extension: Extension<QueryContext>,
content_encoding: TypedHeader<headers::ContentEncoding>,
raw_body: Bytes,
) -> Result<impl IntoResponse> {
remote_write_impl(
handler,
query,
extension,
content_encoding,
raw_body,
false,
true,
state.is_strict_mode,
state.prom_store_with_metric_engine,
)
.await
}
async fn remote_write_impl(
State(handler): State<PromStoreProtocolHandlerRef>,
handler: PromStoreProtocolHandlerRef,
Query(params): Query<RemoteWriteQuery>,
Extension(mut query_ctx): Extension<QueryContext>,
content_encoding: TypedHeader<headers::ContentEncoding>,
@@ -222,7 +162,7 @@ impl IntoResponse for PromStoreResponse {
fields(protocol = "prometheus", request_type = "remote_read")
)]
pub async fn remote_read(
State(handler): State<PromStoreProtocolHandlerRef>,
State(state): State<PromStoreState>,
Query(params): Query<RemoteWriteQuery>,
Extension(mut query_ctx): Extension<QueryContext>,
body: Bytes,
@@ -236,7 +176,7 @@ pub async fn remote_read(
let request = decode_remote_read_request(body).await?;
handler.read(request, query_ctx).await
state.prom_store_handler.read(request, query_ctx).await
}
fn try_decompress(is_zstd: bool, body: &[u8]) -> Result<Bytes> {