From fda9e80cbf461a8d375f1a699d76a07e42e63e34 Mon Sep 17 00:00:00 2001 From: shuiyisong <113876041+shuiyisong@users.noreply.github.com> Date: Wed, 14 Dec 2022 16:38:29 +0800 Subject: [PATCH] feat: impl static_user_provider (#739) * feat: add MemUserProvider and impl auth * feat: impl user_provider option in fe and standalone mode * chore: add file impl for mem provider * chore: remove mem opts * chore: minor change * chore: refac pg server to use user_provider as indicator for using pwd auth * chore: fix test * chore: extract common code * chore: add unit test * chore: rebase develop * chore: add user provider to http server * chore: minor rename * chore: change to ref when convert to anymap * chore: fix according to clippy * chore: remove clone on startcommand * chore: fix cr issue * chore: update tempdir use * chore: change TryFrom to normal func while parsing anymap * chore: minor change * chore: remove to_lowercase --- Cargo.lock | 11 + src/cmd/Cargo.toml | 1 + src/cmd/src/error.rs | 7 + src/cmd/src/frontend.rs | 54 +++- src/cmd/src/standalone.rs | 42 ++- src/frontend/Cargo.toml | 1 + src/frontend/src/frontend.rs | 26 +- src/frontend/src/postgres.rs | 2 - src/frontend/src/server.rs | 7 +- src/servers/Cargo.toml | 3 + src/servers/src/auth.rs | 58 ++++- src/servers/src/auth/user_provider.rs | 253 +++++++++++++++++++ src/servers/src/postgres/auth_handler.rs | 6 +- src/servers/src/postgres/server.rs | 2 - src/servers/tests/http/influxdb_test.rs | 18 +- src/servers/tests/mysql/mysql_server_test.rs | 51 ++-- src/servers/tests/postgres/mod.rs | 13 +- 17 files changed, 482 insertions(+), 73 deletions(-) create mode 100644 src/servers/src/auth/user_provider.rs diff --git a/Cargo.lock b/Cargo.lock index 305cce0a89..86c5827f0f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -127,6 +127,12 @@ version = "1.0.65" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "98161a4e3e2184da77bb14f02184cdd111e83bbbcc9979dfee3c44b9a85f5602" +[[package]] +name = "anymap" +version = "1.0.0-beta.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f1f8f5a6f3d50d89e3797d7593a50f96bb2aaa20ca0cc7be1fb673232c91d72" + [[package]] name = "api" version = "0.1.0" @@ -1276,6 +1282,7 @@ dependencies = [ name = "cmd" version = "0.1.0" dependencies = [ + "anymap", "build-data", "clap 3.2.22", "common-error", @@ -2475,6 +2482,7 @@ dependencies = [ name = "frontend" version = "0.1.0" dependencies = [ + "anymap", "api", "async-stream", "async-trait", @@ -6114,6 +6122,7 @@ dependencies = [ "common-telemetry", "common-time", "datatypes", + "digest", "futures", "hex", "http-body", @@ -6138,10 +6147,12 @@ dependencies = [ "serde", "serde_json", "session", + "sha1", "snafu", "snap", "strum 0.24.1", "table", + "tempdir", "tokio", "tokio-postgres", "tokio-postgres-rustls", diff --git a/src/cmd/Cargo.toml b/src/cmd/Cargo.toml index 8168b98788..f9db96e42d 100644 --- a/src/cmd/Cargo.toml +++ b/src/cmd/Cargo.toml @@ -10,6 +10,7 @@ name = "greptime" path = "src/bin/greptime.rs" [dependencies] +anymap = "1.0.0-beta.2" clap = { version = "3.1", features = ["derive"] } common-error = { path = "../common/error" } common-telemetry = { path = "../common/telemetry", features = [ diff --git a/src/cmd/src/error.rs b/src/cmd/src/error.rs index c57cda3f97..14fe0a9c27 100644 --- a/src/cmd/src/error.rs +++ b/src/cmd/src/error.rs @@ -55,6 +55,12 @@ pub enum Error { #[snafu(display("Illegal config: {}", msg))] IllegalConfig { msg: String, backtrace: Backtrace }, + + #[snafu(display("Illegal auth config: {}", source))] + IllegalAuthConfig { + #[snafu(backtrace)] + source: servers::auth::Error, + }, } pub type Result = std::result::Result; @@ -69,6 +75,7 @@ impl ErrorExt for Error { StatusCode::InvalidArguments } Error::IllegalConfig { .. } => StatusCode::InvalidArguments, + Error::IllegalAuthConfig { .. } => StatusCode::InvalidArguments, } } diff --git a/src/cmd/src/frontend.rs b/src/cmd/src/frontend.rs index 59695c3bbf..3b98332b33 100644 --- a/src/cmd/src/frontend.rs +++ b/src/cmd/src/frontend.rs @@ -14,6 +14,7 @@ use std::sync::Arc; +use anymap::AnyMap; use clap::Parser; use frontend::frontend::{Frontend, FrontendOptions}; use frontend::grpc::GrpcOptions; @@ -23,12 +24,13 @@ use frontend::mysql::MysqlOptions; use frontend::opentsdb::OpentsdbOptions; use frontend::postgres::PostgresOptions; use meta_client::MetaClientOpts; +use servers::auth::UserProviderRef; use servers::http::HttpOptions; use servers::tls::{TlsMode, TlsOption}; -use servers::Mode; +use servers::{auth, Mode}; use snafu::ResultExt; -use crate::error::{self, Result}; +use crate::error::{self, IllegalAuthConfigSnafu, Result}; use crate::toml_loader; #[derive(Parser)] @@ -80,21 +82,35 @@ pub struct StartCommand { tls_cert_path: Option, #[clap(long)] tls_key_path: Option, + #[clap(long)] + user_provider: Option, } impl StartCommand { async fn run(self) -> Result<()> { + let plugins = load_frontend_plugins(&self.user_provider)?; let opts: FrontendOptions = self.try_into()?; let mut frontend = Frontend::new( opts.clone(), Instance::try_new_distributed(&opts) .await .context(error::StartFrontendSnafu)?, + plugins, ); frontend.start().await.context(error::StartFrontendSnafu) } } +pub fn load_frontend_plugins(user_provider: &Option) -> Result { + let mut plugins = AnyMap::new(); + + if let Some(provider) = user_provider { + let provider = auth::user_provider_from_option(provider).context(IllegalAuthConfigSnafu)?; + plugins.insert::(provider); + } + Ok(plugins) +} + impl TryFrom for FrontendOptions { type Error = error::Error; @@ -160,6 +176,8 @@ impl TryFrom for FrontendOptions { mod tests { use std::time::Duration; + use servers::auth::{Identity, Password, UserProviderRef}; + use super::*; #[test] @@ -176,6 +194,7 @@ mod tests { tls_mode: None, tls_cert_path: None, tls_key_path: None, + user_provider: None, }; let opts: FrontendOptions = command.try_into().unwrap(); @@ -228,6 +247,7 @@ mod tests { tls_mode: None, tls_cert_path: None, tls_key_path: None, + user_provider: None, }; let fe_opts = FrontendOptions::try_from(command).unwrap(); @@ -241,4 +261,34 @@ mod tests { fe_opts.http_options.as_ref().unwrap().timeout ); } + + #[tokio::test] + async fn test_try_from_start_command_to_anymap() { + let command = StartCommand { + http_addr: None, + grpc_addr: None, + mysql_addr: None, + postgres_addr: None, + opentsdb_addr: None, + influxdb_enable: None, + config_file: None, + metasrv_addr: None, + tls_mode: None, + tls_cert_path: None, + tls_key_path: None, + user_provider: Some("static_user_provider:cmd:test=test".to_string()), + }; + + let plugins = load_frontend_plugins(&command.user_provider); + assert!(plugins.is_ok()); + let plugins = plugins.unwrap(); + let provider = plugins.get::(); + assert!(provider.is_some()); + + let provider = provider.unwrap(); + let result = provider + .auth(Identity::UserId("test", None), Password::PlainText("test")) + .await; + assert!(result.is_ok()); + } } diff --git a/src/cmd/src/standalone.rs b/src/cmd/src/standalone.rs index e6d109b6a2..bd0ca573c8 100644 --- a/src/cmd/src/standalone.rs +++ b/src/cmd/src/standalone.rs @@ -14,6 +14,7 @@ use std::sync::Arc; +use anymap::AnyMap; use clap::Parser; use common_telemetry::info; use datanode::datanode::{Datanode, DatanodeOptions, ObjectStoreConfig}; @@ -33,6 +34,7 @@ use servers::Mode; use snafu::ResultExt; use crate::error::{Error, IllegalConfigSnafu, Result, StartDatanodeSnafu, StartFrontendSnafu}; +use crate::frontend::load_frontend_plugins; use crate::toml_loader; #[derive(Parser)] @@ -142,12 +144,15 @@ struct StartCommand { tls_cert_path: Option, #[clap(long)] tls_key_path: Option, + #[clap(long)] + user_provider: Option, } impl StartCommand { async fn run(self) -> Result<()> { let enable_memory_catalog = self.enable_memory_catalog; let config_file = self.config_file.clone(); + let plugins = load_frontend_plugins(&self.user_provider)?; let fe_opts = FrontendOptions::try_from(self)?; let dn_opts: DatanodeOptions = { let mut opts: StandaloneOptions = if let Some(path) = config_file { @@ -167,7 +172,7 @@ impl StartCommand { let mut datanode = Datanode::new(dn_opts.clone()) .await .context(StartDatanodeSnafu)?; - let mut frontend = build_frontend(fe_opts, datanode.get_instance()).await?; + let mut frontend = build_frontend(fe_opts, plugins, datanode.get_instance()).await?; // Start datanode instance before starting services, to avoid requests come in before internal components are started. datanode @@ -184,12 +189,13 @@ impl StartCommand { /// Build frontend instance in standalone mode async fn build_frontend( fe_opts: FrontendOptions, + plugins: AnyMap, datanode_instance: InstanceRef, ) -> Result> { let mut frontend_instance = FeInstance::new_standalone(datanode_instance.clone()); frontend_instance.set_catalog_manager(datanode_instance.catalog_manager().clone()); frontend_instance.set_script_handler(datanode_instance); - Ok(Frontend::new(fe_opts, frontend_instance)) + Ok(Frontend::new(fe_opts, frontend_instance, plugins)) } impl TryFrom for FrontendOptions { @@ -274,6 +280,8 @@ impl TryFrom for FrontendOptions { mod tests { use std::time::Duration; + use servers::auth::{Identity, Password, UserProviderRef}; + use super::*; #[test] @@ -293,6 +301,7 @@ mod tests { tls_mode: None, tls_cert_path: None, tls_key_path: None, + user_provider: None, }; let fe_opts = FrontendOptions::try_from(cmd).unwrap(); @@ -316,4 +325,33 @@ mod tests { assert_eq!(2, fe_opts.mysql_options.as_ref().unwrap().runtime_size); assert!(fe_opts.influxdb_options.as_ref().unwrap().enable); } + + #[tokio::test] + async fn test_try_from_start_command_to_anymap() { + let command = StartCommand { + http_addr: None, + rpc_addr: None, + mysql_addr: None, + postgres_addr: None, + opentsdb_addr: None, + config_file: None, + influxdb_enable: false, + enable_memory_catalog: false, + tls_mode: None, + tls_cert_path: None, + tls_key_path: None, + user_provider: Some("static_user_provider:cmd:test=test".to_string()), + }; + + let plugins = load_frontend_plugins(&command.user_provider); + assert!(plugins.is_ok()); + let plugins = plugins.unwrap(); + let provider = plugins.get::(); + assert!(provider.is_some()); + let provider = provider.unwrap(); + let result = provider + .auth(Identity::UserId("test", None), Password::PlainText("test")) + .await; + assert!(result.is_ok()); + } } diff --git a/src/frontend/Cargo.toml b/src/frontend/Cargo.toml index 452131d0d7..56065fe1c0 100644 --- a/src/frontend/Cargo.toml +++ b/src/frontend/Cargo.toml @@ -5,6 +5,7 @@ edition = "2021" license = "Apache-2.0" [dependencies] +anymap = "1.0.0-beta.2" api = { path = "../api" } async-stream = "0.3" async-trait = "0.1" diff --git a/src/frontend/src/frontend.rs b/src/frontend/src/frontend.rs index 8b1a47182a..c73d229e1b 100644 --- a/src/frontend/src/frontend.rs +++ b/src/frontend/src/frontend.rs @@ -14,7 +14,7 @@ use std::sync::Arc; -use common_telemetry::info; +use anymap::AnyMap; use meta_client::MetaClientOpts; use serde::{Deserialize, Serialize}; use servers::auth::UserProviderRef; @@ -67,29 +67,18 @@ where { opts: FrontendOptions, instance: Option, - user_provider: Option, + plugins: AnyMap, } -impl Frontend -where - T: FrontendInstance, -{ - pub fn new(opts: FrontendOptions, instance: T) -> Self { +impl Frontend { + pub fn new(opts: FrontendOptions, instance: T, plugins: AnyMap) -> Self { Self { opts, instance: Some(instance), - user_provider: None, + plugins, } } - pub fn set_user_provider(&mut self, user_provider: Option) { - info!( - "Configured user provider: {:?}", - user_provider.as_ref().map(|u| u.name()) - ); - self.user_provider = user_provider; - } - pub async fn start(&mut self) -> Result<()> { let mut instance = self .instance @@ -100,6 +89,9 @@ where instance.start().await?; let instance = Arc::new(instance); - Services::start(&self.opts, instance, self.user_provider.clone()).await + + let provider = self.plugins.get::().cloned(); + + Services::start(&self.opts, instance, provider).await } } diff --git a/src/frontend/src/postgres.rs b/src/frontend/src/postgres.rs index 0b8c7d44e2..c2df2f54dc 100644 --- a/src/frontend/src/postgres.rs +++ b/src/frontend/src/postgres.rs @@ -21,7 +21,6 @@ use servers::tls::TlsOption; pub struct PostgresOptions { pub addr: String, pub runtime_size: usize, - pub check_pwd: bool, #[serde(default = "Default::default")] pub tls: Arc, } @@ -31,7 +30,6 @@ impl Default for PostgresOptions { Self { addr: "127.0.0.1:4003".to_string(), runtime_size: 2, - check_pwd: false, tls: Default::default(), } } diff --git a/src/frontend/src/server.rs b/src/frontend/src/server.rs index 7bad5a6467..d3c55b8c97 100644 --- a/src/frontend/src/server.rs +++ b/src/frontend/src/server.rs @@ -99,10 +99,9 @@ impl Services { let pg_server = Box::new(PostgresServer::new( instance.clone(), - opts.check_pwd, opts.tls.clone(), pg_io_runtime, - user_provider, + user_provider.clone(), )) as Box; Some((pg_server, pg_addr)) @@ -132,6 +131,10 @@ impl Services { let http_addr = parse_addr(&http_options.addr)?; let mut http_server = HttpServer::new(instance.clone(), http_options.clone()); + if let Some(user_provider) = user_provider { + http_server.set_user_provider(user_provider); + } + if opentsdb_server_and_addr.is_some() { http_server.set_opentsdb_handler(instance.clone()); } diff --git a/src/servers/Cargo.toml b/src/servers/Cargo.toml index 1912346761..3abb18b1c2 100644 --- a/src/servers/Cargo.toml +++ b/src/servers/Cargo.toml @@ -22,6 +22,7 @@ common-runtime = { path = "../common/runtime" } common-telemetry = { path = "../common/telemetry" } common-time = { path = "../common/time" } datatypes = { path = "../datatypes" } +digest = "0.10" futures = "0.3" hex = { version = "0.4" } http-body = "0.4" @@ -43,6 +44,7 @@ schemars = "0.8" serde = "1.0" serde_json = "1.0" session = { path = "../session" } +sha1 = "0.10" snafu = { version = "0.7", features = ["backtraces"] } snap = "1" strum = { version = "0.24", features = ["derive"] } @@ -67,6 +69,7 @@ rand = "0.8" script = { path = "../script", features = ["python"] } serde_json = "1.0" table = { path = "../table" } +tempdir = "0.3" tokio-postgres = "0.7" tokio-postgres-rustls = "0.9" tokio-test = "0.4" diff --git a/src/servers/src/auth.rs b/src/servers/src/auth.rs index 56003efcbe..8e79e9926f 100644 --- a/src/servers/src/auth.rs +++ b/src/servers/src/auth.rs @@ -12,13 +12,17 @@ // See the License for the specific language governing permissions and // limitations under the License. +pub mod user_provider; + pub const DEFAULT_USERNAME: &str = "greptime"; use std::sync::Arc; use common_error::prelude::ErrorExt; use common_error::status_code::StatusCode; -use snafu::{Backtrace, ErrorCompat, Snafu}; +use snafu::{Backtrace, ErrorCompat, OptionExt, Snafu}; + +use crate::auth::user_provider::StaticUserProvider; #[async_trait::async_trait] pub trait UserProvider: Send + Sync { @@ -73,11 +77,40 @@ impl UserInfo { } } +pub fn user_provider_from_option(opt: &String) -> Result { + let (name, content) = opt.split_once(':').context(InvalidConfigSnafu { + value: opt.to_string(), + msg: "UserProviderOption must be in format `