diff --git a/control_plane/storcon_cli/src/main.rs b/control_plane/storcon_cli/src/main.rs index c503697acc..b7e479d90c 100644 --- a/control_plane/storcon_cli/src/main.rs +++ b/control_plane/storcon_cli/src/main.rs @@ -385,8 +385,6 @@ where async fn main() -> anyhow::Result<()> { let cli = Cli::parse(); - let storcon_client = Client::new(cli.api.clone(), cli.jwt.clone()); - let ssl_ca_certs = match &cli.ssl_ca_file { Some(ssl_ca_file) => { let buf = tokio::fs::read(ssl_ca_file).await?; @@ -401,9 +399,11 @@ async fn main() -> anyhow::Result<()> { } let http_client = http_client.build()?; + let storcon_client = Client::new(http_client.clone(), cli.api.clone(), cli.jwt.clone()); + let mut trimmed = cli.api.to_string(); trimmed.pop(); - let vps_client = mgmt_api::Client::new(http_client, trimmed, cli.jwt.as_deref()); + let vps_client = mgmt_api::Client::new(http_client.clone(), trimmed, cli.jwt.as_deref()); match cli.command { Command::NodeRegister { @@ -1056,7 +1056,7 @@ async fn main() -> anyhow::Result<()> { const DEFAULT_MIGRATE_CONCURRENCY: usize = 8; let mut stream = futures::stream::iter(moves) .map(|mv| { - let client = Client::new(cli.api.clone(), cli.jwt.clone()); + let client = Client::new(http_client.clone(), cli.api.clone(), cli.jwt.clone()); async move { client .dispatch::( diff --git a/libs/safekeeper_api/src/models.rs b/libs/safekeeper_api/src/models.rs index 20f11edae7..51f88625da 100644 --- a/libs/safekeeper_api/src/models.rs +++ b/libs/safekeeper_api/src/models.rs @@ -71,6 +71,7 @@ pub struct PeerInfo { pub ts: Instant, pub pg_connstr: String, pub http_connstr: String, + pub https_connstr: Option, } pub type FullTransactionId = u64; @@ -261,6 +262,8 @@ pub struct SkTimelineInfo { pub safekeeper_connstr: Option, #[serde(default)] pub http_connstr: Option, + #[serde(default)] + pub https_connstr: Option, // Minimum of all active RO replicas flush LSN #[serde(default = "lsn_invalid")] pub standby_horizon: Lsn, diff --git a/pageserver/src/tenant/timeline/import_pgdata/upcall_api.rs b/pageserver/src/tenant/timeline/import_pgdata/upcall_api.rs index 7c7a4de2fc..352bbbc4d4 100644 --- a/pageserver/src/tenant/timeline/import_pgdata/upcall_api.rs +++ b/pageserver/src/tenant/timeline/import_pgdata/upcall_api.rs @@ -32,9 +32,15 @@ impl Client { let Some(ref base_url) = conf.import_pgdata_upcall_api else { anyhow::bail!("import_pgdata_upcall_api is not configured") }; + let mut http_client = reqwest::Client::builder(); + for cert in &conf.ssl_ca_certs { + http_client = http_client.add_root_certificate(cert.clone()); + } + let http_client = http_client.build()?; + Ok(Self { base_url: base_url.to_string(), - client: reqwest::Client::new(), + client: http_client, cancel, authorization_header: conf .import_pgdata_upcall_api_token diff --git a/safekeeper/src/bin/safekeeper.rs b/safekeeper/src/bin/safekeeper.rs index 6ce43815a6..18aa710916 100644 --- a/safekeeper/src/bin/safekeeper.rs +++ b/safekeeper/src/bin/safekeeper.rs @@ -219,7 +219,10 @@ struct Args { pub ssl_cert_reload_period: Duration, /// Trusted root CA certificates to use in https APIs. #[arg(long)] - ssl_ca_file: Option, + pub ssl_ca_file: Option, + /// Flag to use https for requests to peer's safekeeper API. + #[arg(long)] + pub use_https_safekeeper_api: bool, } // Like PathBufValueParser, but allows empty string. @@ -399,6 +402,7 @@ async fn main() -> anyhow::Result<()> { ssl_cert_file: args.ssl_cert_file, ssl_cert_reload_period: args.ssl_cert_reload_period, ssl_ca_certs, + use_https_safekeeper_api: args.use_https_safekeeper_api, }); // initialize sentry if SENTRY_DSN is provided diff --git a/safekeeper/src/http/routes.rs b/safekeeper/src/http/routes.rs index 8395c88171..312456e5b2 100644 --- a/safekeeper/src/http/routes.rs +++ b/safekeeper/src/http/routes.rs @@ -536,6 +536,7 @@ async fn record_safekeeper_info(mut request: Request) -> Result, + pub use_https_safekeeper_api: bool, } impl SafeKeeperConf { @@ -170,6 +171,7 @@ impl SafeKeeperConf { ssl_cert_file: Utf8PathBuf::from(defaults::DEFAULT_SSL_CERT_FILE), ssl_cert_reload_period: Duration::from_secs(60), ssl_ca_certs: Vec::new(), + use_https_safekeeper_api: false, } } } diff --git a/safekeeper/src/recovery.rs b/safekeeper/src/recovery.rs index c2760792b8..25b40f5d2e 100644 --- a/safekeeper/src/recovery.rs +++ b/safekeeper/src/recovery.rs @@ -176,6 +176,7 @@ pub struct Donor { pub flush_lsn: Lsn, pub pg_connstr: String, pub http_connstr: String, + pub https_connstr: Option, } impl From<&PeerInfo> for Donor { @@ -186,6 +187,7 @@ impl From<&PeerInfo> for Donor { flush_lsn: p.flush_lsn, pg_connstr: p.pg_connstr.clone(), http_connstr: p.http_connstr.clone(), + https_connstr: p.https_connstr.clone(), } } } @@ -236,11 +238,33 @@ async fn recover( conf: &SafeKeeperConf, ) -> anyhow::Result { // Learn donor term switch history to figure out starting point. - let client = reqwest::Client::new(); + + let mut client = reqwest::Client::builder(); + for cert in &conf.ssl_ca_certs { + client = client.add_root_certificate(cert.clone()); + } + let client = client + .build() + .context("Failed to build http client for recover")?; + + let url = if conf.use_https_safekeeper_api { + if let Some(https_connstr) = donor.https_connstr.as_ref() { + format!("https://{https_connstr}") + } else { + anyhow::bail!( + "cannot recover from donor {}: \ + https is enabled, but https_connstr is not specified", + donor.sk_id + ); + } + } else { + format!("http://{}", donor.http_connstr) + }; + let timeline_info: TimelineStatus = client .get(format!( - "http://{}/v1/tenant/{}/timeline/{}", - donor.http_connstr, tli.ttid.tenant_id, tli.ttid.timeline_id + "{}/v1/tenant/{}/timeline/{}", + url, tli.ttid.tenant_id, tli.ttid.timeline_id )) .send() .await? diff --git a/safekeeper/src/timeline.rs b/safekeeper/src/timeline.rs index d9ca58104e..e6a7ade9f2 100644 --- a/safekeeper/src/timeline.rs +++ b/safekeeper/src/timeline.rs @@ -50,6 +50,7 @@ fn peer_info_from_sk_info(sk_info: &SafekeeperTimelineInfo, ts: Instant) -> Peer local_start_lsn: Lsn(sk_info.local_start_lsn), pg_connstr: sk_info.safekeeper_connstr.clone(), http_connstr: sk_info.http_connstr.clone(), + https_connstr: sk_info.https_connstr.clone(), ts, } } @@ -363,6 +364,7 @@ impl SharedState { .to_owned() .unwrap_or(conf.listen_pg_addr.clone()), http_connstr: conf.listen_http_addr.to_owned(), + https_connstr: conf.listen_https_addr.to_owned(), backup_lsn: self.sk.state().inmem.backup_lsn.0, local_start_lsn: self.sk.state().local_start_lsn.0, availability_zone: conf.availability_zone.clone(), diff --git a/safekeeper/tests/walproposer_sim/safekeeper.rs b/safekeeper/tests/walproposer_sim/safekeeper.rs index 58913537aa..b3f088d31c 100644 --- a/safekeeper/tests/walproposer_sim/safekeeper.rs +++ b/safekeeper/tests/walproposer_sim/safekeeper.rs @@ -184,6 +184,7 @@ pub fn run_server(os: NodeOs, disk: Arc) -> Result<()> { ssl_cert_file: Utf8PathBuf::from(""), ssl_cert_reload_period: Duration::ZERO, ssl_ca_certs: Vec::new(), + use_https_safekeeper_api: false, }; let mut global = GlobalMap::new(disk, conf.clone())?; diff --git a/storage_broker/benches/rps.rs b/storage_broker/benches/rps.rs index 86f2dd9a6c..0fef6a58e0 100644 --- a/storage_broker/benches/rps.rs +++ b/storage_broker/benches/rps.rs @@ -141,6 +141,7 @@ async fn publish(client: Option, n_keys: u64) { peer_horizon_lsn: 5, safekeeper_connstr: "zenith-1-sk-1.local:7676".to_owned(), http_connstr: "zenith-1-sk-1.local:7677".to_owned(), + https_connstr: Some("zenith-1-sk-1.local:7678".to_owned()), local_start_lsn: 0, availability_zone: None, standby_horizon: 0, diff --git a/storage_broker/proto/broker.proto b/storage_broker/proto/broker.proto index a420fd9c66..3891685589 100644 --- a/storage_broker/proto/broker.proto +++ b/storage_broker/proto/broker.proto @@ -45,8 +45,10 @@ message SafekeeperTimelineInfo { uint64 standby_horizon = 14; // A connection string to use for WAL receiving. string safekeeper_connstr = 10; - // HTTP endpoint connection string + // HTTP endpoint connection string. string http_connstr = 13; + // HTTPS endpoint connection string. + optional string https_connstr = 15; // Availability zone of a safekeeper. optional string availability_zone = 11; } diff --git a/storage_broker/src/bin/storage_broker.rs b/storage_broker/src/bin/storage_broker.rs index cc33ec20ff..f1bd7ba708 100644 --- a/storage_broker/src/bin/storage_broker.rs +++ b/storage_broker/src/bin/storage_broker.rs @@ -764,6 +764,7 @@ mod tests { peer_horizon_lsn: 5, safekeeper_connstr: "neon-1-sk-1.local:7676".to_owned(), http_connstr: "neon-1-sk-1.local:7677".to_owned(), + https_connstr: Some("neon-1-sk-1.local:7678".to_owned()), local_start_lsn: 0, availability_zone: None, standby_horizon: 0, diff --git a/storage_controller/client/src/control_api.rs b/storage_controller/client/src/control_api.rs index 7888b18aa7..7afc835675 100644 --- a/storage_controller/client/src/control_api.rs +++ b/storage_controller/client/src/control_api.rs @@ -10,13 +10,11 @@ pub struct Client { } impl Client { - pub fn new(base_url: Url, jwt_token: Option) -> Self { + pub fn new(http_client: reqwest::Client, base_url: Url, jwt_token: Option) -> Self { Self { base_url, jwt_token, - client: reqwest::ClientBuilder::new() - .build() - .expect("Failed to construct http client"), + client: http_client, } } diff --git a/storage_controller/src/compute_hook.rs b/storage_controller/src/compute_hook.rs index 0da35d6545..31ab443ccd 100644 --- a/storage_controller/src/compute_hook.rs +++ b/storage_controller/src/compute_hook.rs @@ -4,6 +4,7 @@ use std::error::Error as _; use std::sync::Arc; use std::time::Duration; +use anyhow::Context; use control_plane::endpoint::{ComputeControlPlane, EndpointStatus}; use control_plane::local_env::LocalEnv; use futures::StreamExt; @@ -364,25 +365,28 @@ pub(crate) struct ShardUpdate<'a> { } impl ComputeHook { - pub(super) fn new(config: Config) -> Self { + pub(super) fn new(config: Config) -> anyhow::Result { let authorization_header = config .control_plane_jwt_token .clone() .map(|jwt| format!("Bearer {}", jwt)); - let client = reqwest::ClientBuilder::new() - .timeout(NOTIFY_REQUEST_TIMEOUT) + let mut client = reqwest::ClientBuilder::new().timeout(NOTIFY_REQUEST_TIMEOUT); + for cert in &config.ssl_ca_certs { + client = client.add_root_certificate(cert.clone()); + } + let client = client .build() - .expect("Failed to construct HTTP client"); + .context("Failed to build http client for compute hook")?; - Self { + Ok(Self { state: Default::default(), config, authorization_header, neon_local_lock: Default::default(), api_concurrency: tokio::sync::Semaphore::new(API_CONCURRENCY), client, - } + }) } /// For test environments: use neon_local's LocalEnv to update compute diff --git a/storage_controller/src/http.rs b/storage_controller/src/http.rs index f06e83d720..0caf6e3766 100644 --- a/storage_controller/src/http.rs +++ b/storage_controller/src/http.rs @@ -1744,19 +1744,17 @@ async fn maybe_forward(req: Request) -> ForwardOutcome { // Use [`RECONCILE_TIMEOUT`] as the max amount of time a request should block for and // include some leeway to get the timeout for proxied requests. const PROXIED_REQUEST_TIMEOUT: Duration = Duration::from_secs(RECONCILE_TIMEOUT.as_secs() + 10); - let client = reqwest::ClientBuilder::new() - .timeout(PROXIED_REQUEST_TIMEOUT) - .build(); - let client = match client { - Ok(client) => client, - Err(err) => { - return ForwardOutcome::Forwarded(Err(ApiError::InternalServerError(anyhow::anyhow!( - "Failed to build leader client for forwarding while in stepped down state: {err}" - )))); - } - }; - let request: reqwest::Request = match convert_request(req, &client, leader.address).await { + let client = state.service.get_http_client().clone(); + + let request: reqwest::Request = match convert_request( + req, + &client, + leader.address, + PROXIED_REQUEST_TIMEOUT, + ) + .await + { Ok(r) => r, Err(err) => { return ForwardOutcome::Forwarded(Err(ApiError::InternalServerError(anyhow::anyhow!( @@ -1814,6 +1812,7 @@ async fn convert_request( req: hyper::Request, client: &reqwest::Client, to_address: String, + timeout: Duration, ) -> Result { use std::str::FromStr; @@ -1868,6 +1867,7 @@ async fn convert_request( .request(method, uri) .headers(headers) .body(body) + .timeout(timeout) .build() .map_err(|err| { ApiError::InternalServerError(anyhow::anyhow!("Request conversion failed: {err}")) diff --git a/storage_controller/src/leadership.rs b/storage_controller/src/leadership.rs index 5e1d6f3ec9..39c28d60a9 100644 --- a/storage_controller/src/leadership.rs +++ b/storage_controller/src/leadership.rs @@ -110,7 +110,20 @@ impl Leadership { ) -> Option { tracing::info!("Sending step down request to {leader:?}"); + let mut http_client = reqwest::Client::builder(); + for cert in &self.config.ssl_ca_certs { + http_client = http_client.add_root_certificate(cert.clone()); + } + let http_client = match http_client.build() { + Ok(http_client) => http_client, + Err(err) => { + tracing::error!("Failed to build client for leader step-down request: {err}"); + return None; + } + }; + let client = PeerClient::new( + http_client, Uri::try_from(leader.address.as_str()).expect("Failed to build leader URI"), self.config.peer_jwt_token.clone(), ); diff --git a/storage_controller/src/peer_client.rs b/storage_controller/src/peer_client.rs index f3f275dee0..604d1024ba 100644 --- a/storage_controller/src/peer_client.rs +++ b/storage_controller/src/peer_client.rs @@ -59,11 +59,11 @@ impl ResponseErrorMessageExt for reqwest::Response { pub(crate) struct GlobalObservedState(pub(crate) HashMap); impl PeerClient { - pub(crate) fn new(uri: Uri, jwt: Option) -> Self { + pub(crate) fn new(http_client: reqwest::Client, uri: Uri, jwt: Option) -> Self { Self { uri, jwt, - client: reqwest::Client::new(), + client: http_client, } } diff --git a/storage_controller/src/service.rs b/storage_controller/src/service.rs index 9f308d9a0b..50f642deaf 100644 --- a/storage_controller/src/service.rs +++ b/storage_controller/src/service.rs @@ -1711,7 +1711,7 @@ impl Service { ))), config: config.clone(), persistence, - compute_hook: Arc::new(ComputeHook::new(config.clone())), + compute_hook: Arc::new(ComputeHook::new(config.clone())?), result_tx, heartbeater_ps, heartbeater_sk, diff --git a/storage_scrubber/src/lib.rs b/storage_scrubber/src/lib.rs index 34e43fcc0b..071f0b9756 100644 --- a/storage_scrubber/src/lib.rs +++ b/storage_scrubber/src/lib.rs @@ -295,8 +295,8 @@ pub struct ControllerClientConfig { } impl ControllerClientConfig { - pub fn build_client(self) -> control_api::Client { - control_api::Client::new(self.controller_api, Some(self.controller_jwt)) + pub fn build_client(self, http_client: reqwest::Client) -> control_api::Client { + control_api::Client::new(http_client, self.controller_api, Some(self.controller_jwt)) } } diff --git a/storage_scrubber/src/main.rs b/storage_scrubber/src/main.rs index fb2ab02565..4823c43e10 100644 --- a/storage_scrubber/src/main.rs +++ b/storage_scrubber/src/main.rs @@ -3,7 +3,7 @@ use camino::Utf8PathBuf; use clap::{Parser, Subcommand}; use pageserver_api::controller_api::{MetadataHealthUpdateRequest, MetadataHealthUpdateResponse}; use pageserver_api::shard::TenantShardId; -use reqwest::{Method, Url}; +use reqwest::{Certificate, Method, Url}; use storage_controller_client::control_api; use storage_scrubber::garbage::{PurgeMode, find_garbage, purge_garbage}; use storage_scrubber::pageserver_physical_gc::{GcMode, pageserver_physical_gc}; @@ -41,6 +41,10 @@ struct Cli { /// If set to true, the scrubber will exit with error code on fatal error. #[arg(long, default_value_t = false)] exit_code: bool, + + /// Trusted root CA certificates to use in https APIs. + #[arg(long)] + ssl_ca_file: Option, } #[derive(Subcommand, Debug)] @@ -146,13 +150,28 @@ async fn main() -> anyhow::Result<()> { tracing::info!("version: {}, build_tag {}", GIT_VERSION, BUILD_TAG); + let ssl_ca_certs = match cli.ssl_ca_file.as_ref() { + Some(ssl_ca_file) => { + tracing::info!("Using ssl root CA file: {ssl_ca_file:?}"); + let buf = tokio::fs::read(ssl_ca_file).await?; + Certificate::from_pem_bundle(&buf)? + } + None => Vec::new(), + }; + + let mut http_client = reqwest::Client::builder(); + for cert in ssl_ca_certs { + http_client = http_client.add_root_certificate(cert); + } + let http_client = http_client.build()?; + let controller_client = cli.controller_api.map(|controller_api| { ControllerClientConfig { controller_api, // Default to no key: this is a convenience when working in a development environment controller_jwt: cli.controller_jwt.unwrap_or("".to_owned()), } - .build_client() + .build_client(http_client) }); match cli.command { diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index 11ff2921b9..86b6043552 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -1318,6 +1318,7 @@ class NeonEnv: "http_port": port.http, "https_port": port.https, "sync": config.safekeepers_enable_fsync, + "use_https_safekeeper_api": config.use_https_safekeeper_api, } if config.auth_enabled: sk_cfg["auth_enabled"] = True