mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-03 20:02:54 +00:00
feat: support decode gzip if influxdb write specify it (#3494)
* feat: support dedoce gzip if influxdb write specify it Signed-off-by: tison <wander4096@gmail.com> * address comments Signed-off-by: tison <wander4096@gmail.com> * simplify with tower_http DecompressionLayer Signed-off-by: tison <wander4096@gmail.com> * tidy some code Signed-off-by: tison <wander4096@gmail.com> --------- Signed-off-by: tison <wander4096@gmail.com>
This commit is contained in:
19
Cargo.lock
generated
19
Cargo.lock
generated
@@ -570,7 +570,6 @@ version = "0.3.15"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "942c7cd7ae39e91bde4820d74132e9862e62c2f386c3aa90ccf55949f5bad63a"
|
||||
dependencies = [
|
||||
"brotli",
|
||||
"bzip2",
|
||||
"flate2",
|
||||
"futures-core",
|
||||
@@ -589,6 +588,7 @@ version = "0.4.5"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "bc2d0cfb2a7388d34f590e76686704c494ed7aaceed62ee1ba35cbf363abc2a5"
|
||||
dependencies = [
|
||||
"brotli",
|
||||
"bzip2",
|
||||
"flate2",
|
||||
"futures-core",
|
||||
@@ -4526,11 +4526,12 @@ checksum = "8f518f335dce6725a761382244631d86cf0ccb2863413590b31338feb467f9c3"
|
||||
|
||||
[[package]]
|
||||
name = "iri-string"
|
||||
version = "0.4.1"
|
||||
version = "0.7.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "8f0f7638c1e223529f1bfdc48c8b133b9e0b434094d1d28473161ee48b235f78"
|
||||
checksum = "21859b667d66a4c1dacd9df0863b3efb65785474255face87f5bca39dd8407c0"
|
||||
dependencies = [
|
||||
"nom",
|
||||
"memchr",
|
||||
"serde",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -10862,13 +10863,13 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "tower-http"
|
||||
version = "0.3.5"
|
||||
version = "0.4.4"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "f873044bf02dd1e8239e9c1293ea39dad76dc594ec16185d0a1bf31d8dc8d858"
|
||||
checksum = "61c5bb1d698276a2443e5ecfabc1008bf15a36c12e6a7176e7bf089ea9131140"
|
||||
dependencies = [
|
||||
"async-compression 0.3.15",
|
||||
"base64 0.13.1",
|
||||
"bitflags 1.3.2",
|
||||
"async-compression 0.4.5",
|
||||
"base64 0.21.5",
|
||||
"bitflags 2.4.1",
|
||||
"bytes",
|
||||
"futures-core",
|
||||
"futures-util",
|
||||
|
||||
@@ -73,7 +73,7 @@ tokio-stream = { workspace = true, features = ["net"] }
|
||||
toml.workspace = true
|
||||
tonic.workspace = true
|
||||
tower = { version = "0.4", features = ["full"] }
|
||||
tower-http = { version = "0.3", features = ["full"] }
|
||||
tower-http = { version = "0.4", features = ["full"] }
|
||||
url = "2.3.1"
|
||||
uuid.workspace = true
|
||||
|
||||
|
||||
@@ -103,7 +103,7 @@ tokio-stream = { workspace = true, features = ["net"] }
|
||||
tonic.workspace = true
|
||||
tonic-reflection = "0.10"
|
||||
tower = { version = "0.4", features = ["full"] }
|
||||
tower-http = { version = "0.3", features = ["full"] }
|
||||
tower-http = { version = "0.4", features = ["full"] }
|
||||
urlencoding = "2.1"
|
||||
|
||||
[target.'cfg(not(windows))'.dependencies]
|
||||
|
||||
@@ -271,18 +271,25 @@ pub enum Error {
|
||||
#[snafu(display("Not found influx http authorization info"))]
|
||||
NotFoundInfluxAuth {},
|
||||
|
||||
#[snafu(display("Unsupported http auth scheme, name: {}", name))]
|
||||
UnsupportedAuthScheme { name: String },
|
||||
|
||||
#[snafu(display("Invalid visibility ASCII chars"))]
|
||||
InvisibleASCII {
|
||||
InvalidAuthHeaderInvisibleASCII {
|
||||
#[snafu(source)]
|
||||
error: hyper::header::ToStrError,
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Unsupported http auth scheme, name: {}", name))]
|
||||
UnsupportedAuthScheme { name: String },
|
||||
#[snafu(display("Invalid utf-8 value"))]
|
||||
InvalidAuthHeaderInvalidUtf8Value {
|
||||
#[snafu(source)]
|
||||
error: FromUtf8Error,
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Invalid http authorization header"))]
|
||||
InvalidAuthorizationHeader { location: Location },
|
||||
InvalidAuthHeader { location: Location },
|
||||
|
||||
#[snafu(display("Invalid base64 value"))]
|
||||
InvalidBase64Value {
|
||||
@@ -520,16 +527,17 @@ impl ErrorExt for Error {
|
||||
DescribeStatement { source } => source.status_code(),
|
||||
|
||||
NotFoundAuthHeader { .. } | NotFoundInfluxAuth { .. } => StatusCode::AuthHeaderNotFound,
|
||||
InvisibleASCII { .. }
|
||||
InvalidAuthHeaderInvisibleASCII { .. }
|
||||
| UnsupportedAuthScheme { .. }
|
||||
| InvalidAuthorizationHeader { .. }
|
||||
| InvalidAuthHeader { .. }
|
||||
| InvalidBase64Value { .. }
|
||||
| InvalidUtf8Value { .. } => StatusCode::InvalidAuthHeader,
|
||||
| InvalidAuthHeaderInvalidUtf8Value { .. } => StatusCode::InvalidAuthHeader,
|
||||
|
||||
DatabaseNotFound { .. } => StatusCode::DatabaseNotFound,
|
||||
#[cfg(feature = "mem-prof")]
|
||||
DumpProfileData { source, .. } => source.status_code(),
|
||||
InvalidFlushArgument { .. } => StatusCode::InvalidArguments,
|
||||
|
||||
InvalidUtf8Value { .. } | InvalidFlushArgument { .. } => StatusCode::InvalidArguments,
|
||||
|
||||
ReplacePreparedStmtParams { source, .. }
|
||||
| GetPreparedStmtParams { source, .. }
|
||||
|
||||
@@ -45,6 +45,7 @@ use tokio::sync::oneshot::{self, Sender};
|
||||
use tokio::sync::Mutex;
|
||||
use tower::timeout::TimeoutLayer;
|
||||
use tower::ServiceBuilder;
|
||||
use tower_http::decompression::RequestDecompressionLayer;
|
||||
use tower_http::trace::TraceLayer;
|
||||
|
||||
use self::authorize::AuthState;
|
||||
@@ -698,6 +699,11 @@ impl HttpServer {
|
||||
Router::new()
|
||||
.route("/write", routing::post(influxdb_write_v1))
|
||||
.route("/api/v2/write", routing::post(influxdb_write_v2))
|
||||
.layer(
|
||||
ServiceBuilder::new()
|
||||
.layer(HandleErrorLayer::new(handle_error))
|
||||
.layer(RequestDecompressionLayer::new()),
|
||||
)
|
||||
.route("/ping", routing::get(influxdb_ping))
|
||||
.route("/health", routing::get(influxdb_health))
|
||||
.with_state(influxdb_handler)
|
||||
|
||||
@@ -35,7 +35,7 @@ use snafu::{ensure, OptionExt, ResultExt};
|
||||
use super::header::{GreptimeDbName, GREPTIME_TIMEZONE_HEADER_NAME};
|
||||
use super::{ResponseFormat, PUBLIC_APIS};
|
||||
use crate::error::{
|
||||
self, InvalidAuthorizationHeaderSnafu, InvalidParameterSnafu, InvisibleASCIISnafu,
|
||||
self, InvalidAuthHeaderInvisibleASCIISnafu, InvalidAuthHeaderSnafu, InvalidParameterSnafu,
|
||||
NotFoundInfluxAuthSnafu, Result, UnsupportedAuthSchemeSnafu, UrlDecodeSnafu,
|
||||
};
|
||||
use crate::http::error_result::ErrorResponse;
|
||||
@@ -174,15 +174,13 @@ fn get_influxdb_credentials<B>(request: &Request<B>) -> Result<Option<(Username,
|
||||
// try header
|
||||
let (auth_scheme, credential) = header
|
||||
.to_str()
|
||||
.context(InvisibleASCIISnafu)?
|
||||
.context(InvalidAuthHeaderInvisibleASCIISnafu)?
|
||||
.split_once(' ')
|
||||
.context(InvalidAuthorizationHeaderSnafu)?;
|
||||
.context(InvalidAuthHeaderSnafu)?;
|
||||
|
||||
let (username, password) = match auth_scheme.to_lowercase().as_str() {
|
||||
"token" => {
|
||||
let (u, p) = credential
|
||||
.split_once(':')
|
||||
.context(InvalidAuthorizationHeaderSnafu)?;
|
||||
let (u, p) = credential.split_once(':').context(InvalidAuthHeaderSnafu)?;
|
||||
(u.to_string(), p.to_string().into())
|
||||
}
|
||||
"basic" => decode_basic(credential)?,
|
||||
@@ -237,13 +235,10 @@ impl TryFrom<&str> for AuthScheme {
|
||||
type Error = error::Error;
|
||||
|
||||
fn try_from(value: &str) -> Result<Self> {
|
||||
let (scheme, encoded_credentials) = value
|
||||
.split_once(' ')
|
||||
.context(InvalidAuthorizationHeaderSnafu)?;
|
||||
ensure!(
|
||||
!encoded_credentials.contains(' '),
|
||||
InvalidAuthorizationHeaderSnafu
|
||||
);
|
||||
let (scheme, encoded_credentials) =
|
||||
value.split_once(' ').context(InvalidAuthHeaderSnafu)?;
|
||||
|
||||
ensure!(!encoded_credentials.contains(' '), InvalidAuthHeaderSnafu);
|
||||
|
||||
match scheme.to_lowercase().as_str() {
|
||||
"basic" => decode_basic(encoded_credentials)
|
||||
@@ -261,7 +256,7 @@ fn auth_header<B>(req: &Request<B>) -> Result<AuthScheme> {
|
||||
.get(http::header::AUTHORIZATION)
|
||||
.context(error::NotFoundAuthHeaderSnafu)?
|
||||
.to_str()
|
||||
.context(InvisibleASCIISnafu)?;
|
||||
.context(InvalidAuthHeaderInvisibleASCIISnafu)?;
|
||||
|
||||
auth_header.try_into()
|
||||
}
|
||||
@@ -270,13 +265,14 @@ fn decode_basic(credential: Credential) -> Result<(Username, Password)> {
|
||||
let decoded = BASE64_STANDARD
|
||||
.decode(credential)
|
||||
.context(error::InvalidBase64ValueSnafu)?;
|
||||
let as_utf8 = String::from_utf8(decoded).context(error::InvalidUtf8ValueSnafu)?;
|
||||
let as_utf8 =
|
||||
String::from_utf8(decoded).context(error::InvalidAuthHeaderInvalidUtf8ValueSnafu)?;
|
||||
|
||||
if let Some((user_id, password)) = as_utf8.split_once(':') {
|
||||
return Ok((user_id.to_string(), password.to_string().into()));
|
||||
}
|
||||
|
||||
InvalidAuthorizationHeaderSnafu {}.fail()
|
||||
InvalidAuthHeaderSnafu {}.fail()
|
||||
}
|
||||
|
||||
fn need_auth<B>(req: &Request<B>) -> bool {
|
||||
@@ -395,10 +391,7 @@ mod tests {
|
||||
|
||||
let wrong_req = mock_http_request(Some("Basic dXNlcm5hbWU6 cGFzc3dvcmQ="), None).unwrap();
|
||||
let res = auth_header(&wrong_req);
|
||||
assert_matches!(
|
||||
res.err(),
|
||||
Some(error::Error::InvalidAuthorizationHeader { .. })
|
||||
);
|
||||
assert_matches!(res.err(), Some(error::Error::InvalidAuthHeader { .. }));
|
||||
|
||||
let wrong_req = mock_http_request(Some("Digest dXNlcm5hbWU6cGFzc3dvcmQ="), None).unwrap();
|
||||
let res = auth_header(&wrong_req);
|
||||
|
||||
Reference in New Issue
Block a user