diff --git a/Cargo.lock b/Cargo.lock index f15c6e857f..fa40009769 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2848,6 +2848,7 @@ dependencies = [ "anyhow", "bytes", "fail", + "futures", "hyper 0.14.30", "itertools 0.10.5", "jemalloc_pprof", @@ -2861,6 +2862,7 @@ dependencies = [ "serde_path_to_error", "thiserror 1.0.69", "tokio", + "tokio-rustls 0.26.0", "tokio-stream", "tokio-util", "tracing", @@ -4189,6 +4191,7 @@ dependencies = [ "pageserver_api", "pageserver_client", "rand 0.8.5", + "reqwest", "serde", "serde_json", "tokio", @@ -4278,6 +4281,9 @@ dependencies = [ "remote_storage", "reqwest", "rpds", + "rustls 0.23.18", + "rustls-pemfile 2.1.1", + "rustls-pki-types", "scopeguard", "send-future", "serde", @@ -4296,6 +4302,7 @@ dependencies = [ "tokio-epoll-uring", "tokio-io-timeout", "tokio-postgres", + "tokio-rustls 0.26.0", "tokio-stream", "tokio-tar", "tokio-util", @@ -5908,9 +5915,9 @@ dependencies = [ [[package]] name = "rustls-pki-types" -version = "1.10.0" +version = "1.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "16f1201b3c9a7ee8039bcadc17b7e605e2945b27eee7631788c1bd2b0643674b" +checksum = "917ce264624a4b4db1c364dcc35bfca9ded014d0a958cd47ad3e960e988ea51c" [[package]] name = "rustls-webpki" diff --git a/Cargo.toml b/Cargo.toml index 427d1e98ad..c59c4c5435 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -155,6 +155,7 @@ rpds = "0.13" rustc-hash = "1.1.0" rustls = { version = "0.23.16", default-features = false } rustls-pemfile = "2" +rustls-pki-types = "1.11" scopeguard = "1.1" sysinfo = "0.29.2" sd-notify = "0.4.1" diff --git a/control_plane/src/bin/neon_local.rs b/control_plane/src/bin/neon_local.rs index 375b5d87d0..ba1411b615 100644 --- a/control_plane/src/bin/neon_local.rs +++ b/control_plane/src/bin/neon_local.rs @@ -963,6 +963,7 @@ fn handle_init(args: &InitCmdArgs) -> anyhow::Result { id: pageserver_id, listen_pg_addr: format!("127.0.0.1:{pg_port}"), listen_http_addr: format!("127.0.0.1:{http_port}"), + listen_https_addr: None, pg_auth_type: AuthType::Trust, http_auth_type: AuthType::Trust, other: Default::default(), @@ -977,6 +978,7 @@ fn handle_init(args: &InitCmdArgs) -> anyhow::Result { default_tenant_id: TenantId::from_array(std::array::from_fn(|_| 0)), storage_controller: None, control_plane_compute_hook_api: None, + generate_local_ssl_certs: false, } }; diff --git a/control_plane/src/local_env.rs b/control_plane/src/local_env.rs index f4026efbbf..2e8fb8f07b 100644 --- a/control_plane/src/local_env.rs +++ b/control_plane/src/local_env.rs @@ -81,6 +81,10 @@ pub struct LocalEnv { // but deserialization into a generic toml object as `toml::Value::try_from` fails with an error. // https://toml.io/en/v1.0.0 does not contain a concept of "a table inside another table". pub branch_name_mappings: HashMap>, + + /// Flag to generate SSL certificates for components that need it. + /// Also generates root CA certificate that is used to sign all other certificates. + pub generate_local_ssl_certs: bool, } /// On-disk state stored in `.neon/config`. @@ -102,6 +106,10 @@ pub struct OnDiskConfig { pub control_plane_api: Option, pub control_plane_compute_hook_api: Option, branch_name_mappings: HashMap>, + // Note: skip serializing because in compat tests old storage controller fails + // to load new config file. May be removed after this field is in release branch. + #[serde(skip_serializing_if = "std::ops::Not::not")] + pub generate_local_ssl_certs: bool, } fn fail_if_pageservers_field_specified<'de, D>(_: D) -> Result, D::Error> @@ -129,6 +137,7 @@ pub struct NeonLocalInitConf { pub safekeepers: Vec, pub control_plane_api: Option, pub control_plane_compute_hook_api: Option>, + pub generate_local_ssl_certs: bool, } /// Broker config for cluster internal communication. @@ -165,6 +174,9 @@ pub struct NeonStorageControllerConf { #[serde(with = "humantime_serde")] pub long_reconcile_threshold: Option, + + #[serde(default)] + pub use_https_pageserver_api: bool, } impl NeonStorageControllerConf { @@ -188,6 +200,7 @@ impl Default for NeonStorageControllerConf { max_secondary_lag_bytes: None, heartbeat_interval: Self::DEFAULT_HEARTBEAT_INTERVAL, long_reconcile_threshold: None, + use_https_pageserver_api: false, } } } @@ -217,6 +230,7 @@ pub struct PageServerConf { pub id: NodeId, pub listen_pg_addr: String, pub listen_http_addr: String, + pub listen_https_addr: Option, pub pg_auth_type: AuthType, pub http_auth_type: AuthType, pub no_sync: bool, @@ -228,6 +242,7 @@ impl Default for PageServerConf { id: NodeId(0), listen_pg_addr: String::new(), listen_http_addr: String::new(), + listen_https_addr: None, pg_auth_type: AuthType::Trust, http_auth_type: AuthType::Trust, no_sync: false, @@ -243,6 +258,7 @@ pub struct NeonLocalInitPageserverConf { pub id: NodeId, pub listen_pg_addr: String, pub listen_http_addr: String, + pub listen_https_addr: Option, pub pg_auth_type: AuthType, pub http_auth_type: AuthType, #[serde(default, skip_serializing_if = "std::ops::Not::not")] @@ -257,6 +273,7 @@ impl From<&NeonLocalInitPageserverConf> for PageServerConf { id, listen_pg_addr, listen_http_addr, + listen_https_addr, pg_auth_type, http_auth_type, no_sync, @@ -266,6 +283,7 @@ impl From<&NeonLocalInitPageserverConf> for PageServerConf { id: *id, listen_pg_addr: listen_pg_addr.clone(), listen_http_addr: listen_http_addr.clone(), + listen_https_addr: listen_https_addr.clone(), pg_auth_type: *pg_auth_type, http_auth_type: *http_auth_type, no_sync: *no_sync, @@ -410,6 +428,41 @@ impl LocalEnv { } } + pub fn ssl_ca_cert_path(&self) -> Option { + if self.generate_local_ssl_certs { + Some(self.base_data_dir.join("rootCA.crt")) + } else { + None + } + } + + pub fn ssl_ca_key_path(&self) -> Option { + if self.generate_local_ssl_certs { + Some(self.base_data_dir.join("rootCA.key")) + } else { + None + } + } + + pub fn generate_ssl_ca_cert(&self) -> anyhow::Result<()> { + let cert_path = self.ssl_ca_cert_path().unwrap(); + let key_path = self.ssl_ca_key_path().unwrap(); + if !fs::exists(cert_path.as_path())? { + generate_ssl_ca_cert(cert_path.as_path(), key_path.as_path())?; + } + Ok(()) + } + + pub fn generate_ssl_cert(&self, cert_path: &Path, key_path: &Path) -> anyhow::Result<()> { + self.generate_ssl_ca_cert()?; + generate_ssl_cert( + cert_path, + key_path, + self.ssl_ca_cert_path().unwrap().as_path(), + self.ssl_ca_key_path().unwrap().as_path(), + ) + } + /// Inspect the base data directory and extract the instance id and instance directory path /// for all storage controller instances pub async fn storage_controller_instances(&self) -> std::io::Result> { @@ -519,6 +572,7 @@ impl LocalEnv { control_plane_api, control_plane_compute_hook_api, branch_name_mappings, + generate_local_ssl_certs, } = on_disk_config; LocalEnv { base_data_dir: repopath.to_owned(), @@ -533,6 +587,7 @@ impl LocalEnv { control_plane_api: control_plane_api.unwrap(), control_plane_compute_hook_api, branch_name_mappings, + generate_local_ssl_certs, } }; @@ -568,6 +623,7 @@ impl LocalEnv { struct PageserverConfigTomlSubset { listen_pg_addr: String, listen_http_addr: String, + listen_https_addr: Option, pg_auth_type: AuthType, http_auth_type: AuthType, #[serde(default)] @@ -592,6 +648,7 @@ impl LocalEnv { let PageserverConfigTomlSubset { listen_pg_addr, listen_http_addr, + listen_https_addr, pg_auth_type, http_auth_type, no_sync, @@ -609,6 +666,7 @@ impl LocalEnv { }, listen_pg_addr, listen_http_addr, + listen_https_addr, pg_auth_type, http_auth_type, no_sync, @@ -636,6 +694,7 @@ impl LocalEnv { control_plane_api: Some(self.control_plane_api.clone()), control_plane_compute_hook_api: self.control_plane_compute_hook_api.clone(), branch_name_mappings: self.branch_name_mappings.clone(), + generate_local_ssl_certs: self.generate_local_ssl_certs, }, ) } @@ -718,6 +777,7 @@ impl LocalEnv { safekeepers, control_plane_api, control_plane_compute_hook_api, + generate_local_ssl_certs, } = conf; // Find postgres binaries. @@ -766,8 +826,13 @@ impl LocalEnv { control_plane_api: control_plane_api.unwrap(), control_plane_compute_hook_api: control_plane_compute_hook_api.unwrap_or_default(), branch_name_mappings: Default::default(), + generate_local_ssl_certs, }; + if generate_local_ssl_certs { + env.generate_ssl_ca_cert()?; + } + // create endpoints dir fs::create_dir_all(env.endpoints_path())?; @@ -851,3 +916,80 @@ fn generate_auth_keys(private_key_path: &Path, public_key_path: &Path) -> anyhow } Ok(()) } + +fn generate_ssl_ca_cert(cert_path: &Path, key_path: &Path) -> anyhow::Result<()> { + // openssl req -x509 -newkey rsa:2048 -nodes -subj "/CN=Neon Local CA" -days 36500 \ + // -out rootCA.crt -keyout rootCA.key + let keygen_output = Command::new("openssl") + .args([ + "req", "-x509", "-newkey", "rsa:2048", "-nodes", "-days", "36500", + ]) + .args(["-subj", "/CN=Neon Local CA"]) + .args(["-out", cert_path.to_str().unwrap()]) + .args(["-keyout", key_path.to_str().unwrap()]) + .output() + .context("failed to generate CA certificate")?; + if !keygen_output.status.success() { + bail!( + "openssl failed: '{}'", + String::from_utf8_lossy(&keygen_output.stderr) + ); + } + Ok(()) +} + +fn generate_ssl_cert( + cert_path: &Path, + key_path: &Path, + ca_cert_path: &Path, + ca_key_path: &Path, +) -> anyhow::Result<()> { + // Generate Certificate Signing Request (CSR). + let mut csr_path = cert_path.to_path_buf(); + csr_path.set_extension(".csr"); + + // openssl req -new -nodes -newkey rsa:2048 -keyout server.key -out server.csr \ + // -subj "/CN=localhost" -addext "subjectAltName=DNS:localhost,IP:127.0.0.1" + let keygen_output = Command::new("openssl") + .args(["req", "-new", "-nodes"]) + .args(["-newkey", "rsa:2048"]) + .args(["-subj", "/CN=localhost"]) + .args(["-addext", "subjectAltName=DNS:localhost,IP:127.0.0.1"]) + .args(["-keyout", key_path.to_str().unwrap()]) + .args(["-out", csr_path.to_str().unwrap()]) + .output() + .context("failed to generate CSR")?; + if !keygen_output.status.success() { + bail!( + "openssl failed: '{}'", + String::from_utf8_lossy(&keygen_output.stderr) + ); + } + + // Sign CSR with CA key. + // + // openssl x509 -req -in server.csr -CA rootCA.crt -CAkey rootCA.key -CAcreateserial \ + // -out server.crt -days 36500 -copy_extensions copyall + let keygen_output = Command::new("openssl") + .args(["x509", "-req"]) + .args(["-in", csr_path.to_str().unwrap()]) + .args(["-CA", ca_cert_path.to_str().unwrap()]) + .args(["-CAkey", ca_key_path.to_str().unwrap()]) + .arg("-CAcreateserial") + .args(["-out", cert_path.to_str().unwrap()]) + .args(["-days", "36500"]) + .args(["-copy_extensions", "copyall"]) + .output() + .context("failed to sign CSR")?; + if !keygen_output.status.success() { + bail!( + "openssl failed: '{}'", + String::from_utf8_lossy(&keygen_output.stderr) + ); + } + + // Remove CSR file as it's not needed anymore. + fs::remove_file(csr_path)?; + + Ok(()) +} diff --git a/control_plane/src/pageserver.rs b/control_plane/src/pageserver.rs index 39656bdbbe..eeaad10d26 100644 --- a/control_plane/src/pageserver.rs +++ b/control_plane/src/pageserver.rs @@ -21,6 +21,7 @@ use pageserver_api::shard::TenantShardId; use pageserver_client::mgmt_api; use postgres_backend::AuthType; use postgres_connection::{PgConnectionConfig, parse_host_port}; +use reqwest::Certificate; use utils::auth::{Claims, Scope}; use utils::id::{NodeId, TenantId, TimelineId}; use utils::lsn::Lsn; @@ -49,12 +50,29 @@ impl PageServerNode { let (host, port) = parse_host_port(&conf.listen_pg_addr).expect("Unable to parse listen_pg_addr"); let port = port.unwrap_or(5432); + + let ssl_ca_cert = env.ssl_ca_cert_path().map(|ssl_ca_file| { + let buf = std::fs::read(ssl_ca_file).expect("SSL root CA file should exist"); + Certificate::from_pem(&buf).expect("CA certificate should be valid") + }); + + let endpoint = if env.storage_controller.use_https_pageserver_api { + format!( + "https://{}", + conf.listen_https_addr.as_ref().expect( + "listen https address should be specified if use_https_pageserver_api is on" + ) + ) + } else { + format!("http://{}", conf.listen_http_addr) + }; + Self { pg_connection_config: PgConnectionConfig::new_host_port(host, port), conf: conf.clone(), env: env.clone(), http_client: mgmt_api::Client::new( - format!("http://{}", conf.listen_http_addr), + endpoint, { match conf.http_auth_type { AuthType::Trust => None, @@ -65,7 +83,9 @@ impl PageServerNode { } } .as_deref(), - ), + ssl_ca_cert, + ) + .expect("Client constructs with no errors"), } } @@ -220,6 +240,13 @@ impl PageServerNode { .context("write identity toml")?; drop(identity_toml); + if self.env.generate_local_ssl_certs { + self.env.generate_ssl_cert( + datadir.join("server.crt").as_path(), + datadir.join("server.key").as_path(), + )?; + } + // TODO: invoke a TBD config-check command to validate that pageserver will start with the written config // Write metadata file, used by pageserver on startup to register itself with @@ -230,6 +257,15 @@ impl PageServerNode { parse_host_port(&self.conf.listen_http_addr).expect("Unable to parse listen_http_addr"); let http_port = http_port.unwrap_or(9898); + let https_port = match self.conf.listen_https_addr.as_ref() { + Some(https_addr) => { + let (_https_host, https_port) = + parse_host_port(https_addr).expect("Unable to parse listen_https_addr"); + Some(https_port.unwrap_or(9899)) + } + None => None, + }; + // Intentionally hand-craft JSON: this acts as an implicit format compat test // in case the pageserver-side structure is edited, and reflects the real life // situation: the metadata is written by some other script. @@ -240,6 +276,7 @@ impl PageServerNode { postgres_port: self.pg_connection_config.port(), http_host: "localhost".to_string(), http_port, + https_port, other: HashMap::from([( "availability_zone_id".to_string(), serde_json::json!(az_id), diff --git a/control_plane/src/storage_controller.rs b/control_plane/src/storage_controller.rs index 3604e4a241..1df50e211c 100644 --- a/control_plane/src/storage_controller.rs +++ b/control_plane/src/storage_controller.rs @@ -534,6 +534,14 @@ impl StorageController { args.push("--start-as-candidate".to_string()); } + if self.config.use_https_pageserver_api { + args.push("--use-https-pageserver-api".to_string()); + } + + if let Some(ssl_ca_file) = self.env.ssl_ca_cert_path() { + args.push(format!("--ssl-ca-file={}", ssl_ca_file.to_str().unwrap())); + } + if let Some(private_key) = &self.private_key { let claims = Claims::new(None, Scope::PageServerApi); let jwt_token = diff --git a/control_plane/storcon_cli/src/main.rs b/control_plane/storcon_cli/src/main.rs index c3f157a9cc..b5c4f21e97 100644 --- a/control_plane/storcon_cli/src/main.rs +++ b/control_plane/storcon_cli/src/main.rs @@ -1,4 +1,5 @@ use std::collections::{HashMap, HashSet}; +use std::path::PathBuf; use std::str::FromStr; use std::time::Duration; @@ -278,6 +279,10 @@ struct Cli { /// a token with both scopes to use with this tool. jwt: Option, + #[arg(long)] + /// Trusted root CA certificate to use in https APIs. + ssl_ca_file: Option, + #[command(subcommand)] command: Command, } @@ -388,9 +393,17 @@ async fn main() -> anyhow::Result<()> { let storcon_client = Client::new(cli.api.clone(), cli.jwt.clone()); + let ssl_ca_cert = match &cli.ssl_ca_file { + Some(ssl_ca_file) => { + let buf = tokio::fs::read(ssl_ca_file).await?; + Some(reqwest::Certificate::from_pem(&buf)?) + } + None => None, + }; + let mut trimmed = cli.api.to_string(); trimmed.pop(); - let vps_client = mgmt_api::Client::new(trimmed, cli.jwt.as_deref()); + let vps_client = mgmt_api::Client::new(trimmed, cli.jwt.as_deref(), ssl_ca_cert)?; match cli.command { Command::NodeRegister { diff --git a/libs/http-utils/Cargo.toml b/libs/http-utils/Cargo.toml index d16dac7876..00b3777a63 100644 --- a/libs/http-utils/Cargo.toml +++ b/libs/http-utils/Cargo.toml @@ -8,6 +8,7 @@ license.workspace = true anyhow.workspace = true bytes.workspace = true fail.workspace = true +futures.workspace = true hyper0.workspace = true itertools.workspace = true jemalloc_pprof.workspace = true @@ -21,6 +22,7 @@ serde_path_to_error.workspace = true thiserror.workspace = true tracing.workspace = true tokio.workspace = true +tokio-rustls.workspace = true tokio-util.workspace = true url.workspace = true uuid.workspace = true diff --git a/libs/http-utils/src/lib.rs b/libs/http-utils/src/lib.rs index 1e9b3c761a..dd520ef69b 100644 --- a/libs/http-utils/src/lib.rs +++ b/libs/http-utils/src/lib.rs @@ -3,9 +3,10 @@ pub mod error; pub mod failpoints; pub mod json; pub mod request; +pub mod server; extern crate hyper0 as hyper; /// Current fast way to apply simple http routing in various Neon binaries. /// Re-exported for sake of uniform approach, that could be later replaced with better alternatives, if needed. -pub use routerify::{RouterBuilder, RouterService, ext::RequestExt}; +pub use routerify::{RequestServiceBuilder, RouterBuilder, RouterService, ext::RequestExt}; diff --git a/libs/http-utils/src/server.rs b/libs/http-utils/src/server.rs new file mode 100644 index 0000000000..33e4915e99 --- /dev/null +++ b/libs/http-utils/src/server.rs @@ -0,0 +1,155 @@ +use std::{error::Error, sync::Arc}; + +use futures::StreamExt; +use futures::stream::FuturesUnordered; +use hyper0::Body; +use hyper0::server::conn::Http; +use routerify::{RequestService, RequestServiceBuilder}; +use tokio::io::{AsyncRead, AsyncWrite}; +use tokio_rustls::TlsAcceptor; +use tokio_util::sync::CancellationToken; +use tracing::{error, info}; + +use crate::error::ApiError; + +/// A simple HTTP server over hyper library. +/// You may want to use it instead of [`hyper0::server::Server`] because: +/// 1. hyper0's Server was removed from hyper v1. +/// It's recommended to replace hyepr0's Server with a manual loop, which is done here. +/// 2. hyper0's Server doesn't support TLS out of the box, and there is no way +/// to support it efficiently with the Accept trait that hyper0's Server uses. +/// That's one of the reasons why it was removed from v1. +/// +pub struct Server { + request_service: Arc>, + listener: tokio::net::TcpListener, + tls_acceptor: Option, +} + +impl Server { + pub fn new( + request_service: Arc>, + listener: std::net::TcpListener, + tls_acceptor: Option, + ) -> anyhow::Result { + // Note: caller of from_std is responsible for setting nonblocking mode. + listener.set_nonblocking(true)?; + let listener = tokio::net::TcpListener::from_std(listener)?; + + Ok(Self { + request_service, + listener, + tls_acceptor, + }) + } + + pub async fn serve(self, cancel: CancellationToken) -> anyhow::Result<()> { + fn suppress_io_error(err: &std::io::Error) -> bool { + use std::io::ErrorKind::*; + matches!(err.kind(), ConnectionReset | ConnectionAborted | BrokenPipe) + } + fn suppress_hyper_error(err: &hyper0::Error) -> bool { + if err.is_incomplete_message() || err.is_closed() || err.is_timeout() { + return true; + } + if let Some(inner) = err.source() { + if let Some(io) = inner.downcast_ref::() { + return suppress_io_error(io); + } + } + false + } + + let mut connections = FuturesUnordered::new(); + loop { + tokio::select! { + stream = self.listener.accept() => { + let (tcp_stream, remote_addr) = match stream { + Ok(stream) => stream, + Err(err) => { + if !suppress_io_error(&err) { + info!("Failed to accept TCP connection: {err:#}"); + } + continue; + } + }; + + let service = self.request_service.build(remote_addr); + let tls_acceptor = self.tls_acceptor.clone(); + let cancel = cancel.clone(); + + connections.push(tokio::spawn( + async move { + match tls_acceptor { + Some(tls_acceptor) => { + // Handle HTTPS connection. + let tls_stream = tokio::select! { + tls_stream = tls_acceptor.accept(tcp_stream) => tls_stream, + _ = cancel.cancelled() => return, + }; + let tls_stream = match tls_stream { + Ok(tls_stream) => tls_stream, + Err(err) => { + if !suppress_io_error(&err) { + info!("Failed to accept TLS connection: {err:#}"); + } + return; + } + }; + if let Err(err) = Self::serve_connection(tls_stream, service, cancel).await { + if !suppress_hyper_error(&err) { + info!("Failed to serve HTTPS connection: {err:#}"); + } + } + } + None => { + // Handle HTTP connection. + if let Err(err) = Self::serve_connection(tcp_stream, service, cancel).await { + if !suppress_hyper_error(&err) { + info!("Failed to serve HTTP connection: {err:#}"); + } + } + } + }; + })); + } + Some(conn) = connections.next() => { + if let Err(err) = conn { + error!("Connection panicked: {err:#}"); + } + } + _ = cancel.cancelled() => { + // Wait for graceful shutdown of all connections. + while let Some(conn) = connections.next().await { + if let Err(err) = conn { + error!("Connection panicked: {err:#}"); + } + } + break; + } + } + } + Ok(()) + } + + /// Serves HTTP connection with graceful shutdown. + async fn serve_connection( + io: I, + service: RequestService, + cancel: CancellationToken, + ) -> Result<(), hyper0::Error> + where + I: AsyncRead + AsyncWrite + Unpin + Send + 'static, + { + let mut conn = Http::new().serve_connection(io, service).with_upgrades(); + + tokio::select! { + res = &mut conn => res, + _ = cancel.cancelled() => { + Pin::new(&mut conn).graceful_shutdown(); + // Note: connection should still be awaited for graceful shutdown to complete. + conn.await + } + } + } +} diff --git a/libs/pageserver_api/src/config.rs b/libs/pageserver_api/src/config.rs index f387ff0579..ce7de1e0c7 100644 --- a/libs/pageserver_api/src/config.rs +++ b/libs/pageserver_api/src/config.rs @@ -35,6 +35,7 @@ pub struct NodeMetadata { pub postgres_port: u16, pub http_host: String, pub http_port: u16, + pub https_port: Option, // Deployment tools may write fields to the metadata file beyond what we // use in this type: this type intentionally only names fields that require. @@ -57,6 +58,9 @@ pub struct ConfigToml { // types mapped 1:1 into the runtime PageServerConfig type pub listen_pg_addr: String, pub listen_http_addr: String, + pub listen_https_addr: Option, + pub ssl_key_file: Utf8PathBuf, + pub ssl_cert_file: Utf8PathBuf, pub availability_zone: Option, #[serde(with = "humantime_serde")] pub wait_lsn_timeout: Duration, @@ -421,6 +425,9 @@ pub mod defaults { pub const DEFAULT_WAL_RECEIVER_PROTOCOL: utils::postgres_client::PostgresClientProtocol = utils::postgres_client::PostgresClientProtocol::Vanilla; + + pub const DEFAULT_SSL_KEY_FILE: &str = "server.key"; + pub const DEFAULT_SSL_CERT_FILE: &str = "server.crt"; } impl Default for ConfigToml { @@ -430,6 +437,9 @@ impl Default for ConfigToml { Self { listen_pg_addr: (DEFAULT_PG_LISTEN_ADDR.to_string()), listen_http_addr: (DEFAULT_HTTP_LISTEN_ADDR.to_string()), + listen_https_addr: (None), + ssl_key_file: Utf8PathBuf::from(DEFAULT_SSL_KEY_FILE), + ssl_cert_file: Utf8PathBuf::from(DEFAULT_SSL_CERT_FILE), availability_zone: (None), wait_lsn_timeout: (humantime::parse_duration(DEFAULT_WAIT_LSN_TIMEOUT) .expect("cannot parse default wait lsn timeout")), diff --git a/libs/pageserver_api/src/config/tests.rs b/libs/pageserver_api/src/config/tests.rs index edeefc156e..9e61873273 100644 --- a/libs/pageserver_api/src/config/tests.rs +++ b/libs/pageserver_api/src/config/tests.rs @@ -16,6 +16,30 @@ fn test_node_metadata_v1_backward_compatibilty() { postgres_port: 23, http_host: "localhost".to_string(), http_port: 42, + https_port: None, + other: HashMap::new(), + } + ) +} + +#[test] +fn test_node_metadata_v2_backward_compatibilty() { + let v2 = serde_json::to_vec(&serde_json::json!({ + "host": "localhost", + "port": 23, + "http_host": "localhost", + "http_port": 42, + "https_port": 123, + })); + + assert_eq!( + serde_json::from_slice::(&v2.unwrap()).unwrap(), + NodeMetadata { + postgres_host: "localhost".to_string(), + postgres_port: 23, + http_host: "localhost".to_string(), + http_port: 42, + https_port: Some(123), other: HashMap::new(), } ) diff --git a/pageserver/Cargo.toml b/pageserver/Cargo.toml index fa16090170..40ca1d3a33 100644 --- a/pageserver/Cargo.toml +++ b/pageserver/Cargo.toml @@ -48,6 +48,9 @@ pprof.workspace = true rand.workspace = true range-set-blaze = { version = "0.1.16", features = ["alloc"] } regex.workspace = true +rustls-pemfile.workspace = true +rustls-pki-types.workspace = true +rustls.workspace = true scopeguard.workspace = true send-future.workspace = true serde.workspace = true @@ -62,6 +65,7 @@ tokio = { workspace = true, features = ["process", "sync", "fs", "rt", "io-util" tokio-epoll-uring.workspace = true tokio-io-timeout.workspace = true tokio-postgres.workspace = true +tokio-rustls.workspace = true tokio-stream.workspace = true tokio-util.workspace = true toml_edit = { workspace = true, features = [ "serde" ] } diff --git a/pageserver/client/src/mgmt_api.rs b/pageserver/client/src/mgmt_api.rs index 37c914c4e9..830fd8a531 100644 --- a/pageserver/client/src/mgmt_api.rs +++ b/pageserver/client/src/mgmt_api.rs @@ -7,7 +7,7 @@ use http_utils::error::HttpErrorBody; use pageserver_api::models::*; use pageserver_api::shard::TenantShardId; pub use reqwest::Body as ReqwestBody; -use reqwest::{IntoUrl, Method, StatusCode}; +use reqwest::{Certificate, IntoUrl, Method, StatusCode}; use utils::id::{TenantId, TimelineId}; use utils::lsn::Lsn; @@ -38,6 +38,9 @@ pub enum Error { #[error("Cancelled")] Cancelled, + + #[error("create client: {0}{}", .0.source().map(|e| format!(": {e}")).unwrap_or_default())] + CreateClient(reqwest::Error), } pub type Result = std::result::Result; @@ -69,8 +72,17 @@ pub enum ForceAwaitLogicalSize { } impl Client { - pub fn new(mgmt_api_endpoint: String, jwt: Option<&str>) -> Self { - Self::from_client(reqwest::Client::new(), mgmt_api_endpoint, jwt) + pub fn new( + mgmt_api_endpoint: String, + jwt: Option<&str>, + ssl_ca_cert: Option, + ) -> Result { + let mut http_client = reqwest::Client::builder(); + if let Some(ssl_ca_cert) = ssl_ca_cert { + http_client = http_client.add_root_certificate(ssl_ca_cert); + } + let http_client = http_client.build().map_err(Error::CreateClient)?; + Ok(Self::from_client(http_client, mgmt_api_endpoint, jwt)) } pub fn from_client( @@ -101,12 +113,10 @@ impl Client { debug_assert!(path.starts_with('/')); let uri = format!("{}{}", self.mgmt_api_endpoint, path); - let req = self.client.request(Method::GET, uri); - let req = if let Some(value) = &self.authorization_header { - req.header(reqwest::header::AUTHORIZATION, value) - } else { - req - }; + let mut req = self.client.request(Method::GET, uri); + if let Some(value) = &self.authorization_header { + req = req.header(reqwest::header::AUTHORIZATION, value); + } req.send().await.map_err(Error::ReceiveBody) } diff --git a/pageserver/pagebench/Cargo.toml b/pageserver/pagebench/Cargo.toml index 245d293e4f..5b5ed09a2b 100644 --- a/pageserver/pagebench/Cargo.toml +++ b/pageserver/pagebench/Cargo.toml @@ -15,6 +15,7 @@ hdrhistogram.workspace = true humantime.workspace = true humantime-serde.workspace = true rand.workspace = true +reqwest.workspace=true serde.workspace = true serde_json.workspace = true tracing.workspace = true diff --git a/pageserver/pagebench/src/cmd/aux_files.rs b/pageserver/pagebench/src/cmd/aux_files.rs index bab17540f5..394a954c30 100644 --- a/pageserver/pagebench/src/cmd/aux_files.rs +++ b/pageserver/pagebench/src/cmd/aux_files.rs @@ -36,7 +36,8 @@ async fn main_impl(args: Args) -> anyhow::Result<()> { let mgmt_api_client = Arc::new(pageserver_client::mgmt_api::Client::new( args.mgmt_api_endpoint.clone(), args.pageserver_jwt.as_deref(), - )); + None, // TODO: support ssl_ca_file for https APIs in pagebench. + )?); // discover targets let timelines: Vec = crate::util::cli::targets::discover( diff --git a/pageserver/pagebench/src/cmd/basebackup.rs b/pageserver/pagebench/src/cmd/basebackup.rs index 51d7d5df89..d3013ded70 100644 --- a/pageserver/pagebench/src/cmd/basebackup.rs +++ b/pageserver/pagebench/src/cmd/basebackup.rs @@ -77,7 +77,8 @@ async fn main_impl( let mgmt_api_client = Arc::new(pageserver_client::mgmt_api::Client::new( args.mgmt_api_endpoint.clone(), args.pageserver_jwt.as_deref(), - )); + None, // TODO: support ssl_ca_file for https APIs in pagebench. + )?); // discover targets let timelines: Vec = crate::util::cli::targets::discover( diff --git a/pageserver/pagebench/src/cmd/getpage_latest_lsn.rs b/pageserver/pagebench/src/cmd/getpage_latest_lsn.rs index 617676c079..969cf24b93 100644 --- a/pageserver/pagebench/src/cmd/getpage_latest_lsn.rs +++ b/pageserver/pagebench/src/cmd/getpage_latest_lsn.rs @@ -125,7 +125,8 @@ async fn main_impl( let mgmt_api_client = Arc::new(pageserver_client::mgmt_api::Client::new( args.mgmt_api_endpoint.clone(), args.pageserver_jwt.as_deref(), - )); + None, // TODO: support ssl_ca_file for https APIs in pagebench. + )?); if let Some(engine_str) = &args.set_io_engine { mgmt_api_client.put_io_engine(engine_str).await?; diff --git a/pageserver/pagebench/src/cmd/ondemand_download_churn.rs b/pageserver/pagebench/src/cmd/ondemand_download_churn.rs index 3194e2e753..a77d3000cc 100644 --- a/pageserver/pagebench/src/cmd/ondemand_download_churn.rs +++ b/pageserver/pagebench/src/cmd/ondemand_download_churn.rs @@ -83,7 +83,8 @@ async fn main_impl(args: Args) -> anyhow::Result<()> { let mgmt_api_client = Arc::new(pageserver_client::mgmt_api::Client::new( args.mgmt_api_endpoint.clone(), args.pageserver_jwt.as_deref(), - )); + None, // TODO: support ssl_ca_file for https APIs in pagebench. + )?); if let Some(engine_str) = &args.set_io_engine { mgmt_api_client.put_io_engine(engine_str).await?; diff --git a/pageserver/pagebench/src/cmd/trigger_initial_size_calculation.rs b/pageserver/pagebench/src/cmd/trigger_initial_size_calculation.rs index 16abbf9ffd..2f919ec652 100644 --- a/pageserver/pagebench/src/cmd/trigger_initial_size_calculation.rs +++ b/pageserver/pagebench/src/cmd/trigger_initial_size_calculation.rs @@ -40,7 +40,8 @@ async fn main_impl(args: Args) -> anyhow::Result<()> { let mgmt_api_client = Arc::new(pageserver_client::mgmt_api::Client::new( args.mgmt_api_endpoint.clone(), args.pageserver_jwt.as_deref(), - )); + None, // TODO: support ssl_ca_file for https APIs in pagebench. + )?); // discover targets let timelines: Vec = crate::util::cli::targets::discover( diff --git a/pageserver/src/bin/pageserver.rs b/pageserver/src/bin/pageserver.rs index 703629aed5..c4af0d5d41 100644 --- a/pageserver/src/bin/pageserver.rs +++ b/pageserver/src/bin/pageserver.rs @@ -25,11 +25,12 @@ use pageserver::task_mgr::{ }; use pageserver::tenant::{TenantSharedResources, mgr, secondary}; use pageserver::{ - CancellableTask, ConsumptionMetricsTasks, HttpEndpointListener, http, page_cache, page_service, - task_mgr, virtual_file, + CancellableTask, ConsumptionMetricsTasks, HttpEndpointListener, HttpsEndpointListener, http, + page_cache, page_service, task_mgr, virtual_file, }; use postgres_backend::AuthType; use remote_storage::GenericRemoteStorage; +use rustls_pki_types::{CertificateDer, PrivateKeyDer}; use tokio::signal::unix::SignalKind; use tokio::time::Instant; use tokio_util::sync::CancellationToken; @@ -343,8 +344,15 @@ fn start_pageserver( info!("Starting pageserver http handler on {http_addr}"); let http_listener = tcp_listener::bind(http_addr)?; - let pg_addr = &conf.listen_pg_addr; + let https_listener = match conf.listen_https_addr.as_ref() { + Some(https_addr) => { + info!("Starting pageserver https handler on {https_addr}"); + Some(tcp_listener::bind(https_addr)?) + } + None => None, + }; + let pg_addr = &conf.listen_pg_addr; info!("Starting pageserver pg protocol handler on {pg_addr}"); let pageserver_listener = tcp_listener::bind(pg_addr)?; @@ -575,9 +583,8 @@ fn start_pageserver( // Start up the service to handle HTTP mgmt API request. We created the // listener earlier already. - let http_endpoint_listener = { + let (http_endpoint_listener, https_endpoint_listener) = { let _rt_guard = MGMT_REQUEST_RUNTIME.enter(); // for hyper - let cancel = CancellationToken::new(); let router_state = Arc::new( http::routes::State::new( @@ -592,22 +599,51 @@ fn start_pageserver( ) .context("Failed to initialize router state")?, ); + let router = http::make_router(router_state, launch_ts, http_auth.clone())? .build() .map_err(|err| anyhow!(err))?; - let service = http_utils::RouterService::new(router).unwrap(); - let server = hyper0::Server::from_tcp(http_listener)? - .serve(service) - .with_graceful_shutdown({ - let cancel = cancel.clone(); - async move { cancel.clone().cancelled().await } - }); - let task = MGMT_REQUEST_RUNTIME.spawn(task_mgr::exit_on_panic_or_error( - "http endpoint listener", - server, - )); - HttpEndpointListener(CancellableTask { task, cancel }) + let service = + Arc::new(http_utils::RequestServiceBuilder::new(router).map_err(|err| anyhow!(err))?); + + let http_task = { + let server = + http_utils::server::Server::new(Arc::clone(&service), http_listener, None)?; + let cancel = CancellationToken::new(); + + let task = MGMT_REQUEST_RUNTIME.spawn(task_mgr::exit_on_panic_or_error( + "http endpoint listener", + server.serve(cancel.clone()), + )); + HttpEndpointListener(CancellableTask { task, cancel }) + }; + + let https_task = match https_listener { + Some(https_listener) => { + let certs = load_certs(&conf.ssl_cert_file)?; + let key = load_private_key(&conf.ssl_key_file)?; + + let server_config = rustls::ServerConfig::builder() + .with_no_client_auth() + .with_single_cert(certs, key)?; + + let tls_acceptor = tokio_rustls::TlsAcceptor::from(Arc::new(server_config)); + + let server = + http_utils::server::Server::new(service, https_listener, Some(tls_acceptor))?; + let cancel = CancellationToken::new(); + + let task = MGMT_REQUEST_RUNTIME.spawn(task_mgr::exit_on_panic_or_error( + "https endpoint listener", + server.serve(cancel.clone()), + )); + Some(HttpsEndpointListener(CancellableTask { task, cancel })) + } + None => None, + }; + + (http_task, https_task) }; let consumption_metrics_tasks = { @@ -683,6 +719,7 @@ fn start_pageserver( shutdown_pageserver.cancel(); pageserver::shutdown_pageserver( http_endpoint_listener, + https_endpoint_listener, page_service, consumption_metrics_tasks, disk_usage_eviction_task, @@ -697,6 +734,25 @@ fn start_pageserver( }) } +fn load_certs(filename: &Utf8Path) -> std::io::Result>> { + let file = std::fs::File::open(filename)?; + let mut reader = std::io::BufReader::new(file); + + rustls_pemfile::certs(&mut reader).collect() +} + +fn load_private_key(filename: &Utf8Path) -> anyhow::Result> { + let file = std::fs::File::open(filename)?; + let mut reader = std::io::BufReader::new(file); + + let key = rustls_pemfile::private_key(&mut reader)?; + + key.ok_or(anyhow::anyhow!( + "no private key found in {}", + filename.as_str(), + )) +} + async fn create_remote_storage_client( conf: &'static PageServerConf, ) -> anyhow::Result { diff --git a/pageserver/src/config.rs b/pageserver/src/config.rs index 06be873160..562a16a14e 100644 --- a/pageserver/src/config.rs +++ b/pageserver/src/config.rs @@ -53,6 +53,11 @@ pub struct PageServerConf { pub listen_pg_addr: String, /// Example (default): 127.0.0.1:9898 pub listen_http_addr: String, + /// Example: 127.0.0.1:9899 + pub listen_https_addr: Option, + + pub ssl_key_file: Utf8PathBuf, + pub ssl_cert_file: Utf8PathBuf, /// Current availability zone. Used for traffic metrics. pub availability_zone: Option, @@ -317,6 +322,9 @@ impl PageServerConf { let pageserver_api::config::ConfigToml { listen_pg_addr, listen_http_addr, + listen_https_addr, + ssl_key_file, + ssl_cert_file, availability_zone, wait_lsn_timeout, wal_redo_timeout, @@ -375,6 +383,9 @@ impl PageServerConf { // ------------------------------------------------------------ listen_pg_addr, listen_http_addr, + listen_https_addr, + ssl_key_file, + ssl_cert_file, availability_zone, wait_lsn_timeout, wal_redo_timeout, diff --git a/pageserver/src/controller_upcall_client.rs b/pageserver/src/controller_upcall_client.rs index 6d5c727958..745d04cf62 100644 --- a/pageserver/src/controller_upcall_client.rs +++ b/pageserver/src/controller_upcall_client.rs @@ -181,7 +181,7 @@ impl ControlPlaneGenerationsApi for ControllerUpcallClient { listen_pg_port: m.postgres_port, listen_http_addr: m.http_host, listen_http_port: m.http_port, - listen_https_port: None, // TODO: Support https. + listen_https_port: m.https_port, availability_zone_id: az_id.expect("Checked above"), }) } diff --git a/pageserver/src/lib.rs b/pageserver/src/lib.rs index 02767055fb..8373d0bd87 100644 --- a/pageserver/src/lib.rs +++ b/pageserver/src/lib.rs @@ -64,6 +64,7 @@ pub struct CancellableTask { pub cancel: CancellationToken, } pub struct HttpEndpointListener(pub CancellableTask); +pub struct HttpsEndpointListener(pub CancellableTask); pub struct ConsumptionMetricsTasks(pub CancellableTask); pub struct DiskUsageEvictionTask(pub CancellableTask); impl CancellableTask { @@ -77,6 +78,7 @@ impl CancellableTask { #[allow(clippy::too_many_arguments)] pub async fn shutdown_pageserver( http_listener: HttpEndpointListener, + https_listener: Option, page_service: page_service::Listener, consumption_metrics_worker: ConsumptionMetricsTasks, disk_usage_eviction_task: Option, @@ -213,6 +215,15 @@ pub async fn shutdown_pageserver( ) .await; + if let Some(https_listener) = https_listener { + timed( + https_listener.0.shutdown(), + "shutdown https", + Duration::from_secs(1), + ) + .await; + } + // Shut down the HTTP endpoint last, so that you can still check the server's // status while it's shutting down. // FIXME: We should probably stop accepting commands like attach/detach earlier. diff --git a/storage_controller/src/heartbeater.rs b/storage_controller/src/heartbeater.rs index dab6799d3e..ee4c9ef9cd 100644 --- a/storage_controller/src/heartbeater.rs +++ b/storage_controller/src/heartbeater.rs @@ -178,6 +178,7 @@ impl HeartBeat for HeartbeaterTask let mut heartbeat_futs = FuturesUnordered::new(); for (node_id, node) in &*pageservers { heartbeat_futs.push({ + let ssl_ca_cert = self.ssl_ca_cert.clone(); let jwt_token = self.jwt_token.clone(); let cancel = self.cancel.clone(); @@ -193,6 +194,7 @@ impl HeartBeat for HeartbeaterTask .with_client_retries( |client| async move { client.get_utilization().await }, &jwt_token, + &ssl_ca_cert, 3, 3, Duration::from_secs(1), diff --git a/storage_controller/src/http.rs b/storage_controller/src/http.rs index 3e448d7013..b27804d820 100644 --- a/storage_controller/src/http.rs +++ b/storage_controller/src/http.rs @@ -657,7 +657,9 @@ async fn handle_tenant_timeline_passthrough( let client = mgmt_api::Client::new( node.base_url(), service.get_config().pageserver_jwt_token.as_deref(), - ); + service.get_config().ssl_ca_cert.clone(), + ) + .map_err(|e| ApiError::InternalServerError(anyhow::anyhow!(e)))?; let resp = client.get_raw(path).await.map_err(|e| // We return 503 here because if we can't successfully send a request to the pageserver, // either we aren't available or the pageserver is unavailable. diff --git a/storage_controller/src/node.rs b/storage_controller/src/node.rs index 735bae2123..40f3c7c58e 100644 --- a/storage_controller/src/node.rs +++ b/storage_controller/src/node.rs @@ -7,7 +7,7 @@ use pageserver_api::controller_api::{ }; use pageserver_api::shard::TenantShardId; use pageserver_client::mgmt_api; -use reqwest::StatusCode; +use reqwest::{Certificate, StatusCode}; use serde::Serialize; use tokio_util::sync::CancellationToken; use utils::backoff; @@ -276,10 +276,12 @@ impl Node { /// This will return None to indicate cancellation. Cancellation may happen from /// the cancellation token passed in, or from Self's cancellation token (i.e. node /// going offline). + #[allow(clippy::too_many_arguments)] pub(crate) async fn with_client_retries( &self, mut op: O, jwt: &Option, + ssl_ca_cert: &Option, warn_threshold: u32, max_retries: u32, timeout: Duration, @@ -298,19 +300,26 @@ impl Node { | ApiError(StatusCode::REQUEST_TIMEOUT, _) => false, ApiError(_, _) => true, Cancelled => true, + CreateClient(_) => true, } } + // TODO: refactor PageserverClient and with_client_retires (#11113). + let mut http_client = reqwest::ClientBuilder::new().timeout(timeout); + if let Some(ssl_ca_cert) = ssl_ca_cert.as_ref() { + http_client = http_client.add_root_certificate(ssl_ca_cert.clone()) + } + + let http_client = match http_client.build() { + Ok(http_client) => http_client, + Err(err) => return Some(Err(mgmt_api::Error::CreateClient(err))), + }; + backoff::retry( || { - let http_client = reqwest::ClientBuilder::new() - .timeout(timeout) - .build() - .expect("Failed to construct HTTP client"); - let client = PageserverClient::from_client( self.get_id(), - http_client, + http_client.clone(), self.base_url(), jwt.as_deref(), ); diff --git a/storage_controller/src/pageserver_client.rs b/storage_controller/src/pageserver_client.rs index d6127c355a..7fd4f37e7e 100644 --- a/storage_controller/src/pageserver_client.rs +++ b/storage_controller/src/pageserver_client.rs @@ -8,7 +8,7 @@ use pageserver_api::models::{ use pageserver_api::shard::TenantShardId; use pageserver_client::BlockUnblock; use pageserver_client::mgmt_api::{Client, Result}; -use reqwest::StatusCode; +use reqwest::{Certificate, StatusCode}; use utils::id::{NodeId, TenantId, TimelineId}; /// Thin wrapper around [`pageserver_client::mgmt_api::Client`]. It allows the storage @@ -46,11 +46,16 @@ macro_rules! measured_request { } impl PageserverClient { - pub(crate) fn new(node_id: NodeId, mgmt_api_endpoint: String, jwt: Option<&str>) -> Self { - Self { - inner: Client::from_client(reqwest::Client::new(), mgmt_api_endpoint, jwt), + pub(crate) fn new( + node_id: NodeId, + mgmt_api_endpoint: String, + jwt: Option<&str>, + ssl_ca_cert: Option, + ) -> Result { + Ok(Self { + inner: Client::new(mgmt_api_endpoint, jwt, ssl_ca_cert)?, node_id_label: node_id.0.to_string(), - } + }) } pub(crate) fn from_client( diff --git a/storage_controller/src/reconciler.rs b/storage_controller/src/reconciler.rs index a327f6f50f..9f0b789f19 100644 --- a/storage_controller/src/reconciler.rs +++ b/storage_controller/src/reconciler.rs @@ -299,6 +299,7 @@ impl Reconciler { .await }, &self.service_config.pageserver_jwt_token, + &self.service_config.ssl_ca_cert, 1, 3, timeout, @@ -420,7 +421,8 @@ impl Reconciler { node.get_id(), node.base_url(), self.service_config.pageserver_jwt_token.as_deref(), - ); + self.service_config.ssl_ca_cert.clone(), + )?; client .wait_lsn( @@ -443,7 +445,8 @@ impl Reconciler { node.get_id(), node.base_url(), self.service_config.pageserver_jwt_token.as_deref(), - ); + self.service_config.ssl_ca_cert.clone(), + )?; let timelines = client.timeline_list(&tenant_shard_id).await?; Ok(timelines @@ -481,6 +484,7 @@ impl Reconciler { .await }, &self.service_config.pageserver_jwt_token, + &self.service_config.ssl_ca_cert, 1, 3, request_download_timeout * 2, @@ -775,6 +779,7 @@ impl Reconciler { .with_client_retries( |client| async move { client.get_location_config(tenant_shard_id).await }, &self.service_config.pageserver_jwt_token, + &self.service_config.ssl_ca_cert, 1, 1, Duration::from_secs(5), @@ -1123,6 +1128,7 @@ impl Reconciler { .with_client_retries( |client| async move { client.get_location_config(tenant_shard_id).await }, &self.service_config.pageserver_jwt_token, + &self.service_config.ssl_ca_cert, 1, 3, Duration::from_secs(5), diff --git a/storage_controller/src/service.rs b/storage_controller/src/service.rs index 2a68711977..b79f223a24 100644 --- a/storage_controller/src/service.rs +++ b/storage_controller/src/service.rs @@ -262,6 +262,7 @@ fn passthrough_api_error(node: &Node, e: mgmt_api::Error) -> ApiError { ApiError::Conflict(format!("{node} {status}: {status} {msg}")) } mgmt_api::Error::Cancelled => ApiError::ShuttingDown, + mgmt_api::Error::CreateClient(e) => ApiError::InternalServerError(anyhow::anyhow!(e)), } } @@ -887,6 +888,7 @@ impl Service { .with_client_retries( |client| async move { client.list_location_config().await }, &self.config.pageserver_jwt_token, + &self.config.ssl_ca_cert, 1, 5, timeout, @@ -984,11 +986,20 @@ impl Service { break; } - let client = PageserverClient::new( + let client = match PageserverClient::new( node.get_id(), node.base_url(), self.config.pageserver_jwt_token.as_deref(), - ); + self.config.ssl_ca_cert.clone(), + ) { + Ok(client) => client, + Err(e) => { + tracing::error!( + "Failed to create client to detach unknown shard {tenant_shard_id} on pageserver {node_id}: {e}" + ); + continue; + } + }; match client .location_config( tenant_shard_id, @@ -1015,7 +1026,7 @@ impl Service { // Non-fatal error: leaving a tenant shard behind that we are not managing shouldn't // break anything. tracing::error!( - "Failed to detach unknkown shard {tenant_shard_id} on pageserver {node_id}: {e}" + "Failed to detach unknown shard {tenant_shard_id} on pageserver {node_id}: {e}" ); } } @@ -1924,6 +1935,7 @@ impl Service { .with_client_retries( |client| async move { client.list_location_config().await }, &self.config.pageserver_jwt_token, + &self.config.ssl_ca_cert, 1, 5, SHORT_RECONCILE_TIMEOUT, @@ -1982,6 +1994,7 @@ impl Service { .await }, &self.config.pageserver_jwt_token, + &self.config.ssl_ca_cert, 1, 5, SHORT_RECONCILE_TIMEOUT, @@ -3125,7 +3138,9 @@ impl Service { node.get_id(), node.base_url(), self.config.pageserver_jwt_token.as_deref(), - ); + self.config.ssl_ca_cert.clone(), + ) + .map_err(|e| passthrough_api_error(&node, e))?; tracing::info!("Doing time travel recovery for shard {tenant_shard_id}",); @@ -3186,7 +3201,9 @@ impl Service { node.get_id(), node.base_url(), self.config.pageserver_jwt_token.as_deref(), - ); + self.config.ssl_ca_cert.clone(), + ) + .map_err(|e| passthrough_api_error(&node, e))?; futs.push(async move { let result = client .tenant_secondary_download(tenant_shard_id, wait) @@ -3309,6 +3326,7 @@ impl Service { .await }, &self.config.pageserver_jwt_token, + &self.config.ssl_ca_cert, 1, 3, RECONCILE_TIMEOUT, @@ -3464,6 +3482,7 @@ impl Service { tenant_shard_id: TenantShardId, locations: ShardMutationLocations, jwt: Option, + ssl_ca_cert: Option, create_req: TimelineCreateRequest, ) -> Result { let latest = locations.latest.node; @@ -3476,7 +3495,8 @@ impl Service { ); let client = - PageserverClient::new(latest.get_id(), latest.base_url(), jwt.as_deref()); + PageserverClient::new(latest.get_id(), latest.base_url(), jwt.as_deref(), ssl_ca_cert.clone()) + .map_err(|e| passthrough_api_error(&latest, e))?; let timeline_info = client .timeline_create(tenant_shard_id, &create_req) @@ -3499,7 +3519,9 @@ impl Service { location.node.get_id(), location.node.base_url(), jwt.as_deref(), - ); + ssl_ca_cert.clone(), + ) + .map_err(|e| passthrough_api_error(&location.node, e))?; let res = client .timeline_create(tenant_shard_id, &create_req) @@ -3528,6 +3550,7 @@ impl Service { shard_zero_tid, shard_zero_locations, self.config.pageserver_jwt_token.clone(), + self.config.ssl_ca_cert.clone(), create_req.clone(), ) .await?; @@ -3557,6 +3580,7 @@ impl Service { tenant_shard_id, mutation_locations, jwt.clone(), + self.config.ssl_ca_cert.clone(), create_req, )) }, @@ -3598,13 +3622,15 @@ impl Service { timeline_id: TimelineId, node: Node, jwt: Option, + ssl_ca_cert: Option, req: TimelineArchivalConfigRequest, ) -> Result<(), ApiError> { tracing::info!( "Setting archival config of timeline on shard {tenant_shard_id}/{timeline_id}, attached to node {node}", ); - let client = PageserverClient::new(node.get_id(), node.base_url(), jwt.as_deref()); + let client = PageserverClient::new(node.get_id(), node.base_url(), jwt.as_deref(), ssl_ca_cert) + .map_err(|e| passthrough_api_error(&node, e))?; client .timeline_archival_config(tenant_shard_id, timeline_id, &req) @@ -3627,6 +3653,7 @@ impl Service { timeline_id, node, self.config.pageserver_jwt_token.clone(), + self.config.ssl_ca_cert.clone(), req.clone(), )) }) @@ -3663,12 +3690,14 @@ impl Service { timeline_id: TimelineId, node: Node, jwt: Option, + ssl_ca_cert: Option, ) -> Result<(ShardNumber, models::detach_ancestor::AncestorDetached), ApiError> { tracing::info!( "Detaching timeline on shard {tenant_shard_id}/{timeline_id}, attached to node {node}", ); - let client = PageserverClient::new(node.get_id(), node.base_url(), jwt.as_deref()); + let client = PageserverClient::new(node.get_id(), node.base_url(), jwt.as_deref(), ssl_ca_cert) + .map_err(|e| passthrough_api_error(&node, e))?; client .timeline_detach_ancestor(tenant_shard_id, timeline_id) @@ -3708,6 +3737,7 @@ impl Service { timeline_id, node, self.config.pageserver_jwt_token.clone(), + self.config.ssl_ca_cert.clone(), )) }) .await?; @@ -3760,9 +3790,16 @@ impl Service { timeline_id: TimelineId, node: Node, jwt: Option, + ssl_ca_cert: Option, dir: BlockUnblock, ) -> Result<(), ApiError> { - let client = PageserverClient::new(node.get_id(), node.base_url(), jwt.as_deref()); + let client = PageserverClient::new( + node.get_id(), + node.base_url(), + jwt.as_deref(), + ssl_ca_cert, + ) + .map_err(|e| passthrough_api_error(&node, e))?; client .timeline_block_unblock_gc(tenant_shard_id, timeline_id, dir) @@ -3782,6 +3819,7 @@ impl Service { timeline_id, node, self.config.pageserver_jwt_token.clone(), + self.config.ssl_ca_cert.clone(), dir, )) }) @@ -3903,6 +3941,7 @@ impl Service { node.with_client_retries( |client| op(tenant_shard_id, client), &self.config.pageserver_jwt_token, + &self.config.ssl_ca_cert, warn_threshold, max_retries, timeout, @@ -4126,12 +4165,14 @@ impl Service { timeline_id: TimelineId, node: Node, jwt: Option, + ssl_ca_cert: Option, ) -> Result { tracing::info!( "Deleting timeline on shard {tenant_shard_id}/{timeline_id}, attached to node {node}", ); - let client = PageserverClient::new(node.get_id(), node.base_url(), jwt.as_deref()); + let client = PageserverClient::new(node.get_id(), node.base_url(), jwt.as_deref(), ssl_ca_cert) + .map_err(|e| passthrough_api_error(&node, e))?; let res = client .timeline_delete(tenant_shard_id, timeline_id) .await; @@ -4158,6 +4199,7 @@ impl Service { timeline_id, node, self.config.pageserver_jwt_token.clone(), + self.config.ssl_ca_cert.clone(), )) }) .await?; @@ -4180,6 +4222,7 @@ impl Service { timeline_id, shard_zero_locations.latest.node, self.config.pageserver_jwt_token.clone(), + self.config.ssl_ca_cert.clone(), ) .await?; Ok(shard_zero_status) @@ -4611,6 +4654,7 @@ impl Service { client.location_config(child_id, config, None, false).await }, &self.config.pageserver_jwt_token, + &self.config.ssl_ca_cert, 1, 10, Duration::from_secs(5), @@ -5214,7 +5258,9 @@ impl Service { node.get_id(), node.base_url(), self.config.pageserver_jwt_token.as_deref(), - ); + self.config.ssl_ca_cert.clone(), + ) + .map_err(|e| passthrough_api_error(node, e))?; let response = client .tenant_shard_split( *parent_id, @@ -5698,7 +5744,9 @@ impl Service { node.get_id(), node.base_url(), self.config.pageserver_jwt_token.as_deref(), - ); + self.config.ssl_ca_cert.clone(), + ) + .map_err(|e| passthrough_api_error(&node, e))?; let scan_result = client .tenant_scan_remote_storage(tenant_id) @@ -7340,6 +7388,7 @@ impl Service { .with_client_retries( |client| async move { client.tenant_heatmap_upload(tenant_shard_id).await }, &self.config.pageserver_jwt_token, + &self.config.ssl_ca_cert, 3, 10, SHORT_RECONCILE_TIMEOUT, @@ -7376,6 +7425,7 @@ impl Service { .await }, &self.config.pageserver_jwt_token, + &self.config.ssl_ca_cert, 3, 10, SHORT_RECONCILE_TIMEOUT, @@ -7503,6 +7553,7 @@ impl Service { node.with_client_retries( |client| async move { client.top_tenant_shards(request.clone()).await }, &self.config.pageserver_jwt_token, + &self.config.ssl_ca_cert, 3, 3, Duration::from_secs(5), @@ -7622,6 +7673,7 @@ impl Service { .with_client_retries( |client| async move { client.tenant_secondary_status(tenant_shard_id).await }, &self.config.pageserver_jwt_token, + &self.config.ssl_ca_cert, 1, 3, Duration::from_millis(250), diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index 4d2b3587e8..7bc746d668 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -463,6 +463,10 @@ class NeonEnvBuilder: self.control_plane_compute_hook_api: str | None = None self.storage_controller_config: dict[Any, Any] | None = None + # Flag to enable https listener in pageserver, generate local ssl certs, + # and force storage controller to use https for pageserver api. + self.use_https_pageserver_api: bool = False + self.pageserver_virtual_file_io_engine: str | None = pageserver_virtual_file_io_engine self.pageserver_get_vectored_concurrent_io: str | None = ( pageserver_get_vectored_concurrent_io @@ -1059,6 +1063,11 @@ class NeonEnv: self.initial_tenant = config.initial_tenant self.initial_timeline = config.initial_timeline + self.generate_local_ssl_certs = config.use_https_pageserver_api + self.ssl_ca_file = ( + self.repo_dir.joinpath("rootCA.crt") if self.generate_local_ssl_certs else None + ) + neon_local_env_vars = {} if self.rust_log_override is not None: neon_local_env_vars["RUST_LOG"] = self.rust_log_override @@ -1122,6 +1131,7 @@ class NeonEnv: }, "safekeepers": [], "pageservers": [], + "generate_local_ssl_certs": self.generate_local_ssl_certs, } if self.control_plane_api is not None: @@ -1130,8 +1140,14 @@ class NeonEnv: if self.control_plane_compute_hook_api is not None: cfg["control_plane_compute_hook_api"] = self.control_plane_compute_hook_api - if self.storage_controller_config is not None: - cfg["storage_controller"] = self.storage_controller_config + storage_controller_config = self.storage_controller_config + + if config.use_https_pageserver_api: + storage_controller_config = storage_controller_config or {} + storage_controller_config["use_https_pageserver_api"] = True + + if storage_controller_config is not None: + cfg["storage_controller"] = storage_controller_config # Create config for pageserver http_auth_type = "NeonJWT" if config.auth_enabled else "Trust" @@ -1142,6 +1158,7 @@ class NeonEnv: pageserver_port = PageserverPort( pg=self.port_distributor.get_port(), http=self.port_distributor.get_port(), + https=self.port_distributor.get_port() if config.use_https_pageserver_api else None, ) # Availabilty zones may also be configured manually with `NeonEnvBuilder.pageserver_config_override` @@ -1156,6 +1173,9 @@ class NeonEnv: "id": ps_id, "listen_pg_addr": f"localhost:{pageserver_port.pg}", "listen_http_addr": f"localhost:{pageserver_port.http}", + "listen_https_addr": f"localhost:{pageserver_port.https}" + if config.use_https_pageserver_api + else None, "pg_auth_type": pg_auth_type, "http_auth_type": http_auth_type, "availability_zone": availability_zone, diff --git a/test_runner/regress/test_ssl.py b/test_runner/regress/test_ssl.py new file mode 100644 index 0000000000..25d839aa42 --- /dev/null +++ b/test_runner/regress/test_ssl.py @@ -0,0 +1,15 @@ +import requests +from fixtures.neon_fixtures import NeonEnvBuilder + + +def test_pageserver_https_api(neon_env_builder: NeonEnvBuilder): + """ + Test HTTPS pageserver management API. + If NeonEnv starts with use_https_pageserver_api with no errors, it's already a success. + Make /v1/status request to HTTPS API to ensure it's appropriately configured. + """ + neon_env_builder.use_https_pageserver_api = True + env = neon_env_builder.init_start() + + addr = f"https://localhost:{env.pageserver.service_port.https}/v1/status" + requests.get(addr, verify=str(env.ssl_ca_file)).raise_for_status()