From 0f367cb6650b7ae088729e1703814628f9eccf5d Mon Sep 17 00:00:00 2001 From: Dmitrii Kovalkov <34828390+DimasKovas@users.noreply.github.com> Date: Fri, 21 Mar 2025 15:48:22 +0400 Subject: [PATCH] storcon: reuse reqwest http client (#11327) ## Problem - Part of https://github.com/neondatabase/neon/issues/11113 - Building a new `reqwest::Client` for every request is expensive because it parses CA certs under the hood. It's noticeable in storcon's flamegraph. ## Summary of changes - Reuse one `reqwest::Client` for all API calls to avoid parsing CA certificates every time. --- control_plane/src/pageserver.rs | 13 +- control_plane/storcon_cli/src/main.rs | 8 +- pageserver/client/src/mgmt_api.rs | 25 +--- pageserver/pagebench/src/cmd/aux_files.rs | 4 +- pageserver/pagebench/src/cmd/basebackup.rs | 4 +- .../pagebench/src/cmd/getpage_latest_lsn.rs | 4 +- .../src/cmd/ondemand_download_churn.rs | 4 +- .../cmd/trigger_initial_size_calculation.rs | 4 +- safekeeper/client/src/mgmt_api.rs | 5 +- storage_controller/src/heartbeater.rs | 19 ++- storage_controller/src/http.rs | 5 +- storage_controller/src/node.rs | 26 ++-- storage_controller/src/pageserver_client.rs | 16 +-- storage_controller/src/reconciler.rs | 19 +-- storage_controller/src/safekeeper.rs | 20 ++- storage_controller/src/service.rs | 134 ++++++++++-------- .../src/service/safekeeper_reconciler.rs | 3 +- .../src/service/safekeeper_service.rs | 4 +- storage_controller/src/tenant_shard.rs | 2 + .../regress/test_storage_controller.py | 6 + 20 files changed, 157 insertions(+), 168 deletions(-) diff --git a/control_plane/src/pageserver.rs b/control_plane/src/pageserver.rs index c6efe7abe7..8a93ea5349 100644 --- a/control_plane/src/pageserver.rs +++ b/control_plane/src/pageserver.rs @@ -56,6 +56,14 @@ impl PageServerNode { Certificate::from_pem(&buf).expect("CA certificate should be valid") }); + let mut http_client = reqwest::Client::builder(); + if let Some(ssl_ca_cert) = ssl_ca_cert { + http_client = http_client.add_root_certificate(ssl_ca_cert); + } + let http_client = http_client + .build() + .expect("Client constructs with no errors"); + let endpoint = if env.storage_controller.use_https_pageserver_api { format!( "https://{}", @@ -72,6 +80,7 @@ impl PageServerNode { conf: conf.clone(), env: env.clone(), http_client: mgmt_api::Client::new( + http_client, endpoint, { match conf.http_auth_type { @@ -83,9 +92,7 @@ impl PageServerNode { } } .as_deref(), - ssl_ca_cert, - ) - .expect("Client constructs with no errors"), + ), } } diff --git a/control_plane/storcon_cli/src/main.rs b/control_plane/storcon_cli/src/main.rs index ae4bf9a519..eb75f300fa 100644 --- a/control_plane/storcon_cli/src/main.rs +++ b/control_plane/storcon_cli/src/main.rs @@ -395,9 +395,15 @@ async fn main() -> anyhow::Result<()> { None => None, }; + let mut http_client = reqwest::Client::builder(); + if let Some(ssl_ca_cert) = ssl_ca_cert { + http_client = http_client.add_root_certificate(ssl_ca_cert); + } + let http_client = http_client.build()?; + let mut trimmed = cli.api.to_string(); trimmed.pop(); - let vps_client = mgmt_api::Client::new(trimmed, cli.jwt.as_deref(), ssl_ca_cert)?; + let vps_client = mgmt_api::Client::new(http_client, trimmed, cli.jwt.as_deref()); match cli.command { Command::NodeRegister { diff --git a/pageserver/client/src/mgmt_api.rs b/pageserver/client/src/mgmt_api.rs index 508dac231e..224208034b 100644 --- a/pageserver/client/src/mgmt_api.rs +++ b/pageserver/client/src/mgmt_api.rs @@ -7,7 +7,7 @@ use http_utils::error::HttpErrorBody; use pageserver_api::models::*; use pageserver_api::shard::TenantShardId; pub use reqwest::Body as ReqwestBody; -use reqwest::{Certificate, IntoUrl, Method, StatusCode, Url}; +use reqwest::{IntoUrl, Method, StatusCode, Url}; use utils::id::{TenantId, TimelineId}; use utils::lsn::Lsn; @@ -39,8 +39,8 @@ pub enum Error { #[error("Cancelled")] Cancelled, - #[error("create client: {0}{}", .0.source().map(|e| format!(": {e}")).unwrap_or_default())] - CreateClient(reqwest::Error), + #[error("request timed out: {0}")] + Timeout(String), } pub type Result = std::result::Result; @@ -72,24 +72,7 @@ pub enum ForceAwaitLogicalSize { } impl Client { - pub fn new( - mgmt_api_endpoint: String, - jwt: Option<&str>, - ssl_ca_cert: Option, - ) -> Result { - let mut http_client = reqwest::Client::builder(); - if let Some(ssl_ca_cert) = ssl_ca_cert { - http_client = http_client.add_root_certificate(ssl_ca_cert); - } - let http_client = http_client.build().map_err(Error::CreateClient)?; - Ok(Self::from_client(http_client, mgmt_api_endpoint, jwt)) - } - - pub fn from_client( - client: reqwest::Client, - mgmt_api_endpoint: String, - jwt: Option<&str>, - ) -> Self { + pub fn new(client: reqwest::Client, mgmt_api_endpoint: String, jwt: Option<&str>) -> Self { Self { mgmt_api_endpoint, authorization_header: jwt.map(|jwt| format!("Bearer {jwt}")), diff --git a/pageserver/pagebench/src/cmd/aux_files.rs b/pageserver/pagebench/src/cmd/aux_files.rs index 394a954c30..6441c047c2 100644 --- a/pageserver/pagebench/src/cmd/aux_files.rs +++ b/pageserver/pagebench/src/cmd/aux_files.rs @@ -34,10 +34,10 @@ async fn main_impl(args: Args) -> anyhow::Result<()> { let args: &'static Args = Box::leak(Box::new(args)); let mgmt_api_client = Arc::new(pageserver_client::mgmt_api::Client::new( + reqwest::Client::new(), // TODO: support ssl_ca_file for https APIs in pagebench. args.mgmt_api_endpoint.clone(), args.pageserver_jwt.as_deref(), - None, // TODO: support ssl_ca_file for https APIs in pagebench. - )?); + )); // discover targets let timelines: Vec = crate::util::cli::targets::discover( diff --git a/pageserver/pagebench/src/cmd/basebackup.rs b/pageserver/pagebench/src/cmd/basebackup.rs index d3013ded70..43ad92980c 100644 --- a/pageserver/pagebench/src/cmd/basebackup.rs +++ b/pageserver/pagebench/src/cmd/basebackup.rs @@ -75,10 +75,10 @@ async fn main_impl( let args: &'static Args = Box::leak(Box::new(args)); let mgmt_api_client = Arc::new(pageserver_client::mgmt_api::Client::new( + reqwest::Client::new(), // TODO: support ssl_ca_file for https APIs in pagebench. args.mgmt_api_endpoint.clone(), args.pageserver_jwt.as_deref(), - None, // TODO: support ssl_ca_file for https APIs in pagebench. - )?); + )); // discover targets let timelines: Vec = crate::util::cli::targets::discover( diff --git a/pageserver/pagebench/src/cmd/getpage_latest_lsn.rs b/pageserver/pagebench/src/cmd/getpage_latest_lsn.rs index 969cf24b93..6fd1c00eca 100644 --- a/pageserver/pagebench/src/cmd/getpage_latest_lsn.rs +++ b/pageserver/pagebench/src/cmd/getpage_latest_lsn.rs @@ -123,10 +123,10 @@ async fn main_impl( let args: &'static Args = Box::leak(Box::new(args)); let mgmt_api_client = Arc::new(pageserver_client::mgmt_api::Client::new( + reqwest::Client::new(), // TODO: support ssl_ca_file for https APIs in pagebench. args.mgmt_api_endpoint.clone(), args.pageserver_jwt.as_deref(), - None, // TODO: support ssl_ca_file for https APIs in pagebench. - )?); + )); if let Some(engine_str) = &args.set_io_engine { mgmt_api_client.put_io_engine(engine_str).await?; diff --git a/pageserver/pagebench/src/cmd/ondemand_download_churn.rs b/pageserver/pagebench/src/cmd/ondemand_download_churn.rs index a77d3000cc..9ff1e638c4 100644 --- a/pageserver/pagebench/src/cmd/ondemand_download_churn.rs +++ b/pageserver/pagebench/src/cmd/ondemand_download_churn.rs @@ -81,10 +81,10 @@ async fn main_impl(args: Args) -> anyhow::Result<()> { let args: &'static Args = Box::leak(Box::new(args)); let mgmt_api_client = Arc::new(pageserver_client::mgmt_api::Client::new( + reqwest::Client::new(), // TODO: support ssl_ca_file for https APIs in pagebench. args.mgmt_api_endpoint.clone(), args.pageserver_jwt.as_deref(), - None, // TODO: support ssl_ca_file for https APIs in pagebench. - )?); + )); if let Some(engine_str) = &args.set_io_engine { mgmt_api_client.put_io_engine(engine_str).await?; diff --git a/pageserver/pagebench/src/cmd/trigger_initial_size_calculation.rs b/pageserver/pagebench/src/cmd/trigger_initial_size_calculation.rs index 2f919ec652..779bacbfd4 100644 --- a/pageserver/pagebench/src/cmd/trigger_initial_size_calculation.rs +++ b/pageserver/pagebench/src/cmd/trigger_initial_size_calculation.rs @@ -38,10 +38,10 @@ async fn main_impl(args: Args) -> anyhow::Result<()> { let args: &'static Args = Box::leak(Box::new(args)); let mgmt_api_client = Arc::new(pageserver_client::mgmt_api::Client::new( + reqwest::Client::new(), // TODO: support ssl_ca_file for https APIs in pagebench. args.mgmt_api_endpoint.clone(), args.pageserver_jwt.as_deref(), - None, // TODO: support ssl_ca_file for https APIs in pagebench. - )?); + )); // discover targets let timelines: Vec = crate::util::cli::targets::discover( diff --git a/safekeeper/client/src/mgmt_api.rs b/safekeeper/client/src/mgmt_api.rs index b5e407995d..afef5e792e 100644 --- a/safekeeper/client/src/mgmt_api.rs +++ b/safekeeper/client/src/mgmt_api.rs @@ -38,9 +38,8 @@ pub enum Error { #[error("Cancelled")] Cancelled, - /// Failed to create client. - #[error("create client: {0}{}", .0.source().map(|e| format!(": {e}")).unwrap_or_default())] - CreateClient(reqwest::Error), + #[error("request timed out: {0}")] + Timeout(String), } pub type Result = std::result::Result; diff --git a/storage_controller/src/heartbeater.rs b/storage_controller/src/heartbeater.rs index ee4c9ef9cd..524225c14a 100644 --- a/storage_controller/src/heartbeater.rs +++ b/storage_controller/src/heartbeater.rs @@ -8,7 +8,6 @@ use futures::StreamExt; use futures::stream::FuturesUnordered; use pageserver_api::controller_api::{NodeAvailability, SkSchedulingPolicy}; use pageserver_api::models::PageserverUtilization; -use reqwest::Certificate; use safekeeper_api::models::SafekeeperUtilization; use safekeeper_client::mgmt_api; use thiserror::Error; @@ -27,8 +26,8 @@ struct HeartbeaterTask { max_offline_interval: Duration, max_warming_up_interval: Duration, + http_client: reqwest::Client, jwt_token: Option, - ssl_ca_cert: Option, } #[derive(Debug, Clone)] @@ -76,8 +75,8 @@ where HeartbeaterTask: HeartBeat, { pub(crate) fn new( + http_client: reqwest::Client, jwt_token: Option, - ssl_ca_cert: Option, max_offline_interval: Duration, max_warming_up_interval: Duration, cancel: CancellationToken, @@ -86,8 +85,8 @@ where tokio::sync::mpsc::unbounded_channel::>(); let mut heartbeater = HeartbeaterTask::new( receiver, + http_client, jwt_token, - ssl_ca_cert, max_offline_interval, max_warming_up_interval, cancel, @@ -122,8 +121,8 @@ where { fn new( receiver: tokio::sync::mpsc::UnboundedReceiver>, + http_client: reqwest::Client, jwt_token: Option, - ssl_ca_cert: Option, max_offline_interval: Duration, max_warming_up_interval: Duration, cancel: CancellationToken, @@ -134,8 +133,8 @@ where state: HashMap::new(), max_offline_interval, max_warming_up_interval, + http_client, jwt_token, - ssl_ca_cert, } } async fn run(&mut self) { @@ -178,7 +177,7 @@ impl HeartBeat for HeartbeaterTask let mut heartbeat_futs = FuturesUnordered::new(); for (node_id, node) in &*pageservers { heartbeat_futs.push({ - let ssl_ca_cert = self.ssl_ca_cert.clone(); + let http_client = self.http_client.clone(); let jwt_token = self.jwt_token.clone(); let cancel = self.cancel.clone(); @@ -193,8 +192,8 @@ impl HeartBeat for HeartbeaterTask let response = node_clone .with_client_retries( |client| async move { client.get_utilization().await }, + &http_client, &jwt_token, - &ssl_ca_cert, 3, 3, Duration::from_secs(1), @@ -329,19 +328,19 @@ impl HeartBeat for HeartbeaterTask( &self, mut op: O, + http_client: &reqwest::Client, jwt: &Option, - ssl_ca_cert: &Option, warn_threshold: u32, max_retries: u32, timeout: Duration, @@ -300,24 +300,13 @@ impl Node { | ApiError(StatusCode::REQUEST_TIMEOUT, _) => false, ApiError(_, _) => true, Cancelled => true, - CreateClient(_) => true, + Timeout(_) => false, } } - // TODO: refactor PageserverClient and with_client_retires (#11113). - let mut http_client = reqwest::ClientBuilder::new().timeout(timeout); - if let Some(ssl_ca_cert) = ssl_ca_cert.as_ref() { - http_client = http_client.add_root_certificate(ssl_ca_cert.clone()) - } - - let http_client = match http_client.build() { - Ok(http_client) => http_client, - Err(err) => return Some(Err(mgmt_api::Error::CreateClient(err))), - }; - backoff::retry( || { - let client = PageserverClient::from_client( + let client = PageserverClient::new( self.get_id(), http_client.clone(), self.base_url(), @@ -326,11 +315,14 @@ impl Node { let node_cancel_fut = self.cancel.cancelled(); - let op_fut = op(client); + let op_fut = tokio::time::timeout(timeout, op(client)); async { tokio::select! { - r = op_fut=> {r}, + r = op_fut => match r { + Ok(r) => r, + Err(e) => Err(mgmt_api::Error::Timeout(format!("{e}"))), + }, _ = node_cancel_fut => { Err(mgmt_api::Error::Cancelled) }} diff --git a/storage_controller/src/pageserver_client.rs b/storage_controller/src/pageserver_client.rs index 05e7aa88c6..c6c21107f1 100644 --- a/storage_controller/src/pageserver_client.rs +++ b/storage_controller/src/pageserver_client.rs @@ -8,7 +8,7 @@ use pageserver_api::models::{ use pageserver_api::shard::TenantShardId; use pageserver_client::BlockUnblock; use pageserver_client::mgmt_api::{Client, Result}; -use reqwest::{Certificate, StatusCode}; +use reqwest::StatusCode; use utils::id::{NodeId, TenantId, TimelineId}; /// Thin wrapper around [`pageserver_client::mgmt_api::Client`]. It allows the storage @@ -47,25 +47,13 @@ macro_rules! measured_request { impl PageserverClient { pub(crate) fn new( - node_id: NodeId, - mgmt_api_endpoint: String, - jwt: Option<&str>, - ssl_ca_cert: Option, - ) -> Result { - Ok(Self { - inner: Client::new(mgmt_api_endpoint, jwt, ssl_ca_cert)?, - node_id_label: node_id.0.to_string(), - }) - } - - pub(crate) fn from_client( node_id: NodeId, raw_client: reqwest::Client, mgmt_api_endpoint: String, jwt: Option<&str>, ) -> Self { Self { - inner: Client::from_client(raw_client, mgmt_api_endpoint, jwt), + inner: Client::new(raw_client, mgmt_api_endpoint, jwt), node_id_label: node_id.0.to_string(), } } diff --git a/storage_controller/src/reconciler.rs b/storage_controller/src/reconciler.rs index 9f0b789f19..9f6f385dc9 100644 --- a/storage_controller/src/reconciler.rs +++ b/storage_controller/src/reconciler.rs @@ -86,6 +86,9 @@ pub(super) struct Reconciler { /// Access to persistent storage for updating generation numbers pub(crate) persistence: Arc, + + /// HTTP client with proper CA certs. + pub(crate) http_client: reqwest::Client, } pub(crate) struct ReconcilerConfigBuilder { @@ -298,8 +301,8 @@ impl Reconciler { .location_config(tenant_shard_id, config.clone(), flush_ms, lazy) .await }, + &self.http_client, &self.service_config.pageserver_jwt_token, - &self.service_config.ssl_ca_cert, 1, 3, timeout, @@ -419,10 +422,10 @@ impl Reconciler { let client = PageserverClient::new( node.get_id(), + self.http_client.clone(), node.base_url(), self.service_config.pageserver_jwt_token.as_deref(), - self.service_config.ssl_ca_cert.clone(), - )?; + ); client .wait_lsn( @@ -443,10 +446,10 @@ impl Reconciler { ) -> anyhow::Result> { let client = PageserverClient::new( node.get_id(), + self.http_client.clone(), node.base_url(), self.service_config.pageserver_jwt_token.as_deref(), - self.service_config.ssl_ca_cert.clone(), - )?; + ); let timelines = client.timeline_list(&tenant_shard_id).await?; Ok(timelines @@ -483,8 +486,8 @@ impl Reconciler { ) .await }, + &self.http_client, &self.service_config.pageserver_jwt_token, - &self.service_config.ssl_ca_cert, 1, 3, request_download_timeout * 2, @@ -778,8 +781,8 @@ impl Reconciler { let observed_conf = match attached_node .with_client_retries( |client| async move { client.get_location_config(tenant_shard_id).await }, + &self.http_client, &self.service_config.pageserver_jwt_token, - &self.service_config.ssl_ca_cert, 1, 1, Duration::from_secs(5), @@ -1127,8 +1130,8 @@ impl Reconciler { match origin .with_client_retries( |client| async move { client.get_location_config(tenant_shard_id).await }, + &self.http_client, &self.service_config.pageserver_jwt_token, - &self.service_config.ssl_ca_cert, 1, 3, Duration::from_secs(5), diff --git a/storage_controller/src/safekeeper.rs b/storage_controller/src/safekeeper.rs index 2bd28f29af..3b731acf7e 100644 --- a/storage_controller/src/safekeeper.rs +++ b/storage_controller/src/safekeeper.rs @@ -1,7 +1,7 @@ use std::time::Duration; use pageserver_api::controller_api::{SafekeeperDescribeResponse, SkSchedulingPolicy}; -use reqwest::{Certificate, StatusCode}; +use reqwest::StatusCode; use safekeeper_client::mgmt_api; use tokio_util::sync::CancellationToken; use utils::backoff; @@ -94,8 +94,8 @@ impl Safekeeper { pub(crate) async fn with_client_retries( &self, mut op: O, + http_client: &reqwest::Client, jwt: &Option, - ssl_ca_cert: &Option, warn_threshold: u32, max_retries: u32, timeout: Duration, @@ -114,17 +114,10 @@ impl Safekeeper { | ApiError(StatusCode::REQUEST_TIMEOUT, _) => false, ApiError(_, _) => true, Cancelled => true, - CreateClient(_) => true, + Timeout(_) => false, } } - // TODO: refactor SafekeeperClient and with_client_retires (#11113). - let mut http_client = reqwest::Client::builder().timeout(timeout); - if let Some(ssl_ca_cert) = ssl_ca_cert.as_ref() { - http_client = http_client.add_root_certificate(ssl_ca_cert.clone()); - } - let http_client = http_client.build().map_err(mgmt_api::Error::CreateClient)?; - backoff::retry( || { let client = SafekeeperClient::new( @@ -136,11 +129,14 @@ impl Safekeeper { let node_cancel_fut = self.cancel.cancelled(); - let op_fut = op(client); + let op_fut = tokio::time::timeout(timeout, op(client)); async { tokio::select! { - r = op_fut=> {r}, + r = op_fut => match r { + Ok(r) => r, + Err(e) => Err(mgmt_api::Error::Timeout(format!("{e}"))), + }, _ = node_cancel_fut => { Err(mgmt_api::Error::Cancelled) }} diff --git a/storage_controller/src/service.rs b/storage_controller/src/service.rs index 40915bd753..c956c1dd1c 100644 --- a/storage_controller/src/service.rs +++ b/storage_controller/src/service.rs @@ -267,7 +267,7 @@ fn passthrough_api_error(node: &Node, e: mgmt_api::Error) -> ApiError { ApiError::Conflict(format!("{node} {status}: {status} {msg}")) } mgmt_api::Error::Cancelled => ApiError::ShuttingDown, - mgmt_api::Error::CreateClient(e) => ApiError::InternalServerError(anyhow::anyhow!(e)), + mgmt_api::Error::Timeout(e) => ApiError::Timeout(e.into()), } } @@ -524,6 +524,9 @@ pub struct Service { /// This waits for initial reconciliation with pageservers to complete. Until this barrier /// passes, it isn't safe to do any actions that mutate tenants. pub(crate) startup_complete: Barrier, + + /// HTTP client with proper CA certs. + http_client: reqwest::Client, } impl From for ApiError { @@ -667,6 +670,10 @@ impl Service { &self.config } + pub fn get_http_client(&self) -> &reqwest::Client { + &self.http_client + } + /// Called once on startup, this function attempts to contact all pageservers to build an up-to-date /// view of the world, and determine which pageservers are responsive. #[instrument(skip_all)] @@ -965,8 +972,8 @@ impl Service { let response = node .with_client_retries( |client| async move { client.list_location_config().await }, + &self.http_client, &self.config.pageserver_jwt_token, - &self.config.ssl_ca_cert, 1, 5, timeout, @@ -1064,20 +1071,12 @@ impl Service { break; } - let client = match PageserverClient::new( + let client = PageserverClient::new( node.get_id(), + self.http_client.clone(), node.base_url(), self.config.pageserver_jwt_token.as_deref(), - self.config.ssl_ca_cert.clone(), - ) { - Ok(client) => client, - Err(e) => { - tracing::error!( - "Failed to create client to detach unknown shard {tenant_shard_id} on pageserver {node_id}: {e}" - ); - continue; - } - }; + ); match client .location_config( tenant_shard_id, @@ -1655,17 +1654,36 @@ impl Service { let cancel = CancellationToken::new(); let reconcilers_cancel = cancel.child_token(); + let mut http_client = reqwest::Client::builder(); + // We intentionally disable the connection pool, so every request will create its own TCP connection. + // It's especially important for heartbeaters to notice more network problems. + // + // TODO: It makes sense to use this client only in heartbeaters and create a second one with + // connection pooling for everything else. But reqwest::Client may create a connection without + // ever using it (it uses hyper's Client under the hood): + // https://github.com/hyperium/hyper-util/blob/d51318df3461d40e5f5e5ca163cb3905ac960209/src/client/legacy/client.rs#L415 + // + // Because of a bug in hyper0::Connection::graceful_shutdown such connections hang during + // graceful server shutdown: https://github.com/hyperium/hyper/issues/2730 + // + // The bug has been fixed in hyper v1, so keep alive may be enabled only after we migrate to hyper1. + http_client = http_client.pool_max_idle_per_host(0); + if let Some(ssl_ca_cert) = &config.ssl_ca_cert { + http_client = http_client.add_root_certificate(ssl_ca_cert.clone()); + } + let http_client = http_client.build()?; + let heartbeater_ps = Heartbeater::new( + http_client.clone(), config.pageserver_jwt_token.clone(), - config.ssl_ca_cert.clone(), config.max_offline_interval, config.max_warming_up_interval, cancel.clone(), ); let heartbeater_sk = Heartbeater::new( + http_client.clone(), config.safekeeper_jwt_token.clone(), - config.ssl_ca_cert.clone(), config.max_offline_interval, config.max_warming_up_interval, cancel.clone(), @@ -1708,6 +1726,7 @@ impl Service { reconcilers_gate: Gate::default(), tenant_op_locks: Default::default(), node_op_locks: Default::default(), + http_client, }); let result_task_this = this.clone(); @@ -2013,8 +2032,8 @@ impl Service { let configs = match node .with_client_retries( |client| async move { client.list_location_config().await }, + &self.http_client, &self.config.pageserver_jwt_token, - &self.config.ssl_ca_cert, 1, 5, SHORT_RECONCILE_TIMEOUT, @@ -2092,8 +2111,8 @@ impl Service { .location_config(tenant_shard_id, config, None, false) .await }, + &self.http_client, &self.config.pageserver_jwt_token, - &self.config.ssl_ca_cert, 1, 5, SHORT_RECONCILE_TIMEOUT, @@ -3235,11 +3254,10 @@ impl Service { for tenant_shard_id in shard_ids { let client = PageserverClient::new( node.get_id(), + self.http_client.clone(), node.base_url(), self.config.pageserver_jwt_token.as_deref(), - self.config.ssl_ca_cert.clone(), - ) - .map_err(|e| passthrough_api_error(&node, e))?; + ); tracing::info!("Doing time travel recovery for shard {tenant_shard_id}",); @@ -3298,11 +3316,10 @@ impl Service { for (tenant_shard_id, node) in targets { let client = PageserverClient::new( node.get_id(), + self.http_client.clone(), node.base_url(), self.config.pageserver_jwt_token.as_deref(), - self.config.ssl_ca_cert.clone(), - ) - .map_err(|e| passthrough_api_error(&node, e))?; + ); futs.push(async move { let result = client .tenant_secondary_download(tenant_shard_id, wait) @@ -3427,8 +3444,8 @@ impl Service { .tenant_delete(TenantShardId::unsharded(tenant_id)) .await }, + &self.http_client, &self.config.pageserver_jwt_token, - &self.config.ssl_ca_cert, 1, 3, RECONCILE_TIMEOUT, @@ -3580,8 +3597,8 @@ impl Service { async fn create_one( tenant_shard_id: TenantShardId, locations: ShardMutationLocations, + http_client: reqwest::Client, jwt: Option, - ssl_ca_cert: Option, create_req: TimelineCreateRequest, ) -> Result { let latest = locations.latest.node; @@ -3594,8 +3611,7 @@ impl Service { ); let client = - PageserverClient::new(latest.get_id(), latest.base_url(), jwt.as_deref(), ssl_ca_cert.clone()) - .map_err(|e| passthrough_api_error(&latest, e))?; + PageserverClient::new(latest.get_id(), http_client.clone(), latest.base_url(), jwt.as_deref()); let timeline_info = client .timeline_create(tenant_shard_id, &create_req) @@ -3616,11 +3632,10 @@ impl Service { let client = PageserverClient::new( location.node.get_id(), + http_client.clone(), location.node.base_url(), jwt.as_deref(), - ssl_ca_cert.clone(), - ) - .map_err(|e| passthrough_api_error(&location.node, e))?; + ); let res = client .timeline_create(tenant_shard_id, &create_req) @@ -3648,8 +3663,8 @@ impl Service { let timeline_info = create_one( shard_zero_tid, shard_zero_locations, + self.http_client.clone(), self.config.pageserver_jwt_token.clone(), - self.config.ssl_ca_cert.clone(), create_req.clone(), ) .await?; @@ -3678,8 +3693,8 @@ impl Service { Box::pin(create_one( tenant_shard_id, mutation_locations, + self.http_client.clone(), jwt.clone(), - self.config.ssl_ca_cert.clone(), create_req, )) }, @@ -3762,16 +3777,15 @@ impl Service { tenant_shard_id: TenantShardId, timeline_id: TimelineId, node: Node, + http_client: reqwest::Client, jwt: Option, - ssl_ca_cert: Option, req: TimelineArchivalConfigRequest, ) -> Result<(), ApiError> { tracing::info!( "Setting archival config of timeline on shard {tenant_shard_id}/{timeline_id}, attached to node {node}", ); - let client = PageserverClient::new(node.get_id(), node.base_url(), jwt.as_deref(), ssl_ca_cert) - .map_err(|e| passthrough_api_error(&node, e))?; + let client = PageserverClient::new(node.get_id(), http_client, node.base_url(), jwt.as_deref()); client .timeline_archival_config(tenant_shard_id, timeline_id, &req) @@ -3793,8 +3807,8 @@ impl Service { tenant_shard_id, timeline_id, node, + self.http_client.clone(), self.config.pageserver_jwt_token.clone(), - self.config.ssl_ca_cert.clone(), req.clone(), )) }) @@ -3831,16 +3845,15 @@ impl Service { tenant_shard_id: TenantShardId, timeline_id: TimelineId, node: Node, + http_client: reqwest::Client, jwt: Option, - ssl_ca_cert: Option, behavior: Option, ) -> Result<(ShardNumber, models::detach_ancestor::AncestorDetached), ApiError> { tracing::info!( "Detaching timeline on shard {tenant_shard_id}/{timeline_id}, attached to node {node}", ); - let client = PageserverClient::new(node.get_id(), node.base_url(), jwt.as_deref(), ssl_ca_cert) - .map_err(|e| passthrough_api_error(&node, e))?; + let client = PageserverClient::new(node.get_id(), http_client, node.base_url(), jwt.as_deref()); client .timeline_detach_ancestor(tenant_shard_id, timeline_id, behavior) @@ -3879,8 +3892,8 @@ impl Service { tenant_shard_id, timeline_id, node, + self.http_client.clone(), self.config.pageserver_jwt_token.clone(), - self.config.ssl_ca_cert.clone(), behavior, )) }) @@ -3933,17 +3946,16 @@ impl Service { tenant_shard_id: TenantShardId, timeline_id: TimelineId, node: Node, + http_client: reqwest::Client, jwt: Option, - ssl_ca_cert: Option, dir: BlockUnblock, ) -> Result<(), ApiError> { let client = PageserverClient::new( node.get_id(), + http_client, node.base_url(), jwt.as_deref(), - ssl_ca_cert, - ) - .map_err(|e| passthrough_api_error(&node, e))?; + ); client .timeline_block_unblock_gc(tenant_shard_id, timeline_id, dir) @@ -3962,8 +3974,8 @@ impl Service { tenant_shard_id, timeline_id, node, + self.http_client.clone(), self.config.pageserver_jwt_token.clone(), - self.config.ssl_ca_cert.clone(), dir, )) }) @@ -4091,8 +4103,8 @@ impl Service { let r = node .with_client_retries( |client| op(tenant_shard_id, client), + &self.http_client, &self.config.pageserver_jwt_token, - &self.config.ssl_ca_cert, warn_threshold, max_retries, timeout, @@ -4316,15 +4328,14 @@ impl Service { tenant_shard_id: TenantShardId, timeline_id: TimelineId, node: Node, + http_client: reqwest::Client, jwt: Option, - ssl_ca_cert: Option, ) -> Result { tracing::info!( "Deleting timeline on shard {tenant_shard_id}/{timeline_id}, attached to node {node}", ); - let client = PageserverClient::new(node.get_id(), node.base_url(), jwt.as_deref(), ssl_ca_cert) - .map_err(|e| passthrough_api_error(&node, e))?; + let client = PageserverClient::new(node.get_id(), http_client, node.base_url(), jwt.as_deref()); let res = client .timeline_delete(tenant_shard_id, timeline_id) .await; @@ -4350,8 +4361,8 @@ impl Service { tenant_shard_id, timeline_id, node, + self.http_client.clone(), self.config.pageserver_jwt_token.clone(), - self.config.ssl_ca_cert.clone(), )) }) .await?; @@ -4373,8 +4384,8 @@ impl Service { shard_zero_tid, timeline_id, shard_zero_locations.latest.node, + self.http_client.clone(), self.config.pageserver_jwt_token.clone(), - self.config.ssl_ca_cert.clone(), ) .await?; Ok(shard_zero_status) @@ -4809,8 +4820,8 @@ impl Service { client.location_config(child_id, config, None, false).await }, + &self.http_client, &self.config.pageserver_jwt_token, - &self.config.ssl_ca_cert, 1, 10, Duration::from_secs(5), @@ -5412,11 +5423,10 @@ impl Service { } = target; let client = PageserverClient::new( node.get_id(), + self.http_client.clone(), node.base_url(), self.config.pageserver_jwt_token.as_deref(), - self.config.ssl_ca_cert.clone(), - ) - .map_err(|e| passthrough_api_error(node, e))?; + ); let response = client .tenant_shard_split( *parent_id, @@ -5900,11 +5910,10 @@ impl Service { let client = PageserverClient::new( node.get_id(), + self.http_client.clone(), node.base_url(), self.config.pageserver_jwt_token.as_deref(), - self.config.ssl_ca_cert.clone(), - ) - .map_err(|e| passthrough_api_error(&node, e))?; + ); let scan_result = client .tenant_scan_remote_storage(tenant_id) @@ -7138,6 +7147,7 @@ impl Service { units, gate_guard, &self.reconcilers_cancel, + self.http_client.clone(), ) } @@ -7545,8 +7555,8 @@ impl Service { match attached_node .with_client_retries( |client| async move { client.tenant_heatmap_upload(tenant_shard_id).await }, + &self.http_client, &self.config.pageserver_jwt_token, - &self.config.ssl_ca_cert, 3, 10, SHORT_RECONCILE_TIMEOUT, @@ -7582,8 +7592,8 @@ impl Service { ) .await }, + &self.http_client, &self.config.pageserver_jwt_token, - &self.config.ssl_ca_cert, 3, 10, SHORT_RECONCILE_TIMEOUT, @@ -7856,8 +7866,8 @@ impl Service { futures.push(async move { node.with_client_retries( |client| async move { client.top_tenant_shards(request.clone()).await }, + &self.http_client, &self.config.pageserver_jwt_token, - &self.config.ssl_ca_cert, 3, 3, Duration::from_secs(5), @@ -7976,8 +7986,8 @@ impl Service { match node .with_client_retries( |client| async move { client.tenant_secondary_status(tenant_shard_id).await }, + &self.http_client, &self.config.pageserver_jwt_token, - &self.config.ssl_ca_cert, 1, 3, Duration::from_millis(250), diff --git a/storage_controller/src/service/safekeeper_reconciler.rs b/storage_controller/src/service/safekeeper_reconciler.rs index 3f92bb391e..a60aa6ca53 100644 --- a/storage_controller/src/service/safekeeper_reconciler.rs +++ b/storage_controller/src/service/safekeeper_reconciler.rs @@ -338,7 +338,6 @@ impl SafekeeperReconciler { .safekeeper_jwt_token .clone() .map(SecretString::from); - let ssl_ca_cert = self.service.config.ssl_ca_cert.clone(); loop { let res = req .safekeeper @@ -347,8 +346,8 @@ impl SafekeeperReconciler { let closure = &closure; async move { closure(client).await } }, + self.service.get_http_client(), &jwt, - &ssl_ca_cert, 3, 10, Duration::from_secs(10), diff --git a/storage_controller/src/service/safekeeper_service.rs b/storage_controller/src/service/safekeeper_service.rs index bcfd035883..557c684f6b 100644 --- a/storage_controller/src/service/safekeeper_service.rs +++ b/storage_controller/src/service/safekeeper_service.rs @@ -78,8 +78,8 @@ impl Service { for sk in timeline_persistence.sk_set.iter() { let sk_id = NodeId(*sk as u64); let safekeepers = safekeepers.clone(); + let http_client = self.http_client.clone(); let jwt = jwt.clone(); - let ssl_ca_cert = self.config.ssl_ca_cert.clone(); let req = req.clone(); joinset.spawn(async move { // Unwrap is fine as we already would have returned error above @@ -90,8 +90,8 @@ impl Service { let req = req.clone(); async move { client.create_timeline(&req).await } }, + &http_client, &jwt, - &ssl_ca_cert, 3, 3, SK_CREATE_TIMELINE_RECONCILE_TIMEOUT, diff --git a/storage_controller/src/tenant_shard.rs b/storage_controller/src/tenant_shard.rs index 80f42e04a9..f6b748844a 100644 --- a/storage_controller/src/tenant_shard.rs +++ b/storage_controller/src/tenant_shard.rs @@ -1588,6 +1588,7 @@ impl TenantShard { units: ReconcileUnits, gate_guard: GateGuard, cancel: &CancellationToken, + http_client: reqwest::Client, ) -> Option { // Reconcile in flight for a stale sequence? Our sequence's task will wait for it before // doing our sequence's work. @@ -1633,6 +1634,7 @@ impl TenantShard { cancel: reconciler_cancel.clone(), persistence: persistence.clone(), compute_notify_failure: false, + http_client, }; let reconcile_seq = self.sequence; diff --git a/test_runner/regress/test_storage_controller.py b/test_runner/regress/test_storage_controller.py index c37859bba9..00dc087a21 100644 --- a/test_runner/regress/test_storage_controller.py +++ b/test_runner/regress/test_storage_controller.py @@ -1599,6 +1599,12 @@ def test_storage_controller_heartbeats( env.storage_controller.allowed_errors.append( ".*Call to node.*management API.*failed.*failpoint.*" ) + # The server starts listening to the socket before sending re-attach request, + # but it starts serving HTTP only when re-attach is completed. + # If re-attach is slow (last scenario), storcon's heartbeat requests will time out. + env.storage_controller.allowed_errors.append( + ".*Call to node.*management API.*failed.* Timeout.*" + ) # Initially we have two online pageservers nodes = env.storage_controller.node_list()