From ef53a7643452cd44a60773239da9d786f1e30b35 Mon Sep 17 00:00:00 2001 From: Dmitrii Kovalkov <34828390+DimasKovas@users.noreply.github.com> Date: Fri, 25 Apr 2025 18:28:56 +0400 Subject: [PATCH] storage_broker: https handler (#11603) ## Problem Broker supports only HTTP, no HTTPS - Closes: https://github.com/neondatabase/cloud/issues/27492 ## Summary of changes - Add `listen_https_addr`, `ssl_key_file`, `ssl_cert_file`, `ssl_cert_reload_period` arguments to storage broker - Make `listen_addr` argument optional - Listen https in storage broker - Support https for storage broker request in neon_local - Add `use_https_storage_broker_api` option to NeonEnvBuilder --- Cargo.lock | 3 + control_plane/src/bin/neon_local.rs | 22 +++-- control_plane/src/broker.rs | 118 ++++++++++++++--------- control_plane/src/local_env.rs | 64 +++++++++--- control_plane/src/pageserver.rs | 16 +-- control_plane/src/safekeeper.rs | 2 +- control_plane/src/storage_controller.rs | 17 +--- storage_broker/Cargo.toml | 3 + storage_broker/src/bin/storage_broker.rs | 106 +++++++++++++++++--- storage_controller/src/main.rs | 2 +- test_runner/fixtures/neon_fixtures.py | 22 +++-- test_runner/regress/test_ssl.py | 22 +++++ 12 files changed, 278 insertions(+), 119 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 2cf260c88c..4c464c62b8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6616,12 +6616,14 @@ dependencies = [ "anyhow", "async-stream", "bytes", + "camino", "clap", "const_format", "futures", "futures-core", "futures-util", "http-body-util", + "http-utils", "humantime", "hyper 1.4.1", "hyper-util", @@ -6631,6 +6633,7 @@ dependencies = [ "prost 0.13.3", "rustls 0.23.18", "tokio", + "tokio-rustls 0.26.0", "tonic", "tonic-build", "tracing", diff --git a/control_plane/src/bin/neon_local.rs b/control_plane/src/bin/neon_local.rs index 5cf6767361..6f55c0310f 100644 --- a/control_plane/src/bin/neon_local.rs +++ b/control_plane/src/bin/neon_local.rs @@ -17,8 +17,10 @@ use std::time::Duration; use anyhow::{Context, Result, anyhow, bail}; use clap::Parser; use compute_api::spec::ComputeMode; +use control_plane::broker::StorageBroker; use control_plane::endpoint::ComputeControlPlane; use control_plane::endpoint_storage::{ENDPOINT_STORAGE_DEFAULT_PORT, EndpointStorage}; +use control_plane::local_env; use control_plane::local_env::{ EndpointStorageConf, InitForceMode, LocalEnv, NeonBroker, NeonLocalInitConf, NeonLocalInitPageserverConf, SafekeeperConf, @@ -28,7 +30,6 @@ use control_plane::safekeeper::SafekeeperNode; use control_plane::storage_controller::{ NeonStorageControllerStartArgs, NeonStorageControllerStopArgs, StorageController, }; -use control_plane::{broker, local_env}; use nix::fcntl::{FlockArg, flock}; use pageserver_api::config::{ DEFAULT_HTTP_LISTEN_PORT as DEFAULT_PAGESERVER_HTTP_PORT, @@ -988,7 +989,8 @@ fn handle_init(args: &InitCmdArgs) -> anyhow::Result { NeonLocalInitConf { control_plane_api: Some(DEFAULT_PAGESERVER_CONTROL_PLANE_API.parse().unwrap()), broker: NeonBroker { - listen_addr: DEFAULT_BROKER_ADDR.parse().unwrap(), + listen_addr: Some(DEFAULT_BROKER_ADDR.parse().unwrap()), + listen_https_addr: None, }, safekeepers: vec![SafekeeperConf { id: DEFAULT_SAFEKEEPER_ID, @@ -1777,7 +1779,8 @@ async fn handle_endpoint_storage( async fn handle_storage_broker(subcmd: &StorageBrokerCmd, env: &local_env::LocalEnv) -> Result<()> { match subcmd { StorageBrokerCmd::Start(args) => { - if let Err(e) = broker::start_broker_process(env, &args.start_timeout).await { + let storage_broker = StorageBroker::from_env(env); + if let Err(e) = storage_broker.start(&args.start_timeout).await { eprintln!("broker start failed: {e}"); exit(1); } @@ -1785,7 +1788,8 @@ async fn handle_storage_broker(subcmd: &StorageBrokerCmd, env: &local_env::Local StorageBrokerCmd::Stop(_args) => { // FIXME: stop_mode unused - if let Err(e) = broker::stop_broker_process(env) { + let storage_broker = StorageBroker::from_env(env); + if let Err(e) = storage_broker.stop() { eprintln!("broker stop failed: {e}"); exit(1); } @@ -1835,8 +1839,11 @@ async fn handle_start_all_impl( #[allow(clippy::redundant_closure_call)] (|| { js.spawn(async move { - let retry_timeout = retry_timeout; - broker::start_broker_process(env, &retry_timeout).await + let storage_broker = StorageBroker::from_env(env); + storage_broker + .start(&retry_timeout) + .await + .map_err(|e| e.context("start storage_broker")) }); js.spawn(async move { @@ -1991,7 +1998,8 @@ async fn try_stop_all(env: &local_env::LocalEnv, immediate: bool) { } } - if let Err(e) = broker::stop_broker_process(env) { + let storage_broker = StorageBroker::from_env(env); + if let Err(e) = storage_broker.stop() { eprintln!("neon broker stop failed: {e:#}"); } diff --git a/control_plane/src/broker.rs b/control_plane/src/broker.rs index 1b507bb384..f43f459636 100644 --- a/control_plane/src/broker.rs +++ b/control_plane/src/broker.rs @@ -3,60 +3,86 @@ //! In the local test environment, the storage broker stores its data directly in //! //! ```text -//! .neon +//! .neon/storage_broker //! ``` use std::time::Duration; use anyhow::Context; use camino::Utf8PathBuf; -use crate::{background_process, local_env}; +use crate::{background_process, local_env::LocalEnv}; -pub async fn start_broker_process( - env: &local_env::LocalEnv, - retry_timeout: &Duration, -) -> anyhow::Result<()> { - let broker = &env.broker; - let listen_addr = &broker.listen_addr; - - print!("Starting neon broker at {}", listen_addr); - - let args = [format!("--listen-addr={listen_addr}")]; - - let client = reqwest::Client::new(); - background_process::start_process( - "storage_broker", - &env.base_data_dir, - &env.storage_broker_bin(), - args, - [], - background_process::InitialPidFile::Create(storage_broker_pid_file_path(env)), - retry_timeout, - || async { - let url = broker.client_url(); - let status_url = url.join("status").with_context(|| { - format!("Failed to append /status path to broker endpoint {url}") - })?; - let request = client - .get(status_url) - .build() - .with_context(|| format!("Failed to construct request to broker endpoint {url}"))?; - match client.execute(request).await { - Ok(resp) => Ok(resp.status().is_success()), - Err(_) => Ok(false), - } - }, - ) - .await - .context("Failed to spawn storage_broker subprocess")?; - Ok(()) +pub struct StorageBroker { + env: LocalEnv, } -pub fn stop_broker_process(env: &local_env::LocalEnv) -> anyhow::Result<()> { - background_process::stop_process(true, "storage_broker", &storage_broker_pid_file_path(env)) -} +impl StorageBroker { + /// Create a new `StorageBroker` instance from the environment. + pub fn from_env(env: &LocalEnv) -> Self { + Self { env: env.clone() } + } -fn storage_broker_pid_file_path(env: &local_env::LocalEnv) -> Utf8PathBuf { - Utf8PathBuf::from_path_buf(env.base_data_dir.join("storage_broker.pid")) - .expect("non-Unicode path") + pub fn initialize(&self) -> anyhow::Result<()> { + if self.env.generate_local_ssl_certs { + self.env.generate_ssl_cert( + &self.env.storage_broker_data_dir().join("server.crt"), + &self.env.storage_broker_data_dir().join("server.key"), + )?; + } + Ok(()) + } + + /// Start the storage broker process. + pub async fn start(&self, retry_timeout: &Duration) -> anyhow::Result<()> { + let broker = &self.env.broker; + + print!("Starting neon broker at {}", broker.client_url()); + + let mut args = Vec::new(); + + if let Some(addr) = &broker.listen_addr { + args.push(format!("--listen-addr={addr}")); + } + if let Some(addr) = &broker.listen_https_addr { + args.push(format!("--listen-https-addr={addr}")); + } + + let client = self.env.create_http_client(); + background_process::start_process( + "storage_broker", + &self.env.storage_broker_data_dir(), + &self.env.storage_broker_bin(), + args, + [], + background_process::InitialPidFile::Create(self.pid_file_path()), + retry_timeout, + || async { + let url = broker.client_url(); + let status_url = url.join("status").with_context(|| { + format!("Failed to append /status path to broker endpoint {url}") + })?; + let request = client.get(status_url).build().with_context(|| { + format!("Failed to construct request to broker endpoint {url}") + })?; + match client.execute(request).await { + Ok(resp) => Ok(resp.status().is_success()), + Err(_) => Ok(false), + } + }, + ) + .await + .context("Failed to spawn storage_broker subprocess")?; + Ok(()) + } + + /// Stop the storage broker process. + pub fn stop(&self) -> anyhow::Result<()> { + background_process::stop_process(true, "storage_broker", &self.pid_file_path()) + } + + /// Get the path to the PID file for the storage broker. + fn pid_file_path(&self) -> Utf8PathBuf { + Utf8PathBuf::from_path_buf(self.env.base_data_dir.join("storage_broker.pid")) + .expect("non-Unicode path") + } } diff --git a/control_plane/src/local_env.rs b/control_plane/src/local_env.rs index 77d5c1c922..a18b34daa4 100644 --- a/control_plane/src/local_env.rs +++ b/control_plane/src/local_env.rs @@ -4,7 +4,7 @@ //! script which will use local paths. use std::collections::HashMap; -use std::net::{IpAddr, Ipv4Addr, SocketAddr}; +use std::net::SocketAddr; use std::path::{Path, PathBuf}; use std::process::{Command, Stdio}; use std::time::Duration; @@ -14,11 +14,12 @@ use anyhow::{Context, bail}; use clap::ValueEnum; use pem::Pem; use postgres_backend::AuthType; -use reqwest::Url; +use reqwest::{Certificate, Url}; use serde::{Deserialize, Serialize}; use utils::auth::encode_from_key_file; use utils::id::{NodeId, TenantId, TenantTimelineId, TimelineId}; +use crate::broker::StorageBroker; use crate::endpoint_storage::{ENDPOINT_STORAGE_REMOTE_STORAGE_DIR, EndpointStorage}; use crate::pageserver::{PAGESERVER_REMOTE_STORAGE_DIR, PageServerNode}; use crate::safekeeper::SafekeeperNode; @@ -157,11 +158,16 @@ pub struct EndpointStorageConf { } /// Broker config for cluster internal communication. -#[derive(Serialize, Deserialize, PartialEq, Eq, Clone, Debug)] +#[derive(Serialize, Deserialize, PartialEq, Eq, Clone, Debug, Default)] #[serde(default)] pub struct NeonBroker { - /// Broker listen address for storage nodes coordination, e.g. '127.0.0.1:50051'. - pub listen_addr: SocketAddr, + /// Broker listen HTTP address for storage nodes coordination, e.g. '127.0.0.1:50051'. + /// At least one of listen_addr or listen_https_addr must be set. + pub listen_addr: Option, + /// Broker listen HTTPS address for storage nodes coordination, e.g. '127.0.0.1:50051'. + /// At least one of listen_addr or listen_https_addr must be set. + /// listen_https_addr is preferred over listen_addr in neon_local. + pub listen_https_addr: Option, } /// A part of storage controller's config the neon_local knows about. @@ -235,18 +241,19 @@ impl Default for NeonStorageControllerConf { } } -// Dummy Default impl to satisfy Deserialize derive. -impl Default for NeonBroker { - fn default() -> Self { - NeonBroker { - listen_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 0), - } - } -} - impl NeonBroker { pub fn client_url(&self) -> Url { - Url::parse(&format!("http://{}", self.listen_addr)).expect("failed to construct url") + let url = if let Some(addr) = self.listen_https_addr { + format!("https://{}", addr) + } else { + format!( + "http://{}", + self.listen_addr + .expect("at least one address should be set") + ) + }; + + Url::parse(&url).expect("failed to construct url") } } @@ -441,6 +448,10 @@ impl LocalEnv { self.base_data_dir.join("endpoints") } + pub fn storage_broker_data_dir(&self) -> PathBuf { + self.base_data_dir.join("storage_broker") + } + pub fn pageserver_data_dir(&self, pageserver_id: NodeId) -> PathBuf { self.base_data_dir .join(format!("pageserver_{pageserver_id}")) @@ -503,6 +514,23 @@ impl LocalEnv { ) } + /// Creates HTTP client with local SSL CA certificates. + pub fn create_http_client(&self) -> reqwest::Client { + let ssl_ca_certs = self.ssl_ca_cert_path().map(|ssl_ca_file| { + let buf = std::fs::read(ssl_ca_file).expect("SSL CA file should exist"); + Certificate::from_pem_bundle(&buf).expect("SSL CA file should be valid") + }); + + let mut http_client = reqwest::Client::builder(); + for ssl_ca_cert in ssl_ca_certs.unwrap_or_default() { + http_client = http_client.add_root_certificate(ssl_ca_cert); + } + + http_client + .build() + .expect("HTTP client should construct with no error") + } + /// Inspect the base data directory and extract the instance id and instance directory path /// for all storage controller instances pub async fn storage_controller_instances(&self) -> std::io::Result> { @@ -911,6 +939,12 @@ impl LocalEnv { // create endpoints dir fs::create_dir_all(env.endpoints_path())?; + // create storage broker dir + fs::create_dir_all(env.storage_broker_data_dir())?; + StorageBroker::from_env(&env) + .initialize() + .context("storage broker init failed")?; + // create safekeeper dirs for safekeeper in &env.safekeepers { fs::create_dir_all(SafekeeperNode::datadir_path_by_id(&env, safekeeper.id))?; diff --git a/control_plane/src/pageserver.rs b/control_plane/src/pageserver.rs index b9257a27bf..79e87eba9b 100644 --- a/control_plane/src/pageserver.rs +++ b/control_plane/src/pageserver.rs @@ -21,7 +21,6 @@ use pageserver_api::shard::TenantShardId; use pageserver_client::mgmt_api; use postgres_backend::AuthType; use postgres_connection::{PgConnectionConfig, parse_host_port}; -use reqwest::Certificate; use utils::auth::{Claims, Scope}; use utils::id::{NodeId, TenantId, TimelineId}; use utils::lsn::Lsn; @@ -51,19 +50,6 @@ impl PageServerNode { parse_host_port(&conf.listen_pg_addr).expect("Unable to parse listen_pg_addr"); let port = port.unwrap_or(5432); - let ssl_ca_certs = env.ssl_ca_cert_path().map(|ssl_ca_file| { - let buf = std::fs::read(ssl_ca_file).expect("SSL root CA file should exist"); - Certificate::from_pem_bundle(&buf).expect("SSL CA file should be valid") - }); - - let mut http_client = reqwest::Client::builder(); - for ssl_ca_cert in ssl_ca_certs.unwrap_or_default() { - http_client = http_client.add_root_certificate(ssl_ca_cert); - } - let http_client = http_client - .build() - .expect("Client constructs with no errors"); - let endpoint = if env.storage_controller.use_https_pageserver_api { format!( "https://{}", @@ -80,7 +66,7 @@ impl PageServerNode { conf: conf.clone(), env: env.clone(), http_client: mgmt_api::Client::new( - http_client, + env.create_http_client(), endpoint, { match conf.http_auth_type { diff --git a/control_plane/src/safekeeper.rs b/control_plane/src/safekeeper.rs index 231871852e..948e3c8c93 100644 --- a/control_plane/src/safekeeper.rs +++ b/control_plane/src/safekeeper.rs @@ -87,7 +87,7 @@ impl SafekeeperNode { conf: conf.clone(), pg_connection_config: Self::safekeeper_connection_config(&listen_addr, conf.pg_port), env: env.clone(), - http_client: reqwest::Client::new(), + http_client: env.create_http_client(), http_base_url: format!("http://{}:{}/v1", listen_addr, conf.http_port), listen_addr, } diff --git a/control_plane/src/storage_controller.rs b/control_plane/src/storage_controller.rs index 62ad5fa8d6..a36815d27e 100644 --- a/control_plane/src/storage_controller.rs +++ b/control_plane/src/storage_controller.rs @@ -20,7 +20,7 @@ use pageserver_api::shard::TenantShardId; use pageserver_client::mgmt_api::ResponseErrorMessageExt; use pem::Pem; use postgres_backend::AuthType; -use reqwest::{Certificate, Method}; +use reqwest::Method; use serde::de::DeserializeOwned; use serde::{Deserialize, Serialize}; use tokio::process::Command; @@ -153,24 +153,11 @@ impl StorageController { } }; - let ssl_ca_certs = env.ssl_ca_cert_path().map(|ssl_ca_file| { - let buf = std::fs::read(ssl_ca_file).expect("SSL CA file should exist"); - Certificate::from_pem_bundle(&buf).expect("SSL CA file should be valid") - }); - - let mut http_client = reqwest::Client::builder(); - for ssl_ca_cert in ssl_ca_certs.unwrap_or_default() { - http_client = http_client.add_root_certificate(ssl_ca_cert); - } - let http_client = http_client - .build() - .expect("HTTP client should construct with no error"); - Self { env: env.clone(), private_key, public_key, - client: http_client, + client: env.create_http_client(), config: env.storage_controller.clone(), listen_port: OnceLock::default(), } diff --git a/storage_broker/Cargo.toml b/storage_broker/Cargo.toml index e4db9a317d..67b276c8fe 100644 --- a/storage_broker/Cargo.toml +++ b/storage_broker/Cargo.toml @@ -11,6 +11,7 @@ bench = [] anyhow.workspace = true async-stream.workspace = true bytes.workspace = true +camino.workspace = true clap = { workspace = true, features = ["derive"] } const_format.workspace = true futures.workspace = true @@ -19,12 +20,14 @@ futures-util.workspace = true humantime.workspace = true hyper = { workspace = true, features = ["full"] } http-body-util.workspace = true +http-utils.workspace = true hyper-util = "0.1" once_cell.workspace = true parking_lot.workspace = true prost.workspace = true tonic.workspace = true tokio = { workspace = true, features = ["rt-multi-thread"] } +tokio-rustls.workspace = true tracing.workspace = true metrics.workspace = true utils.workspace = true diff --git a/storage_broker/src/bin/storage_broker.rs b/storage_broker/src/bin/storage_broker.rs index a7e0c986e6..476d5f03ea 100644 --- a/storage_broker/src/bin/storage_broker.rs +++ b/storage_broker/src/bin/storage_broker.rs @@ -17,10 +17,13 @@ use std::pin::Pin; use std::sync::Arc; use std::time::Duration; +use camino::Utf8PathBuf; use clap::{Parser, command}; +use futures::future::OptionFuture; use futures_core::Stream; use futures_util::StreamExt; use http_body_util::Full; +use http_utils::tls_certs::ReloadingCertificateResolver; use hyper::body::Incoming; use hyper::header::CONTENT_TYPE; use hyper::service::service_fn; @@ -38,7 +41,7 @@ use storage_broker::proto::{ FilterTenantTimelineId, MessageType, SafekeeperDiscoveryRequest, SafekeeperDiscoveryResponse, SafekeeperTimelineInfo, SubscribeByFilterRequest, SubscribeSafekeeperInfoRequest, TypedMessage, }; -use storage_broker::{DEFAULT_KEEPALIVE_INTERVAL, DEFAULT_LISTEN_ADDR, parse_proto_ttid}; +use storage_broker::{DEFAULT_KEEPALIVE_INTERVAL, parse_proto_ttid}; use tokio::net::TcpListener; use tokio::sync::broadcast; use tokio::sync::broadcast::error::RecvError; @@ -59,12 +62,25 @@ project_build_tag!(BUILD_TAG); const DEFAULT_CHAN_SIZE: usize = 32; const DEFAULT_ALL_KEYS_CHAN_SIZE: usize = 16384; +const DEFAULT_SSL_KEY_FILE: &str = "server.key"; +const DEFAULT_SSL_CERT_FILE: &str = "server.crt"; +const DEFAULT_SSL_CERT_RELOAD_PERIOD: &str = "60s"; + #[derive(Parser, Debug)] #[command(version = GIT_VERSION, about = "Broker for neon storage nodes communication", long_about = None)] +#[clap(group( + clap::ArgGroup::new("listen-addresses") + .required(true) + .multiple(true) + .args(&["listen_addr", "listen_https_addr"]), +))] struct Args { - /// Endpoint to listen on. - #[arg(short, long, default_value = DEFAULT_LISTEN_ADDR)] - listen_addr: SocketAddr, + /// Endpoint to listen HTTP on. + #[arg(short, long)] + listen_addr: Option, + /// Endpoint to listen HTTPS on. + #[arg(long)] + listen_https_addr: Option, /// Size of the queue to the per timeline subscriber. #[arg(long, default_value_t = DEFAULT_CHAN_SIZE)] timeline_chan_size: usize, @@ -72,11 +88,20 @@ struct Args { #[arg(long, default_value_t = DEFAULT_ALL_KEYS_CHAN_SIZE)] all_keys_chan_size: usize, /// HTTP/2 keepalive interval. - #[arg(long, value_parser= humantime::parse_duration, default_value = DEFAULT_KEEPALIVE_INTERVAL)] + #[arg(long, value_parser = humantime::parse_duration, default_value = DEFAULT_KEEPALIVE_INTERVAL)] http2_keepalive_interval: Duration, /// Format for logging, either 'plain' or 'json'. #[arg(long, default_value = "plain")] log_format: String, + /// Path to a file with certificate's private key for https API. + #[arg(long, default_value = DEFAULT_SSL_KEY_FILE)] + ssl_key_file: Utf8PathBuf, + /// Path to a file with a X509 certificate for https API. + #[arg(long, default_value = DEFAULT_SSL_CERT_FILE)] + ssl_cert_file: Utf8PathBuf, + /// Period to reload certificate and private key from files. + #[arg(long, value_parser = humantime::parse_duration, default_value = DEFAULT_SSL_CERT_RELOAD_PERIOD)] + ssl_cert_reload_period: Duration, } /// Id of publisher for registering in maps @@ -674,12 +699,50 @@ async fn main() -> Result<(), Box> { }; let storage_broker_server = BrokerServiceServer::new(storage_broker_impl); + let http_listener = match &args.listen_addr { + Some(addr) => { + info!("listening HTTP on {}", addr); + Some(TcpListener::bind(addr).await?) + } + None => None, + }; + + let (https_listener, tls_acceptor) = match &args.listen_https_addr { + Some(addr) => { + let listener = TcpListener::bind(addr).await?; + + let cert_resolver = ReloadingCertificateResolver::new( + "main", + &args.ssl_key_file, + &args.ssl_cert_file, + args.ssl_cert_reload_period, + ) + .await?; + + let mut tls_config = rustls::ServerConfig::builder() + .with_no_client_auth() + .with_cert_resolver(cert_resolver); + + // Tonic is HTTP/2 only and it negotiates it with ALPN. + tls_config.alpn_protocols = vec![b"h2".to_vec(), b"http/1.1".to_vec()]; + + let acceptor = tokio_rustls::TlsAcceptor::from(Arc::new(tls_config)); + + info!("listening HTTPS on {}", addr); + (Some(listener), Some(acceptor)) + } + None => (None, None), + }; + // grpc is served along with http1 for metrics on a single port, hence we // don't use tonic's Server. - let tcp_listener = TcpListener::bind(&args.listen_addr).await?; - info!("listening on {}", &args.listen_addr); loop { - let (stream, addr) = match tcp_listener.accept().await { + let (conn, is_https) = tokio::select! { + Some(conn) = OptionFuture::from(http_listener.as_ref().map(|l| l.accept())) => (conn, false), + Some(conn) = OptionFuture::from(https_listener.as_ref().map(|l| l.accept())) => (conn, true), + }; + + let (tcp_stream, addr) = match conn { Ok(v) => v, Err(e) => { info!("couldn't accept connection: {e}"); @@ -734,13 +797,32 @@ async fn main() -> Result<(), Box> { } .await; + let tls_acceptor = tls_acceptor.clone(); + tokio::task::spawn(async move { - let res = builder - .serve_connection(TokioIo::new(stream), service_fn_) - .await; + let res = if is_https { + let tls_acceptor = + tls_acceptor.expect("tls_acceptor is set together with https_listener"); + + let tls_stream = match tls_acceptor.accept(tcp_stream).await { + Ok(tls_stream) => tls_stream, + Err(e) => { + info!("error accepting TLS connection from {addr}: {e}"); + return; + } + }; + + builder + .serve_connection(TokioIo::new(tls_stream), service_fn_) + .await + } else { + builder + .serve_connection(TokioIo::new(tcp_stream), service_fn_) + .await + }; if let Err(e) = res { - info!("error serving connection from {addr}: {e}"); + info!(%is_https, "error serving connection from {addr}: {e}"); } }); } diff --git a/storage_controller/src/main.rs b/storage_controller/src/main.rs index a924e5b6c5..71dde9e126 100644 --- a/storage_controller/src/main.rs +++ b/storage_controller/src/main.rs @@ -196,7 +196,7 @@ struct Cli { ssl_cert_reload_period: humantime::Duration, /// Trusted root CA certificates to use in https APIs. #[arg(long)] - ssl_ca_file: Option, + ssl_ca_file: Option, /// Neon local specific flag. When set, ignore [`Cli::control_plane_url`] and deliver /// the compute notification directly (instead of via control plane). diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index 48aa739ce4..1d668d4b2d 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -501,6 +501,9 @@ class NeonEnvBuilder: # Flag to use https listener in storage controller, generate local ssl certs, # and force pageservers and neon_local to use https for storage controller api. self.use_https_storage_controller_api: bool = False + # Flag to use https listener in storage broker, generate local ssl certs, + # and force pageservers and safekeepers to use https for storage broker api. + self.use_https_storage_broker_api: bool = False self.pageserver_virtual_file_io_engine: str | None = pageserver_virtual_file_io_engine self.pageserver_get_vectored_concurrent_io: str | None = ( @@ -1086,7 +1089,7 @@ class NeonEnv: self.safekeepers: list[Safekeeper] = [] self.pageservers: list[NeonPageserver] = [] self.num_azs = config.num_azs - self.broker = NeonBroker(self) + self.broker = NeonBroker(self, config.use_https_storage_broker_api) self.pageserver_remote_storage = config.pageserver_remote_storage self.safekeepers_remote_storage = config.safekeepers_remote_storage self.pg_version = config.pg_version @@ -1106,6 +1109,7 @@ class NeonEnv: config.use_https_pageserver_api or config.use_https_safekeeper_api or config.use_https_storage_controller_api + or config.use_https_storage_broker_api ) self.ssl_ca_file = ( self.repo_dir.joinpath("rootCA.crt") if self.generate_local_ssl_certs else None @@ -1178,15 +1182,18 @@ class NeonEnv: # Create the neon_local's `NeonLocalInitConf` cfg: dict[str, Any] = { "default_tenant_id": str(self.initial_tenant), - "broker": { - "listen_addr": self.broker.listen_addr(), - }, + "broker": {}, "safekeepers": [], "pageservers": [], "endpoint_storage": {"port": self.port_distributor.get_port()}, "generate_local_ssl_certs": self.generate_local_ssl_certs, } + if config.use_https_storage_broker_api: + cfg["broker"]["listen_https_addr"] = self.broker.listen_addr() + else: + cfg["broker"]["listen_addr"] = self.broker.listen_addr() + if self.control_plane_api is not None: cfg["control_plane_api"] = self.control_plane_api @@ -4933,9 +4940,10 @@ class Safekeeper(LogUtils): class NeonBroker(LogUtils): """An object managing storage_broker instance""" - def __init__(self, env: NeonEnv): - super().__init__(logfile=env.repo_dir / "storage_broker.log") + def __init__(self, env: NeonEnv, use_https: bool): + super().__init__(logfile=env.repo_dir / "storage_broker" / "storage_broker.log") self.env = env + self.scheme = "https" if use_https else "http" self.port: int = self.env.port_distributor.get_port() self.running = False @@ -4958,7 +4966,7 @@ class NeonBroker(LogUtils): return f"127.0.0.1:{self.port}" def client_url(self): - return f"http://{self.listen_addr()}" + return f"{self.scheme}://{self.listen_addr()}" def assert_no_errors(self): assert_no_errors(self.logfile, "storage_controller", []) diff --git a/test_runner/regress/test_ssl.py b/test_runner/regress/test_ssl.py index 39c94c05a9..62879834c3 100644 --- a/test_runner/regress/test_ssl.py +++ b/test_runner/regress/test_ssl.py @@ -6,6 +6,7 @@ import pytest import requests from fixtures.neon_fixtures import NeonEnvBuilder, StorageControllerApiException from fixtures.utils import wait_until +from fixtures.workload import Workload def test_pageserver_https_api(neon_env_builder: NeonEnvBuilder): @@ -212,3 +213,24 @@ def test_server_and_cert_metrics(neon_env_builder: NeonEnvBuilder): assert reload_error_cnt > 0 wait_until(reload_failed) + + +def test_storage_broker_https_api(neon_env_builder: NeonEnvBuilder): + """ + Test HTTPS storage broker API. + 1. Make /status request to HTTPS API to ensure it's appropriately configured. + 2. Generate simple workload to ensure that SK -> broker -> PS communication works well. + """ + neon_env_builder.use_https_storage_broker_api = True + env = neon_env_builder.init_start() + + # 1. Simple check that HTTPS is enabled and works. + url = env.broker.client_url() + "/status" + assert url.startswith("https://") + requests.get(url, verify=str(env.ssl_ca_file)).raise_for_status() + + # 2. Simple workload to check that SK -> broker -> PS communication works over HTTPS. + workload = Workload(env, env.initial_tenant, env.initial_timeline) + workload.init() + workload.write_rows(10) + workload.validate()