diff --git a/Cargo.lock b/Cargo.lock index 870401e7f9..af5c271686 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4285,6 +4285,7 @@ dependencies = [ "pageserver_api", "pageserver_client", "pageserver_compaction", + "pem", "pin-project-lite", "postgres-protocol", "postgres-types", @@ -6001,6 +6002,7 @@ dependencies = [ "once_cell", "pageserver_api", "parking_lot 0.12.1", + "pem", "postgres-protocol", "postgres_backend", "postgres_ffi", diff --git a/pageserver/Cargo.toml b/pageserver/Cargo.toml index 5c5bab0642..fee78aa94d 100644 --- a/pageserver/Cargo.toml +++ b/pageserver/Cargo.toml @@ -78,6 +78,7 @@ metrics.workspace = true pageserver_api.workspace = true pageserver_client.workspace = true # for ResponseErrorMessageExt TOOD refactor that pageserver_compaction.workspace = true +pem.workspace = true postgres_connection.workspace = true postgres_ffi.workspace = true pq_proto.workspace = true diff --git a/pageserver/src/bin/pageserver.rs b/pageserver/src/bin/pageserver.rs index 250d4180f5..6cfaec955b 100644 --- a/pageserver/src/bin/pageserver.rs +++ b/pageserver/src/bin/pageserver.rs @@ -416,8 +416,18 @@ fn start_pageserver( // The storage_broker::connect call needs to happen inside a tokio runtime thread. let broker_client = WALRECEIVER_RUNTIME .block_on(async { + let tls_config = storage_broker::ClientTlsConfig::new().ca_certificates( + conf.ssl_ca_certs + .iter() + .map(pem::encode) + .map(storage_broker::Certificate::from_pem), + ); // Note: we do not attempt connecting here (but validate endpoints sanity). - storage_broker::connect(conf.broker_endpoint.clone(), conf.broker_keepalive_interval) + storage_broker::connect( + conf.broker_endpoint.clone(), + conf.broker_keepalive_interval, + tls_config, + ) }) .with_context(|| { format!( diff --git a/pageserver/src/config.rs b/pageserver/src/config.rs index c12ac32b7e..d4bfed95a1 100644 --- a/pageserver/src/config.rs +++ b/pageserver/src/config.rs @@ -17,9 +17,10 @@ use once_cell::sync::OnceCell; use pageserver_api::config::{DiskUsageEvictionTaskConfig, MaxVectoredReadBytes}; use pageserver_api::models::ImageCompressionAlgorithm; use pageserver_api::shard::TenantShardId; +use pem::Pem; use postgres_backend::AuthType; use remote_storage::{RemotePath, RemoteStorageConfig}; -use reqwest::{Certificate, Url}; +use reqwest::Url; use storage_broker::Uri; use utils::id::{NodeId, TimelineId}; use utils::logging::{LogFormat, SecretString}; @@ -67,8 +68,8 @@ pub struct PageServerConf { /// Period to reload certificate and private key from files. /// Default: 60s. pub ssl_cert_reload_period: Duration, - /// Trusted root CA certificates to use in https APIs. - pub ssl_ca_certs: Vec, + /// Trusted root CA certificates to use in https APIs in PEM format. + pub ssl_ca_certs: Vec, /// Current availability zone. Used for traffic metrics. pub availability_zone: Option, @@ -497,7 +498,10 @@ impl PageServerConf { ssl_ca_certs: match ssl_ca_file { Some(ssl_ca_file) => { let buf = std::fs::read(ssl_ca_file)?; - Certificate::from_pem_bundle(&buf)? + pem::parse_many(&buf)? + .into_iter() + .filter(|pem| pem.tag() == "CERTIFICATE") + .collect() } None => Vec::new(), }, diff --git a/pageserver/src/controller_upcall_client.rs b/pageserver/src/controller_upcall_client.rs index fd5fbfcba9..ed52823c20 100644 --- a/pageserver/src/controller_upcall_client.rs +++ b/pageserver/src/controller_upcall_client.rs @@ -8,6 +8,7 @@ use pageserver_api::upcall_api::{ ReAttachRequest, ReAttachResponse, ReAttachResponseTenant, ValidateRequest, ValidateRequestTenant, ValidateResponse, }; +use reqwest::Certificate; use serde::Serialize; use serde::de::DeserializeOwned; use tokio_util::sync::CancellationToken; @@ -76,8 +77,8 @@ impl StorageControllerUpcallClient { client = client.default_headers(headers); } - for ssl_ca_cert in &conf.ssl_ca_certs { - client = client.add_root_certificate(ssl_ca_cert.clone()); + for cert in &conf.ssl_ca_certs { + client = client.add_root_certificate(Certificate::from_der(cert.contents())?); } Ok(Some(Self { diff --git a/pageserver/src/tenant/timeline/import_pgdata/upcall_api.rs b/pageserver/src/tenant/timeline/import_pgdata/upcall_api.rs index 352bbbc4d4..99081a65e0 100644 --- a/pageserver/src/tenant/timeline/import_pgdata/upcall_api.rs +++ b/pageserver/src/tenant/timeline/import_pgdata/upcall_api.rs @@ -1,6 +1,6 @@ //! FIXME: most of this is copy-paste from mgmt_api.rs ; dedupe into a `reqwest_utils::Client` crate. use pageserver_client::mgmt_api::{Error, ResponseErrorMessageExt}; -use reqwest::Method; +use reqwest::{Certificate, Method}; use serde::{Deserialize, Serialize}; use tokio_util::sync::CancellationToken; use tracing::error; @@ -34,7 +34,7 @@ impl Client { }; let mut http_client = reqwest::Client::builder(); for cert in &conf.ssl_ca_certs { - http_client = http_client.add_root_certificate(cert.clone()); + http_client = http_client.add_root_certificate(Certificate::from_der(cert.contents())?); } let http_client = http_client.build()?; diff --git a/safekeeper/Cargo.toml b/safekeeper/Cargo.toml index a0ba69aa34..0a8cc415be 100644 --- a/safekeeper/Cargo.toml +++ b/safekeeper/Cargo.toml @@ -55,6 +55,7 @@ tokio-util = { workspace = true } tracing.workspace = true url.workspace = true metrics.workspace = true +pem.workspace = true postgres_backend.workspace = true postgres_ffi.workspace = true pq_proto.workspace = true diff --git a/safekeeper/src/bin/safekeeper.rs b/safekeeper/src/bin/safekeeper.rs index b8c122ea72..5fc742cda7 100644 --- a/safekeeper/src/bin/safekeeper.rs +++ b/safekeeper/src/bin/safekeeper.rs @@ -16,7 +16,6 @@ use futures::stream::FuturesUnordered; use futures::{FutureExt, StreamExt}; use metrics::set_build_info_metric; use remote_storage::RemoteStorageConfig; -use reqwest::Certificate; use safekeeper::defaults::{ DEFAULT_CONTROL_FILE_SAVE_INTERVAL, DEFAULT_EVICTION_MIN_RESIDENT, DEFAULT_HEARTBEAT_TIMEOUT, DEFAULT_HTTP_LISTEN_ADDR, DEFAULT_MAX_OFFLOADER_LAG_BYTES, DEFAULT_PARTIAL_BACKUP_CONCURRENCY, @@ -373,7 +372,10 @@ async fn main() -> anyhow::Result<()> { Some(ssl_ca_file) => { tracing::info!("Using ssl root CA file: {ssl_ca_file:?}"); let buf = tokio::fs::read(ssl_ca_file).await?; - Certificate::from_pem_bundle(&buf)? + pem::parse_many(&buf)? + .into_iter() + .filter(|pem| pem.tag() == "CERTIFICATE") + .collect() } None => Vec::new(), }; diff --git a/safekeeper/src/broker.rs b/safekeeper/src/broker.rs index de6e275124..3b15cf8d70 100644 --- a/safekeeper/src/broker.rs +++ b/safekeeper/src/broker.rs @@ -24,6 +24,15 @@ use crate::{GlobalTimelines, SafeKeeperConf}; const RETRY_INTERVAL_MSEC: u64 = 1000; const PUSH_INTERVAL_MSEC: u64 = 1000; +fn make_tls_config(conf: &SafeKeeperConf) -> storage_broker::ClientTlsConfig { + storage_broker::ClientTlsConfig::new().ca_certificates( + conf.ssl_ca_certs + .iter() + .map(pem::encode) + .map(storage_broker::Certificate::from_pem), + ) +} + /// Push once in a while data about all active timelines to the broker. async fn push_loop( conf: Arc, @@ -37,8 +46,11 @@ async fn push_loop( let active_timelines_set = global_timelines.get_global_broker_active_set(); - let mut client = - storage_broker::connect(conf.broker_endpoint.clone(), conf.broker_keepalive_interval)?; + let mut client = storage_broker::connect( + conf.broker_endpoint.clone(), + conf.broker_keepalive_interval, + make_tls_config(&conf), + )?; let push_interval = Duration::from_millis(PUSH_INTERVAL_MSEC); let outbound = async_stream::stream! { @@ -81,8 +93,11 @@ async fn pull_loop( global_timelines: Arc, stats: Arc, ) -> Result<()> { - let mut client = - storage_broker::connect(conf.broker_endpoint.clone(), conf.broker_keepalive_interval)?; + let mut client = storage_broker::connect( + conf.broker_endpoint.clone(), + conf.broker_keepalive_interval, + make_tls_config(&conf), + )?; // TODO: subscribe only to local timelines instead of all let request = SubscribeSafekeeperInfoRequest { @@ -134,8 +149,11 @@ async fn discover_loop( global_timelines: Arc, stats: Arc, ) -> Result<()> { - let mut client = - storage_broker::connect(conf.broker_endpoint.clone(), conf.broker_keepalive_interval)?; + let mut client = storage_broker::connect( + conf.broker_endpoint.clone(), + conf.broker_keepalive_interval, + make_tls_config(&conf), + )?; let request = SubscribeByFilterRequest { types: vec![TypeSubscription { diff --git a/safekeeper/src/http/routes.rs b/safekeeper/src/http/routes.rs index 312456e5b2..2b2d721db2 100644 --- a/safekeeper/src/http/routes.rs +++ b/safekeeper/src/http/routes.rs @@ -14,6 +14,7 @@ use http_utils::json::{json_request, json_response}; use http_utils::request::{ensure_no_body, parse_query_param, parse_request_param}; use http_utils::{RequestExt, RouterBuilder}; use hyper::{Body, Request, Response, StatusCode}; +use pem::Pem; use postgres_ffi::WAL_SEGMENT_SIZE; use safekeeper_api::models::{ AcceptorStateStatus, PullTimelineRequest, SafekeeperStatus, SkTimelineInfo, TenantDeleteResult, @@ -230,14 +231,20 @@ async fn timeline_pull_handler(mut request: Request) -> Result, _>>() + .map_err(|e| { + ApiError::InternalServerError(anyhow::anyhow!("failed to parse CA certs: {e}")) + })?; + + let resp = + pull_timeline::handle_request(data, conf.sk_auth_token.clone(), ca_certs, global_timelines) + .await + .map_err(ApiError::InternalServerError)?; json_response(StatusCode::OK, resp) } diff --git a/safekeeper/src/lib.rs b/safekeeper/src/lib.rs index 3ca51ba40a..9f7580a313 100644 --- a/safekeeper/src/lib.rs +++ b/safekeeper/src/lib.rs @@ -6,8 +6,8 @@ use std::time::Duration; use camino::Utf8PathBuf; use once_cell::sync::Lazy; +use pem::Pem; use remote_storage::RemoteStorageConfig; -use reqwest::Certificate; use storage_broker::Uri; use tokio::runtime::Runtime; use utils::auth::SwappableJwtAuth; @@ -120,7 +120,7 @@ pub struct SafeKeeperConf { pub ssl_key_file: Utf8PathBuf, pub ssl_cert_file: Utf8PathBuf, pub ssl_cert_reload_period: Duration, - pub ssl_ca_certs: Vec, + pub ssl_ca_certs: Vec, pub use_https_safekeeper_api: bool, } diff --git a/safekeeper/src/recovery.rs b/safekeeper/src/recovery.rs index 25b40f5d2e..577a2f694e 100644 --- a/safekeeper/src/recovery.rs +++ b/safekeeper/src/recovery.rs @@ -8,6 +8,7 @@ use std::time::SystemTime; use anyhow::{Context, bail}; use futures::StreamExt; use postgres_protocol::message::backend::ReplicationMessage; +use reqwest::Certificate; use safekeeper_api::Term; use safekeeper_api::membership::INVALID_GENERATION; use safekeeper_api::models::{PeerInfo, TimelineStatus}; @@ -241,7 +242,7 @@ async fn recover( let mut client = reqwest::Client::builder(); for cert in &conf.ssl_ca_certs { - client = client.add_root_certificate(cert.clone()); + client = client.add_root_certificate(Certificate::from_der(cert.contents())?); } let client = client .build() diff --git a/storage_broker/benches/rps.rs b/storage_broker/benches/rps.rs index 0fef6a58e0..9953ccfa91 100644 --- a/storage_broker/benches/rps.rs +++ b/storage_broker/benches/rps.rs @@ -87,7 +87,12 @@ fn tli_from_u64(i: u64) -> Vec { async fn subscribe(client: Option, counter: Arc, i: u64) { let mut client = match client { Some(c) => c, - None => storage_broker::connect(DEFAULT_ENDPOINT, Duration::from_secs(5)).unwrap(), + None => storage_broker::connect( + DEFAULT_ENDPOINT, + Duration::from_secs(5), + storage_broker::ClientTlsConfig::new(), + ) + .unwrap(), }; let ttid = ProtoTenantTimelineId { @@ -119,7 +124,12 @@ async fn subscribe(client: Option, counter: Arc, async fn publish(client: Option, n_keys: u64) { let mut client = match client { Some(c) => c, - None => storage_broker::connect(DEFAULT_ENDPOINT, Duration::from_secs(5)).unwrap(), + None => storage_broker::connect( + DEFAULT_ENDPOINT, + Duration::from_secs(5), + storage_broker::ClientTlsConfig::new(), + ) + .unwrap(), }; let mut counter: u64 = 0; @@ -164,7 +174,12 @@ async fn main() -> Result<(), Box> { } let h = tokio::spawn(progress_reporter(counters.clone())); - let c = storage_broker::connect(DEFAULT_ENDPOINT, Duration::from_secs(5)).unwrap(); + let c = storage_broker::connect( + DEFAULT_ENDPOINT, + Duration::from_secs(5), + storage_broker::ClientTlsConfig::new(), + ) + .unwrap(); for i in 0..args.num_subs { let c = Some(c.clone()); diff --git a/storage_broker/src/lib.rs b/storage_broker/src/lib.rs index 7b36f5e948..149656a191 100644 --- a/storage_broker/src/lib.rs +++ b/storage_broker/src/lib.rs @@ -4,7 +4,7 @@ use proto::TenantTimelineId as ProtoTenantTimelineId; use proto::broker_service_client::BrokerServiceClient; use tonic::Status; use tonic::codegen::StdError; -use tonic::transport::{Channel, ClientTlsConfig, Endpoint}; +use tonic::transport::{Channel, Endpoint}; use utils::id::{TenantId, TenantTimelineId, TimelineId}; // Code generated by protobuf. @@ -20,6 +20,7 @@ pub mod metrics; // Re-exports to avoid direct tonic dependency in user crates. pub use hyper::Uri; +pub use tonic::transport::{Certificate, ClientTlsConfig}; pub use tonic::{Code, Request, Streaming}; pub const DEFAULT_LISTEN_ADDR: &str = "127.0.0.1:50051"; @@ -38,7 +39,11 @@ pub type BrokerClientChannel = BrokerServiceClient; // // NB: this function is not async, but still must be run on a tokio runtime thread // because that's a requirement of tonic_endpoint.connect_lazy()'s Channel::new call. -pub fn connect(endpoint: U, keepalive_interval: Duration) -> anyhow::Result +pub fn connect( + endpoint: U, + keepalive_interval: Duration, + tls_config: ClientTlsConfig, +) -> anyhow::Result where U: std::convert::TryInto, U::Error: std::error::Error + Send + Sync + 'static, @@ -54,8 +59,7 @@ where rustls::crypto::ring::default_provider() .install_default() .ok(); - let tls = ClientTlsConfig::new(); - tonic_endpoint = tonic_endpoint.tls_config(tls)?; + tonic_endpoint = tonic_endpoint.tls_config(tls_config)?; } tonic_endpoint = tonic_endpoint .http2_keep_alive_interval(keepalive_interval)