From 5dba373ede58224f296fb49d3c1c367e4d6721bf Mon Sep 17 00:00:00 2001 From: shuiyisong <113876041+shuiyisong@users.noreply.github.com> Date: Thu, 14 Dec 2023 18:01:12 +0800 Subject: [PATCH] chore: return json body under http status 401 (#2924) * chore: change auth_fn to function and return response with json body * chore: move unsupported to debug level * chore: add docs and tests * chore: rebase and update test --- src/common/error/src/status_code.rs | 2 +- src/servers/src/http.rs | 11 +- src/servers/src/http/authorize.rs | 179 ++++++++++++------------ src/servers/tests/http/authorize.rs | 59 +++++--- src/servers/tests/http/influxdb_test.rs | 8 ++ 5 files changed, 139 insertions(+), 120 deletions(-) diff --git a/src/common/error/src/status_code.rs b/src/common/error/src/status_code.rs index 228b278aa7..64b9a9e7fb 100644 --- a/src/common/error/src/status_code.rs +++ b/src/common/error/src/status_code.rs @@ -138,7 +138,6 @@ impl StatusCode { pub fn should_log_error(&self) -> bool { match self { StatusCode::Unknown - | StatusCode::Unsupported | StatusCode::Unexpected | StatusCode::Internal | StatusCode::Cancelled @@ -147,6 +146,7 @@ impl StatusCode { | StatusCode::StorageUnavailable | StatusCode::RuntimeResourcesExhausted => true, StatusCode::Success + | StatusCode::Unsupported | StatusCode::InvalidArguments | StatusCode::InvalidSyntax | StatusCode::TableAlreadyExists diff --git a/src/servers/src/http.rs b/src/servers/src/http.rs index 0bd6f90319..a390f406d7 100644 --- a/src/servers/src/http.rs +++ b/src/servers/src/http.rs @@ -36,7 +36,6 @@ use aide::axum::{routing as apirouting, ApiRouter, IntoApiResponse}; use aide::openapi::{Info, OpenApi, Server as OpenAPIServer}; use async_trait::async_trait; use auth::UserProviderRef; -use axum::body::BoxBody; use axum::error_handling::HandleErrorLayer; use axum::extract::{DefaultBodyLimit, MatchedPath}; use axum::http::Request; @@ -62,12 +61,11 @@ use tokio::sync::oneshot::{self, Sender}; use tokio::sync::Mutex; use tower::timeout::TimeoutLayer; use tower::ServiceBuilder; -use tower_http::auth::AsyncRequireAuthorizationLayer; use tower_http::trace::TraceLayer; +use self::authorize::AuthState; use crate::configurator::ConfiguratorRef; use crate::error::{AlreadyStartedSnafu, Error, Result, StartHttpSnafu, ToJsonSnafu}; -use crate::http::authorize::HttpAuth; use crate::http::influxdb::{influxdb_health, influxdb_ping, influxdb_write_v1, influxdb_write_v2}; use crate::http::influxdb_result_v1::InfluxdbV1Response; use crate::http::prometheus::{ @@ -721,9 +719,10 @@ impl HttpServer { .try_into() .unwrap_or_else(|_| DEFAULT_BODY_LIMIT.as_bytes() as usize), )) - // custom layer - .layer(AsyncRequireAuthorizationLayer::new( - HttpAuth::::new(self.user_provider.clone()), + // auth layer + .layer(middleware::from_fn_with_state( + AuthState::new(self.user_provider.clone()), + authorize::check_http_auth, )), ) // Handlers for debug, we don't expect a timeout. diff --git a/src/servers/src/http/authorize.rs b/src/servers/src/http/authorize.rs index 9225b6ec52..51040ba489 100644 --- a/src/servers/src/http/authorize.rs +++ b/src/servers/src/http/authorize.rs @@ -12,118 +12,123 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::marker::PhantomData; - use ::auth::UserProviderRef; +use axum::extract::State; use axum::http::{self, Request, StatusCode}; -use axum::response::Response; +use axum::middleware::Next; +use axum::response::{IntoResponse, Response}; +use axum::Json; use base64::prelude::BASE64_STANDARD; use base64::Engine; use common_catalog::consts::DEFAULT_SCHEMA_NAME; use common_catalog::parse_catalog_and_schema_from_db_string; use common_error::ext::ErrorExt; use common_telemetry::warn; -use futures::future::BoxFuture; use headers::Header; -use http_body::Body; use secrecy::SecretString; use session::context::QueryContext; use snafu::{ensure, OptionExt, ResultExt}; -use tower_http::auth::AsyncAuthorizeRequest; use super::header::GreptimeDbName; -use super::PUBLIC_APIS; +use super::{JsonResponse, ResponseFormat, PUBLIC_APIS}; use crate::error::{ self, InvalidAuthorizationHeaderSnafu, InvalidParameterSnafu, InvisibleASCIISnafu, NotFoundInfluxAuthSnafu, Result, UnsupportedAuthSchemeSnafu, UrlDecodeSnafu, }; use crate::http::HTTP_API_PREFIX; -pub struct HttpAuth { +/// AuthState is a holder state for [`UserProviderRef`] +/// during [`check_http_auth`] function in axum's middleware +#[derive(Clone)] +pub struct AuthState { user_provider: Option, - _ty: PhantomData, } -impl HttpAuth { +impl AuthState { pub fn new(user_provider: Option) -> Self { - Self { - user_provider, - _ty: PhantomData, + Self { user_provider } + } +} + +pub async fn inner_auth( + user_provider: Option, + mut req: Request, +) -> std::result::Result, Response> { + // 1. prepare + let (catalog, schema) = extract_catalog_and_schema(&req); + let query_ctx = QueryContext::with(catalog, schema); + let need_auth = need_auth(&req); + let is_influxdb = req.uri().path().contains("influxdb"); + + // 2. check if auth is needed + let user_provider = if let Some(user_provider) = user_provider.filter(|_| need_auth) { + user_provider + } else { + query_ctx.set_current_user(Some(auth::userinfo_by_name(None))); + let _ = req.extensions_mut().insert(query_ctx); + return Ok(req); + }; + + // 3. get username and pwd + let (username, password) = match extract_username_and_password(is_influxdb, &req) { + Ok((username, password)) => (username, password), + Err(e) => { + warn!("extract username and password failed: {}", e); + crate::metrics::METRIC_AUTH_FAILURE + .with_label_values(&[e.status_code().as_ref()]) + .inc(); + return Err(err_response(is_influxdb, e).into_response()); + } + }; + + // 4. auth + match user_provider + .auth( + auth::Identity::UserId(&username, None), + auth::Password::PlainText(password), + catalog, + schema, + ) + .await + { + Ok(userinfo) => { + query_ctx.set_current_user(Some(userinfo)); + let _ = req.extensions_mut().insert(query_ctx); + Ok(req) + } + Err(e) => { + warn!("authenticate failed: {}", e); + crate::metrics::METRIC_AUTH_FAILURE + .with_label_values(&[e.status_code().as_ref()]) + .inc(); + Err(err_response(is_influxdb, e).into_response()) } } } -impl Clone for HttpAuth { - fn clone(&self) -> Self { - Self { - user_provider: self.user_provider.clone(), - _ty: PhantomData, - } +pub async fn check_http_auth( + State(auth_state): State, + req: Request, + next: Next, +) -> Response { + match inner_auth(auth_state.user_provider, req).await { + Ok(req) => next.run(req).await, + Err(resp) => resp, } } -impl AsyncAuthorizeRequest for HttpAuth -where - B: Send + Sync + 'static, - RespBody: Body + Default, -{ - type RequestBody = B; - type ResponseBody = RespBody; - type Future = BoxFuture<'static, std::result::Result, Response>>; +fn err_response(is_influxdb: bool, err: impl ErrorExt) -> impl IntoResponse { + let format = if is_influxdb { + ResponseFormat::InfluxdbV1 + } else { + ResponseFormat::GreptimedbV1 + }; - fn authorize(&mut self, mut request: Request) -> Self::Future { - let user_provider = self.user_provider.clone(); - Box::pin(async move { - let (catalog, schema) = extract_catalog_and_schema(&request); - let query_ctx = QueryContext::with(catalog, schema); - let need_auth = need_auth(&request); - - let user_provider = if let Some(user_provider) = user_provider.filter(|_| need_auth) { - user_provider - } else { - query_ctx.set_current_user(Some(auth::userinfo_by_name(None))); - let _ = request.extensions_mut().insert(query_ctx); - return Ok(request); - }; - - let (username, password) = match extract_username_and_password(&request) { - Ok((username, password)) => (username, password), - Err(e) => { - warn!("extract username and password failed: {}", e); - crate::metrics::METRIC_AUTH_FAILURE - .with_label_values(&[e.status_code().as_ref()]) - .inc(); - return Err(unauthorized_resp()); - } - }; - - match user_provider - .auth( - ::auth::Identity::UserId(username.as_str(), None), - ::auth::Password::PlainText(password), - catalog, - schema, - ) - .await - { - Ok(userinfo) => { - query_ctx.set_current_user(Some(userinfo)); - let _ = request.extensions_mut().insert(query_ctx); - Ok(request) - } - Err(e) => { - warn!("authenticate failed: {}", e); - crate::metrics::METRIC_AUTH_FAILURE - .with_label_values(&[e.status_code().as_ref()]) - .inc(); - Err(unauthorized_resp()) - } - } - }) - } + let body = JsonResponse::with_error(err, format); + (StatusCode::UNAUTHORIZED, Json(body)) } -fn extract_catalog_and_schema(request: &Request) -> (&str, &str) { +fn extract_catalog_and_schema(request: &Request) -> (&str, &str) { // parse database from header let dbname = request .headers() @@ -139,9 +144,7 @@ fn extract_catalog_and_schema(request: &Request) -> parse_catalog_and_schema_from_db_string(dbname) } -fn get_influxdb_credentials( - request: &Request, -) -> Result> { +fn get_influxdb_credentials(request: &Request) -> Result> { // compat with influxdb v2 and v1 if let Some(header) = request.headers().get(http::header::AUTHORIZATION) { // try v2 first @@ -182,10 +185,11 @@ fn get_influxdb_credentials( } } -fn extract_username_and_password( +fn extract_username_and_password( + is_influxdb: bool, request: &Request, ) -> Result<(Username, Password)> { - Ok(if request.uri().path().contains("influxdb") { + Ok(if is_influxdb { // compatible with influxdb auth get_influxdb_credentials(request)?.context(NotFoundInfluxAuthSnafu)? } else { @@ -197,15 +201,6 @@ fn extract_username_and_password( }) } -fn unauthorized_resp() -> Response -where - RespBody: Body + Default, -{ - let mut res = Response::new(RespBody::default()); - *res.status_mut() = StatusCode::UNAUTHORIZED; - res -} - #[derive(Debug)] pub enum AuthScheme { Basic(Username, Password), diff --git a/src/servers/tests/http/authorize.rs b/src/servers/tests/http/authorize.rs index e41e0316f8..97f1c9e2e8 100644 --- a/src/servers/tests/http/authorize.rs +++ b/src/servers/tests/http/authorize.rs @@ -16,20 +16,17 @@ use std::sync::Arc; use auth::tests::MockUserProvider; use auth::UserProvider; -use axum::body::BoxBody; use axum::http; -use hyper::Request; -use servers::http::authorize::HttpAuth; +use http_body::Body; +use hyper::{Request, StatusCode}; +use servers::http::authorize::inner_auth; use session::context::QueryContextRef; -use tower_http::auth::AsyncAuthorizeRequest; #[tokio::test] async fn test_http_auth() { - let mut http_auth: HttpAuth = HttpAuth::new(None); - // base64encode("username:password") == "dXNlcm5hbWU6cGFzc3dvcmQ=" let req = mock_http_request(Some("Basic dXNlcm5hbWU6cGFzc3dvcmQ="), None).unwrap(); - let req = http_auth.authorize(req).await.unwrap(); + let req = inner_auth(None, req).await.unwrap(); let ctx: &QueryContextRef = req.extensions().get().unwrap(); let user_info = ctx.current_user().unwrap(); let default = auth::userinfo_by_name(None); @@ -37,32 +34,41 @@ async fn test_http_auth() { // In mock user provider, right username:password == "greptime:greptime" let mock_user_provider = Some(Arc::new(MockUserProvider::default()) as Arc); - let mut http_auth: HttpAuth = HttpAuth::new(mock_user_provider); // base64encode("greptime:greptime") == "Z3JlcHRpbWU6Z3JlcHRpbWU=" let req = mock_http_request(Some("Basic Z3JlcHRpbWU6Z3JlcHRpbWU="), None).unwrap(); - let req = http_auth.authorize(req).await.unwrap(); + let req = inner_auth(mock_user_provider.clone(), req).await.unwrap(); let ctx: &QueryContextRef = req.extensions().get().unwrap(); let user_info = ctx.current_user().unwrap(); let default = auth::userinfo_by_name(None); assert_eq!(default.username(), user_info.username()); let req = mock_http_request(None, None).unwrap(); - let auth_res = http_auth.authorize(req).await; + let auth_res = inner_auth(mock_user_provider.clone(), req).await; assert!(auth_res.is_err()); + let mut resp = auth_res.unwrap_err(); + assert_eq!(resp.status(), StatusCode::UNAUTHORIZED); + assert_eq!( + b"{\"type\":\"GreptimedbV1\",\"code\":7003,\"error\":\"Not found http or grpc authorization header\"}", + resp.data().await.unwrap().unwrap().as_ref() + ); // base64encode("username:password") == "dXNlcm5hbWU6cGFzc3dvcmQ=" let wrong_req = mock_http_request(Some("Basic dXNlcm5hbWU6cGFzc3dvcmQ="), None).unwrap(); - let auth_res = http_auth.authorize(wrong_req).await; + let auth_res = inner_auth(mock_user_provider, wrong_req).await; assert!(auth_res.is_err()); + let mut resp = auth_res.unwrap_err(); + assert_eq!(resp.status(), StatusCode::UNAUTHORIZED); + assert_eq!( + b"{\"type\":\"GreptimedbV1\",\"code\":7000,\"error\":\"User not found, username: username\"}", + resp.data().await.unwrap().unwrap().as_ref(), + ); } #[tokio::test] async fn test_schema_validating() { // In mock user provider, right username:password == "greptime:greptime" - let provider = MockUserProvider::default(); - let mock_user_provider = Some(Arc::new(provider) as Arc); - let mut http_auth: HttpAuth = HttpAuth::new(mock_user_provider); + let mock_user_provider = Some(Arc::new(MockUserProvider::default()) as Arc); // base64encode("greptime:greptime") == "Z3JlcHRpbWU6Z3JlcHRpbWU=" // http://localhost/{http_api_version}/sql?db=greptime @@ -72,7 +78,7 @@ async fn test_schema_validating() { Some(format!("http://localhost/{version}/sql?db=public").as_str()), ) .unwrap(); - let req = http_auth.authorize(req).await.unwrap(); + let req = inner_auth(mock_user_provider.clone(), req).await.unwrap(); let ctx: &QueryContextRef = req.extensions().get().unwrap(); let user_info = ctx.current_user().unwrap(); let default = auth::userinfo_by_name(None); @@ -84,26 +90,37 @@ async fn test_schema_validating() { Some(format!("http://localhost/{version}/sql?db=wrong").as_str()), ) .unwrap(); - let result = http_auth.authorize(req).await; + let result = inner_auth(mock_user_provider, req).await; assert!(result.is_err()); + let mut resp = result.unwrap_err(); + assert_eq!(resp.status(), StatusCode::UNAUTHORIZED); + assert_eq!( + b"{\"type\":\"GreptimedbV1\",\"code\":7005,\"error\":\"Access denied for user 'greptime' to database 'greptime-wrong'\"}", + resp.data().await.unwrap().unwrap().as_ref() + ); } #[tokio::test] async fn test_whitelist_no_auth() { // In mock user provider, right username:password == "greptime:greptime" let mock_user_provider = Some(Arc::new(MockUserProvider::default()) as Arc); - let mut http_auth: HttpAuth = HttpAuth::new(mock_user_provider); // base64encode("greptime:greptime") == "Z3JlcHRpbWU6Z3JlcHRpbWU=" // try auth path first let req = mock_http_request(None, None).unwrap(); - let req = http_auth.authorize(req).await; - assert!(req.is_err()); + let auth_res = inner_auth(mock_user_provider.clone(), req).await; + assert!(auth_res.is_err()); + let mut resp = auth_res.unwrap_err(); + assert_eq!(resp.status(), StatusCode::UNAUTHORIZED); + assert_eq!( + b"{\"type\":\"GreptimedbV1\",\"code\":7003,\"error\":\"Not found http or grpc authorization header\"}", + resp.data().await.unwrap().unwrap().as_ref() + ); // try whitelist path let req = mock_http_request(None, Some("http://localhost/health")).unwrap(); - let req = http_auth.authorize(req).await; - let _ = req.unwrap(); + let req = inner_auth(mock_user_provider, req).await; + assert!(req.is_ok()); } // copy from http::authorize diff --git a/src/servers/tests/http/influxdb_test.rs b/src/servers/tests/http/influxdb_test.rs index d74acca3fc..06a9193ce3 100644 --- a/src/servers/tests/http/influxdb_test.rs +++ b/src/servers/tests/http/influxdb_test.rs @@ -168,6 +168,10 @@ async fn test_influxdb_write() { .send() .await; assert_eq!(result.status(), 401); + assert_eq!( + "{\"type\":\"InfluxdbV1\",\"results\":[],\"error\":\"Username and password does not match, username: greptime\"}", + result.text().await + ); // no auth let result = client @@ -176,6 +180,10 @@ async fn test_influxdb_write() { .send() .await; assert_eq!(result.status(), 401); + assert_eq!( + "{\"type\":\"InfluxdbV1\",\"results\":[],\"error\":\"Not found influx http authorization info\"}", + result.text().await + ); // make new app for db=influxdb let app = make_test_app(tx, Some("influxdb"));