mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-05-28 02:40:38 +00:00
feat: add fallback logic for vmagent sending wrong content type (#4009)
* feat: add fallback logic for vmagent sending wrong content type * fix: resolve lint issues * Update src/servers/src/http/prom_store.rs Co-authored-by: Yingwen <realevenyag@gmail.com> --------- Co-authored-by: Yingwen <realevenyag@gmail.com>
This commit is contained in:
@@ -237,6 +237,14 @@ pub async fn remote_read(
|
||||
handler.read(request, query_ctx).await
|
||||
}
|
||||
|
||||
fn try_decompress(is_zstd: bool, body: &[u8]) -> Result<Bytes> {
|
||||
Ok(Bytes::from(if is_zstd {
|
||||
zstd_decompress(body)?
|
||||
} else {
|
||||
snappy_decompress(body)?
|
||||
}))
|
||||
}
|
||||
|
||||
async fn decode_remote_write_request(
|
||||
is_zstd: bool,
|
||||
body: Body,
|
||||
@@ -247,11 +255,18 @@ async fn decode_remote_write_request(
|
||||
.await
|
||||
.context(error::HyperSnafu)?;
|
||||
|
||||
let buf = Bytes::from(if is_zstd {
|
||||
zstd_decompress(&body[..])?
|
||||
// due to vmagent's limitation, there is a chance that vmagent is
|
||||
// sending content type wrong so we have to apply a fallback with decoding
|
||||
// the content in another method.
|
||||
//
|
||||
// see https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5301
|
||||
// see https://github.com/GreptimeTeam/greptimedb/issues/3929
|
||||
let buf = if let Ok(buf) = try_decompress(is_zstd, &body[..]) {
|
||||
buf
|
||||
} else {
|
||||
snappy_decompress(&body[..])?
|
||||
});
|
||||
// fallback to the other compression method
|
||||
try_decompress(!is_zstd, &body[..])?
|
||||
};
|
||||
|
||||
let mut request = PROM_WRITE_REQUEST_POOL.pull(PromWriteRequest::default);
|
||||
request
|
||||
|
||||
@@ -963,5 +963,30 @@ pub async fn test_vm_proto_remote_write(store_type: StorageType) {
|
||||
.await;
|
||||
assert_eq!(res.status(), StatusCode::NO_CONTENT);
|
||||
|
||||
// also test fallback logic, vmagent could sent data in wrong content-type
|
||||
// we encode it as zstd but send it as snappy
|
||||
let compressed_request =
|
||||
zstd::stream::encode_all(&serialized_request[..], 1).expect("Failed to encode zstd");
|
||||
|
||||
let res = client
|
||||
.post("/v1/prometheus/write")
|
||||
.header("Content-Encoding", "snappy")
|
||||
.body(compressed_request)
|
||||
.send()
|
||||
.await;
|
||||
assert_eq!(res.status(), StatusCode::NO_CONTENT);
|
||||
|
||||
// reversed
|
||||
let compressed_request =
|
||||
prom_store::snappy_compress(&serialized_request[..]).expect("Failed to encode snappy");
|
||||
|
||||
let res = client
|
||||
.post("/v1/prometheus/write")
|
||||
.header("Content-Encoding", "zstd")
|
||||
.body(compressed_request)
|
||||
.send()
|
||||
.await;
|
||||
assert_eq!(res.status(), StatusCode::NO_CONTENT);
|
||||
|
||||
guard.remove_all().await;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user