From e876794ce578111d915c492d5abfa47360e6ee82 Mon Sep 17 00:00:00 2001 From: Dmitrii Kovalkov <34828390+DimasKovas@users.noreply.github.com> Date: Fri, 7 Mar 2025 21:22:47 +0400 Subject: [PATCH] storcon: use https safekeeper api (#11065) ## Problem Storage controller uses http for requests to safekeeper management API. Closes: https://github.com/neondatabase/cloud/issues/24835 ## Summary of changes - Add `use_https_safekeeper_api` option to storcon to use https api - Use https for requests to safekeeper management API if this option is enabled - Add `ssl_ca_file` option to storcon for ability to specify custom root CA certificate --- libs/pageserver_api/src/controller_api.rs | 1 + safekeeper/client/src/mgmt_api.rs | 20 +++-- safekeeper/src/pull_timeline.rs | 12 ++- .../down.sql | 1 + .../up.sql | 1 + storage_controller/src/heartbeater.rs | 8 ++ storage_controller/src/main.rs | 27 ++++++- storage_controller/src/node.rs | 12 ++- storage_controller/src/persistence.rs | 6 ++ storage_controller/src/safekeeper.rs | 77 +++++++++++++++---- storage_controller/src/safekeeper_client.rs | 14 +--- storage_controller/src/schema.rs | 1 + storage_controller/src/service.rs | 56 ++++++++++---- .../regress/test_storage_controller.py | 20 +++++ 14 files changed, 193 insertions(+), 63 deletions(-) create mode 100644 storage_controller/migrations/2025-02-28-141741_safekeeper_use_https/down.sql create mode 100644 storage_controller/migrations/2025-02-28-141741_safekeeper_use_https/up.sql diff --git a/libs/pageserver_api/src/controller_api.rs b/libs/pageserver_api/src/controller_api.rs index 154ab849dd..3cb62f9d18 100644 --- a/libs/pageserver_api/src/controller_api.rs +++ b/libs/pageserver_api/src/controller_api.rs @@ -489,6 +489,7 @@ pub struct SafekeeperDescribeResponse { pub host: String, pub port: i32, pub http_port: i32, + pub https_port: Option, pub availability_zone_id: String, pub scheduling_policy: SkSchedulingPolicy, } diff --git a/safekeeper/client/src/mgmt_api.rs b/safekeeper/client/src/mgmt_api.rs index 0e92e87103..3966aa811f 100644 --- a/safekeeper/client/src/mgmt_api.rs +++ b/safekeeper/client/src/mgmt_api.rs @@ -37,6 +37,10 @@ pub enum Error { #[error("Cancelled")] Cancelled, + + /// Failed to create client. + #[error("create client: {0}{}", .0.source().map(|e| format!(": {e}")).unwrap_or_default())] + CreateClient(reqwest::Error), } pub type Result = std::result::Result; @@ -64,11 +68,7 @@ impl ResponseErrorMessageExt for reqwest::Response { } impl Client { - pub fn new(mgmt_api_endpoint: String, jwt: Option) -> Self { - Self::from_client(reqwest::Client::new(), mgmt_api_endpoint, jwt) - } - - pub fn from_client( + pub fn new( client: reqwest::Client, mgmt_api_endpoint: String, jwt: Option, @@ -172,12 +172,10 @@ impl Client { uri: U, body: B, ) -> Result { - let req = self.client.request(method, uri); - let req = if let Some(value) = &self.authorization_header { - req.header(reqwest::header::AUTHORIZATION, value.get_contents()) - } else { - req - }; + let mut req = self.client.request(method, uri); + if let Some(value) = &self.authorization_header { + req = req.header(reqwest::header::AUTHORIZATION, value.get_contents()) + } req.json(&body).send().await.map_err(Error::ReceiveBody) } } diff --git a/safekeeper/src/pull_timeline.rs b/safekeeper/src/pull_timeline.rs index fc58b8509a..7d6ce1269c 100644 --- a/safekeeper/src/pull_timeline.rs +++ b/safekeeper/src/pull_timeline.rs @@ -402,12 +402,16 @@ pub async fn handle_request( bail!("Timeline {} already exists", request.timeline_id); } + // TODO(DimasKovas): add ssl root CA certificate when implementing safekeeper's + // part of https support (#24836). + let http_client = reqwest::Client::new(); + let http_hosts = request.http_hosts.clone(); // Figure out statuses of potential donors. let responses: Vec> = futures::future::join_all(http_hosts.iter().map(|url| async { - let cclient = Client::new(url.clone(), sk_auth_token.clone()); + let cclient = Client::new(http_client.clone(), url.clone(), sk_auth_token.clone()); let info = cclient .timeline_status(request.tenant_id, request.timeline_id) .await?; @@ -460,8 +464,10 @@ async fn pull_timeline( let conf = &global_timelines.get_global_config(); let (_tmp_dir, tli_dir_path) = create_temp_timeline_dir(conf, ttid).await?; - - let client = Client::new(host.clone(), sk_auth_token.clone()); + // TODO(DimasKovas): add ssl root CA certificate when implementing safekeeper's + // part of https support (#24836). + let http_client = reqwest::Client::new(); + let client = Client::new(http_client, host.clone(), sk_auth_token.clone()); // Request stream with basebackup archive. let bb_resp = client .snapshot(status.tenant_id, status.timeline_id, conf.my_id) diff --git a/storage_controller/migrations/2025-02-28-141741_safekeeper_use_https/down.sql b/storage_controller/migrations/2025-02-28-141741_safekeeper_use_https/down.sql new file mode 100644 index 0000000000..378e9f8c16 --- /dev/null +++ b/storage_controller/migrations/2025-02-28-141741_safekeeper_use_https/down.sql @@ -0,0 +1 @@ +ALTER TABLE safekeepers DROP https_port; diff --git a/storage_controller/migrations/2025-02-28-141741_safekeeper_use_https/up.sql b/storage_controller/migrations/2025-02-28-141741_safekeeper_use_https/up.sql new file mode 100644 index 0000000000..bb47b0b256 --- /dev/null +++ b/storage_controller/migrations/2025-02-28-141741_safekeeper_use_https/up.sql @@ -0,0 +1 @@ +ALTER TABLE safekeepers ADD https_port INTEGER; diff --git a/storage_controller/src/heartbeater.rs b/storage_controller/src/heartbeater.rs index 56a331becd..dab6799d3e 100644 --- a/storage_controller/src/heartbeater.rs +++ b/storage_controller/src/heartbeater.rs @@ -8,6 +8,7 @@ use futures::StreamExt; use futures::stream::FuturesUnordered; use pageserver_api::controller_api::{NodeAvailability, SkSchedulingPolicy}; use pageserver_api::models::PageserverUtilization; +use reqwest::Certificate; use safekeeper_api::models::SafekeeperUtilization; use safekeeper_client::mgmt_api; use thiserror::Error; @@ -27,6 +28,7 @@ struct HeartbeaterTask { max_offline_interval: Duration, max_warming_up_interval: Duration, jwt_token: Option, + ssl_ca_cert: Option, } #[derive(Debug, Clone)] @@ -75,6 +77,7 @@ where { pub(crate) fn new( jwt_token: Option, + ssl_ca_cert: Option, max_offline_interval: Duration, max_warming_up_interval: Duration, cancel: CancellationToken, @@ -84,6 +87,7 @@ where let mut heartbeater = HeartbeaterTask::new( receiver, jwt_token, + ssl_ca_cert, max_offline_interval, max_warming_up_interval, cancel, @@ -119,6 +123,7 @@ where fn new( receiver: tokio::sync::mpsc::UnboundedReceiver>, jwt_token: Option, + ssl_ca_cert: Option, max_offline_interval: Duration, max_warming_up_interval: Duration, cancel: CancellationToken, @@ -130,6 +135,7 @@ where max_offline_interval, max_warming_up_interval, jwt_token, + ssl_ca_cert, } } async fn run(&mut self) { @@ -325,6 +331,7 @@ impl HeartBeat for HeartbeaterTask for HeartbeaterTask, - // Maximum acceptable lag for the secondary location while draining - // a pageserver + /// Maximum acceptable lag for the secondary location while draining + /// a pageserver #[arg(long)] max_secondary_lag_bytes: Option, - // Period with which to send heartbeats to registered nodes + /// Period with which to send heartbeats to registered nodes #[arg(long)] heartbeat_interval: Option, #[arg(long)] long_reconcile_threshold: Option, - // Flag to use https for requests to pageserver API. + /// Flag to use https for requests to pageserver API. #[arg(long, default_value = "false")] use_https_pageserver_api: bool, + /// Flag to use https for requests to safekeeper API. + #[arg(long, default_value = "false")] + use_https_safekeeper_api: bool, + + /// Trusted root CA certificate to use in https APIs. + #[arg(long)] + ssl_ca_file: Option, } enum StrictMode { @@ -315,6 +323,15 @@ async fn async_main() -> anyhow::Result<()> { } } + let ssl_ca_cert = match args.ssl_ca_file.as_ref() { + Some(ssl_ca_file) => { + tracing::info!("Using ssl root CA file: {ssl_ca_file:?}"); + let buf = tokio::fs::read(ssl_ca_file).await?; + Some(Certificate::from_pem(&buf)?) + } + None => None, + }; + let config = Config { pageserver_jwt_token: secrets.pageserver_jwt_token, safekeeper_jwt_token: secrets.safekeeper_jwt_token, @@ -351,6 +368,8 @@ async fn async_main() -> anyhow::Result<()> { start_as_candidate: args.start_as_candidate, http_service_port: args.listen.port() as i32, use_https_pageserver_api: args.use_https_pageserver_api, + use_https_safekeeper_api: args.use_https_safekeeper_api, + ssl_ca_cert, }; // Validate that we can connect to the database diff --git a/storage_controller/src/node.rs b/storage_controller/src/node.rs index bc7fe8802a..735bae2123 100644 --- a/storage_controller/src/node.rs +++ b/storage_controller/src/node.rs @@ -1,7 +1,6 @@ use std::str::FromStr; use std::time::Duration; -use anyhow::anyhow; use pageserver_api::controller_api::{ AvailabilityZone, NodeAvailability, NodeDescribeResponse, NodeRegisterRequest, NodeSchedulingPolicy, TenantLocateResponseShard, @@ -211,7 +210,10 @@ impl Node { use_https: bool, ) -> anyhow::Result { if use_https && listen_https_port.is_none() { - return Err(anyhow!("https is enabled, but node has no https port")); + anyhow::bail!( + "cannot create node {id}: \ + https is enabled, but https port is not specified" + ); } Ok(Self { @@ -244,7 +246,11 @@ impl Node { pub(crate) fn from_persistent(np: NodePersistence, use_https: bool) -> anyhow::Result { if use_https && np.listen_https_port.is_none() { - return Err(anyhow!("https is enabled, but node has no https port")); + anyhow::bail!( + "cannot load node {} from persistent: \ + https is enabled, but https port is not specified", + np.node_id, + ); } Ok(Self { diff --git a/storage_controller/src/persistence.rs b/storage_controller/src/persistence.rs index 2e80b48859..939b8c6cd8 100644 --- a/storage_controller/src/persistence.rs +++ b/storage_controller/src/persistence.rs @@ -1559,6 +1559,7 @@ pub(crate) struct SafekeeperPersistence { pub(crate) http_port: i32, pub(crate) availability_zone_id: String, pub(crate) scheduling_policy: SkSchedulingPolicyFromSql, + pub(crate) https_port: Option, } /// Wrapper struct around [`SkSchedulingPolicy`] because both it and [`FromSql`] are from foreign crates, @@ -1599,6 +1600,7 @@ impl SafekeeperPersistence { host: upsert.host, port: upsert.port, http_port: upsert.http_port, + https_port: upsert.https_port, availability_zone_id: upsert.availability_zone_id, scheduling_policy: SkSchedulingPolicyFromSql(scheduling_policy), } @@ -1611,6 +1613,7 @@ impl SafekeeperPersistence { host: self.host.clone(), port: self.port, http_port: self.http_port, + https_port: self.https_port, availability_zone_id: self.availability_zone_id.clone(), scheduling_policy: self.scheduling_policy.0, }) @@ -1631,6 +1634,7 @@ pub(crate) struct SafekeeperUpsert { /// The active flag will not be stored in the database and will be ignored. pub(crate) active: Option, pub(crate) http_port: i32, + pub(crate) https_port: Option, pub(crate) availability_zone_id: String, } @@ -1646,6 +1650,7 @@ impl SafekeeperUpsert { host: &self.host, port: self.port, http_port: self.http_port, + https_port: self.https_port, availability_zone_id: &self.availability_zone_id, // None means a wish to not update this column. We expose abilities to update it via other means. scheduling_policy: None, @@ -1662,6 +1667,7 @@ struct InsertUpdateSafekeeper<'a> { host: &'a str, port: i32, http_port: i32, + https_port: Option, availability_zone_id: &'a str, scheduling_policy: Option<&'a str>, } diff --git a/storage_controller/src/safekeeper.rs b/storage_controller/src/safekeeper.rs index 9c7e6e0894..16f72ef4bc 100644 --- a/storage_controller/src/safekeeper.rs +++ b/storage_controller/src/safekeeper.rs @@ -1,7 +1,7 @@ use std::time::Duration; use pageserver_api::controller_api::{SafekeeperDescribeResponse, SkSchedulingPolicy}; -use reqwest::StatusCode; +use reqwest::{Certificate, StatusCode}; use safekeeper_client::mgmt_api; use tokio_util::sync::CancellationToken; use utils::backoff; @@ -18,26 +18,55 @@ pub struct Safekeeper { cancel: CancellationToken, listen_http_addr: String, listen_http_port: u16, + listen_https_port: Option, scheduling_policy: SkSchedulingPolicy, id: NodeId, availability: SafekeeperState, + + // Flag from storcon's config to use https for safekeeper API. + // Invariant: if |true|, listen_https_port should contain a value. + use_https: bool, } impl Safekeeper { - pub(crate) fn from_persistence(skp: SafekeeperPersistence, cancel: CancellationToken) -> Self { + pub(crate) fn from_persistence( + skp: SafekeeperPersistence, + cancel: CancellationToken, + use_https: bool, + ) -> anyhow::Result { + if use_https && skp.https_port.is_none() { + anyhow::bail!( + "cannot load safekeeper {} from persistence: \ + https is enabled, but https port is not specified", + skp.id, + ); + } + let scheduling_policy = skp.scheduling_policy.0; - Self { + Ok(Self { cancel, listen_http_addr: skp.host.clone(), listen_http_port: skp.http_port as u16, + listen_https_port: skp.https_port.map(|x| x as u16), id: NodeId(skp.id as u64), skp, availability: SafekeeperState::Offline, scheduling_policy, - } + use_https, + }) } + pub(crate) fn base_url(&self) -> String { - format!("http://{}:{}", self.listen_http_addr, self.listen_http_port) + if self.use_https { + format!( + "https://{}:{}", + self.listen_http_addr, + self.listen_https_port + .expect("https port should be specified if use_https is on"), + ) + } else { + format!("http://{}:{}", self.listen_http_addr, self.listen_http_port) + } } pub(crate) fn get_id(&self) -> NodeId { @@ -57,10 +86,12 @@ impl Safekeeper { self.skp.scheduling_policy = scheduling_policy.into(); } /// Perform an operation (which is given a [`SafekeeperClient`]) with retries + #[allow(clippy::too_many_arguments)] pub(crate) async fn with_client_retries( &self, mut op: O, jwt: &Option, + ssl_ca_cert: &Option, warn_threshold: u32, max_retries: u32, timeout: Duration, @@ -79,19 +110,22 @@ impl Safekeeper { | ApiError(StatusCode::REQUEST_TIMEOUT, _) => false, ApiError(_, _) => true, Cancelled => true, + CreateClient(_) => true, } } + // TODO: refactor SafekeeperClient and with_client_retires (#11113). + let mut http_client = reqwest::Client::builder().timeout(timeout); + if let Some(ssl_ca_cert) = ssl_ca_cert.as_ref() { + http_client = http_client.add_root_certificate(ssl_ca_cert.clone()); + } + let http_client = http_client.build().map_err(mgmt_api::Error::CreateClient)?; + backoff::retry( || { - let http_client = reqwest::ClientBuilder::new() - .timeout(timeout) - .build() - .expect("Failed to construct HTTP client"); - - let client = SafekeeperClient::from_client( + let client = SafekeeperClient::new( self.get_id(), - http_client, + http_client.clone(), self.base_url(), jwt.clone(), ); @@ -112,8 +146,9 @@ impl Safekeeper { warn_threshold, max_retries, &format!( - "Call to safekeeper {} ({}:{}) management API", - self.id, self.listen_http_addr, self.listen_http_port + "Call to safekeeper {} ({}) management API", + self.id, + self.base_url(), ), cancel, ) @@ -121,12 +156,16 @@ impl Safekeeper { .unwrap_or(Err(mgmt_api::Error::Cancelled)) } - pub(crate) fn update_from_record(&mut self, record: crate::persistence::SafekeeperUpsert) { + pub(crate) fn update_from_record( + &mut self, + record: crate::persistence::SafekeeperUpsert, + ) -> anyhow::Result<()> { let crate::persistence::SafekeeperUpsert { active: _, availability_zone_id: _, host, http_port, + https_port, id, port: _, region_id: _, @@ -139,9 +178,17 @@ impl Safekeeper { self.id.0 ); } + if self.use_https && https_port.is_none() { + anyhow::bail!( + "cannot update safekeeper {id}: \ + https is enabled, but https port is not specified" + ); + } self.skp = crate::persistence::SafekeeperPersistence::from_upsert(record, self.scheduling_policy); self.listen_http_port = http_port as u16; + self.listen_https_port = https_port.map(|x| x as u16); self.listen_http_addr = host; + Ok(()) } } diff --git a/storage_controller/src/safekeeper_client.rs b/storage_controller/src/safekeeper_client.rs index fb5be092a0..662f6d43be 100644 --- a/storage_controller/src/safekeeper_client.rs +++ b/storage_controller/src/safekeeper_client.rs @@ -45,26 +45,14 @@ macro_rules! measured_request { } impl SafekeeperClient { - #[allow(dead_code)] pub(crate) fn new( - node_id: NodeId, - mgmt_api_endpoint: String, - jwt: Option, - ) -> Self { - Self { - inner: Client::from_client(reqwest::Client::new(), mgmt_api_endpoint, jwt), - node_id_label: node_id.0.to_string(), - } - } - - pub(crate) fn from_client( node_id: NodeId, raw_client: reqwest::Client, mgmt_api_endpoint: String, jwt: Option, ) -> Self { Self { - inner: Client::from_client(raw_client, mgmt_api_endpoint, jwt), + inner: Client::new(raw_client, mgmt_api_endpoint, jwt), node_id_label: node_id.0.to_string(), } } diff --git a/storage_controller/src/schema.rs b/storage_controller/src/schema.rs index 361253bd19..ebfe630173 100644 --- a/storage_controller/src/schema.rs +++ b/storage_controller/src/schema.rs @@ -40,6 +40,7 @@ diesel::table! { http_port -> Int4, availability_zone_id -> Text, scheduling_policy -> Varchar, + https_port -> Nullable, } } diff --git a/storage_controller/src/service.rs b/storage_controller/src/service.rs index 6795abf6e9..d8c9ee70b1 100644 --- a/storage_controller/src/service.rs +++ b/storage_controller/src/service.rs @@ -48,7 +48,7 @@ use pageserver_api::upcall_api::{ ValidateResponseTenant, }; use pageserver_client::{BlockUnblock, mgmt_api}; -use reqwest::StatusCode; +use reqwest::{Certificate, StatusCode}; use safekeeper_api::models::SafekeeperUtilization; use tokio::sync::TryAcquireError; use tokio::sync::mpsc::error::TrySendError; @@ -400,6 +400,9 @@ pub struct Config { pub long_reconcile_threshold: Duration, pub use_https_pageserver_api: bool, + pub use_https_safekeeper_api: bool, + + pub ssl_ca_cert: Option, } impl From for ApiError { @@ -1420,8 +1423,14 @@ impl Service { .list_safekeepers() .await? .into_iter() - .map(|skp| Safekeeper::from_persistence(skp, CancellationToken::new())) - .collect::>(); + .map(|skp| { + Safekeeper::from_persistence( + skp, + CancellationToken::new(), + config.use_https_safekeeper_api, + ) + }) + .collect::>>()?; let safekeepers: HashMap = safekeepers.into_iter().map(|n| (n.get_id(), n)).collect(); tracing::info!("Loaded {} safekeepers from database.", safekeepers.len()); @@ -1559,6 +1568,7 @@ impl Service { let heartbeater_ps = Heartbeater::new( config.pageserver_jwt_token.clone(), + config.ssl_ca_cert.clone(), config.max_offline_interval, config.max_warming_up_interval, cancel.clone(), @@ -1566,6 +1576,7 @@ impl Service { let heartbeater_sk = Heartbeater::new( config.safekeeper_jwt_token.clone(), + config.ssl_ca_cert.clone(), config.max_offline_interval, config.max_warming_up_interval, cancel.clone(), @@ -8227,24 +8238,41 @@ impl Service { pub(crate) async fn upsert_safekeeper( &self, record: crate::persistence::SafekeeperUpsert, - ) -> Result<(), DatabaseError> { + ) -> Result<(), ApiError> { let node_id = NodeId(record.id as u64); + let use_https = self.config.use_https_safekeeper_api; + + if use_https && record.https_port.is_none() { + return Err(ApiError::PreconditionFailed( + format!( + "cannot upsert safekeeper {node_id}: \ + https is enabled, but https port is not specified" + ) + .into(), + )); + } + self.persistence.safekeeper_upsert(record.clone()).await?; { let mut locked = self.inner.write().unwrap(); let mut safekeepers = (*locked.safekeepers).clone(); match safekeepers.entry(node_id) { - std::collections::hash_map::Entry::Occupied(mut entry) => { - entry.get_mut().update_from_record(record); - } + std::collections::hash_map::Entry::Occupied(mut entry) => entry + .get_mut() + .update_from_record(record) + .expect("all preconditions should be checked before upsert to database"), std::collections::hash_map::Entry::Vacant(entry) => { - entry.insert(Safekeeper::from_persistence( - crate::persistence::SafekeeperPersistence::from_upsert( - record, - SkSchedulingPolicy::Pause, - ), - CancellationToken::new(), - )); + entry.insert( + Safekeeper::from_persistence( + crate::persistence::SafekeeperPersistence::from_upsert( + record, + SkSchedulingPolicy::Pause, + ), + CancellationToken::new(), + use_https, + ) + .expect("all preconditions should be checked before upsert to database"), + ); } } locked.safekeepers = Arc::new(safekeepers); diff --git a/test_runner/regress/test_storage_controller.py b/test_runner/regress/test_storage_controller.py index b5572ce6a1..29919f2fe7 100644 --- a/test_runner/regress/test_storage_controller.py +++ b/test_runner/regress/test_storage_controller.py @@ -3226,6 +3226,7 @@ def test_safekeeper_deployment_time_update(neon_env_builder: NeonEnvBuilder): "host": "localhost", "port": sk_0.port.pg, "http_port": sk_0.port.http, + "https_port": None, "version": 5957, "availability_zone_id": "us-east-2b", } @@ -3260,6 +3261,24 @@ def test_safekeeper_deployment_time_update(neon_env_builder: NeonEnvBuilder): assert eq_safekeeper_records(body, inserted_now) + # https_port appears during migration + body["https_port"] = 123 + target.on_safekeeper_deploy(fake_id, body) + inserted_now = target.get_safekeeper(fake_id) + assert target.get_safekeepers() == [inserted_now] + assert inserted_now is not None + assert eq_safekeeper_records(body, inserted_now) + env.storage_controller.consistency_check() + + # https_port rollback + body["https_port"] = None + target.on_safekeeper_deploy(fake_id, body) + inserted_now = target.get_safekeeper(fake_id) + assert target.get_safekeepers() == [inserted_now] + assert inserted_now is not None + assert eq_safekeeper_records(body, inserted_now) + env.storage_controller.consistency_check() + # some small tests for the scheduling policy querying and returning APIs newest_info = target.get_safekeeper(inserted["id"]) assert newest_info @@ -3792,6 +3811,7 @@ def test_storage_controller_node_flap_detach_race( wait_until(validate_locations, timeout=10) +@run_only_on_default_postgres("this is like a 'unit test' against storcon db") def test_update_node_on_registration(neon_env_builder: NeonEnvBuilder): """ Check that storage controller handles node_register requests with updated fields correctly.