From 61f0703af8409425fae8d65f38d3dc40c716cfee Mon Sep 17 00:00:00 2001 From: tison Date: Thu, 14 Mar 2024 12:26:26 +0800 Subject: [PATCH] feat: support decode gzip if influxdb write specify it (#3494) * feat: support dedoce gzip if influxdb write specify it Signed-off-by: tison * address comments Signed-off-by: tison * simplify with tower_http DecompressionLayer Signed-off-by: tison * tidy some code Signed-off-by: tison --------- Signed-off-by: tison --- Cargo.lock | 19 +++++++++--------- src/datanode/Cargo.toml | 2 +- src/servers/Cargo.toml | 2 +- src/servers/src/error.rs | 24 ++++++++++++++-------- src/servers/src/http.rs | 6 ++++++ src/servers/src/http/authorize.rs | 33 ++++++++++++------------------- 6 files changed, 47 insertions(+), 39 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index fdf835ce31..ef6bc0d316 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -570,7 +570,6 @@ version = "0.3.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "942c7cd7ae39e91bde4820d74132e9862e62c2f386c3aa90ccf55949f5bad63a" dependencies = [ - "brotli", "bzip2", "flate2", "futures-core", @@ -589,6 +588,7 @@ version = "0.4.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bc2d0cfb2a7388d34f590e76686704c494ed7aaceed62ee1ba35cbf363abc2a5" dependencies = [ + "brotli", "bzip2", "flate2", "futures-core", @@ -4526,11 +4526,12 @@ checksum = "8f518f335dce6725a761382244631d86cf0ccb2863413590b31338feb467f9c3" [[package]] name = "iri-string" -version = "0.4.1" +version = "0.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8f0f7638c1e223529f1bfdc48c8b133b9e0b434094d1d28473161ee48b235f78" +checksum = "21859b667d66a4c1dacd9df0863b3efb65785474255face87f5bca39dd8407c0" dependencies = [ - "nom", + "memchr", + "serde", ] [[package]] @@ -10862,13 +10863,13 @@ dependencies = [ [[package]] name = "tower-http" -version = "0.3.5" +version = "0.4.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f873044bf02dd1e8239e9c1293ea39dad76dc594ec16185d0a1bf31d8dc8d858" +checksum = "61c5bb1d698276a2443e5ecfabc1008bf15a36c12e6a7176e7bf089ea9131140" dependencies = [ - "async-compression 0.3.15", - "base64 0.13.1", - "bitflags 1.3.2", + "async-compression 0.4.5", + "base64 0.21.5", + "bitflags 2.4.1", "bytes", "futures-core", "futures-util", diff --git a/src/datanode/Cargo.toml b/src/datanode/Cargo.toml index 38fe116131..f03625eb66 100644 --- a/src/datanode/Cargo.toml +++ b/src/datanode/Cargo.toml @@ -73,7 +73,7 @@ tokio-stream = { workspace = true, features = ["net"] } toml.workspace = true tonic.workspace = true tower = { version = "0.4", features = ["full"] } -tower-http = { version = "0.3", features = ["full"] } +tower-http = { version = "0.4", features = ["full"] } url = "2.3.1" uuid.workspace = true diff --git a/src/servers/Cargo.toml b/src/servers/Cargo.toml index d1eb805312..95cbea167f 100644 --- a/src/servers/Cargo.toml +++ b/src/servers/Cargo.toml @@ -103,7 +103,7 @@ tokio-stream = { workspace = true, features = ["net"] } tonic.workspace = true tonic-reflection = "0.10" tower = { version = "0.4", features = ["full"] } -tower-http = { version = "0.3", features = ["full"] } +tower-http = { version = "0.4", features = ["full"] } urlencoding = "2.1" [target.'cfg(not(windows))'.dependencies] diff --git a/src/servers/src/error.rs b/src/servers/src/error.rs index e86ee531f3..0546d2a262 100644 --- a/src/servers/src/error.rs +++ b/src/servers/src/error.rs @@ -271,18 +271,25 @@ pub enum Error { #[snafu(display("Not found influx http authorization info"))] NotFoundInfluxAuth {}, + #[snafu(display("Unsupported http auth scheme, name: {}", name))] + UnsupportedAuthScheme { name: String }, + #[snafu(display("Invalid visibility ASCII chars"))] - InvisibleASCII { + InvalidAuthHeaderInvisibleASCII { #[snafu(source)] error: hyper::header::ToStrError, location: Location, }, - #[snafu(display("Unsupported http auth scheme, name: {}", name))] - UnsupportedAuthScheme { name: String }, + #[snafu(display("Invalid utf-8 value"))] + InvalidAuthHeaderInvalidUtf8Value { + #[snafu(source)] + error: FromUtf8Error, + location: Location, + }, #[snafu(display("Invalid http authorization header"))] - InvalidAuthorizationHeader { location: Location }, + InvalidAuthHeader { location: Location }, #[snafu(display("Invalid base64 value"))] InvalidBase64Value { @@ -520,16 +527,17 @@ impl ErrorExt for Error { DescribeStatement { source } => source.status_code(), NotFoundAuthHeader { .. } | NotFoundInfluxAuth { .. } => StatusCode::AuthHeaderNotFound, - InvisibleASCII { .. } + InvalidAuthHeaderInvisibleASCII { .. } | UnsupportedAuthScheme { .. } - | InvalidAuthorizationHeader { .. } + | InvalidAuthHeader { .. } | InvalidBase64Value { .. } - | InvalidUtf8Value { .. } => StatusCode::InvalidAuthHeader, + | InvalidAuthHeaderInvalidUtf8Value { .. } => StatusCode::InvalidAuthHeader, DatabaseNotFound { .. } => StatusCode::DatabaseNotFound, #[cfg(feature = "mem-prof")] DumpProfileData { source, .. } => source.status_code(), - InvalidFlushArgument { .. } => StatusCode::InvalidArguments, + + InvalidUtf8Value { .. } | InvalidFlushArgument { .. } => StatusCode::InvalidArguments, ReplacePreparedStmtParams { source, .. } | GetPreparedStmtParams { source, .. } diff --git a/src/servers/src/http.rs b/src/servers/src/http.rs index 6a141bfa74..7a2abf33cf 100644 --- a/src/servers/src/http.rs +++ b/src/servers/src/http.rs @@ -45,6 +45,7 @@ use tokio::sync::oneshot::{self, Sender}; use tokio::sync::Mutex; use tower::timeout::TimeoutLayer; use tower::ServiceBuilder; +use tower_http::decompression::RequestDecompressionLayer; use tower_http::trace::TraceLayer; use self::authorize::AuthState; @@ -698,6 +699,11 @@ impl HttpServer { Router::new() .route("/write", routing::post(influxdb_write_v1)) .route("/api/v2/write", routing::post(influxdb_write_v2)) + .layer( + ServiceBuilder::new() + .layer(HandleErrorLayer::new(handle_error)) + .layer(RequestDecompressionLayer::new()), + ) .route("/ping", routing::get(influxdb_ping)) .route("/health", routing::get(influxdb_health)) .with_state(influxdb_handler) diff --git a/src/servers/src/http/authorize.rs b/src/servers/src/http/authorize.rs index 135d102730..2cc8901fc7 100644 --- a/src/servers/src/http/authorize.rs +++ b/src/servers/src/http/authorize.rs @@ -35,7 +35,7 @@ use snafu::{ensure, OptionExt, ResultExt}; use super::header::{GreptimeDbName, GREPTIME_TIMEZONE_HEADER_NAME}; use super::{ResponseFormat, PUBLIC_APIS}; use crate::error::{ - self, InvalidAuthorizationHeaderSnafu, InvalidParameterSnafu, InvisibleASCIISnafu, + self, InvalidAuthHeaderInvisibleASCIISnafu, InvalidAuthHeaderSnafu, InvalidParameterSnafu, NotFoundInfluxAuthSnafu, Result, UnsupportedAuthSchemeSnafu, UrlDecodeSnafu, }; use crate::http::error_result::ErrorResponse; @@ -174,15 +174,13 @@ fn get_influxdb_credentials(request: &Request) -> Result { - let (u, p) = credential - .split_once(':') - .context(InvalidAuthorizationHeaderSnafu)?; + let (u, p) = credential.split_once(':').context(InvalidAuthHeaderSnafu)?; (u.to_string(), p.to_string().into()) } "basic" => decode_basic(credential)?, @@ -237,13 +235,10 @@ impl TryFrom<&str> for AuthScheme { type Error = error::Error; fn try_from(value: &str) -> Result { - let (scheme, encoded_credentials) = value - .split_once(' ') - .context(InvalidAuthorizationHeaderSnafu)?; - ensure!( - !encoded_credentials.contains(' '), - InvalidAuthorizationHeaderSnafu - ); + let (scheme, encoded_credentials) = + value.split_once(' ').context(InvalidAuthHeaderSnafu)?; + + ensure!(!encoded_credentials.contains(' '), InvalidAuthHeaderSnafu); match scheme.to_lowercase().as_str() { "basic" => decode_basic(encoded_credentials) @@ -261,7 +256,7 @@ fn auth_header(req: &Request) -> Result { .get(http::header::AUTHORIZATION) .context(error::NotFoundAuthHeaderSnafu)? .to_str() - .context(InvisibleASCIISnafu)?; + .context(InvalidAuthHeaderInvisibleASCIISnafu)?; auth_header.try_into() } @@ -270,13 +265,14 @@ fn decode_basic(credential: Credential) -> Result<(Username, Password)> { let decoded = BASE64_STANDARD .decode(credential) .context(error::InvalidBase64ValueSnafu)?; - let as_utf8 = String::from_utf8(decoded).context(error::InvalidUtf8ValueSnafu)?; + let as_utf8 = + String::from_utf8(decoded).context(error::InvalidAuthHeaderInvalidUtf8ValueSnafu)?; if let Some((user_id, password)) = as_utf8.split_once(':') { return Ok((user_id.to_string(), password.to_string().into())); } - InvalidAuthorizationHeaderSnafu {}.fail() + InvalidAuthHeaderSnafu {}.fail() } fn need_auth(req: &Request) -> bool { @@ -395,10 +391,7 @@ mod tests { let wrong_req = mock_http_request(Some("Basic dXNlcm5hbWU6 cGFzc3dvcmQ="), None).unwrap(); let res = auth_header(&wrong_req); - assert_matches!( - res.err(), - Some(error::Error::InvalidAuthorizationHeader { .. }) - ); + assert_matches!(res.err(), Some(error::Error::InvalidAuthHeader { .. })); let wrong_req = mock_http_request(Some("Digest dXNlcm5hbWU6cGFzc3dvcmQ="), None).unwrap(); let res = auth_header(&wrong_req);