diff --git a/Cargo.lock b/Cargo.lock index 991a52422b..c313dbd558 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9121,6 +9121,7 @@ dependencies = [ "tower", "tower-http", "urlencoding", + "zstd 0.13.0", ] [[package]] @@ -10165,6 +10166,7 @@ dependencies = [ "tonic 0.10.2", "tower", "uuid", + "zstd 0.13.0", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 0bf04f17b8..6cfcebdff4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -155,6 +155,7 @@ tokio-util = { version = "0.7", features = ["io-util", "compat"] } toml = "0.8.8" tonic = { version = "0.10", features = ["tls"] } uuid = { version = "1", features = ["serde", "v4", "fast-rng"] } +zstd = "0.13" ## workspaces members api = { path = "src/api" } diff --git a/src/servers/Cargo.toml b/src/servers/Cargo.toml index 19f4a9f09d..c86770246d 100644 --- a/src/servers/Cargo.toml +++ b/src/servers/Cargo.toml @@ -102,6 +102,7 @@ tonic-reflection = "0.10" tower = { version = "0.4", features = ["full"] } tower-http = { version = "0.4", features = ["full"] } urlencoding = "2.1" +zstd.workspace = true [target.'cfg(not(windows))'.dependencies] tikv-jemalloc-ctl = { version = "0.5", features = ["use_std"] } diff --git a/src/servers/src/error.rs b/src/servers/src/error.rs index 2d47547e65..1d4baa605e 100644 --- a/src/servers/src/error.rs +++ b/src/servers/src/error.rs @@ -219,13 +219,20 @@ pub enum Error { error: prost::DecodeError, }, - #[snafu(display("Failed to decompress prometheus remote request"))] - DecompressPromRemoteRequest { + #[snafu(display("Failed to decompress snappy prometheus remote request"))] + DecompressSnappyPromRemoteRequest { location: Location, #[snafu(source)] error: snap::Error, }, + #[snafu(display("Failed to decompress zstd prometheus remote request"))] + DecompressZstdPromRemoteRequest { + location: Location, + #[snafu(source)] + error: std::io::Error, + }, + #[snafu(display("Failed to send prometheus remote request"))] SendPromRemoteRequest { location: Location, @@ -504,7 +511,8 @@ impl ErrorExt for Error { | DecodePromRemoteRequest { .. } | DecodeOtlpRequest { .. } | CompressPromRemoteRequest { .. } - | DecompressPromRemoteRequest { .. } + | DecompressSnappyPromRemoteRequest { .. } + | DecompressZstdPromRemoteRequest { .. } | InvalidPromRemoteRequest { .. } | InvalidExportMetricsConfig { .. } | InvalidFlightTicket { .. } @@ -657,7 +665,8 @@ impl IntoResponse for Error { | Error::InvalidOpentsdbJsonRequest { .. } | Error::DecodePromRemoteRequest { .. } | Error::DecodeOtlpRequest { .. } - | Error::DecompressPromRemoteRequest { .. } + | Error::DecompressSnappyPromRemoteRequest { .. } + | Error::DecompressZstdPromRemoteRequest { .. } | Error::InvalidPromRemoteRequest { .. } | Error::InvalidQuery { .. } | Error::TimePrecision { .. } => HttpStatusCode::BAD_REQUEST, diff --git a/src/servers/src/http/prom_store.rs b/src/servers/src/http/prom_store.rs index bf3f248eb0..1d4bd7a37d 100644 --- a/src/servers/src/http/prom_store.rs +++ b/src/servers/src/http/prom_store.rs @@ -19,7 +19,7 @@ use api::v1::RowInsertRequests; use axum::extract::{Query, RawBody, State}; use axum::http::{header, HeaderValue, StatusCode}; use axum::response::IntoResponse; -use axum::Extension; +use axum::{Extension, TypedHeader}; use bytes::Bytes; use common_catalog::consts::DEFAULT_SCHEMA_NAME; use common_query::prelude::GREPTIME_PHYSICAL_TABLE; @@ -35,7 +35,7 @@ use snafu::prelude::*; use super::header::{write_cost_header_map, GREPTIME_DB_HEADER_METRICS}; use crate::error::{self, Result, UnexpectedPhysicalTableSnafu}; -use crate::prom_store::snappy_decompress; +use crate::prom_store::{snappy_decompress, zstd_decompress}; use crate::proto::PromWriteRequest; use crate::query_handler::{PromStoreProtocolHandlerRef, PromStoreResponse}; @@ -45,19 +45,26 @@ lazy_static! { Pool::new(256, PromWriteRequest::default); } +pub const DEFAULT_ENCODING: &str = "snappy"; +pub const VM_ENCODING: &str = "zstd"; +pub const VM_PROTO_VERSION: &str = "1"; + #[derive(Debug, Serialize, Deserialize, JsonSchema)] -pub struct DatabaseQuery { +pub struct RemoteWriteQuery { pub db: Option, /// Specify which physical table to use for storing metrics. /// This only works on remote write requests. pub physical_table: Option, + /// For VictoriaMetrics modified remote write protocol + pub get_vm_proto_version: Option, } -impl Default for DatabaseQuery { - fn default() -> DatabaseQuery { +impl Default for RemoteWriteQuery { + fn default() -> RemoteWriteQuery { Self { db: Some(DEFAULT_SCHEMA_NAME.to_string()), physical_table: Some(GREPTIME_PHYSICAL_TABLE.to_string()), + get_vm_proto_version: None, } } } @@ -66,16 +73,23 @@ impl Default for DatabaseQuery { #[axum_macros::debug_handler] pub async fn route_write_without_metric_engine( State(handler): State, - Query(params): Query, + Query(params): Query, Extension(query_ctx): Extension, + content_encoding: TypedHeader, RawBody(body): RawBody, ) -> Result { + // VictoriaMetrics handshake + if let Some(_vm_handshake) = params.get_vm_proto_version { + return Ok(VM_PROTO_VERSION.into_response()); + } + let db = params.db.clone().unwrap_or_default(); let _timer = crate::metrics::METRIC_HTTP_PROM_STORE_WRITE_ELAPSED .with_label_values(&[db.as_str()]) .start_timer(); - let (request, samples) = decode_remote_write_request(body).await?; + let is_zstd = content_encoding.contains(VM_ENCODING); + let (request, samples) = decode_remote_write_request(is_zstd, body).await?; // reject if physical table is specified when metric engine is disabled if params.physical_table.is_some() { return UnexpectedPhysicalTableSnafu {}.fail(); @@ -86,7 +100,8 @@ pub async fn route_write_without_metric_engine( Ok(( StatusCode::NO_CONTENT, write_cost_header_map(output.meta.cost), - )) + ) + .into_response()) } #[axum_macros::debug_handler] @@ -96,16 +111,23 @@ pub async fn route_write_without_metric_engine( )] pub async fn remote_write( State(handler): State, - Query(params): Query, + Query(params): Query, Extension(mut query_ctx): Extension, + content_encoding: TypedHeader, RawBody(body): RawBody, ) -> Result { + // VictoriaMetrics handshake + if let Some(_vm_handshake) = params.get_vm_proto_version { + return Ok(VM_PROTO_VERSION.into_response()); + } + let db = params.db.clone().unwrap_or_default(); let _timer = crate::metrics::METRIC_HTTP_PROM_STORE_WRITE_ELAPSED .with_label_values(&[db.as_str()]) .start_timer(); - let (request, samples) = decode_remote_write_request_to_row_inserts(body).await?; + let is_zstd = content_encoding.contains(VM_ENCODING); + let (request, samples) = decode_remote_write_request_to_row_inserts(is_zstd, body).await?; if let Some(physical_table) = params.physical_table { let mut new_query_ctx = query_ctx.as_ref().clone(); @@ -118,7 +140,8 @@ pub async fn remote_write( Ok(( StatusCode::NO_CONTENT, write_cost_header_map(output.meta.cost), - )) + ) + .into_response()) } impl IntoResponse for PromStoreResponse { @@ -147,7 +170,7 @@ impl IntoResponse for PromStoreResponse { )] pub async fn remote_read( State(handler): State, - Query(params): Query, + Query(params): Query, Extension(query_ctx): Extension, RawBody(body): RawBody, ) -> Result { @@ -162,6 +185,7 @@ pub async fn remote_read( } async fn decode_remote_write_request_to_row_inserts( + is_zstd: bool, body: Body, ) -> Result<(RowInsertRequests, usize)> { let _timer = crate::metrics::METRIC_HTTP_PROM_STORE_DECODE_ELAPSED.start_timer(); @@ -169,7 +193,11 @@ async fn decode_remote_write_request_to_row_inserts( .await .context(error::HyperSnafu)?; - let buf = Bytes::from(snappy_decompress(&body[..])?); + let buf = Bytes::from(if is_zstd { + zstd_decompress(&body[..])? + } else { + snappy_decompress(&body[..])? + }); let mut request = PROM_WRITE_REQUEST_POOL.pull(PromWriteRequest::default); request @@ -178,13 +206,20 @@ async fn decode_remote_write_request_to_row_inserts( Ok(request.as_row_insert_requests()) } -async fn decode_remote_write_request(body: Body) -> Result<(RowInsertRequests, usize)> { +async fn decode_remote_write_request( + is_zstd: bool, + body: Body, +) -> Result<(RowInsertRequests, usize)> { let _timer = crate::metrics::METRIC_HTTP_PROM_STORE_DECODE_ELAPSED.start_timer(); let body = hyper::body::to_bytes(body) .await .context(error::HyperSnafu)?; - let buf = Bytes::from(snappy_decompress(&body[..])?); + let buf = Bytes::from(if is_zstd { + zstd_decompress(&body[..])? + } else { + snappy_decompress(&body[..])? + }); let mut request = PromWriteRequest::default(); request diff --git a/src/servers/src/prom_store.rs b/src/servers/src/prom_store.rs index 7553d97912..23c7e45c38 100644 --- a/src/servers/src/prom_store.rs +++ b/src/servers/src/prom_store.rs @@ -389,7 +389,7 @@ pub fn snappy_decompress(buf: &[u8]) -> Result> { let mut decoder = Decoder::new(); decoder .decompress_vec(buf) - .context(error::DecompressPromRemoteRequestSnafu) + .context(error::DecompressSnappyPromRemoteRequestSnafu) } #[inline] @@ -400,6 +400,11 @@ pub fn snappy_compress(buf: &[u8]) -> Result> { .context(error::CompressPromRemoteRequestSnafu) } +#[inline] +pub fn zstd_decompress(buf: &[u8]) -> Result> { + zstd::stream::decode_all(buf).context(error::DecompressZstdPromRemoteRequestSnafu) +} + /// Mock timeseries for test, it is both used in servers and frontend crate /// So we present it here pub fn mock_timeseries() -> Vec { diff --git a/tests-integration/Cargo.toml b/tests-integration/Cargo.toml index aa40cae92a..99438540e2 100644 --- a/tests-integration/Cargo.toml +++ b/tests-integration/Cargo.toml @@ -46,6 +46,7 @@ mysql_async = { version = "0.33", default-features = false, features = [ ] } object-store.workspace = true operator.workspace = true +prost.workspace = true query.workspace = true rstest = "0.17" rstest_reuse = "0.5" @@ -68,6 +69,7 @@ tokio.workspace = true tonic.workspace = true tower = "0.4" uuid.workspace = true +zstd.workspace = true [dev-dependencies] datafusion.workspace = true diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index b2b3230ca4..bc8b458ee5 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -14,10 +14,12 @@ use std::collections::BTreeMap; +use api::prom_store::remote::WriteRequest; use auth::user_provider_from_option; use axum::http::{HeaderName, StatusCode}; use axum_test_helper::TestClient; use common_error::status_code::StatusCode as ErrorCode; +use prost::Message; use serde_json::json; use servers::http::error_result::ErrorResponse; use servers::http::greptime_result_v1::GreptimedbV1Response; @@ -26,6 +28,7 @@ use servers::http::header::GREPTIME_TIMEZONE_HEADER_NAME; use servers::http::influxdb_result_v1::{InfluxdbOutput, InfluxdbV1Response}; use servers::http::prometheus::{PrometheusJsonResponse, PrometheusResponse}; use servers::http::GreptimeQueryOutput; +use servers::prom_store; use tests_integration::test_util::{ setup_test_http_app, setup_test_http_app_with_frontend, setup_test_http_app_with_frontend_and_user_provider, setup_test_prom_app_with_frontend, @@ -71,6 +74,8 @@ macro_rules! http_tests { test_status_api, test_config_api, test_dashboard_path, + test_prometheus_remote_write, + test_vm_proto_remote_write, ); )* }; @@ -896,3 +901,63 @@ pub async fn test_dashboard_path(store_type: StorageType) { #[cfg(not(feature = "dashboard"))] pub async fn test_dashboard_path(_: StorageType) {} + +pub async fn test_prometheus_remote_write(store_type: StorageType) { + common_telemetry::init_default_ut_logging(); + let (app, mut guard) = + setup_test_prom_app_with_frontend(store_type, "prometheus_remote_write").await; + let client = TestClient::new(app); + + // write snappy encoded data + let write_request = WriteRequest { + timeseries: prom_store::mock_timeseries(), + ..Default::default() + }; + let serialized_request = write_request.encode_to_vec(); + let compressed_request = + prom_store::snappy_compress(&serialized_request).expect("failed to encode snappy"); + + let res = client + .post("/v1/prometheus/write") + .header("Content-Encoding", "snappy") + .body(compressed_request) + .send() + .await; + assert_eq!(res.status(), StatusCode::NO_CONTENT); + + guard.remove_all().await; +} + +pub async fn test_vm_proto_remote_write(store_type: StorageType) { + common_telemetry::init_default_ut_logging(); + let (app, mut guard) = + setup_test_prom_app_with_frontend(store_type, "vm_proto_remote_write").await; + + // handshake + let client = TestClient::new(app); + let res = client + .post("/v1/prometheus/write?get_vm_proto_version=1") + .send() + .await; + assert_eq!(res.status(), StatusCode::OK); + assert_eq!(res.text().await, "1"); + + // write zstd encoded data + let write_request = WriteRequest { + timeseries: prom_store::mock_timeseries(), + ..Default::default() + }; + let serialized_request = write_request.encode_to_vec(); + let compressed_request = + zstd::stream::encode_all(&serialized_request[..], 1).expect("Failed to encode zstd"); + + let res = client + .post("/v1/prometheus/write") + .header("Content-Encoding", "zstd") + .body(compressed_request) + .send() + .await; + assert_eq!(res.status(), StatusCode::NO_CONTENT); + + guard.remove_all().await; +}