pageserver + safekeeper: pass ssl ca certs to broker client (#11635)

## Problem
Pageservers and safakeepers do not pass CA certificates to broker
client, so the client do not trust locally issued certificates.
- Part of https://github.com/neondatabase/cloud/issues/27492

## Summary of changes
- Change `ssl_ca_certs` type in PS/SK's config to `Pem` which may be
converted to both `reqwest` and `tonic` certificates.
- Pass CA certificates to storage broker client in PS and SK
This commit is contained in:
Dmitrii Kovalkov
2025-04-18 10:27:23 +04:00
committed by GitHub
parent 5073e46df4
commit a0d844dfed
14 changed files with 101 additions and 35 deletions

2
Cargo.lock generated
View File

@@ -4285,6 +4285,7 @@ dependencies = [
"pageserver_api", "pageserver_api",
"pageserver_client", "pageserver_client",
"pageserver_compaction", "pageserver_compaction",
"pem",
"pin-project-lite", "pin-project-lite",
"postgres-protocol", "postgres-protocol",
"postgres-types", "postgres-types",
@@ -6001,6 +6002,7 @@ dependencies = [
"once_cell", "once_cell",
"pageserver_api", "pageserver_api",
"parking_lot 0.12.1", "parking_lot 0.12.1",
"pem",
"postgres-protocol", "postgres-protocol",
"postgres_backend", "postgres_backend",
"postgres_ffi", "postgres_ffi",

View File

@@ -78,6 +78,7 @@ metrics.workspace = true
pageserver_api.workspace = true pageserver_api.workspace = true
pageserver_client.workspace = true # for ResponseErrorMessageExt TOOD refactor that pageserver_client.workspace = true # for ResponseErrorMessageExt TOOD refactor that
pageserver_compaction.workspace = true pageserver_compaction.workspace = true
pem.workspace = true
postgres_connection.workspace = true postgres_connection.workspace = true
postgres_ffi.workspace = true postgres_ffi.workspace = true
pq_proto.workspace = true pq_proto.workspace = true

View File

@@ -416,8 +416,18 @@ fn start_pageserver(
// The storage_broker::connect call needs to happen inside a tokio runtime thread. // The storage_broker::connect call needs to happen inside a tokio runtime thread.
let broker_client = WALRECEIVER_RUNTIME let broker_client = WALRECEIVER_RUNTIME
.block_on(async { .block_on(async {
let tls_config = storage_broker::ClientTlsConfig::new().ca_certificates(
conf.ssl_ca_certs
.iter()
.map(pem::encode)
.map(storage_broker::Certificate::from_pem),
);
// Note: we do not attempt connecting here (but validate endpoints sanity). // Note: we do not attempt connecting here (but validate endpoints sanity).
storage_broker::connect(conf.broker_endpoint.clone(), conf.broker_keepalive_interval) storage_broker::connect(
conf.broker_endpoint.clone(),
conf.broker_keepalive_interval,
tls_config,
)
}) })
.with_context(|| { .with_context(|| {
format!( format!(

View File

@@ -17,9 +17,10 @@ use once_cell::sync::OnceCell;
use pageserver_api::config::{DiskUsageEvictionTaskConfig, MaxVectoredReadBytes}; use pageserver_api::config::{DiskUsageEvictionTaskConfig, MaxVectoredReadBytes};
use pageserver_api::models::ImageCompressionAlgorithm; use pageserver_api::models::ImageCompressionAlgorithm;
use pageserver_api::shard::TenantShardId; use pageserver_api::shard::TenantShardId;
use pem::Pem;
use postgres_backend::AuthType; use postgres_backend::AuthType;
use remote_storage::{RemotePath, RemoteStorageConfig}; use remote_storage::{RemotePath, RemoteStorageConfig};
use reqwest::{Certificate, Url}; use reqwest::Url;
use storage_broker::Uri; use storage_broker::Uri;
use utils::id::{NodeId, TimelineId}; use utils::id::{NodeId, TimelineId};
use utils::logging::{LogFormat, SecretString}; use utils::logging::{LogFormat, SecretString};
@@ -67,8 +68,8 @@ pub struct PageServerConf {
/// Period to reload certificate and private key from files. /// Period to reload certificate and private key from files.
/// Default: 60s. /// Default: 60s.
pub ssl_cert_reload_period: Duration, pub ssl_cert_reload_period: Duration,
/// Trusted root CA certificates to use in https APIs. /// Trusted root CA certificates to use in https APIs in PEM format.
pub ssl_ca_certs: Vec<Certificate>, pub ssl_ca_certs: Vec<Pem>,
/// Current availability zone. Used for traffic metrics. /// Current availability zone. Used for traffic metrics.
pub availability_zone: Option<String>, pub availability_zone: Option<String>,
@@ -497,7 +498,10 @@ impl PageServerConf {
ssl_ca_certs: match ssl_ca_file { ssl_ca_certs: match ssl_ca_file {
Some(ssl_ca_file) => { Some(ssl_ca_file) => {
let buf = std::fs::read(ssl_ca_file)?; let buf = std::fs::read(ssl_ca_file)?;
Certificate::from_pem_bundle(&buf)? pem::parse_many(&buf)?
.into_iter()
.filter(|pem| pem.tag() == "CERTIFICATE")
.collect()
} }
None => Vec::new(), None => Vec::new(),
}, },

View File

@@ -8,6 +8,7 @@ use pageserver_api::upcall_api::{
ReAttachRequest, ReAttachResponse, ReAttachResponseTenant, ValidateRequest, ReAttachRequest, ReAttachResponse, ReAttachResponseTenant, ValidateRequest,
ValidateRequestTenant, ValidateResponse, ValidateRequestTenant, ValidateResponse,
}; };
use reqwest::Certificate;
use serde::Serialize; use serde::Serialize;
use serde::de::DeserializeOwned; use serde::de::DeserializeOwned;
use tokio_util::sync::CancellationToken; use tokio_util::sync::CancellationToken;
@@ -76,8 +77,8 @@ impl StorageControllerUpcallClient {
client = client.default_headers(headers); client = client.default_headers(headers);
} }
for ssl_ca_cert in &conf.ssl_ca_certs { for cert in &conf.ssl_ca_certs {
client = client.add_root_certificate(ssl_ca_cert.clone()); client = client.add_root_certificate(Certificate::from_der(cert.contents())?);
} }
Ok(Some(Self { Ok(Some(Self {

View File

@@ -1,6 +1,6 @@
//! FIXME: most of this is copy-paste from mgmt_api.rs ; dedupe into a `reqwest_utils::Client` crate. //! FIXME: most of this is copy-paste from mgmt_api.rs ; dedupe into a `reqwest_utils::Client` crate.
use pageserver_client::mgmt_api::{Error, ResponseErrorMessageExt}; use pageserver_client::mgmt_api::{Error, ResponseErrorMessageExt};
use reqwest::Method; use reqwest::{Certificate, Method};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use tokio_util::sync::CancellationToken; use tokio_util::sync::CancellationToken;
use tracing::error; use tracing::error;
@@ -34,7 +34,7 @@ impl Client {
}; };
let mut http_client = reqwest::Client::builder(); let mut http_client = reqwest::Client::builder();
for cert in &conf.ssl_ca_certs { for cert in &conf.ssl_ca_certs {
http_client = http_client.add_root_certificate(cert.clone()); http_client = http_client.add_root_certificate(Certificate::from_der(cert.contents())?);
} }
let http_client = http_client.build()?; let http_client = http_client.build()?;

View File

@@ -55,6 +55,7 @@ tokio-util = { workspace = true }
tracing.workspace = true tracing.workspace = true
url.workspace = true url.workspace = true
metrics.workspace = true metrics.workspace = true
pem.workspace = true
postgres_backend.workspace = true postgres_backend.workspace = true
postgres_ffi.workspace = true postgres_ffi.workspace = true
pq_proto.workspace = true pq_proto.workspace = true

View File

@@ -16,7 +16,6 @@ use futures::stream::FuturesUnordered;
use futures::{FutureExt, StreamExt}; use futures::{FutureExt, StreamExt};
use metrics::set_build_info_metric; use metrics::set_build_info_metric;
use remote_storage::RemoteStorageConfig; use remote_storage::RemoteStorageConfig;
use reqwest::Certificate;
use safekeeper::defaults::{ use safekeeper::defaults::{
DEFAULT_CONTROL_FILE_SAVE_INTERVAL, DEFAULT_EVICTION_MIN_RESIDENT, DEFAULT_HEARTBEAT_TIMEOUT, DEFAULT_CONTROL_FILE_SAVE_INTERVAL, DEFAULT_EVICTION_MIN_RESIDENT, DEFAULT_HEARTBEAT_TIMEOUT,
DEFAULT_HTTP_LISTEN_ADDR, DEFAULT_MAX_OFFLOADER_LAG_BYTES, DEFAULT_PARTIAL_BACKUP_CONCURRENCY, DEFAULT_HTTP_LISTEN_ADDR, DEFAULT_MAX_OFFLOADER_LAG_BYTES, DEFAULT_PARTIAL_BACKUP_CONCURRENCY,
@@ -373,7 +372,10 @@ async fn main() -> anyhow::Result<()> {
Some(ssl_ca_file) => { Some(ssl_ca_file) => {
tracing::info!("Using ssl root CA file: {ssl_ca_file:?}"); tracing::info!("Using ssl root CA file: {ssl_ca_file:?}");
let buf = tokio::fs::read(ssl_ca_file).await?; let buf = tokio::fs::read(ssl_ca_file).await?;
Certificate::from_pem_bundle(&buf)? pem::parse_many(&buf)?
.into_iter()
.filter(|pem| pem.tag() == "CERTIFICATE")
.collect()
} }
None => Vec::new(), None => Vec::new(),
}; };

View File

@@ -24,6 +24,15 @@ use crate::{GlobalTimelines, SafeKeeperConf};
const RETRY_INTERVAL_MSEC: u64 = 1000; const RETRY_INTERVAL_MSEC: u64 = 1000;
const PUSH_INTERVAL_MSEC: u64 = 1000; const PUSH_INTERVAL_MSEC: u64 = 1000;
fn make_tls_config(conf: &SafeKeeperConf) -> storage_broker::ClientTlsConfig {
storage_broker::ClientTlsConfig::new().ca_certificates(
conf.ssl_ca_certs
.iter()
.map(pem::encode)
.map(storage_broker::Certificate::from_pem),
)
}
/// Push once in a while data about all active timelines to the broker. /// Push once in a while data about all active timelines to the broker.
async fn push_loop( async fn push_loop(
conf: Arc<SafeKeeperConf>, conf: Arc<SafeKeeperConf>,
@@ -37,8 +46,11 @@ async fn push_loop(
let active_timelines_set = global_timelines.get_global_broker_active_set(); let active_timelines_set = global_timelines.get_global_broker_active_set();
let mut client = let mut client = storage_broker::connect(
storage_broker::connect(conf.broker_endpoint.clone(), conf.broker_keepalive_interval)?; conf.broker_endpoint.clone(),
conf.broker_keepalive_interval,
make_tls_config(&conf),
)?;
let push_interval = Duration::from_millis(PUSH_INTERVAL_MSEC); let push_interval = Duration::from_millis(PUSH_INTERVAL_MSEC);
let outbound = async_stream::stream! { let outbound = async_stream::stream! {
@@ -81,8 +93,11 @@ async fn pull_loop(
global_timelines: Arc<GlobalTimelines>, global_timelines: Arc<GlobalTimelines>,
stats: Arc<BrokerStats>, stats: Arc<BrokerStats>,
) -> Result<()> { ) -> Result<()> {
let mut client = let mut client = storage_broker::connect(
storage_broker::connect(conf.broker_endpoint.clone(), conf.broker_keepalive_interval)?; conf.broker_endpoint.clone(),
conf.broker_keepalive_interval,
make_tls_config(&conf),
)?;
// TODO: subscribe only to local timelines instead of all // TODO: subscribe only to local timelines instead of all
let request = SubscribeSafekeeperInfoRequest { let request = SubscribeSafekeeperInfoRequest {
@@ -134,8 +149,11 @@ async fn discover_loop(
global_timelines: Arc<GlobalTimelines>, global_timelines: Arc<GlobalTimelines>,
stats: Arc<BrokerStats>, stats: Arc<BrokerStats>,
) -> Result<()> { ) -> Result<()> {
let mut client = let mut client = storage_broker::connect(
storage_broker::connect(conf.broker_endpoint.clone(), conf.broker_keepalive_interval)?; conf.broker_endpoint.clone(),
conf.broker_keepalive_interval,
make_tls_config(&conf),
)?;
let request = SubscribeByFilterRequest { let request = SubscribeByFilterRequest {
types: vec![TypeSubscription { types: vec![TypeSubscription {

View File

@@ -14,6 +14,7 @@ use http_utils::json::{json_request, json_response};
use http_utils::request::{ensure_no_body, parse_query_param, parse_request_param}; use http_utils::request::{ensure_no_body, parse_query_param, parse_request_param};
use http_utils::{RequestExt, RouterBuilder}; use http_utils::{RequestExt, RouterBuilder};
use hyper::{Body, Request, Response, StatusCode}; use hyper::{Body, Request, Response, StatusCode};
use pem::Pem;
use postgres_ffi::WAL_SEGMENT_SIZE; use postgres_ffi::WAL_SEGMENT_SIZE;
use safekeeper_api::models::{ use safekeeper_api::models::{
AcceptorStateStatus, PullTimelineRequest, SafekeeperStatus, SkTimelineInfo, TenantDeleteResult, AcceptorStateStatus, PullTimelineRequest, SafekeeperStatus, SkTimelineInfo, TenantDeleteResult,
@@ -230,14 +231,20 @@ async fn timeline_pull_handler(mut request: Request<Body>) -> Result<Response<Bo
let conf = get_conf(&request); let conf = get_conf(&request);
let global_timelines = get_global_timelines(&request); let global_timelines = get_global_timelines(&request);
let resp = pull_timeline::handle_request( let ca_certs = conf
data, .ssl_ca_certs
conf.sk_auth_token.clone(), .iter()
conf.ssl_ca_certs.clone(), .map(Pem::contents)
global_timelines, .map(reqwest::Certificate::from_der)
) .collect::<Result<Vec<_>, _>>()
.await .map_err(|e| {
.map_err(ApiError::InternalServerError)?; ApiError::InternalServerError(anyhow::anyhow!("failed to parse CA certs: {e}"))
})?;
let resp =
pull_timeline::handle_request(data, conf.sk_auth_token.clone(), ca_certs, global_timelines)
.await
.map_err(ApiError::InternalServerError)?;
json_response(StatusCode::OK, resp) json_response(StatusCode::OK, resp)
} }

View File

@@ -6,8 +6,8 @@ use std::time::Duration;
use camino::Utf8PathBuf; use camino::Utf8PathBuf;
use once_cell::sync::Lazy; use once_cell::sync::Lazy;
use pem::Pem;
use remote_storage::RemoteStorageConfig; use remote_storage::RemoteStorageConfig;
use reqwest::Certificate;
use storage_broker::Uri; use storage_broker::Uri;
use tokio::runtime::Runtime; use tokio::runtime::Runtime;
use utils::auth::SwappableJwtAuth; use utils::auth::SwappableJwtAuth;
@@ -120,7 +120,7 @@ pub struct SafeKeeperConf {
pub ssl_key_file: Utf8PathBuf, pub ssl_key_file: Utf8PathBuf,
pub ssl_cert_file: Utf8PathBuf, pub ssl_cert_file: Utf8PathBuf,
pub ssl_cert_reload_period: Duration, pub ssl_cert_reload_period: Duration,
pub ssl_ca_certs: Vec<Certificate>, pub ssl_ca_certs: Vec<Pem>,
pub use_https_safekeeper_api: bool, pub use_https_safekeeper_api: bool,
} }

View File

@@ -8,6 +8,7 @@ use std::time::SystemTime;
use anyhow::{Context, bail}; use anyhow::{Context, bail};
use futures::StreamExt; use futures::StreamExt;
use postgres_protocol::message::backend::ReplicationMessage; use postgres_protocol::message::backend::ReplicationMessage;
use reqwest::Certificate;
use safekeeper_api::Term; use safekeeper_api::Term;
use safekeeper_api::membership::INVALID_GENERATION; use safekeeper_api::membership::INVALID_GENERATION;
use safekeeper_api::models::{PeerInfo, TimelineStatus}; use safekeeper_api::models::{PeerInfo, TimelineStatus};
@@ -241,7 +242,7 @@ async fn recover(
let mut client = reqwest::Client::builder(); let mut client = reqwest::Client::builder();
for cert in &conf.ssl_ca_certs { for cert in &conf.ssl_ca_certs {
client = client.add_root_certificate(cert.clone()); client = client.add_root_certificate(Certificate::from_der(cert.contents())?);
} }
let client = client let client = client
.build() .build()

View File

@@ -87,7 +87,12 @@ fn tli_from_u64(i: u64) -> Vec<u8> {
async fn subscribe(client: Option<BrokerClientChannel>, counter: Arc<AtomicU64>, i: u64) { async fn subscribe(client: Option<BrokerClientChannel>, counter: Arc<AtomicU64>, i: u64) {
let mut client = match client { let mut client = match client {
Some(c) => c, Some(c) => c,
None => storage_broker::connect(DEFAULT_ENDPOINT, Duration::from_secs(5)).unwrap(), None => storage_broker::connect(
DEFAULT_ENDPOINT,
Duration::from_secs(5),
storage_broker::ClientTlsConfig::new(),
)
.unwrap(),
}; };
let ttid = ProtoTenantTimelineId { let ttid = ProtoTenantTimelineId {
@@ -119,7 +124,12 @@ async fn subscribe(client: Option<BrokerClientChannel>, counter: Arc<AtomicU64>,
async fn publish(client: Option<BrokerClientChannel>, n_keys: u64) { async fn publish(client: Option<BrokerClientChannel>, n_keys: u64) {
let mut client = match client { let mut client = match client {
Some(c) => c, Some(c) => c,
None => storage_broker::connect(DEFAULT_ENDPOINT, Duration::from_secs(5)).unwrap(), None => storage_broker::connect(
DEFAULT_ENDPOINT,
Duration::from_secs(5),
storage_broker::ClientTlsConfig::new(),
)
.unwrap(),
}; };
let mut counter: u64 = 0; let mut counter: u64 = 0;
@@ -164,7 +174,12 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
} }
let h = tokio::spawn(progress_reporter(counters.clone())); let h = tokio::spawn(progress_reporter(counters.clone()));
let c = storage_broker::connect(DEFAULT_ENDPOINT, Duration::from_secs(5)).unwrap(); let c = storage_broker::connect(
DEFAULT_ENDPOINT,
Duration::from_secs(5),
storage_broker::ClientTlsConfig::new(),
)
.unwrap();
for i in 0..args.num_subs { for i in 0..args.num_subs {
let c = Some(c.clone()); let c = Some(c.clone());

View File

@@ -4,7 +4,7 @@ use proto::TenantTimelineId as ProtoTenantTimelineId;
use proto::broker_service_client::BrokerServiceClient; use proto::broker_service_client::BrokerServiceClient;
use tonic::Status; use tonic::Status;
use tonic::codegen::StdError; use tonic::codegen::StdError;
use tonic::transport::{Channel, ClientTlsConfig, Endpoint}; use tonic::transport::{Channel, Endpoint};
use utils::id::{TenantId, TenantTimelineId, TimelineId}; use utils::id::{TenantId, TenantTimelineId, TimelineId};
// Code generated by protobuf. // Code generated by protobuf.
@@ -20,6 +20,7 @@ pub mod metrics;
// Re-exports to avoid direct tonic dependency in user crates. // Re-exports to avoid direct tonic dependency in user crates.
pub use hyper::Uri; pub use hyper::Uri;
pub use tonic::transport::{Certificate, ClientTlsConfig};
pub use tonic::{Code, Request, Streaming}; pub use tonic::{Code, Request, Streaming};
pub const DEFAULT_LISTEN_ADDR: &str = "127.0.0.1:50051"; pub const DEFAULT_LISTEN_ADDR: &str = "127.0.0.1:50051";
@@ -38,7 +39,11 @@ pub type BrokerClientChannel = BrokerServiceClient<Channel>;
// //
// NB: this function is not async, but still must be run on a tokio runtime thread // NB: this function is not async, but still must be run on a tokio runtime thread
// because that's a requirement of tonic_endpoint.connect_lazy()'s Channel::new call. // because that's a requirement of tonic_endpoint.connect_lazy()'s Channel::new call.
pub fn connect<U>(endpoint: U, keepalive_interval: Duration) -> anyhow::Result<BrokerClientChannel> pub fn connect<U>(
endpoint: U,
keepalive_interval: Duration,
tls_config: ClientTlsConfig,
) -> anyhow::Result<BrokerClientChannel>
where where
U: std::convert::TryInto<Uri>, U: std::convert::TryInto<Uri>,
U::Error: std::error::Error + Send + Sync + 'static, U::Error: std::error::Error + Send + Sync + 'static,
@@ -54,8 +59,7 @@ where
rustls::crypto::ring::default_provider() rustls::crypto::ring::default_provider()
.install_default() .install_default()
.ok(); .ok();
let tls = ClientTlsConfig::new(); tonic_endpoint = tonic_endpoint.tls_config(tls_config)?;
tonic_endpoint = tonic_endpoint.tls_config(tls)?;
} }
tonic_endpoint = tonic_endpoint tonic_endpoint = tonic_endpoint
.http2_keep_alive_interval(keepalive_interval) .http2_keep_alive_interval(keepalive_interval)