diff --git a/Cargo.lock b/Cargo.lock index 20f746fce6..fef8597128 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2579,6 +2579,9 @@ dependencies = [ "rand 0.9.0", "regex", "rskafka", + "rustls", + "rustls-native-certs 0.7.3", + "rustls-pemfile", "serde", "serde_json", "serde_with", @@ -2590,6 +2593,7 @@ dependencies = [ "table", "tokio", "tokio-postgres", + "tokio-postgres-rustls", "tonic 0.12.3", "tracing", "typetag", diff --git a/config/config.md b/config/config.md index 23c166aecd..3aea01a581 100644 --- a/config/config.md +++ b/config/config.md @@ -337,6 +337,12 @@ | `runtime` | -- | -- | The runtime options. | | `runtime.global_rt_size` | Integer | `8` | The number of threads to execute the runtime for global read operations. | | `runtime.compact_rt_size` | Integer | `4` | The number of threads to execute the runtime for global write operations. | +| `backend_tls` | -- | -- | TLS configuration for kv store backend (only applicable for PostgreSQL/MySQL backends)
When using PostgreSQL or MySQL as metadata store, you can configure TLS here | +| `backend_tls.mode` | String | `prefer` | TLS mode, refer to https://www.postgresql.org/docs/current/libpq-ssl.html
- "disable" - No TLS
- "prefer" (default) - Try TLS, fallback to plain
- "require" - Require TLS
- "verify_ca" - Require TLS and verify CA
- "verify_full" - Require TLS and verify hostname | +| `backend_tls.cert_path` | String | `""` | Path to client certificate file (for client authentication)
Like "/path/to/client.crt" | +| `backend_tls.key_path` | String | `""` | Path to client private key file (for client authentication)
Like "/path/to/client.key" | +| `backend_tls.ca_cert_path` | String | `""` | Path to CA certificate file (for server certificate verification)
Required when using custom CAs or self-signed certificates
Leave empty to use system root certificates only
Like "/path/to/ca.crt" | +| `backend_tls.watch` | Bool | `false` | Watch for certificate file changes and auto reload | | `grpc` | -- | -- | The gRPC server options. | | `grpc.bind_addr` | String | `127.0.0.1:3002` | The address to bind the gRPC server. | | `grpc.server_addr` | String | `127.0.0.1:3002` | The communication server address for the frontend and datanode to connect to metasrv.
If left empty or unset, the server will automatically use the IP address of the first network interface
on the host, with the same port number as the one specified in `bind_addr`. | diff --git a/config/metasrv.example.toml b/config/metasrv.example.toml index 03b74f3953..893c3a7ee8 100644 --- a/config/metasrv.example.toml +++ b/config/metasrv.example.toml @@ -65,6 +65,34 @@ node_max_idle_time = "24hours" ## The number of threads to execute the runtime for global write operations. #+ compact_rt_size = 4 +## TLS configuration for kv store backend (only applicable for PostgreSQL/MySQL backends) +## When using PostgreSQL or MySQL as metadata store, you can configure TLS here +[backend_tls] +## TLS mode, refer to https://www.postgresql.org/docs/current/libpq-ssl.html +## - "disable" - No TLS +## - "prefer" (default) - Try TLS, fallback to plain +## - "require" - Require TLS +## - "verify_ca" - Require TLS and verify CA +## - "verify_full" - Require TLS and verify hostname +mode = "prefer" + +## Path to client certificate file (for client authentication) +## Like "/path/to/client.crt" +cert_path = "" + +## Path to client private key file (for client authentication) +## Like "/path/to/client.key" +key_path = "" + +## Path to CA certificate file (for server certificate verification) +## Required when using custom CAs or self-signed certificates +## Leave empty to use system root certificates only +## Like "/path/to/ca.crt" +ca_cert_path = "" + +## Watch for certificate file changes and auto reload +watch = false + ## The gRPC server options. [grpc] ## The address to bind the gRPC server. diff --git a/src/cli/src/metadata/common.rs b/src/cli/src/metadata/common.rs index 7e77cd49fb..2455aa400c 100644 --- a/src/cli/src/metadata/common.rs +++ b/src/cli/src/metadata/common.rs @@ -75,7 +75,7 @@ impl StoreConfig { #[cfg(feature = "pg_kvbackend")] BackendImpl::PostgresStore => { let table_name = &self.meta_table_name; - let pool = meta_srv::bootstrap::create_postgres_pool(store_addrs) + let pool = meta_srv::bootstrap::create_postgres_pool(store_addrs, None) .await .map_err(BoxedError::new)?; Ok(common_meta::kv_backend::rds::PgStore::with_pg_pool( diff --git a/src/cmd/tests/load_config_test.rs b/src/cmd/tests/load_config_test.rs index e79ba915d4..55a457317a 100644 --- a/src/cmd/tests/load_config_test.rs +++ b/src/cmd/tests/load_config_test.rs @@ -34,6 +34,7 @@ use query::options::QueryOptions; use servers::export_metrics::ExportMetricsOption; use servers::grpc::GrpcOptions; use servers::http::HttpOptions; +use servers::tls::{TlsMode, TlsOption}; use store_api::path_utils::WAL_DIR; #[allow(deprecated)] @@ -190,6 +191,13 @@ fn test_load_metasrv_example_config() { remote_write: Some(Default::default()), ..Default::default() }, + backend_tls: Some(TlsOption { + mode: TlsMode::Prefer, + cert_path: String::new(), + key_path: String::new(), + ca_cert_path: String::new(), + watch: false, + }), ..Default::default() }, ..Default::default() @@ -299,6 +307,7 @@ fn test_load_standalone_example_config() { cors_allowed_origins: vec!["https://example.com".to_string()], ..Default::default() }, + ..Default::default() }, ..Default::default() diff --git a/src/common/meta/Cargo.toml b/src/common/meta/Cargo.toml index aef6293bc9..788029e570 100644 --- a/src/common/meta/Cargo.toml +++ b/src/common/meta/Cargo.toml @@ -6,7 +6,16 @@ license.workspace = true [features] testing = [] -pg_kvbackend = ["dep:tokio-postgres", "dep:backon", "dep:deadpool-postgres", "dep:deadpool"] +pg_kvbackend = [ + "dep:tokio-postgres", + "dep:backon", + "dep:deadpool-postgres", + "dep:deadpool", + "dep:tokio-postgres-rustls", + "dep:rustls-pemfile", + "dep:rustls-native-certs", + "dep:rustls", +] mysql_kvbackend = ["dep:sqlx", "dep:backon"] enterprise = [] @@ -57,6 +66,9 @@ prost.workspace = true rand.workspace = true regex.workspace = true rskafka.workspace = true +rustls = { workspace = true, default-features = false, features = ["ring", "logging", "std", "tls12"], optional = true } +rustls-native-certs = { version = "0.7", optional = true } +rustls-pemfile = { version = "2.0", optional = true } serde.workspace = true serde_json.workspace = true serde_with.workspace = true @@ -68,6 +80,7 @@ strum.workspace = true table = { workspace = true, features = ["testing"] } tokio.workspace = true tokio-postgres = { workspace = true, optional = true } +tokio-postgres-rustls = { version = "0.12", optional = true } tonic.workspace = true tracing.workspace = true typetag.workspace = true diff --git a/src/common/meta/src/error.rs b/src/common/meta/src/error.rs index 4af0c28cb8..0f97207ef3 100644 --- a/src/common/meta/src/error.rs +++ b/src/common/meta/src/error.rs @@ -740,6 +740,32 @@ pub enum Error { operation: String, }, + #[cfg(feature = "pg_kvbackend")] + #[snafu(display("Failed to setup PostgreSQL TLS configuration: {}", reason))] + PostgresTlsConfig { + reason: String, + #[snafu(implicit)] + location: Location, + }, + + #[cfg(feature = "pg_kvbackend")] + #[snafu(display("Failed to load TLS certificate from path: {}", path))] + LoadTlsCertificate { + path: String, + #[snafu(source)] + error: std::io::Error, + #[snafu(implicit)] + location: Location, + }, + + #[cfg(feature = "pg_kvbackend")] + #[snafu(display("Invalid TLS configuration: {}", reason))] + InvalidTlsConfig { + reason: String, + #[snafu(implicit)] + location: Location, + }, + #[cfg(feature = "mysql_kvbackend")] #[snafu(display("Failed to execute via MySql, sql: {}", sql))] MySqlExecution { @@ -1080,7 +1106,10 @@ impl ErrorExt for Error { PostgresExecution { .. } | CreatePostgresPool { .. } | GetPostgresConnection { .. } - | PostgresTransaction { .. } => StatusCode::Internal, + | PostgresTransaction { .. } + | PostgresTlsConfig { .. } + | LoadTlsCertificate { .. } + | InvalidTlsConfig { .. } => StatusCode::Internal, #[cfg(feature = "mysql_kvbackend")] MySqlExecution { .. } | CreateMySqlPool { .. } | MySqlTransaction { .. } => { StatusCode::Internal diff --git a/src/common/meta/src/kv_backend/rds.rs b/src/common/meta/src/kv_backend/rds.rs index 8e57964051..35a5716544 100644 --- a/src/common/meta/src/kv_backend/rds.rs +++ b/src/common/meta/src/kv_backend/rds.rs @@ -40,7 +40,7 @@ const RDS_STORE_OP_RANGE_DELETE: &str = "range_delete"; const RDS_STORE_OP_BATCH_DELETE: &str = "batch_delete"; #[cfg(feature = "pg_kvbackend")] -mod postgres; +pub mod postgres; #[cfg(feature = "pg_kvbackend")] pub use postgres::PgStore; @@ -118,7 +118,7 @@ impl ExecutorImpl<'_, T> { } } - #[warn(dead_code)] // Used in #[cfg(feature = "mysql_kvbackend")] + #[allow(dead_code)] // Used in #[cfg(feature = "mysql_kvbackend")] async fn execute(&mut self, query: &str, params: &Vec<&Vec>) -> Result<()> { match self { Self::Default(executor) => executor.execute(query, params).await, diff --git a/src/common/meta/src/kv_backend/rds/postgres.rs b/src/common/meta/src/kv_backend/rds/postgres.rs index e0661f6f7a..e5d2ca98d0 100644 --- a/src/common/meta/src/kv_backend/rds/postgres.rs +++ b/src/common/meta/src/kv_backend/rds/postgres.rs @@ -12,19 +12,29 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::fs::File; +use std::io::BufReader; use std::marker::PhantomData; use std::sync::Arc; use common_telemetry::debug; use deadpool_postgres::{Config, Pool, Runtime}; +use rustls::client::danger::{HandshakeSignatureValid, ServerCertVerified, ServerCertVerifier}; +use rustls::pki_types::{CertificateDer, ServerName, UnixTime}; +use rustls::server::ParsedCertificate; +// TLS-related imports (feature-gated) +use rustls::ClientConfig; +use rustls::{DigitallySignedStruct, Error as TlsError, SignatureScheme}; +use rustls_pemfile::{certs, private_key}; use snafu::ResultExt; use strum::AsRefStr; use tokio_postgres::types::ToSql; use tokio_postgres::{IsolationLevel, NoTls, Row}; +use tokio_postgres_rustls::MakeRustlsConnect; use crate::error::{ - CreatePostgresPoolSnafu, GetPostgresConnectionSnafu, PostgresExecutionSnafu, - PostgresTransactionSnafu, Result, + CreatePostgresPoolSnafu, GetPostgresConnectionSnafu, LoadTlsCertificateSnafu, + PostgresExecutionSnafu, PostgresTlsConfigSnafu, PostgresTransactionSnafu, Result, }; use crate::kv_backend::rds::{ Executor, ExecutorFactory, ExecutorImpl, KvQueryExecutor, RdsStore, Transaction, @@ -38,6 +48,41 @@ use crate::rpc::store::{ }; use crate::rpc::KeyValue; +/// TLS mode configuration for PostgreSQL connections. +/// This mirrors the TlsMode from servers::tls to avoid circular dependencies. +#[derive(Debug, Clone, PartialEq, Eq, Default)] +pub enum TlsMode { + Disable, + #[default] + Prefer, + Require, + VerifyCa, + VerifyFull, +} + +/// TLS configuration for PostgreSQL connections. +/// This mirrors the TlsOption from servers::tls to avoid circular dependencies. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct TlsOption { + pub mode: TlsMode, + pub cert_path: String, + pub key_path: String, + pub ca_cert_path: String, + pub watch: bool, +} + +impl Default for TlsOption { + fn default() -> Self { + TlsOption { + mode: TlsMode::Prefer, + cert_path: String::new(), + key_path: String::new(), + ca_cert_path: String::new(), + watch: false, + } + } +} + const PG_STORE_NAME: &str = "pg_store"; pub struct PgClient(deadpool::managed::Object); @@ -348,6 +393,265 @@ impl ExecutorFactory for PgExecutorFactory { /// It uses [deadpool_postgres::Pool] as the connection pool for [RdsStore]. pub type PgStore = RdsStore; +/// Creates a PostgreSQL TLS connector based on the provided configuration. +/// +/// This function creates a rustls-based TLS connector for PostgreSQL connections, +/// following PostgreSQL's TLS mode specifications exactly: +/// +/// # TLS Modes (PostgreSQL Specification) +/// +/// - `Disable`: No TLS connection attempted +/// - `Prefer`: Try TLS first, fallback to plaintext if TLS fails (handled by connection logic) +/// - `Require`: Only TLS connections, but NO certificate verification (accept any cert) +/// - `VerifyCa`: TLS + verify certificate is signed by trusted CA (no hostname verification) +/// - `VerifyFull`: TLS + verify CA + verify hostname matches certificate SAN +/// +pub fn create_postgres_tls_connector(tls_config: &TlsOption) -> Result { + common_telemetry::info!( + "Creating PostgreSQL TLS connector with mode: {:?}", + tls_config.mode + ); + + let config_builder = match tls_config.mode { + TlsMode::Disable => { + return PostgresTlsConfigSnafu { + reason: "Cannot create TLS connector for Disable mode".to_string(), + } + .fail(); + } + TlsMode::Prefer | TlsMode::Require => { + // For Prefer/Require: Accept any certificate (no verification) + let verifier = Arc::new(AcceptAnyVerifier); + ClientConfig::builder() + .dangerous() + .with_custom_certificate_verifier(verifier) + } + TlsMode::VerifyCa => { + // For VerifyCa: Verify server cert against CA store, but skip hostname verification + let ca_store = load_ca(&tls_config.ca_cert_path)?; + let verifier = Arc::new(NoHostnameVerification { roots: ca_store }); + ClientConfig::builder() + .dangerous() + .with_custom_certificate_verifier(verifier) + } + TlsMode::VerifyFull => { + let ca_store = load_ca(&tls_config.ca_cert_path)?; + ClientConfig::builder().with_root_certificates(ca_store) + } + }; + + // Create the TLS client configuration based on the mode and client cert requirements + let client_config = if !tls_config.cert_path.is_empty() && !tls_config.key_path.is_empty() { + // Client certificate authentication required + common_telemetry::info!("Loading client certificate for mutual TLS"); + let cert_chain = load_certs(&tls_config.cert_path)?; + let private_key = load_private_key(&tls_config.key_path)?; + + config_builder + .with_client_auth_cert(cert_chain, private_key) + .map_err(|e| { + PostgresTlsConfigSnafu { + reason: format!("Failed to configure client authentication: {}", e), + } + .build() + })? + } else { + common_telemetry::info!("No client certificate provided, skip client authentication"); + config_builder.with_no_client_auth() + }; + + common_telemetry::info!("Successfully created PostgreSQL TLS connector"); + Ok(MakeRustlsConnect::new(client_config)) +} + +/// For Prefer/Require mode, we accept any server certificate without verification. +#[derive(Debug)] +struct AcceptAnyVerifier; + +impl ServerCertVerifier for AcceptAnyVerifier { + fn verify_server_cert( + &self, + _end_entity: &CertificateDer<'_>, + _intermediates: &[CertificateDer<'_>], + _server_name: &ServerName<'_>, + _ocsp_response: &[u8], + _now: UnixTime, + ) -> std::result::Result { + common_telemetry::debug!( + "Accepting server certificate without verification (Prefer/Require mode)" + ); + Ok(ServerCertVerified::assertion()) + } + + fn verify_tls12_signature( + &self, + _message: &[u8], + _cert: &CertificateDer<'_>, + _dss: &DigitallySignedStruct, + ) -> std::result::Result { + // Accept any signature without verification + Ok(HandshakeSignatureValid::assertion()) + } + + fn verify_tls13_signature( + &self, + _message: &[u8], + _cert: &CertificateDer<'_>, + _dss: &DigitallySignedStruct, + ) -> std::result::Result { + // Accept any signature without verification + Ok(HandshakeSignatureValid::assertion()) + } + + fn supported_verify_schemes(&self) -> Vec { + // Support all signature schemes + rustls::crypto::ring::default_provider() + .signature_verification_algorithms + .supported_schemes() + } +} + +/// For VerifyCa mode, we verify the server certificate against our CA store +/// and skip verify server's HostName. +#[derive(Debug)] +struct NoHostnameVerification { + roots: Arc, +} + +impl ServerCertVerifier for NoHostnameVerification { + fn verify_server_cert( + &self, + end_entity: &CertificateDer<'_>, + intermediates: &[CertificateDer<'_>], + _server_name: &ServerName<'_>, + _ocsp_response: &[u8], + now: UnixTime, + ) -> std::result::Result { + let cert = ParsedCertificate::try_from(end_entity)?; + rustls::client::verify_server_cert_signed_by_trust_anchor( + &cert, + &self.roots, + intermediates, + now, + rustls::crypto::ring::default_provider() + .signature_verification_algorithms + .all, + )?; + + Ok(ServerCertVerified::assertion()) + } + + fn verify_tls12_signature( + &self, + message: &[u8], + cert: &CertificateDer<'_>, + dss: &DigitallySignedStruct, + ) -> std::result::Result { + rustls::crypto::verify_tls12_signature( + message, + cert, + dss, + &rustls::crypto::ring::default_provider().signature_verification_algorithms, + ) + } + + fn verify_tls13_signature( + &self, + message: &[u8], + cert: &CertificateDer<'_>, + dss: &DigitallySignedStruct, + ) -> std::result::Result { + rustls::crypto::verify_tls13_signature( + message, + cert, + dss, + &rustls::crypto::ring::default_provider().signature_verification_algorithms, + ) + } + + fn supported_verify_schemes(&self) -> Vec { + // Support all signature schemes + rustls::crypto::ring::default_provider() + .signature_verification_algorithms + .supported_schemes() + } +} + +fn load_certs(path: &str) -> Result>> { + let file = File::open(path).context(LoadTlsCertificateSnafu { path })?; + let mut reader = BufReader::new(file); + let certs = certs(&mut reader) + .collect::, _>>() + .map_err(|e| { + PostgresTlsConfigSnafu { + reason: format!("Failed to parse certificates from {}: {}", path, e), + } + .build() + })?; + Ok(certs) +} + +fn load_private_key(path: &str) -> Result> { + let file = File::open(path).context(LoadTlsCertificateSnafu { path })?; + let mut reader = BufReader::new(file); + let key = private_key(&mut reader) + .map_err(|e| { + PostgresTlsConfigSnafu { + reason: format!("Failed to parse private key from {}: {}", path, e), + } + .build() + })? + .ok_or_else(|| { + PostgresTlsConfigSnafu { + reason: format!("No private key found in {}", path), + } + .build() + })?; + Ok(key) +} + +fn load_ca(path: &str) -> Result> { + let mut root_store = rustls::RootCertStore::empty(); + + // Add system root certificates + match rustls_native_certs::load_native_certs() { + Ok(certs) => { + let num_certs = certs.len(); + for cert in certs { + if let Err(e) = root_store.add(cert) { + return PostgresTlsConfigSnafu { + reason: format!("Failed to add root certificate: {}", e), + } + .fail(); + } + } + common_telemetry::info!("Loaded {num_certs} system root certificates successfully"); + } + Err(e) => { + return PostgresTlsConfigSnafu { + reason: format!("Failed to load system root certificates: {}", e), + } + .fail(); + } + } + + // Try add custom CA certificate if provided + if !path.is_empty() { + let ca_certs = load_certs(path)?; + for cert in ca_certs { + if let Err(e) = root_store.add(cert) { + return PostgresTlsConfigSnafu { + reason: format!("Failed to add custom CA certificate: {}", e), + } + .fail(); + } + } + common_telemetry::info!("Added custom CA certificate from {}", path); + } + + Ok(Arc::new(root_store)) +} + #[async_trait::async_trait] impl KvQueryExecutor for PgStore { async fn range_with_query_executor( @@ -491,17 +795,54 @@ impl KvQueryExecutor for PgStore { } impl PgStore { - /// Create [PgStore] impl of [KvBackendRef] from url. - pub async fn with_url(url: &str, table_name: &str, max_txn_ops: usize) -> Result { + /// Create [PgStore] impl of [KvBackendRef] from url with optional TLS support. + /// + /// # Arguments + /// + /// * `url` - PostgreSQL connection URL + /// * `table_name` - Name of the table to use for key-value storage + /// * `max_txn_ops` - Maximum number of operations per transaction + /// * `tls_config` - Optional TLS configuration. If None, uses plaintext connection. + pub async fn with_url_and_tls( + url: &str, + table_name: &str, + max_txn_ops: usize, + tls_config: Option, + ) -> Result { let mut cfg = Config::new(); cfg.url = Some(url.to_string()); - // TODO(weny, CookiePie): add tls support - let pool = cfg - .create_pool(Some(Runtime::Tokio1), NoTls) - .context(CreatePostgresPoolSnafu)?; + + let pool = match tls_config { + Some(tls_config) if tls_config.mode != TlsMode::Disable => { + match create_postgres_tls_connector(&tls_config) { + Ok(tls_connector) => cfg + .create_pool(Some(Runtime::Tokio1), tls_connector) + .context(CreatePostgresPoolSnafu)?, + Err(e) => { + if tls_config.mode == TlsMode::Prefer { + // Fallback to insecure connection if TLS fails + common_telemetry::info!("Failed to create TLS connector, falling back to insecure connection"); + cfg.create_pool(Some(Runtime::Tokio1), NoTls) + .context(CreatePostgresPoolSnafu)? + } else { + return Err(e); + } + } + } + } + _ => cfg + .create_pool(Some(Runtime::Tokio1), NoTls) + .context(CreatePostgresPoolSnafu)?, + }; + Self::with_pg_pool(pool, table_name, max_txn_ops).await } + /// Create [PgStore] impl of [KvBackendRef] from url (backward compatibility). + pub async fn with_url(url: &str, table_name: &str, max_txn_ops: usize) -> Result { + Self::with_url_and_tls(url, table_name, max_txn_ops, None).await + } + /// Create [PgStore] impl of [KvBackendRef] from [deadpool_postgres::Pool]. pub async fn with_pg_pool( pool: Pool, diff --git a/src/meta-srv/src/bootstrap.rs b/src/meta-srv/src/bootstrap.rs index f52cefa1a6..82d3eac035 100644 --- a/src/meta-srv/src/bootstrap.rs +++ b/src/meta-srv/src/bootstrap.rs @@ -21,11 +21,16 @@ use api::v1::meta::procedure_service_server::ProcedureServiceServer; use api::v1::meta::store_server::StoreServer; use common_base::Plugins; use common_config::Configurable; +use common_error::ext::BoxedError; #[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))] use common_meta::distributed_time_constants::META_LEASE_SECS; use common_meta::kv_backend::chroot::ChrootKvBackend; use common_meta::kv_backend::etcd::EtcdStore; use common_meta::kv_backend::memory::MemoryKvBackend; +#[cfg(feature = "pg_kvbackend")] +use common_meta::kv_backend::rds::postgres::create_postgres_tls_connector; +#[cfg(feature = "pg_kvbackend")] +use common_meta::kv_backend::rds::postgres::{TlsMode as PgTlsMode, TlsOption as PgTlsOption}; #[cfg(feature = "mysql_kvbackend")] use common_meta::kv_backend::rds::MySqlStore; #[cfg(feature = "pg_kvbackend")] @@ -41,6 +46,7 @@ use servers::export_metrics::ExportMetricsTask; use servers::http::{HttpServer, HttpServerBuilder}; use servers::metrics_handler::MetricsHandler; use servers::server::Server; +use servers::tls::TlsOption; #[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))] use snafu::OptionExt; use snafu::ResultExt; @@ -310,7 +316,8 @@ pub async fn metasrv_builder( cfg.keepalives = Some(true); cfg.keepalives_idle = Some(Duration::from_secs(POSTGRES_KEEP_ALIVE_SECS)); // We use a separate pool for election since we need a different session keep-alive idle time. - let pool = create_postgres_pool_with(&opts.store_addrs, cfg).await?; + let pool = + create_postgres_pool_with(&opts.store_addrs, cfg, opts.backend_tls.clone()).await?; let election_client = ElectionPgClient::new( pool, @@ -329,7 +336,7 @@ pub async fn metasrv_builder( ) .await?; - let pool = create_postgres_pool(&opts.store_addrs).await?; + let pool = create_postgres_pool(&opts.store_addrs, opts.backend_tls.clone()).await?; let kv_backend = PgStore::with_pg_pool(pool, &opts.meta_table_name, opts.max_txn_ops) .await .context(error::KvBackendSnafu)?; @@ -440,28 +447,64 @@ pub async fn create_etcd_client(store_addrs: &[String]) -> Result { } #[cfg(feature = "pg_kvbackend")] -/// Creates a pool for the Postgres backend. -/// -/// It only use first store addr to create a pool. -pub async fn create_postgres_pool(store_addrs: &[String]) -> Result { - create_postgres_pool_with(store_addrs, Config::new()).await +/// Converts servers::tls::TlsOption to postgres::TlsOption to avoid circular dependencies +fn convert_tls_option(tls_option: &TlsOption) -> PgTlsOption { + let mode = match tls_option.mode { + servers::tls::TlsMode::Disable => PgTlsMode::Disable, + servers::tls::TlsMode::Prefer => PgTlsMode::Prefer, + servers::tls::TlsMode::Require => PgTlsMode::Require, + servers::tls::TlsMode::VerifyCa => PgTlsMode::VerifyCa, + servers::tls::TlsMode::VerifyFull => PgTlsMode::VerifyFull, + }; + + PgTlsOption { + mode, + cert_path: tls_option.cert_path.clone(), + key_path: tls_option.key_path.clone(), + ca_cert_path: tls_option.ca_cert_path.clone(), + watch: tls_option.watch, + } } #[cfg(feature = "pg_kvbackend")] -/// Creates a pool for the Postgres backend. +/// Creates a pool for the Postgres backend with optional TLS. +/// +/// It only use first store addr to create a pool. +pub async fn create_postgres_pool( + store_addrs: &[String], + tls_config: Option, +) -> Result { + create_postgres_pool_with(store_addrs, Config::new(), tls_config).await +} + +#[cfg(feature = "pg_kvbackend")] +/// Creates a pool for the Postgres backend with config and optional TLS. /// /// It only use first store addr to create a pool, and use the given config to create a pool. pub async fn create_postgres_pool_with( store_addrs: &[String], mut cfg: Config, + tls_config: Option, ) -> Result { let postgres_url = store_addrs.first().context(error::InvalidArgumentsSnafu { err_msg: "empty store addrs", })?; cfg.url = Some(postgres_url.to_string()); - let pool = cfg - .create_pool(Some(Runtime::Tokio1), NoTls) - .context(error::CreatePostgresPoolSnafu)?; + + let pool = if let Some(tls_config) = tls_config { + let pg_tls_config = convert_tls_option(&tls_config); + let tls_connector = + create_postgres_tls_connector(&pg_tls_config).map_err(|e| error::Error::Other { + source: BoxedError::new(e), + location: snafu::Location::new(file!(), line!(), 0), + })?; + cfg.create_pool(Some(Runtime::Tokio1), tls_connector) + .context(error::CreatePostgresPoolSnafu)? + } else { + cfg.create_pool(Some(Runtime::Tokio1), NoTls) + .context(error::CreatePostgresPoolSnafu)? + }; + Ok(pool) } diff --git a/src/meta-srv/src/election/rds/postgres.rs b/src/meta-srv/src/election/rds/postgres.rs index 7caa3a249b..4d50fe0867 100644 --- a/src/meta-srv/src/election/rds/postgres.rs +++ b/src/meta-srv/src/election/rds/postgres.rs @@ -819,7 +819,7 @@ mod tests { } .fail(); } - let pool = create_postgres_pool(&[endpoint]).await.unwrap(); + let pool = create_postgres_pool(&[endpoint], None).await.unwrap(); let mut pg_client = ElectionPgClient::new( pool, execution_timeout, diff --git a/src/meta-srv/src/metasrv.rs b/src/meta-srv/src/metasrv.rs index 0fdcd862d7..038cb8dd57 100644 --- a/src/meta-srv/src/metasrv.rs +++ b/src/meta-srv/src/metasrv.rs @@ -50,6 +50,7 @@ use serde::{Deserialize, Serialize}; use servers::export_metrics::ExportMetricsOption; use servers::grpc::GrpcOptions; use servers::http::HttpOptions; +use servers::tls::TlsOption; use snafu::{OptionExt, ResultExt}; use store_api::storage::RegionId; use table::metadata::TableId; @@ -106,6 +107,10 @@ pub struct MetasrvOptions { pub server_addr: String, /// The address of the store, e.g., etcd. pub store_addrs: Vec, + /// TLS configuration for kv store backend (PostgreSQL/MySQL) + /// Only applicable when using PostgreSQL or MySQL as the metadata store + #[serde(default)] + pub backend_tls: Option, /// The type of selector. pub selector: SelectorType, /// Whether to use the memory store. @@ -180,6 +185,7 @@ impl fmt::Debug for MetasrvOptions { let mut debug_struct = f.debug_struct("MetasrvOptions"); debug_struct .field("store_addrs", &self.sanitize_store_addrs()) + .field("backend_tls", &self.backend_tls) .field("selector", &self.selector) .field("use_memory_store", &self.use_memory_store) .field("enable_region_failover", &self.enable_region_failover) @@ -225,6 +231,7 @@ impl Default for MetasrvOptions { #[allow(deprecated)] server_addr: String::new(), store_addrs: vec!["127.0.0.1:2379".to_string()], + backend_tls: None, selector: SelectorType::default(), use_memory_store: false, enable_region_failover: false, diff --git a/src/servers/src/tls.rs b/src/servers/src/tls.rs index cbc3b6d30c..1810aeca66 100644 --- a/src/servers/src/tls.rs +++ b/src/servers/src/tls.rs @@ -62,6 +62,8 @@ pub struct TlsOption { #[serde(default)] pub key_path: String, #[serde(default)] + pub ca_cert_path: String, + #[serde(default)] pub watch: bool, } @@ -253,6 +255,7 @@ mod tests { mode: Disable, cert_path: "/path/to/cert_path".to_string(), key_path: "/path/to/key_path".to_string(), + ca_cert_path: String::new(), watch: false }, TlsOption::new( @@ -413,6 +416,7 @@ mod tests { .into_os_string() .into_string() .expect("failed to convert path to string"), + ca_cert_path: String::new(), watch: true, }; diff --git a/src/servers/tests/mysql/mysql_server_test.rs b/src/servers/tests/mysql/mysql_server_test.rs index a3d299a90a..e6abdf22d2 100644 --- a/src/servers/tests/mysql/mysql_server_test.rs +++ b/src/servers/tests/mysql/mysql_server_test.rs @@ -259,6 +259,7 @@ async fn test_server_required_secure_client_plain() -> Result<()> { mode: servers::tls::TlsMode::Require, cert_path: "tests/ssl/server.crt".to_owned(), key_path: "tests/ssl/server-rsa.key".to_owned(), + ca_cert_path: String::new(), watch: false, }; @@ -298,6 +299,7 @@ async fn test_server_required_secure_client_plain_with_pkcs8_priv_key() -> Resul mode: servers::tls::TlsMode::Require, cert_path: "tests/ssl/server.crt".to_owned(), key_path: "tests/ssl/server-pkcs8.key".to_owned(), + ca_cert_path: String::new(), watch: false, }; @@ -602,6 +604,7 @@ async fn do_test_query_all_datatypes_with_secure_server( "tests/ssl/server-rsa.key".to_owned() } }, + ca_cert_path: String::new(), watch: false, }; diff --git a/src/servers/tests/postgres/mod.rs b/src/servers/tests/postgres/mod.rs index 0daffdc32a..612aa3dd94 100644 --- a/src/servers/tests/postgres/mod.rs +++ b/src/servers/tests/postgres/mod.rs @@ -273,6 +273,7 @@ async fn test_server_secure_require_client_plain() -> Result<()> { mode: servers::tls::TlsMode::Require, cert_path: "tests/ssl/server.crt".to_owned(), key_path: "tests/ssl/server-rsa.key".to_owned(), + ca_cert_path: String::new(), watch: false, }; let server_port = start_test_server(server_tls).await?; @@ -289,6 +290,7 @@ async fn test_server_secure_require_client_plain_with_pkcs8_priv_key() -> Result mode: servers::tls::TlsMode::Require, cert_path: "tests/ssl/server.crt".to_owned(), key_path: "tests/ssl/server-pkcs8.key".to_owned(), + ca_cert_path: String::new(), watch: false, }; let server_port = start_test_server(server_tls).await?; @@ -525,6 +527,7 @@ async fn do_simple_query_with_secure_server( "tests/ssl/server-rsa.key".to_owned() } }, + ca_cert_path: String::new(), watch: false, }; diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index b44741b307..4fafc35442 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -1081,6 +1081,7 @@ runtime_size = 8 mode = "disable" cert_path = "" key_path = "" +ca_cert_path = "" watch = false [mysql] @@ -1093,6 +1094,7 @@ keep_alive = "0s" mode = "disable" cert_path = "" key_path = "" +ca_cert_path = "" watch = false [postgres] @@ -1105,6 +1107,7 @@ keep_alive = "0s" mode = "disable" cert_path = "" key_path = "" +ca_cert_path = "" watch = false [opentsdb]