fix: wrong handler implementation of prometheus remote write (#3826)

* fix: wrong handler implementation of prometheus remote write

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* refactor code

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
Ruihang Xia
2024-04-28 18:52:32 +08:00
committed by GitHub
parent dadee99d69
commit c0b909330a
2 changed files with 72 additions and 107 deletions

View File

@@ -684,6 +684,9 @@ impl HttpServer {
.with_state(api_state)
}
/// Route Prometheus [HTTP API].
///
/// [HTTP API]: https://prometheus.io/docs/prometheus/latest/querying/api/
fn route_prometheus<S>(prometheus_handler: PrometheusHandlerRef) -> Router<S> {
Router::new()
.route(
@@ -702,6 +705,11 @@ impl HttpServer {
.with_state(prometheus_handler)
}
/// Route Prometheus remote [read] and [write] API. In other places the related modules are
/// called `prom_store`.
///
/// [read]: https://prometheus.io/docs/prometheus/latest/querying/remote_read_api/
/// [write]: https://prometheus.io/docs/concepts/remote_write_spec/
fn route_prom<S>(
prom_handler: PromStoreProtocolHandlerRef,
prom_store_with_metric_engine: bool,

View File

@@ -34,7 +34,7 @@ use session::context::QueryContextRef;
use snafu::prelude::*;
use super::header::{write_cost_header_map, GREPTIME_DB_HEADER_METRICS};
use crate::error::{self, Result, UnexpectedPhysicalTableSnafu};
use crate::error::{self, Result};
use crate::prom_store::{snappy_decompress, zstd_decompress};
use crate::proto::PromWriteRequest;
use crate::query_handler::{PromStoreProtocolHandlerRef, PromStoreResponse};
@@ -72,67 +72,44 @@ impl Default for RemoteWriteQuery {
/// Same with [remote_write] but won't store data to metric engine.
#[axum_macros::debug_handler]
pub async fn route_write_without_metric_engine(
State(handler): State<PromStoreProtocolHandlerRef>,
Query(params): Query<RemoteWriteQuery>,
Extension(query_ctx): Extension<QueryContextRef>,
handler: State<PromStoreProtocolHandlerRef>,
query: Query<RemoteWriteQuery>,
extension: Extension<QueryContextRef>,
content_encoding: TypedHeader<headers::ContentEncoding>,
RawBody(body): RawBody,
raw_body: RawBody,
) -> Result<impl IntoResponse> {
// VictoriaMetrics handshake
if let Some(_vm_handshake) = params.get_vm_proto_version {
return Ok(VM_PROTO_VERSION.into_response());
}
let db = params.db.clone().unwrap_or_default();
let _timer = crate::metrics::METRIC_HTTP_PROM_STORE_WRITE_ELAPSED
.with_label_values(&[db.as_str()])
.start_timer();
let is_zstd = content_encoding.contains(VM_ENCODING);
let (request, samples) = decode_remote_write_request(is_zstd, body, true).await?;
// reject if physical table is specified when metric engine is disabled
if params.physical_table.is_some() {
return UnexpectedPhysicalTableSnafu {}.fail();
}
let output = handler.write(request, query_ctx, false).await?;
crate::metrics::PROM_STORE_REMOTE_WRITE_SAMPLES.inc_by(samples as u64);
Ok((
StatusCode::NO_CONTENT,
write_cost_header_map(output.meta.cost),
remote_write_impl(
handler,
query,
extension,
content_encoding,
raw_body,
true,
false,
)
.into_response())
.await
}
/// Same with [remote_write] but won't store data to metric engine.
/// And without strict_mode on will not check invalid UTF-8.
#[axum_macros::debug_handler]
pub async fn route_write_without_metric_engine_and_strict_mode(
State(handler): State<PromStoreProtocolHandlerRef>,
Query(params): Query<RemoteWriteQuery>,
Extension(query_ctx): Extension<QueryContextRef>,
handler: State<PromStoreProtocolHandlerRef>,
query: Query<RemoteWriteQuery>,
extension: Extension<QueryContextRef>,
content_encoding: TypedHeader<headers::ContentEncoding>,
RawBody(body): RawBody,
raw_body: RawBody,
) -> Result<impl IntoResponse> {
let db = params.db.clone().unwrap_or_default();
let _timer = crate::metrics::METRIC_HTTP_PROM_STORE_WRITE_ELAPSED
.with_label_values(&[db.as_str()])
.start_timer();
let is_zstd = content_encoding.contains(VM_ENCODING);
let (request, samples) = decode_remote_write_request(is_zstd, body, false).await?;
// reject if physical table is specified when metric engine is disabled
if params.physical_table.is_some() {
return UnexpectedPhysicalTableSnafu {}.fail();
}
let output = handler.write(request, query_ctx, false).await?;
crate::metrics::PROM_STORE_REMOTE_WRITE_SAMPLES.inc_by(samples as u64);
Ok((
StatusCode::NO_CONTENT,
write_cost_header_map(output.meta.cost),
remote_write_impl(
handler,
query,
extension,
content_encoding,
raw_body,
false,
false,
)
.into_response())
.await
}
#[axum_macros::debug_handler]
@@ -141,39 +118,22 @@ pub async fn route_write_without_metric_engine_and_strict_mode(
fields(protocol = "prometheus", request_type = "remote_write")
)]
pub async fn remote_write(
State(handler): State<PromStoreProtocolHandlerRef>,
Query(params): Query<RemoteWriteQuery>,
Extension(mut query_ctx): Extension<QueryContextRef>,
handler: State<PromStoreProtocolHandlerRef>,
query: Query<RemoteWriteQuery>,
extension: Extension<QueryContextRef>,
content_encoding: TypedHeader<headers::ContentEncoding>,
RawBody(body): RawBody,
raw_body: RawBody,
) -> Result<impl IntoResponse> {
// VictoriaMetrics handshake
if let Some(_vm_handshake) = params.get_vm_proto_version {
return Ok(VM_PROTO_VERSION.into_response());
}
let db = params.db.clone().unwrap_or_default();
let _timer = crate::metrics::METRIC_HTTP_PROM_STORE_WRITE_ELAPSED
.with_label_values(&[db.as_str()])
.start_timer();
let is_zstd = content_encoding.contains(VM_ENCODING);
let (request, samples) =
decode_remote_write_request_to_row_inserts(is_zstd, body, true).await?;
if let Some(physical_table) = params.physical_table {
let mut new_query_ctx = query_ctx.as_ref().clone();
new_query_ctx.set_extension(PHYSICAL_TABLE_PARAM, physical_table);
query_ctx = Arc::new(new_query_ctx);
}
let output = handler.write(request, query_ctx, true).await?;
crate::metrics::PROM_STORE_REMOTE_WRITE_SAMPLES.inc_by(samples as u64);
Ok((
StatusCode::NO_CONTENT,
write_cost_header_map(output.meta.cost),
remote_write_impl(
handler,
query,
extension,
content_encoding,
raw_body,
true,
true,
)
.into_response())
.await
}
#[axum_macros::debug_handler]
@@ -182,11 +142,32 @@ pub async fn remote_write(
fields(protocol = "prometheus", request_type = "remote_write")
)]
pub async fn remote_write_without_strict_mode(
handler: State<PromStoreProtocolHandlerRef>,
query: Query<RemoteWriteQuery>,
extension: Extension<QueryContextRef>,
content_encoding: TypedHeader<headers::ContentEncoding>,
raw_body: RawBody,
) -> Result<impl IntoResponse> {
remote_write_impl(
handler,
query,
extension,
content_encoding,
raw_body,
false,
true,
)
.await
}
async fn remote_write_impl(
State(handler): State<PromStoreProtocolHandlerRef>,
Query(params): Query<RemoteWriteQuery>,
Extension(mut query_ctx): Extension<QueryContextRef>,
content_encoding: TypedHeader<headers::ContentEncoding>,
RawBody(body): RawBody,
is_strict_mode: bool,
is_metric_engine: bool,
) -> Result<impl IntoResponse> {
// VictoriaMetrics handshake
if let Some(_vm_handshake) = params.get_vm_proto_version {
@@ -199,8 +180,7 @@ pub async fn remote_write_without_strict_mode(
.start_timer();
let is_zstd = content_encoding.contains(VM_ENCODING);
let (request, samples) =
decode_remote_write_request_to_row_inserts(is_zstd, body, false).await?;
let (request, samples) = decode_remote_write_request(is_zstd, body, is_strict_mode).await?;
if let Some(physical_table) = params.physical_table {
let mut new_query_ctx = query_ctx.as_ref().clone();
@@ -208,7 +188,7 @@ pub async fn remote_write_without_strict_mode(
query_ctx = Arc::new(new_query_ctx);
}
let output = handler.write(request, query_ctx, false).await?;
let output = handler.write(request, query_ctx, is_metric_engine).await?;
crate::metrics::PROM_STORE_REMOTE_WRITE_SAMPLES.inc_by(samples as u64);
Ok((
StatusCode::NO_CONTENT,
@@ -257,7 +237,7 @@ pub async fn remote_read(
handler.read(request, query_ctx).await
}
async fn decode_remote_write_request_to_row_inserts(
async fn decode_remote_write_request(
is_zstd: bool,
body: Body,
is_strict_mode: bool,
@@ -280,29 +260,6 @@ async fn decode_remote_write_request_to_row_inserts(
Ok(request.as_row_insert_requests())
}
async fn decode_remote_write_request(
is_zstd: bool,
body: Body,
is_strict_mode: bool,
) -> Result<(RowInsertRequests, usize)> {
let _timer = crate::metrics::METRIC_HTTP_PROM_STORE_DECODE_ELAPSED.start_timer();
let body = hyper::body::to_bytes(body)
.await
.context(error::HyperSnafu)?;
let buf = Bytes::from(if is_zstd {
zstd_decompress(&body[..])?
} else {
snappy_decompress(&body[..])?
});
let mut request = PromWriteRequest::default();
request
.merge(buf, is_strict_mode)
.context(error::DecodePromRemoteRequestSnafu)?;
Ok(request.as_row_insert_requests())
}
async fn decode_remote_read_request(body: Body) -> Result<ReadRequest> {
let body = hyper::body::to_bytes(body)
.await