From 5eb2c609a34563df5e50e547515b009783eb0efb Mon Sep 17 00:00:00 2001 From: shuiyisong <113876041+shuiyisong@users.noreply.github.com> Date: Tue, 1 Aug 2023 23:18:31 +0800 Subject: [PATCH] fix: auth in grpc (#2056) * fix: auth in grpc * fix: change to return err * fix: add grpc test * fix: add http test * fix: add mysql and pg test --- src/servers/src/grpc/handler.rs | 5 +- tests-integration/src/test_util.rs | 53 ++++++++++-- tests-integration/tests/grpc.rs | 68 ++++++++++++++- tests-integration/tests/http.rs | 48 ++++++++++- tests-integration/tests/sql.rs | 127 ++++++++++++++++++++++++++++- 5 files changed, 286 insertions(+), 15 deletions(-) diff --git a/src/servers/src/grpc/handler.rs b/src/servers/src/grpc/handler.rs index 097294bcd5..9216f31139 100644 --- a/src/servers/src/grpc/handler.rs +++ b/src/servers/src/grpc/handler.rs @@ -70,7 +70,10 @@ impl GreptimeRequestHandler { let header = request.header.as_ref(); let query_ctx = create_query_context(header); - let _ = self.auth(header, &query_ctx).await?; + if let Err(e) = self.auth(header, &query_ctx).await? { + return Ok(Err(e)); + } + let handler = self.handler.clone(); let request_type = request_type(&query); let db = query_ctx.get_db_string(); diff --git a/tests-integration/src/test_util.rs b/tests-integration/src/test_util.rs index 5d56ddb472..b1a10beb75 100644 --- a/tests-integration/src/test_util.rs +++ b/tests-integration/src/test_util.rs @@ -48,6 +48,7 @@ use object_store::services::{Azblob, Gcs, Oss, S3}; use object_store::test_util::TempFolder; use object_store::ObjectStore; use secrecy::ExposeSecret; +use servers::auth::UserProviderRef; use servers::grpc::GrpcServer; use servers::http::{HttpOptions, HttpServerBuilder}; use servers::metrics_handler::MetricsHandler; @@ -404,6 +405,14 @@ pub async fn setup_test_http_app(store_type: StorageType, name: &str) -> (Router pub async fn setup_test_http_app_with_frontend( store_type: StorageType, name: &str, +) -> (Router, TestGuard) { + setup_test_http_app_with_frontend_and_user_provider(store_type, name, None).await +} + +pub async fn setup_test_http_app_with_frontend_and_user_provider( + store_type: StorageType, + name: &str, + user_provider: Option, ) -> (Router, TestGuard) { let (opts, guard) = create_tmp_dir_and_datanode_opts(store_type, name); let (instance, heartbeat) = Instance::with_mock_meta_client(&opts).await.unwrap(); @@ -429,12 +438,20 @@ pub async fn setup_test_http_app_with_frontend( }; let frontend_ref = Arc::new(frontend); - let http_server = HttpServerBuilder::new(http_opts) + let mut http_server = HttpServerBuilder::new(http_opts); + + http_server .with_sql_handler(ServerSqlQueryHandlerAdaptor::arc(frontend_ref.clone())) .with_grpc_handler(ServerGrpcQueryHandlerAdaptor::arc(frontend_ref.clone())) .with_script_handler(frontend_ref) - .with_greptime_config_options(opts.to_toml_string()) - .build(); + .with_greptime_config_options(opts.to_toml_string()); + + if let Some(user_provider) = user_provider { + http_server.with_user_provider(user_provider); + } + + let http_server = http_server.build(); + let app = http_server.build(http_server.make_app()); (app, guard) } @@ -532,6 +549,14 @@ pub async fn setup_test_prom_app_with_frontend( pub async fn setup_grpc_server( store_type: StorageType, name: &str, +) -> (String, TestGuard, Arc) { + setup_grpc_server_with_user_provider(store_type, name, None).await +} + +pub async fn setup_grpc_server_with_user_provider( + store_type: StorageType, + name: &str, + user_provider: Option, ) -> (String, TestGuard, Arc) { common_telemetry::init_default_ut_logging(); @@ -557,7 +582,7 @@ pub async fn setup_grpc_server( let fe_grpc_server = Arc::new(GrpcServer::new( ServerGrpcQueryHandlerAdaptor::arc(fe_instance_ref.clone()), Some(fe_instance_ref.clone()), - None, + user_provider, runtime, )); @@ -587,6 +612,14 @@ pub async fn check_output_stream(output: Output, expected: &str) { pub async fn setup_mysql_server( store_type: StorageType, name: &str, +) -> (String, TestGuard, Arc>) { + setup_mysql_server_with_user_provider(store_type, name, None).await +} + +pub async fn setup_mysql_server_with_user_provider( + store_type: StorageType, + name: &str, + user_provider: Option, ) -> (String, TestGuard, Arc>) { common_telemetry::init_default_ut_logging(); @@ -619,7 +652,7 @@ pub async fn setup_mysql_server( runtime, Arc::new(MysqlSpawnRef::new( ServerSqlQueryHandlerAdaptor::arc(fe_instance_ref), - None, + user_provider, )), Arc::new(MysqlSpawnConfig::new( false, @@ -643,6 +676,14 @@ pub async fn setup_mysql_server( pub async fn setup_pg_server( store_type: StorageType, name: &str, +) -> (String, TestGuard, Arc>) { + setup_pg_server_with_user_provider(store_type, name, None).await +} + +pub async fn setup_pg_server_with_user_provider( + store_type: StorageType, + name: &str, + user_provider: Option, ) -> (String, TestGuard, Arc>) { common_telemetry::init_default_ut_logging(); @@ -675,7 +716,7 @@ pub async fn setup_pg_server( ServerSqlQueryHandlerAdaptor::arc(fe_instance_ref), opts.tls.clone(), runtime, - None, + user_provider, )) as Box); let fe_pg_addr_clone = fe_pg_addr.clone(); diff --git a/tests-integration/tests/grpc.rs b/tests-integration/tests/grpc.rs index 364e306e54..a65004c71c 100644 --- a/tests-integration/tests/grpc.rs +++ b/tests-integration/tests/grpc.rs @@ -12,19 +12,24 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::sync::Arc; + use api::v1::alter_expr::Kind; use api::v1::promql_request::Promql; use api::v1::{ - column, AddColumn, AddColumns, AlterExpr, Column, ColumnDataType, ColumnDef, CreateTableExpr, - InsertRequest, InsertRequests, PromInstantQuery, PromRangeQuery, PromqlRequest, RequestHeader, - SemanticType, TableId, + column, AddColumn, AddColumns, AlterExpr, Basic, Column, ColumnDataType, ColumnDef, + CreateTableExpr, InsertRequest, InsertRequests, PromInstantQuery, PromRangeQuery, + PromqlRequest, RequestHeader, SemanticType, TableId, }; use client::{Client, Database, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use common_catalog::consts::{MIN_USER_TABLE_ID, MITO_ENGINE}; use common_query::Output; +use servers::auth::user_provider::StaticUserProvider; use servers::prometheus::{PromData, PromSeries, PrometheusJsonResponse, PrometheusResponse}; use servers::server::Server; -use tests_integration::test_util::{setup_grpc_server, StorageType}; +use tests_integration::test_util::{ + setup_grpc_server, setup_grpc_server_with_user_provider, StorageType, +}; #[macro_export] macro_rules! grpc_test { @@ -60,6 +65,7 @@ macro_rules! grpc_tests { test_auto_create_table, test_insert_and_select, test_dbname, + test_grpc_auth, test_health_check, test_prom_gateway_query, ); @@ -111,6 +117,60 @@ pub async fn test_dbname(store_type: StorageType) { guard.remove_all().await; } +pub async fn test_grpc_auth(store_type: StorageType) { + let user_provider = StaticUserProvider::try_from("cmd:greptime_user=greptime_pwd").unwrap(); + + let (addr, mut guard, fe_grpc_server) = setup_grpc_server_with_user_provider( + store_type, + "auto_create_table", + Some(Arc::new(user_provider)), + ) + .await; + + let grpc_client = Client::with_urls(vec![addr]); + let mut db = Database::new_with_dbname( + format!("{}-{}", DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME), + grpc_client, + ); + + // 1. test without auth + let re = db.sql("show tables;").await; + assert!(re.is_err()); + assert!(matches!( + re, + Err(client::Error::FlightGet { + tonic_code: tonic::Code::Unauthenticated, + .. + }) + )); + + // 2. test wrong auth + db.set_auth(api::v1::auth_header::AuthScheme::Basic(Basic { + username: "greptime_user".to_string(), + password: "wrong_pwd".to_string(), + })); + let re = db.sql("show tables;").await; + assert!(re.is_err()); + assert!(matches!( + re, + Err(client::Error::FlightGet { + tonic_code: tonic::Code::Unauthenticated, + .. + }) + )); + + // 3. test right auth + db.set_auth(api::v1::auth_header::AuthScheme::Basic(Basic { + username: "greptime_user".to_string(), + password: "greptime_pwd".to_string(), + })); + let re = db.sql("show tables;").await; + assert!(re.is_ok()); + + let _ = fe_grpc_server.shutdown().await; + guard.remove_all().await; +} + pub async fn test_auto_create_table(store_type: StorageType) { let (addr, mut guard, fe_grpc_server) = setup_grpc_server(store_type, "auto_create_table").await; diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index a056e9cac7..dca65775db 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -12,15 +12,19 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::sync::Arc; + use axum::http::StatusCode; use axum_test_helper::TestClient; use common_error::status_code::StatusCode as ErrorCode; use serde_json::json; +use servers::auth::user_provider::StaticUserProvider; use servers::http::handler::HealthResponse; use servers::http::{JsonOutput, JsonResponse}; use servers::prometheus::{PrometheusJsonResponse, PrometheusResponse}; use tests_integration::test_util::{ - setup_test_http_app, setup_test_http_app_with_frontend, setup_test_prom_app_with_frontend, + setup_test_http_app, setup_test_http_app_with_frontend, + setup_test_http_app_with_frontend_and_user_provider, setup_test_prom_app_with_frontend, StorageType, }; @@ -53,6 +57,7 @@ macro_rules! http_tests { http_test!( $service, + test_http_auth, test_sql_api, test_prometheus_promql_api, test_prom_http_api, @@ -67,6 +72,47 @@ macro_rules! http_tests { }; } +pub async fn test_http_auth(store_type: StorageType) { + common_telemetry::init_default_ut_logging(); + + let user_provider = StaticUserProvider::try_from("cmd:greptime_user=greptime_pwd").unwrap(); + + let (app, mut guard) = setup_test_http_app_with_frontend_and_user_provider( + store_type, + "sql_api", + Some(Arc::new(user_provider)), + ) + .await; + let client = TestClient::new(app); + + // 1. no auth + let res = client + .get("/v1/sql?db=public&sql=show tables;") + .send() + .await; + assert_eq!(res.status(), StatusCode::UNAUTHORIZED); + + // 2. wrong auth + let res = client + .get("/v1/sql?db=public&sql=show tables;") + .header("Authorization", "basic Z3JlcHRpbWVfdXNlcjp3cm9uZ19wd2Q=") + .send() + .await; + assert_eq!(res.status(), StatusCode::UNAUTHORIZED); + + // 3. right auth + let res = client + .get("/v1/sql?db=public&sql=show tables;") + .header( + "Authorization", + "basic Z3JlcHRpbWVfdXNlcjpncmVwdGltZV9wd2Q=", + ) + .send() + .await; + assert_eq!(res.status(), StatusCode::OK); + guard.remove_all().await; +} + pub async fn test_sql_api(store_type: StorageType) { common_telemetry::init_default_ut_logging(); let (app, mut guard) = setup_test_http_app_with_frontend(store_type, "sql_api").await; diff --git a/tests-integration/tests/sql.rs b/tests-integration/tests/sql.rs index 1d1d2f824e..e4652d9665 100644 --- a/tests-integration/tests/sql.rs +++ b/tests-integration/tests/sql.rs @@ -11,11 +11,17 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. +use std::sync::Arc; + use chrono::{DateTime, NaiveDate, NaiveDateTime, Utc}; -use sqlx::mysql::MySqlPoolOptions; -use sqlx::postgres::PgPoolOptions; +use servers::auth::user_provider::StaticUserProvider; +use sqlx::mysql::{MySqlDatabaseError, MySqlPoolOptions}; +use sqlx::postgres::{PgDatabaseError, PgPoolOptions}; use sqlx::Row; -use tests_integration::test_util::{setup_mysql_server, setup_pg_server, StorageType}; +use tests_integration::test_util::{ + setup_mysql_server, setup_mysql_server_with_user_provider, setup_pg_server, + setup_pg_server_with_user_provider, StorageType, +}; #[macro_export] macro_rules! sql_test { @@ -47,13 +53,72 @@ macro_rules! sql_tests { sql_test!( $service, + test_mysql_auth, test_mysql_crud, + test_postgres_auth, test_postgres_crud, ); )* }; } +pub async fn test_mysql_auth(store_type: StorageType) { + let user_provider = StaticUserProvider::try_from("cmd:greptime_user=greptime_pwd").unwrap(); + let (addr, mut guard, fe_mysql_server) = setup_mysql_server_with_user_provider( + store_type, + "sql_crud", + Some(Arc::new(user_provider)), + ) + .await; + + // 1. no auth + let conn_re = MySqlPoolOptions::new() + .max_connections(2) + .connect(&format!("mysql://{addr}/public")) + .await; + + assert!(conn_re.is_err()); + assert_eq!( + conn_re + .err() + .unwrap() + .into_database_error() + .unwrap() + .downcast::() + .code(), + Some("28000") + ); + + // 2. wrong pwd + let conn_re = MySqlPoolOptions::new() + .max_connections(2) + .connect(&format!("mysql://greptime_user:wrong_pwd@{addr}/public")) + .await; + + assert!(conn_re.is_err()); + assert_eq!( + conn_re + .err() + .unwrap() + .into_database_error() + .unwrap() + .downcast::() + .code(), + Some("28000") + ); + + // 3. right pwd + let conn_re = MySqlPoolOptions::new() + .max_connections(2) + .connect(&format!("mysql://greptime_user:greptime_pwd@{addr}/public")) + .await; + + assert!(conn_re.is_ok()); + + let _ = fe_mysql_server.shutdown().await; + guard.remove_all().await; +} + pub async fn test_mysql_crud(store_type: StorageType) { let (addr, mut guard, fe_mysql_server) = setup_mysql_server(store_type, "sql_crud").await; @@ -136,6 +201,62 @@ pub async fn test_mysql_crud(store_type: StorageType) { guard.remove_all().await; } +pub async fn test_postgres_auth(store_type: StorageType) { + let user_provider = StaticUserProvider::try_from("cmd:greptime_user=greptime_pwd").unwrap(); + let (addr, mut guard, fe_pg_server) = + setup_pg_server_with_user_provider(store_type, "sql_crud", Some(Arc::new(user_provider))) + .await; + + // 1. no auth + let conn_re = PgPoolOptions::new() + .max_connections(2) + .connect(&format!("postgres://{addr}/public")) + .await; + + assert!(conn_re.is_err()); + assert_eq!( + conn_re + .err() + .unwrap() + .into_database_error() + .unwrap() + .downcast::() + .code(), + "28P01" + ); + + // 2. wrong pwd + let conn_re = PgPoolOptions::new() + .max_connections(2) + .connect(&format!("postgres://greptime_user:wrong_pwd@{addr}/public")) + .await; + + assert!(conn_re.is_err()); + assert_eq!( + conn_re + .err() + .unwrap() + .into_database_error() + .unwrap() + .downcast::() + .code(), + "28P01" + ); + + // 2. right pwd + let conn_re = PgPoolOptions::new() + .max_connections(2) + .connect(&format!( + "postgres://greptime_user:greptime_pwd@{addr}/public" + )) + .await; + + assert!(conn_re.is_ok()); + + let _ = fe_pg_server.shutdown().await; + guard.remove_all().await; +} + pub async fn test_postgres_crud(store_type: StorageType) { let (addr, mut guard, fe_pg_server) = setup_pg_server(store_type, "sql_crud").await;