mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-05-20 23:10:37 +00:00
chore: return json body under http status 401 (#2924)
* chore: change auth_fn to function and return response with json body * chore: move unsupported to debug level * chore: add docs and tests * chore: rebase and update test
This commit is contained in:
@@ -138,7 +138,6 @@ impl StatusCode {
|
||||
pub fn should_log_error(&self) -> bool {
|
||||
match self {
|
||||
StatusCode::Unknown
|
||||
| StatusCode::Unsupported
|
||||
| StatusCode::Unexpected
|
||||
| StatusCode::Internal
|
||||
| StatusCode::Cancelled
|
||||
@@ -147,6 +146,7 @@ impl StatusCode {
|
||||
| StatusCode::StorageUnavailable
|
||||
| StatusCode::RuntimeResourcesExhausted => true,
|
||||
StatusCode::Success
|
||||
| StatusCode::Unsupported
|
||||
| StatusCode::InvalidArguments
|
||||
| StatusCode::InvalidSyntax
|
||||
| StatusCode::TableAlreadyExists
|
||||
|
||||
@@ -36,7 +36,6 @@ use aide::axum::{routing as apirouting, ApiRouter, IntoApiResponse};
|
||||
use aide::openapi::{Info, OpenApi, Server as OpenAPIServer};
|
||||
use async_trait::async_trait;
|
||||
use auth::UserProviderRef;
|
||||
use axum::body::BoxBody;
|
||||
use axum::error_handling::HandleErrorLayer;
|
||||
use axum::extract::{DefaultBodyLimit, MatchedPath};
|
||||
use axum::http::Request;
|
||||
@@ -62,12 +61,11 @@ use tokio::sync::oneshot::{self, Sender};
|
||||
use tokio::sync::Mutex;
|
||||
use tower::timeout::TimeoutLayer;
|
||||
use tower::ServiceBuilder;
|
||||
use tower_http::auth::AsyncRequireAuthorizationLayer;
|
||||
use tower_http::trace::TraceLayer;
|
||||
|
||||
use self::authorize::AuthState;
|
||||
use crate::configurator::ConfiguratorRef;
|
||||
use crate::error::{AlreadyStartedSnafu, Error, Result, StartHttpSnafu, ToJsonSnafu};
|
||||
use crate::http::authorize::HttpAuth;
|
||||
use crate::http::influxdb::{influxdb_health, influxdb_ping, influxdb_write_v1, influxdb_write_v2};
|
||||
use crate::http::influxdb_result_v1::InfluxdbV1Response;
|
||||
use crate::http::prometheus::{
|
||||
@@ -721,9 +719,10 @@ impl HttpServer {
|
||||
.try_into()
|
||||
.unwrap_or_else(|_| DEFAULT_BODY_LIMIT.as_bytes() as usize),
|
||||
))
|
||||
// custom layer
|
||||
.layer(AsyncRequireAuthorizationLayer::new(
|
||||
HttpAuth::<BoxBody>::new(self.user_provider.clone()),
|
||||
// auth layer
|
||||
.layer(middleware::from_fn_with_state(
|
||||
AuthState::new(self.user_provider.clone()),
|
||||
authorize::check_http_auth,
|
||||
)),
|
||||
)
|
||||
// Handlers for debug, we don't expect a timeout.
|
||||
|
||||
@@ -12,118 +12,123 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::marker::PhantomData;
|
||||
|
||||
use ::auth::UserProviderRef;
|
||||
use axum::extract::State;
|
||||
use axum::http::{self, Request, StatusCode};
|
||||
use axum::response::Response;
|
||||
use axum::middleware::Next;
|
||||
use axum::response::{IntoResponse, Response};
|
||||
use axum::Json;
|
||||
use base64::prelude::BASE64_STANDARD;
|
||||
use base64::Engine;
|
||||
use common_catalog::consts::DEFAULT_SCHEMA_NAME;
|
||||
use common_catalog::parse_catalog_and_schema_from_db_string;
|
||||
use common_error::ext::ErrorExt;
|
||||
use common_telemetry::warn;
|
||||
use futures::future::BoxFuture;
|
||||
use headers::Header;
|
||||
use http_body::Body;
|
||||
use secrecy::SecretString;
|
||||
use session::context::QueryContext;
|
||||
use snafu::{ensure, OptionExt, ResultExt};
|
||||
use tower_http::auth::AsyncAuthorizeRequest;
|
||||
|
||||
use super::header::GreptimeDbName;
|
||||
use super::PUBLIC_APIS;
|
||||
use super::{JsonResponse, ResponseFormat, PUBLIC_APIS};
|
||||
use crate::error::{
|
||||
self, InvalidAuthorizationHeaderSnafu, InvalidParameterSnafu, InvisibleASCIISnafu,
|
||||
NotFoundInfluxAuthSnafu, Result, UnsupportedAuthSchemeSnafu, UrlDecodeSnafu,
|
||||
};
|
||||
use crate::http::HTTP_API_PREFIX;
|
||||
|
||||
pub struct HttpAuth<RespBody> {
|
||||
/// AuthState is a holder state for [`UserProviderRef`]
|
||||
/// during [`check_http_auth`] function in axum's middleware
|
||||
#[derive(Clone)]
|
||||
pub struct AuthState {
|
||||
user_provider: Option<UserProviderRef>,
|
||||
_ty: PhantomData<RespBody>,
|
||||
}
|
||||
|
||||
impl<RespBody> HttpAuth<RespBody> {
|
||||
impl AuthState {
|
||||
pub fn new(user_provider: Option<UserProviderRef>) -> Self {
|
||||
Self {
|
||||
user_provider,
|
||||
_ty: PhantomData,
|
||||
Self { user_provider }
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn inner_auth<B>(
|
||||
user_provider: Option<UserProviderRef>,
|
||||
mut req: Request<B>,
|
||||
) -> std::result::Result<Request<B>, Response> {
|
||||
// 1. prepare
|
||||
let (catalog, schema) = extract_catalog_and_schema(&req);
|
||||
let query_ctx = QueryContext::with(catalog, schema);
|
||||
let need_auth = need_auth(&req);
|
||||
let is_influxdb = req.uri().path().contains("influxdb");
|
||||
|
||||
// 2. check if auth is needed
|
||||
let user_provider = if let Some(user_provider) = user_provider.filter(|_| need_auth) {
|
||||
user_provider
|
||||
} else {
|
||||
query_ctx.set_current_user(Some(auth::userinfo_by_name(None)));
|
||||
let _ = req.extensions_mut().insert(query_ctx);
|
||||
return Ok(req);
|
||||
};
|
||||
|
||||
// 3. get username and pwd
|
||||
let (username, password) = match extract_username_and_password(is_influxdb, &req) {
|
||||
Ok((username, password)) => (username, password),
|
||||
Err(e) => {
|
||||
warn!("extract username and password failed: {}", e);
|
||||
crate::metrics::METRIC_AUTH_FAILURE
|
||||
.with_label_values(&[e.status_code().as_ref()])
|
||||
.inc();
|
||||
return Err(err_response(is_influxdb, e).into_response());
|
||||
}
|
||||
};
|
||||
|
||||
// 4. auth
|
||||
match user_provider
|
||||
.auth(
|
||||
auth::Identity::UserId(&username, None),
|
||||
auth::Password::PlainText(password),
|
||||
catalog,
|
||||
schema,
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(userinfo) => {
|
||||
query_ctx.set_current_user(Some(userinfo));
|
||||
let _ = req.extensions_mut().insert(query_ctx);
|
||||
Ok(req)
|
||||
}
|
||||
Err(e) => {
|
||||
warn!("authenticate failed: {}", e);
|
||||
crate::metrics::METRIC_AUTH_FAILURE
|
||||
.with_label_values(&[e.status_code().as_ref()])
|
||||
.inc();
|
||||
Err(err_response(is_influxdb, e).into_response())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<RespBody> Clone for HttpAuth<RespBody> {
|
||||
fn clone(&self) -> Self {
|
||||
Self {
|
||||
user_provider: self.user_provider.clone(),
|
||||
_ty: PhantomData,
|
||||
}
|
||||
pub async fn check_http_auth<B>(
|
||||
State(auth_state): State<AuthState>,
|
||||
req: Request<B>,
|
||||
next: Next<B>,
|
||||
) -> Response {
|
||||
match inner_auth(auth_state.user_provider, req).await {
|
||||
Ok(req) => next.run(req).await,
|
||||
Err(resp) => resp,
|
||||
}
|
||||
}
|
||||
|
||||
impl<B, RespBody> AsyncAuthorizeRequest<B> for HttpAuth<RespBody>
|
||||
where
|
||||
B: Send + Sync + 'static,
|
||||
RespBody: Body + Default,
|
||||
{
|
||||
type RequestBody = B;
|
||||
type ResponseBody = RespBody;
|
||||
type Future = BoxFuture<'static, std::result::Result<Request<B>, Response<Self::ResponseBody>>>;
|
||||
fn err_response(is_influxdb: bool, err: impl ErrorExt) -> impl IntoResponse {
|
||||
let format = if is_influxdb {
|
||||
ResponseFormat::InfluxdbV1
|
||||
} else {
|
||||
ResponseFormat::GreptimedbV1
|
||||
};
|
||||
|
||||
fn authorize(&mut self, mut request: Request<B>) -> Self::Future {
|
||||
let user_provider = self.user_provider.clone();
|
||||
Box::pin(async move {
|
||||
let (catalog, schema) = extract_catalog_and_schema(&request);
|
||||
let query_ctx = QueryContext::with(catalog, schema);
|
||||
let need_auth = need_auth(&request);
|
||||
|
||||
let user_provider = if let Some(user_provider) = user_provider.filter(|_| need_auth) {
|
||||
user_provider
|
||||
} else {
|
||||
query_ctx.set_current_user(Some(auth::userinfo_by_name(None)));
|
||||
let _ = request.extensions_mut().insert(query_ctx);
|
||||
return Ok(request);
|
||||
};
|
||||
|
||||
let (username, password) = match extract_username_and_password(&request) {
|
||||
Ok((username, password)) => (username, password),
|
||||
Err(e) => {
|
||||
warn!("extract username and password failed: {}", e);
|
||||
crate::metrics::METRIC_AUTH_FAILURE
|
||||
.with_label_values(&[e.status_code().as_ref()])
|
||||
.inc();
|
||||
return Err(unauthorized_resp());
|
||||
}
|
||||
};
|
||||
|
||||
match user_provider
|
||||
.auth(
|
||||
::auth::Identity::UserId(username.as_str(), None),
|
||||
::auth::Password::PlainText(password),
|
||||
catalog,
|
||||
schema,
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(userinfo) => {
|
||||
query_ctx.set_current_user(Some(userinfo));
|
||||
let _ = request.extensions_mut().insert(query_ctx);
|
||||
Ok(request)
|
||||
}
|
||||
Err(e) => {
|
||||
warn!("authenticate failed: {}", e);
|
||||
crate::metrics::METRIC_AUTH_FAILURE
|
||||
.with_label_values(&[e.status_code().as_ref()])
|
||||
.inc();
|
||||
Err(unauthorized_resp())
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
let body = JsonResponse::with_error(err, format);
|
||||
(StatusCode::UNAUTHORIZED, Json(body))
|
||||
}
|
||||
|
||||
fn extract_catalog_and_schema<B: Send + Sync + 'static>(request: &Request<B>) -> (&str, &str) {
|
||||
fn extract_catalog_and_schema<B>(request: &Request<B>) -> (&str, &str) {
|
||||
// parse database from header
|
||||
let dbname = request
|
||||
.headers()
|
||||
@@ -139,9 +144,7 @@ fn extract_catalog_and_schema<B: Send + Sync + 'static>(request: &Request<B>) ->
|
||||
parse_catalog_and_schema_from_db_string(dbname)
|
||||
}
|
||||
|
||||
fn get_influxdb_credentials<B: Send + Sync + 'static>(
|
||||
request: &Request<B>,
|
||||
) -> Result<Option<(Username, Password)>> {
|
||||
fn get_influxdb_credentials<B>(request: &Request<B>) -> Result<Option<(Username, Password)>> {
|
||||
// compat with influxdb v2 and v1
|
||||
if let Some(header) = request.headers().get(http::header::AUTHORIZATION) {
|
||||
// try v2 first
|
||||
@@ -182,10 +185,11 @@ fn get_influxdb_credentials<B: Send + Sync + 'static>(
|
||||
}
|
||||
}
|
||||
|
||||
fn extract_username_and_password<B: Send + Sync + 'static>(
|
||||
fn extract_username_and_password<B>(
|
||||
is_influxdb: bool,
|
||||
request: &Request<B>,
|
||||
) -> Result<(Username, Password)> {
|
||||
Ok(if request.uri().path().contains("influxdb") {
|
||||
Ok(if is_influxdb {
|
||||
// compatible with influxdb auth
|
||||
get_influxdb_credentials(request)?.context(NotFoundInfluxAuthSnafu)?
|
||||
} else {
|
||||
@@ -197,15 +201,6 @@ fn extract_username_and_password<B: Send + Sync + 'static>(
|
||||
})
|
||||
}
|
||||
|
||||
fn unauthorized_resp<RespBody>() -> Response<RespBody>
|
||||
where
|
||||
RespBody: Body + Default,
|
||||
{
|
||||
let mut res = Response::new(RespBody::default());
|
||||
*res.status_mut() = StatusCode::UNAUTHORIZED;
|
||||
res
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum AuthScheme {
|
||||
Basic(Username, Password),
|
||||
|
||||
@@ -16,20 +16,17 @@ use std::sync::Arc;
|
||||
|
||||
use auth::tests::MockUserProvider;
|
||||
use auth::UserProvider;
|
||||
use axum::body::BoxBody;
|
||||
use axum::http;
|
||||
use hyper::Request;
|
||||
use servers::http::authorize::HttpAuth;
|
||||
use http_body::Body;
|
||||
use hyper::{Request, StatusCode};
|
||||
use servers::http::authorize::inner_auth;
|
||||
use session::context::QueryContextRef;
|
||||
use tower_http::auth::AsyncAuthorizeRequest;
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_http_auth() {
|
||||
let mut http_auth: HttpAuth<BoxBody> = HttpAuth::new(None);
|
||||
|
||||
// base64encode("username:password") == "dXNlcm5hbWU6cGFzc3dvcmQ="
|
||||
let req = mock_http_request(Some("Basic dXNlcm5hbWU6cGFzc3dvcmQ="), None).unwrap();
|
||||
let req = http_auth.authorize(req).await.unwrap();
|
||||
let req = inner_auth(None, req).await.unwrap();
|
||||
let ctx: &QueryContextRef = req.extensions().get().unwrap();
|
||||
let user_info = ctx.current_user().unwrap();
|
||||
let default = auth::userinfo_by_name(None);
|
||||
@@ -37,32 +34,41 @@ async fn test_http_auth() {
|
||||
|
||||
// In mock user provider, right username:password == "greptime:greptime"
|
||||
let mock_user_provider = Some(Arc::new(MockUserProvider::default()) as Arc<dyn UserProvider>);
|
||||
let mut http_auth: HttpAuth<BoxBody> = HttpAuth::new(mock_user_provider);
|
||||
|
||||
// base64encode("greptime:greptime") == "Z3JlcHRpbWU6Z3JlcHRpbWU="
|
||||
let req = mock_http_request(Some("Basic Z3JlcHRpbWU6Z3JlcHRpbWU="), None).unwrap();
|
||||
let req = http_auth.authorize(req).await.unwrap();
|
||||
let req = inner_auth(mock_user_provider.clone(), req).await.unwrap();
|
||||
let ctx: &QueryContextRef = req.extensions().get().unwrap();
|
||||
let user_info = ctx.current_user().unwrap();
|
||||
let default = auth::userinfo_by_name(None);
|
||||
assert_eq!(default.username(), user_info.username());
|
||||
|
||||
let req = mock_http_request(None, None).unwrap();
|
||||
let auth_res = http_auth.authorize(req).await;
|
||||
let auth_res = inner_auth(mock_user_provider.clone(), req).await;
|
||||
assert!(auth_res.is_err());
|
||||
let mut resp = auth_res.unwrap_err();
|
||||
assert_eq!(resp.status(), StatusCode::UNAUTHORIZED);
|
||||
assert_eq!(
|
||||
b"{\"type\":\"GreptimedbV1\",\"code\":7003,\"error\":\"Not found http or grpc authorization header\"}",
|
||||
resp.data().await.unwrap().unwrap().as_ref()
|
||||
);
|
||||
|
||||
// base64encode("username:password") == "dXNlcm5hbWU6cGFzc3dvcmQ="
|
||||
let wrong_req = mock_http_request(Some("Basic dXNlcm5hbWU6cGFzc3dvcmQ="), None).unwrap();
|
||||
let auth_res = http_auth.authorize(wrong_req).await;
|
||||
let auth_res = inner_auth(mock_user_provider, wrong_req).await;
|
||||
assert!(auth_res.is_err());
|
||||
let mut resp = auth_res.unwrap_err();
|
||||
assert_eq!(resp.status(), StatusCode::UNAUTHORIZED);
|
||||
assert_eq!(
|
||||
b"{\"type\":\"GreptimedbV1\",\"code\":7000,\"error\":\"User not found, username: username\"}",
|
||||
resp.data().await.unwrap().unwrap().as_ref(),
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_schema_validating() {
|
||||
// In mock user provider, right username:password == "greptime:greptime"
|
||||
let provider = MockUserProvider::default();
|
||||
let mock_user_provider = Some(Arc::new(provider) as Arc<dyn UserProvider>);
|
||||
let mut http_auth: HttpAuth<BoxBody> = HttpAuth::new(mock_user_provider);
|
||||
let mock_user_provider = Some(Arc::new(MockUserProvider::default()) as Arc<dyn UserProvider>);
|
||||
|
||||
// base64encode("greptime:greptime") == "Z3JlcHRpbWU6Z3JlcHRpbWU="
|
||||
// http://localhost/{http_api_version}/sql?db=greptime
|
||||
@@ -72,7 +78,7 @@ async fn test_schema_validating() {
|
||||
Some(format!("http://localhost/{version}/sql?db=public").as_str()),
|
||||
)
|
||||
.unwrap();
|
||||
let req = http_auth.authorize(req).await.unwrap();
|
||||
let req = inner_auth(mock_user_provider.clone(), req).await.unwrap();
|
||||
let ctx: &QueryContextRef = req.extensions().get().unwrap();
|
||||
let user_info = ctx.current_user().unwrap();
|
||||
let default = auth::userinfo_by_name(None);
|
||||
@@ -84,26 +90,37 @@ async fn test_schema_validating() {
|
||||
Some(format!("http://localhost/{version}/sql?db=wrong").as_str()),
|
||||
)
|
||||
.unwrap();
|
||||
let result = http_auth.authorize(req).await;
|
||||
let result = inner_auth(mock_user_provider, req).await;
|
||||
assert!(result.is_err());
|
||||
let mut resp = result.unwrap_err();
|
||||
assert_eq!(resp.status(), StatusCode::UNAUTHORIZED);
|
||||
assert_eq!(
|
||||
b"{\"type\":\"GreptimedbV1\",\"code\":7005,\"error\":\"Access denied for user 'greptime' to database 'greptime-wrong'\"}",
|
||||
resp.data().await.unwrap().unwrap().as_ref()
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_whitelist_no_auth() {
|
||||
// In mock user provider, right username:password == "greptime:greptime"
|
||||
let mock_user_provider = Some(Arc::new(MockUserProvider::default()) as Arc<dyn UserProvider>);
|
||||
let mut http_auth: HttpAuth<BoxBody> = HttpAuth::new(mock_user_provider);
|
||||
|
||||
// base64encode("greptime:greptime") == "Z3JlcHRpbWU6Z3JlcHRpbWU="
|
||||
// try auth path first
|
||||
let req = mock_http_request(None, None).unwrap();
|
||||
let req = http_auth.authorize(req).await;
|
||||
assert!(req.is_err());
|
||||
let auth_res = inner_auth(mock_user_provider.clone(), req).await;
|
||||
assert!(auth_res.is_err());
|
||||
let mut resp = auth_res.unwrap_err();
|
||||
assert_eq!(resp.status(), StatusCode::UNAUTHORIZED);
|
||||
assert_eq!(
|
||||
b"{\"type\":\"GreptimedbV1\",\"code\":7003,\"error\":\"Not found http or grpc authorization header\"}",
|
||||
resp.data().await.unwrap().unwrap().as_ref()
|
||||
);
|
||||
|
||||
// try whitelist path
|
||||
let req = mock_http_request(None, Some("http://localhost/health")).unwrap();
|
||||
let req = http_auth.authorize(req).await;
|
||||
let _ = req.unwrap();
|
||||
let req = inner_auth(mock_user_provider, req).await;
|
||||
assert!(req.is_ok());
|
||||
}
|
||||
|
||||
// copy from http::authorize
|
||||
|
||||
@@ -168,6 +168,10 @@ async fn test_influxdb_write() {
|
||||
.send()
|
||||
.await;
|
||||
assert_eq!(result.status(), 401);
|
||||
assert_eq!(
|
||||
"{\"type\":\"InfluxdbV1\",\"results\":[],\"error\":\"Username and password does not match, username: greptime\"}",
|
||||
result.text().await
|
||||
);
|
||||
|
||||
// no auth
|
||||
let result = client
|
||||
@@ -176,6 +180,10 @@ async fn test_influxdb_write() {
|
||||
.send()
|
||||
.await;
|
||||
assert_eq!(result.status(), 401);
|
||||
assert_eq!(
|
||||
"{\"type\":\"InfluxdbV1\",\"results\":[],\"error\":\"Not found influx http authorization info\"}",
|
||||
result.text().await
|
||||
);
|
||||
|
||||
// make new app for db=influxdb
|
||||
let app = make_test_app(tx, Some("influxdb"));
|
||||
|
||||
Reference in New Issue
Block a user