From 6173c0f44c4b86273ac6fb7f49bd6645ce25fcc1 Mon Sep 17 00:00:00 2001 From: Dmitrii Kovalkov <34828390+DimasKovas@users.noreply.github.com> Date: Tue, 22 Apr 2025 17:19:03 +0400 Subject: [PATCH] safekeeper: add enable_tls_wal_service_api (#11520) ## Problem Safekeeper doesn't use TLS in wal service - Closes: https://github.com/neondatabase/cloud/issues/27302 ## Summary of changes - Add `enable_tls_wal_service_api` option to safekeeper's cmd arguments - Propagate `tls_server_config` to `wal_service` if the option is enabled - Create `BACKGROUND_RUNTIME` for small background tasks and offload SSL certificate reloader to it. No integration tests for now because support from compute side is required: https://github.com/neondatabase/cloud/issues/25823 --- safekeeper/src/bin/safekeeper.rs | 54 +++++++++++++++++-- safekeeper/src/http/mod.rs | 16 +----- safekeeper/src/lib.rs | 11 ++++ safekeeper/src/wal_service.rs | 8 ++- .../tests/walproposer_sim/safekeeper.rs | 1 + 5 files changed, 69 insertions(+), 21 deletions(-) diff --git a/safekeeper/src/bin/safekeeper.rs b/safekeeper/src/bin/safekeeper.rs index 5fc742cda7..000235f2f5 100644 --- a/safekeeper/src/bin/safekeeper.rs +++ b/safekeeper/src/bin/safekeeper.rs @@ -14,6 +14,7 @@ use clap::{ArgAction, Parser}; use futures::future::BoxFuture; use futures::stream::FuturesUnordered; use futures::{FutureExt, StreamExt}; +use http_utils::tls_certs::ReloadingCertificateResolver; use metrics::set_build_info_metric; use remote_storage::RemoteStorageConfig; use safekeeper::defaults::{ @@ -23,8 +24,8 @@ use safekeeper::defaults::{ DEFAULT_SSL_CERT_RELOAD_PERIOD, DEFAULT_SSL_KEY_FILE, }; use safekeeper::{ - BROKER_RUNTIME, GlobalTimelines, HTTP_RUNTIME, SafeKeeperConf, WAL_SERVICE_RUNTIME, broker, - control_file, http, wal_backup, wal_service, + BACKGROUND_RUNTIME, BROKER_RUNTIME, GlobalTimelines, HTTP_RUNTIME, SafeKeeperConf, + WAL_SERVICE_RUNTIME, broker, control_file, http, wal_backup, wal_service, }; use sd_notify::NotifyState; use storage_broker::{DEFAULT_ENDPOINT, Uri}; @@ -215,16 +216,21 @@ struct Args { ssl_cert_file: Utf8PathBuf, /// Period to reload certificate and private key from files. #[arg(long, value_parser = humantime::parse_duration, default_value = DEFAULT_SSL_CERT_RELOAD_PERIOD)] - pub ssl_cert_reload_period: Duration, + ssl_cert_reload_period: Duration, /// Trusted root CA certificates to use in https APIs. #[arg(long)] - pub ssl_ca_file: Option, + ssl_ca_file: Option, /// Flag to use https for requests to peer's safekeeper API. #[arg(long)] - pub use_https_safekeeper_api: bool, + use_https_safekeeper_api: bool, /// Path to the JWT auth token used to authenticate with other safekeepers. #[arg(long)] auth_token_path: Option, + /// Enable TLS in WAL service API. + /// Does not force TLS: the client negotiates TLS usage during the handshake. + /// Uses key and certificate from ssl_key_file/ssl_cert_file. + #[arg(long)] + enable_tls_wal_service_api: bool, } // Like PathBufValueParser, but allows empty string. @@ -418,6 +424,7 @@ async fn main() -> anyhow::Result<()> { ssl_cert_reload_period: args.ssl_cert_reload_period, ssl_ca_certs, use_https_safekeeper_api: args.use_https_safekeeper_api, + enable_tls_wal_service_api: args.enable_tls_wal_service_api, }); // initialize sentry if SENTRY_DSN is provided @@ -517,6 +524,36 @@ async fn start_safekeeper(conf: Arc) -> Result<()> { info!("running in current thread runtime"); } + let tls_server_config = if conf.listen_https_addr.is_some() || conf.enable_tls_wal_service_api { + let ssl_key_file = conf.ssl_key_file.clone(); + let ssl_cert_file = conf.ssl_cert_file.clone(); + let ssl_cert_reload_period = conf.ssl_cert_reload_period; + + // Create resolver in BACKGROUND_RUNTIME, so the background certificate reloading + // task is run in this runtime. + let cert_resolver = current_thread_rt + .as_ref() + .unwrap_or_else(|| BACKGROUND_RUNTIME.handle()) + .spawn(async move { + ReloadingCertificateResolver::new( + "main", + &ssl_key_file, + &ssl_cert_file, + ssl_cert_reload_period, + ) + .await + }) + .await??; + + let config = rustls::ServerConfig::builder() + .with_no_client_auth() + .with_cert_resolver(cert_resolver); + + Some(Arc::new(config)) + } else { + None + }; + let wal_service_handle = current_thread_rt .as_ref() .unwrap_or_else(|| WAL_SERVICE_RUNTIME.handle()) @@ -524,6 +561,9 @@ async fn start_safekeeper(conf: Arc) -> Result<()> { conf.clone(), pg_listener, Scope::SafekeeperData, + conf.enable_tls_wal_service_api + .then(|| tls_server_config.clone()) + .flatten(), global_timelines.clone(), )) // wrap with task name for error reporting @@ -552,6 +592,9 @@ async fn start_safekeeper(conf: Arc) -> Result<()> { conf.clone(), pg_listener_tenant_only, Scope::Tenant, + conf.enable_tls_wal_service_api + .then(|| tls_server_config.clone()) + .flatten(), global_timelines.clone(), )) // wrap with task name for error reporting @@ -577,6 +620,7 @@ async fn start_safekeeper(conf: Arc) -> Result<()> { .spawn(http::task_main_https( conf.clone(), https_listener, + tls_server_config.expect("tls_server_config is set earlier if https is enabled"), global_timelines.clone(), )) .map(|res| ("HTTPS service main".to_owned(), res)); diff --git a/safekeeper/src/http/mod.rs b/safekeeper/src/http/mod.rs index 6e7c5d971d..0003310763 100644 --- a/safekeeper/src/http/mod.rs +++ b/safekeeper/src/http/mod.rs @@ -1,7 +1,6 @@ pub mod routes; use std::sync::Arc; -use http_utils::tls_certs::ReloadingCertificateResolver; pub use routes::make_router; pub use safekeeper_api::models; use tokio_util::sync::CancellationToken; @@ -28,21 +27,10 @@ pub async fn task_main_http( pub async fn task_main_https( conf: Arc, https_listener: std::net::TcpListener, + tls_config: Arc, global_timelines: Arc, ) -> anyhow::Result<()> { - let cert_resolver = ReloadingCertificateResolver::new( - "main", - &conf.ssl_key_file, - &conf.ssl_cert_file, - conf.ssl_cert_reload_period, - ) - .await?; - - let server_config = rustls::ServerConfig::builder() - .with_no_client_auth() - .with_cert_resolver(cert_resolver); - - let tls_acceptor = tokio_rustls::TlsAcceptor::from(Arc::new(server_config)); + let tls_acceptor = tokio_rustls::TlsAcceptor::from(tls_config); let router = make_router(conf, global_timelines) .build() diff --git a/safekeeper/src/lib.rs b/safekeeper/src/lib.rs index 9f7580a313..ef2608e5d6 100644 --- a/safekeeper/src/lib.rs +++ b/safekeeper/src/lib.rs @@ -122,6 +122,7 @@ pub struct SafeKeeperConf { pub ssl_cert_reload_period: Duration, pub ssl_ca_certs: Vec, pub use_https_safekeeper_api: bool, + pub enable_tls_wal_service_api: bool, } impl SafeKeeperConf { @@ -172,6 +173,7 @@ impl SafeKeeperConf { ssl_cert_reload_period: Duration::from_secs(60), ssl_ca_certs: Vec::new(), use_https_safekeeper_api: false, + enable_tls_wal_service_api: false, } } } @@ -209,3 +211,12 @@ pub static WAL_BACKUP_RUNTIME: Lazy = Lazy::new(|| { .build() .expect("Failed to create WAL backup runtime") }); + +pub static BACKGROUND_RUNTIME: Lazy = Lazy::new(|| { + tokio::runtime::Builder::new_multi_thread() + .thread_name("background worker") + .worker_threads(1) // there is only one task now (ssl certificate reloading), having more threads doesn't make sense + .enable_all() + .build() + .expect("Failed to create background runtime") +}); diff --git a/safekeeper/src/wal_service.rs b/safekeeper/src/wal_service.rs index 045fa88cb0..6e007265b2 100644 --- a/safekeeper/src/wal_service.rs +++ b/safekeeper/src/wal_service.rs @@ -29,6 +29,7 @@ pub async fn task_main( conf: Arc, pg_listener: std::net::TcpListener, allowed_auth_scope: Scope, + tls_config: Option>, global_timelines: Arc, ) -> anyhow::Result<()> { // Tokio's from_std won't do this for us, per its comment. @@ -43,9 +44,10 @@ pub async fn task_main( let conf = conf.clone(); let conn_id = issue_connection_id(&mut connection_count); let global_timelines = global_timelines.clone(); + let tls_config = tls_config.clone(); tokio::spawn( async move { - if let Err(err) = handle_socket(socket, conf, conn_id, allowed_auth_scope, global_timelines).await { + if let Err(err) = handle_socket(socket, conf, conn_id, allowed_auth_scope, tls_config, global_timelines).await { error!("connection handler exited: {}", err); } } @@ -61,6 +63,7 @@ async fn handle_socket( conf: Arc, conn_id: ConnectionId, allowed_auth_scope: Scope, + tls_config: Option>, global_timelines: Arc, ) -> Result<(), QueryError> { socket.set_nodelay(true)?; @@ -110,7 +113,8 @@ async fn handle_socket( auth_pair, global_timelines, ); - let pgbackend = PostgresBackend::new_from_io(socket_fd, socket, peer_addr, auth_type, None)?; + let pgbackend = + PostgresBackend::new_from_io(socket_fd, socket, peer_addr, auth_type, tls_config)?; // libpq protocol between safekeeper and walproposer / pageserver // We don't use shutdown. pgbackend diff --git a/safekeeper/tests/walproposer_sim/safekeeper.rs b/safekeeper/tests/walproposer_sim/safekeeper.rs index b3f088d31c..5fb29683f2 100644 --- a/safekeeper/tests/walproposer_sim/safekeeper.rs +++ b/safekeeper/tests/walproposer_sim/safekeeper.rs @@ -185,6 +185,7 @@ pub fn run_server(os: NodeOs, disk: Arc) -> Result<()> { ssl_cert_reload_period: Duration::ZERO, ssl_ca_certs: Vec::new(), use_https_safekeeper_api: false, + enable_tls_wal_service_api: false, }; let mut global = GlobalMap::new(disk, conf.clone())?;