fix: auth in grpc (#2056)

* fix: auth in grpc

* fix: change to return err

* fix: add grpc test

* fix: add http test

* fix: add mysql and pg test
This commit is contained in:
shuiyisong
2023-08-01 23:18:31 +08:00
committed by GitHub
parent 7d76131469
commit 5eb2c609a3
5 changed files with 286 additions and 15 deletions

View File

@@ -70,7 +70,10 @@ impl GreptimeRequestHandler {
let header = request.header.as_ref();
let query_ctx = create_query_context(header);
let _ = self.auth(header, &query_ctx).await?;
if let Err(e) = self.auth(header, &query_ctx).await? {
return Ok(Err(e));
}
let handler = self.handler.clone();
let request_type = request_type(&query);
let db = query_ctx.get_db_string();

View File

@@ -48,6 +48,7 @@ use object_store::services::{Azblob, Gcs, Oss, S3};
use object_store::test_util::TempFolder;
use object_store::ObjectStore;
use secrecy::ExposeSecret;
use servers::auth::UserProviderRef;
use servers::grpc::GrpcServer;
use servers::http::{HttpOptions, HttpServerBuilder};
use servers::metrics_handler::MetricsHandler;
@@ -404,6 +405,14 @@ pub async fn setup_test_http_app(store_type: StorageType, name: &str) -> (Router
pub async fn setup_test_http_app_with_frontend(
store_type: StorageType,
name: &str,
) -> (Router, TestGuard) {
setup_test_http_app_with_frontend_and_user_provider(store_type, name, None).await
}
pub async fn setup_test_http_app_with_frontend_and_user_provider(
store_type: StorageType,
name: &str,
user_provider: Option<UserProviderRef>,
) -> (Router, TestGuard) {
let (opts, guard) = create_tmp_dir_and_datanode_opts(store_type, name);
let (instance, heartbeat) = Instance::with_mock_meta_client(&opts).await.unwrap();
@@ -429,12 +438,20 @@ pub async fn setup_test_http_app_with_frontend(
};
let frontend_ref = Arc::new(frontend);
let http_server = HttpServerBuilder::new(http_opts)
let mut http_server = HttpServerBuilder::new(http_opts);
http_server
.with_sql_handler(ServerSqlQueryHandlerAdaptor::arc(frontend_ref.clone()))
.with_grpc_handler(ServerGrpcQueryHandlerAdaptor::arc(frontend_ref.clone()))
.with_script_handler(frontend_ref)
.with_greptime_config_options(opts.to_toml_string())
.build();
.with_greptime_config_options(opts.to_toml_string());
if let Some(user_provider) = user_provider {
http_server.with_user_provider(user_provider);
}
let http_server = http_server.build();
let app = http_server.build(http_server.make_app());
(app, guard)
}
@@ -532,6 +549,14 @@ pub async fn setup_test_prom_app_with_frontend(
pub async fn setup_grpc_server(
store_type: StorageType,
name: &str,
) -> (String, TestGuard, Arc<GrpcServer>) {
setup_grpc_server_with_user_provider(store_type, name, None).await
}
pub async fn setup_grpc_server_with_user_provider(
store_type: StorageType,
name: &str,
user_provider: Option<UserProviderRef>,
) -> (String, TestGuard, Arc<GrpcServer>) {
common_telemetry::init_default_ut_logging();
@@ -557,7 +582,7 @@ pub async fn setup_grpc_server(
let fe_grpc_server = Arc::new(GrpcServer::new(
ServerGrpcQueryHandlerAdaptor::arc(fe_instance_ref.clone()),
Some(fe_instance_ref.clone()),
None,
user_provider,
runtime,
));
@@ -587,6 +612,14 @@ pub async fn check_output_stream(output: Output, expected: &str) {
pub async fn setup_mysql_server(
store_type: StorageType,
name: &str,
) -> (String, TestGuard, Arc<Box<dyn Server>>) {
setup_mysql_server_with_user_provider(store_type, name, None).await
}
pub async fn setup_mysql_server_with_user_provider(
store_type: StorageType,
name: &str,
user_provider: Option<UserProviderRef>,
) -> (String, TestGuard, Arc<Box<dyn Server>>) {
common_telemetry::init_default_ut_logging();
@@ -619,7 +652,7 @@ pub async fn setup_mysql_server(
runtime,
Arc::new(MysqlSpawnRef::new(
ServerSqlQueryHandlerAdaptor::arc(fe_instance_ref),
None,
user_provider,
)),
Arc::new(MysqlSpawnConfig::new(
false,
@@ -643,6 +676,14 @@ pub async fn setup_mysql_server(
pub async fn setup_pg_server(
store_type: StorageType,
name: &str,
) -> (String, TestGuard, Arc<Box<dyn Server>>) {
setup_pg_server_with_user_provider(store_type, name, None).await
}
pub async fn setup_pg_server_with_user_provider(
store_type: StorageType,
name: &str,
user_provider: Option<UserProviderRef>,
) -> (String, TestGuard, Arc<Box<dyn Server>>) {
common_telemetry::init_default_ut_logging();
@@ -675,7 +716,7 @@ pub async fn setup_pg_server(
ServerSqlQueryHandlerAdaptor::arc(fe_instance_ref),
opts.tls.clone(),
runtime,
None,
user_provider,
)) as Box<dyn Server>);
let fe_pg_addr_clone = fe_pg_addr.clone();

View File

@@ -12,19 +12,24 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use api::v1::alter_expr::Kind;
use api::v1::promql_request::Promql;
use api::v1::{
column, AddColumn, AddColumns, AlterExpr, Column, ColumnDataType, ColumnDef, CreateTableExpr,
InsertRequest, InsertRequests, PromInstantQuery, PromRangeQuery, PromqlRequest, RequestHeader,
SemanticType, TableId,
column, AddColumn, AddColumns, AlterExpr, Basic, Column, ColumnDataType, ColumnDef,
CreateTableExpr, InsertRequest, InsertRequests, PromInstantQuery, PromRangeQuery,
PromqlRequest, RequestHeader, SemanticType, TableId,
};
use client::{Client, Database, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_catalog::consts::{MIN_USER_TABLE_ID, MITO_ENGINE};
use common_query::Output;
use servers::auth::user_provider::StaticUserProvider;
use servers::prometheus::{PromData, PromSeries, PrometheusJsonResponse, PrometheusResponse};
use servers::server::Server;
use tests_integration::test_util::{setup_grpc_server, StorageType};
use tests_integration::test_util::{
setup_grpc_server, setup_grpc_server_with_user_provider, StorageType,
};
#[macro_export]
macro_rules! grpc_test {
@@ -60,6 +65,7 @@ macro_rules! grpc_tests {
test_auto_create_table,
test_insert_and_select,
test_dbname,
test_grpc_auth,
test_health_check,
test_prom_gateway_query,
);
@@ -111,6 +117,60 @@ pub async fn test_dbname(store_type: StorageType) {
guard.remove_all().await;
}
pub async fn test_grpc_auth(store_type: StorageType) {
let user_provider = StaticUserProvider::try_from("cmd:greptime_user=greptime_pwd").unwrap();
let (addr, mut guard, fe_grpc_server) = setup_grpc_server_with_user_provider(
store_type,
"auto_create_table",
Some(Arc::new(user_provider)),
)
.await;
let grpc_client = Client::with_urls(vec![addr]);
let mut db = Database::new_with_dbname(
format!("{}-{}", DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME),
grpc_client,
);
// 1. test without auth
let re = db.sql("show tables;").await;
assert!(re.is_err());
assert!(matches!(
re,
Err(client::Error::FlightGet {
tonic_code: tonic::Code::Unauthenticated,
..
})
));
// 2. test wrong auth
db.set_auth(api::v1::auth_header::AuthScheme::Basic(Basic {
username: "greptime_user".to_string(),
password: "wrong_pwd".to_string(),
}));
let re = db.sql("show tables;").await;
assert!(re.is_err());
assert!(matches!(
re,
Err(client::Error::FlightGet {
tonic_code: tonic::Code::Unauthenticated,
..
})
));
// 3. test right auth
db.set_auth(api::v1::auth_header::AuthScheme::Basic(Basic {
username: "greptime_user".to_string(),
password: "greptime_pwd".to_string(),
}));
let re = db.sql("show tables;").await;
assert!(re.is_ok());
let _ = fe_grpc_server.shutdown().await;
guard.remove_all().await;
}
pub async fn test_auto_create_table(store_type: StorageType) {
let (addr, mut guard, fe_grpc_server) =
setup_grpc_server(store_type, "auto_create_table").await;

View File

@@ -12,15 +12,19 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use axum::http::StatusCode;
use axum_test_helper::TestClient;
use common_error::status_code::StatusCode as ErrorCode;
use serde_json::json;
use servers::auth::user_provider::StaticUserProvider;
use servers::http::handler::HealthResponse;
use servers::http::{JsonOutput, JsonResponse};
use servers::prometheus::{PrometheusJsonResponse, PrometheusResponse};
use tests_integration::test_util::{
setup_test_http_app, setup_test_http_app_with_frontend, setup_test_prom_app_with_frontend,
setup_test_http_app, setup_test_http_app_with_frontend,
setup_test_http_app_with_frontend_and_user_provider, setup_test_prom_app_with_frontend,
StorageType,
};
@@ -53,6 +57,7 @@ macro_rules! http_tests {
http_test!(
$service,
test_http_auth,
test_sql_api,
test_prometheus_promql_api,
test_prom_http_api,
@@ -67,6 +72,47 @@ macro_rules! http_tests {
};
}
pub async fn test_http_auth(store_type: StorageType) {
common_telemetry::init_default_ut_logging();
let user_provider = StaticUserProvider::try_from("cmd:greptime_user=greptime_pwd").unwrap();
let (app, mut guard) = setup_test_http_app_with_frontend_and_user_provider(
store_type,
"sql_api",
Some(Arc::new(user_provider)),
)
.await;
let client = TestClient::new(app);
// 1. no auth
let res = client
.get("/v1/sql?db=public&sql=show tables;")
.send()
.await;
assert_eq!(res.status(), StatusCode::UNAUTHORIZED);
// 2. wrong auth
let res = client
.get("/v1/sql?db=public&sql=show tables;")
.header("Authorization", "basic Z3JlcHRpbWVfdXNlcjp3cm9uZ19wd2Q=")
.send()
.await;
assert_eq!(res.status(), StatusCode::UNAUTHORIZED);
// 3. right auth
let res = client
.get("/v1/sql?db=public&sql=show tables;")
.header(
"Authorization",
"basic Z3JlcHRpbWVfdXNlcjpncmVwdGltZV9wd2Q=",
)
.send()
.await;
assert_eq!(res.status(), StatusCode::OK);
guard.remove_all().await;
}
pub async fn test_sql_api(store_type: StorageType) {
common_telemetry::init_default_ut_logging();
let (app, mut guard) = setup_test_http_app_with_frontend(store_type, "sql_api").await;

View File

@@ -11,11 +11,17 @@
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use chrono::{DateTime, NaiveDate, NaiveDateTime, Utc};
use sqlx::mysql::MySqlPoolOptions;
use sqlx::postgres::PgPoolOptions;
use servers::auth::user_provider::StaticUserProvider;
use sqlx::mysql::{MySqlDatabaseError, MySqlPoolOptions};
use sqlx::postgres::{PgDatabaseError, PgPoolOptions};
use sqlx::Row;
use tests_integration::test_util::{setup_mysql_server, setup_pg_server, StorageType};
use tests_integration::test_util::{
setup_mysql_server, setup_mysql_server_with_user_provider, setup_pg_server,
setup_pg_server_with_user_provider, StorageType,
};
#[macro_export]
macro_rules! sql_test {
@@ -47,13 +53,72 @@ macro_rules! sql_tests {
sql_test!(
$service,
test_mysql_auth,
test_mysql_crud,
test_postgres_auth,
test_postgres_crud,
);
)*
};
}
pub async fn test_mysql_auth(store_type: StorageType) {
let user_provider = StaticUserProvider::try_from("cmd:greptime_user=greptime_pwd").unwrap();
let (addr, mut guard, fe_mysql_server) = setup_mysql_server_with_user_provider(
store_type,
"sql_crud",
Some(Arc::new(user_provider)),
)
.await;
// 1. no auth
let conn_re = MySqlPoolOptions::new()
.max_connections(2)
.connect(&format!("mysql://{addr}/public"))
.await;
assert!(conn_re.is_err());
assert_eq!(
conn_re
.err()
.unwrap()
.into_database_error()
.unwrap()
.downcast::<MySqlDatabaseError>()
.code(),
Some("28000")
);
// 2. wrong pwd
let conn_re = MySqlPoolOptions::new()
.max_connections(2)
.connect(&format!("mysql://greptime_user:wrong_pwd@{addr}/public"))
.await;
assert!(conn_re.is_err());
assert_eq!(
conn_re
.err()
.unwrap()
.into_database_error()
.unwrap()
.downcast::<MySqlDatabaseError>()
.code(),
Some("28000")
);
// 3. right pwd
let conn_re = MySqlPoolOptions::new()
.max_connections(2)
.connect(&format!("mysql://greptime_user:greptime_pwd@{addr}/public"))
.await;
assert!(conn_re.is_ok());
let _ = fe_mysql_server.shutdown().await;
guard.remove_all().await;
}
pub async fn test_mysql_crud(store_type: StorageType) {
let (addr, mut guard, fe_mysql_server) = setup_mysql_server(store_type, "sql_crud").await;
@@ -136,6 +201,62 @@ pub async fn test_mysql_crud(store_type: StorageType) {
guard.remove_all().await;
}
pub async fn test_postgres_auth(store_type: StorageType) {
let user_provider = StaticUserProvider::try_from("cmd:greptime_user=greptime_pwd").unwrap();
let (addr, mut guard, fe_pg_server) =
setup_pg_server_with_user_provider(store_type, "sql_crud", Some(Arc::new(user_provider)))
.await;
// 1. no auth
let conn_re = PgPoolOptions::new()
.max_connections(2)
.connect(&format!("postgres://{addr}/public"))
.await;
assert!(conn_re.is_err());
assert_eq!(
conn_re
.err()
.unwrap()
.into_database_error()
.unwrap()
.downcast::<PgDatabaseError>()
.code(),
"28P01"
);
// 2. wrong pwd
let conn_re = PgPoolOptions::new()
.max_connections(2)
.connect(&format!("postgres://greptime_user:wrong_pwd@{addr}/public"))
.await;
assert!(conn_re.is_err());
assert_eq!(
conn_re
.err()
.unwrap()
.into_database_error()
.unwrap()
.downcast::<PgDatabaseError>()
.code(),
"28P01"
);
// 2. right pwd
let conn_re = PgPoolOptions::new()
.max_connections(2)
.connect(&format!(
"postgres://greptime_user:greptime_pwd@{addr}/public"
))
.await;
assert!(conn_re.is_ok());
let _ = fe_pg_server.shutdown().await;
guard.remove_all().await;
}
pub async fn test_postgres_crud(store_type: StorageType) {
let (addr, mut guard, fe_pg_server) = setup_pg_server(store_type, "sql_crud").await;