storcon: reuse reqwest http client (#11327)

## Problem

- Part of https://github.com/neondatabase/neon/issues/11113
- Building a new `reqwest::Client` for every request is expensive
because it parses CA certs under the hood. It's noticeable in storcon's
flamegraph.

## Summary of changes
- Reuse one `reqwest::Client` for all API calls to avoid parsing CA
certificates every time.
This commit is contained in:
Dmitrii Kovalkov
2025-03-21 15:48:22 +04:00
committed by GitHub
parent 76088c16d2
commit 0f367cb665
20 changed files with 157 additions and 168 deletions

View File

@@ -56,6 +56,14 @@ impl PageServerNode {
Certificate::from_pem(&buf).expect("CA certificate should be valid")
});
let mut http_client = reqwest::Client::builder();
if let Some(ssl_ca_cert) = ssl_ca_cert {
http_client = http_client.add_root_certificate(ssl_ca_cert);
}
let http_client = http_client
.build()
.expect("Client constructs with no errors");
let endpoint = if env.storage_controller.use_https_pageserver_api {
format!(
"https://{}",
@@ -72,6 +80,7 @@ impl PageServerNode {
conf: conf.clone(),
env: env.clone(),
http_client: mgmt_api::Client::new(
http_client,
endpoint,
{
match conf.http_auth_type {
@@ -83,9 +92,7 @@ impl PageServerNode {
}
}
.as_deref(),
ssl_ca_cert,
)
.expect("Client constructs with no errors"),
),
}
}

View File

@@ -395,9 +395,15 @@ async fn main() -> anyhow::Result<()> {
None => None,
};
let mut http_client = reqwest::Client::builder();
if let Some(ssl_ca_cert) = ssl_ca_cert {
http_client = http_client.add_root_certificate(ssl_ca_cert);
}
let http_client = http_client.build()?;
let mut trimmed = cli.api.to_string();
trimmed.pop();
let vps_client = mgmt_api::Client::new(trimmed, cli.jwt.as_deref(), ssl_ca_cert)?;
let vps_client = mgmt_api::Client::new(http_client, trimmed, cli.jwt.as_deref());
match cli.command {
Command::NodeRegister {

View File

@@ -7,7 +7,7 @@ use http_utils::error::HttpErrorBody;
use pageserver_api::models::*;
use pageserver_api::shard::TenantShardId;
pub use reqwest::Body as ReqwestBody;
use reqwest::{Certificate, IntoUrl, Method, StatusCode, Url};
use reqwest::{IntoUrl, Method, StatusCode, Url};
use utils::id::{TenantId, TimelineId};
use utils::lsn::Lsn;
@@ -39,8 +39,8 @@ pub enum Error {
#[error("Cancelled")]
Cancelled,
#[error("create client: {0}{}", .0.source().map(|e| format!(": {e}")).unwrap_or_default())]
CreateClient(reqwest::Error),
#[error("request timed out: {0}")]
Timeout(String),
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -72,24 +72,7 @@ pub enum ForceAwaitLogicalSize {
}
impl Client {
pub fn new(
mgmt_api_endpoint: String,
jwt: Option<&str>,
ssl_ca_cert: Option<Certificate>,
) -> Result<Self> {
let mut http_client = reqwest::Client::builder();
if let Some(ssl_ca_cert) = ssl_ca_cert {
http_client = http_client.add_root_certificate(ssl_ca_cert);
}
let http_client = http_client.build().map_err(Error::CreateClient)?;
Ok(Self::from_client(http_client, mgmt_api_endpoint, jwt))
}
pub fn from_client(
client: reqwest::Client,
mgmt_api_endpoint: String,
jwt: Option<&str>,
) -> Self {
pub fn new(client: reqwest::Client, mgmt_api_endpoint: String, jwt: Option<&str>) -> Self {
Self {
mgmt_api_endpoint,
authorization_header: jwt.map(|jwt| format!("Bearer {jwt}")),

View File

@@ -34,10 +34,10 @@ async fn main_impl(args: Args) -> anyhow::Result<()> {
let args: &'static Args = Box::leak(Box::new(args));
let mgmt_api_client = Arc::new(pageserver_client::mgmt_api::Client::new(
reqwest::Client::new(), // TODO: support ssl_ca_file for https APIs in pagebench.
args.mgmt_api_endpoint.clone(),
args.pageserver_jwt.as_deref(),
None, // TODO: support ssl_ca_file for https APIs in pagebench.
)?);
));
// discover targets
let timelines: Vec<TenantTimelineId> = crate::util::cli::targets::discover(

View File

@@ -75,10 +75,10 @@ async fn main_impl(
let args: &'static Args = Box::leak(Box::new(args));
let mgmt_api_client = Arc::new(pageserver_client::mgmt_api::Client::new(
reqwest::Client::new(), // TODO: support ssl_ca_file for https APIs in pagebench.
args.mgmt_api_endpoint.clone(),
args.pageserver_jwt.as_deref(),
None, // TODO: support ssl_ca_file for https APIs in pagebench.
)?);
));
// discover targets
let timelines: Vec<TenantTimelineId> = crate::util::cli::targets::discover(

View File

@@ -123,10 +123,10 @@ async fn main_impl(
let args: &'static Args = Box::leak(Box::new(args));
let mgmt_api_client = Arc::new(pageserver_client::mgmt_api::Client::new(
reqwest::Client::new(), // TODO: support ssl_ca_file for https APIs in pagebench.
args.mgmt_api_endpoint.clone(),
args.pageserver_jwt.as_deref(),
None, // TODO: support ssl_ca_file for https APIs in pagebench.
)?);
));
if let Some(engine_str) = &args.set_io_engine {
mgmt_api_client.put_io_engine(engine_str).await?;

View File

@@ -81,10 +81,10 @@ async fn main_impl(args: Args) -> anyhow::Result<()> {
let args: &'static Args = Box::leak(Box::new(args));
let mgmt_api_client = Arc::new(pageserver_client::mgmt_api::Client::new(
reqwest::Client::new(), // TODO: support ssl_ca_file for https APIs in pagebench.
args.mgmt_api_endpoint.clone(),
args.pageserver_jwt.as_deref(),
None, // TODO: support ssl_ca_file for https APIs in pagebench.
)?);
));
if let Some(engine_str) = &args.set_io_engine {
mgmt_api_client.put_io_engine(engine_str).await?;

View File

@@ -38,10 +38,10 @@ async fn main_impl(args: Args) -> anyhow::Result<()> {
let args: &'static Args = Box::leak(Box::new(args));
let mgmt_api_client = Arc::new(pageserver_client::mgmt_api::Client::new(
reqwest::Client::new(), // TODO: support ssl_ca_file for https APIs in pagebench.
args.mgmt_api_endpoint.clone(),
args.pageserver_jwt.as_deref(),
None, // TODO: support ssl_ca_file for https APIs in pagebench.
)?);
));
// discover targets
let timelines: Vec<TenantTimelineId> = crate::util::cli::targets::discover(

View File

@@ -38,9 +38,8 @@ pub enum Error {
#[error("Cancelled")]
Cancelled,
/// Failed to create client.
#[error("create client: {0}{}", .0.source().map(|e| format!(": {e}")).unwrap_or_default())]
CreateClient(reqwest::Error),
#[error("request timed out: {0}")]
Timeout(String),
}
pub type Result<T> = std::result::Result<T, Error>;

View File

@@ -8,7 +8,6 @@ use futures::StreamExt;
use futures::stream::FuturesUnordered;
use pageserver_api::controller_api::{NodeAvailability, SkSchedulingPolicy};
use pageserver_api::models::PageserverUtilization;
use reqwest::Certificate;
use safekeeper_api::models::SafekeeperUtilization;
use safekeeper_client::mgmt_api;
use thiserror::Error;
@@ -27,8 +26,8 @@ struct HeartbeaterTask<Server, State> {
max_offline_interval: Duration,
max_warming_up_interval: Duration,
http_client: reqwest::Client,
jwt_token: Option<String>,
ssl_ca_cert: Option<Certificate>,
}
#[derive(Debug, Clone)]
@@ -76,8 +75,8 @@ where
HeartbeaterTask<Server, State>: HeartBeat<Server, State>,
{
pub(crate) fn new(
http_client: reqwest::Client,
jwt_token: Option<String>,
ssl_ca_cert: Option<Certificate>,
max_offline_interval: Duration,
max_warming_up_interval: Duration,
cancel: CancellationToken,
@@ -86,8 +85,8 @@ where
tokio::sync::mpsc::unbounded_channel::<HeartbeatRequest<Server, State>>();
let mut heartbeater = HeartbeaterTask::new(
receiver,
http_client,
jwt_token,
ssl_ca_cert,
max_offline_interval,
max_warming_up_interval,
cancel,
@@ -122,8 +121,8 @@ where
{
fn new(
receiver: tokio::sync::mpsc::UnboundedReceiver<HeartbeatRequest<Server, State>>,
http_client: reqwest::Client,
jwt_token: Option<String>,
ssl_ca_cert: Option<Certificate>,
max_offline_interval: Duration,
max_warming_up_interval: Duration,
cancel: CancellationToken,
@@ -134,8 +133,8 @@ where
state: HashMap::new(),
max_offline_interval,
max_warming_up_interval,
http_client,
jwt_token,
ssl_ca_cert,
}
}
async fn run(&mut self) {
@@ -178,7 +177,7 @@ impl HeartBeat<Node, PageserverState> for HeartbeaterTask<Node, PageserverState>
let mut heartbeat_futs = FuturesUnordered::new();
for (node_id, node) in &*pageservers {
heartbeat_futs.push({
let ssl_ca_cert = self.ssl_ca_cert.clone();
let http_client = self.http_client.clone();
let jwt_token = self.jwt_token.clone();
let cancel = self.cancel.clone();
@@ -193,8 +192,8 @@ impl HeartBeat<Node, PageserverState> for HeartbeaterTask<Node, PageserverState>
let response = node_clone
.with_client_retries(
|client| async move { client.get_utilization().await },
&http_client,
&jwt_token,
&ssl_ca_cert,
3,
3,
Duration::from_secs(1),
@@ -329,19 +328,19 @@ impl HeartBeat<Safekeeper, SafekeeperState> for HeartbeaterTask<Safekeeper, Safe
continue;
}
heartbeat_futs.push({
let http_client = self.http_client.clone();
let jwt_token = self
.jwt_token
.as_ref()
.map(|t| SecretString::from(t.to_owned()));
let ssl_ca_cert = self.ssl_ca_cert.clone();
let cancel = self.cancel.clone();
async move {
let response = sk
.with_client_retries(
|client| async move { client.get_utilization().await },
&http_client,
&jwt_token,
&ssl_ca_cert,
3,
3,
Duration::from_secs(1),

View File

@@ -656,11 +656,10 @@ async fn handle_tenant_timeline_passthrough(
let _timer = latency.start_timer(labels.clone());
let client = mgmt_api::Client::new(
service.get_http_client().clone(),
node.base_url(),
service.get_config().pageserver_jwt_token.as_deref(),
service.get_config().ssl_ca_cert.clone(),
)
.map_err(|e| ApiError::InternalServerError(anyhow::anyhow!(e)))?;
);
let resp = client.get_raw(path).await.map_err(|e|
// We return 503 here because if we can't successfully send a request to the pageserver,
// either we aren't available or the pageserver is unavailable.

View File

@@ -7,7 +7,7 @@ use pageserver_api::controller_api::{
};
use pageserver_api::shard::TenantShardId;
use pageserver_client::mgmt_api;
use reqwest::{Certificate, StatusCode};
use reqwest::StatusCode;
use serde::Serialize;
use tokio_util::sync::CancellationToken;
use utils::backoff;
@@ -280,8 +280,8 @@ impl Node {
pub(crate) async fn with_client_retries<T, O, F>(
&self,
mut op: O,
http_client: &reqwest::Client,
jwt: &Option<String>,
ssl_ca_cert: &Option<Certificate>,
warn_threshold: u32,
max_retries: u32,
timeout: Duration,
@@ -300,24 +300,13 @@ impl Node {
| ApiError(StatusCode::REQUEST_TIMEOUT, _) => false,
ApiError(_, _) => true,
Cancelled => true,
CreateClient(_) => true,
Timeout(_) => false,
}
}
// TODO: refactor PageserverClient and with_client_retires (#11113).
let mut http_client = reqwest::ClientBuilder::new().timeout(timeout);
if let Some(ssl_ca_cert) = ssl_ca_cert.as_ref() {
http_client = http_client.add_root_certificate(ssl_ca_cert.clone())
}
let http_client = match http_client.build() {
Ok(http_client) => http_client,
Err(err) => return Some(Err(mgmt_api::Error::CreateClient(err))),
};
backoff::retry(
|| {
let client = PageserverClient::from_client(
let client = PageserverClient::new(
self.get_id(),
http_client.clone(),
self.base_url(),
@@ -326,11 +315,14 @@ impl Node {
let node_cancel_fut = self.cancel.cancelled();
let op_fut = op(client);
let op_fut = tokio::time::timeout(timeout, op(client));
async {
tokio::select! {
r = op_fut=> {r},
r = op_fut => match r {
Ok(r) => r,
Err(e) => Err(mgmt_api::Error::Timeout(format!("{e}"))),
},
_ = node_cancel_fut => {
Err(mgmt_api::Error::Cancelled)
}}

View File

@@ -8,7 +8,7 @@ use pageserver_api::models::{
use pageserver_api::shard::TenantShardId;
use pageserver_client::BlockUnblock;
use pageserver_client::mgmt_api::{Client, Result};
use reqwest::{Certificate, StatusCode};
use reqwest::StatusCode;
use utils::id::{NodeId, TenantId, TimelineId};
/// Thin wrapper around [`pageserver_client::mgmt_api::Client`]. It allows the storage
@@ -47,25 +47,13 @@ macro_rules! measured_request {
impl PageserverClient {
pub(crate) fn new(
node_id: NodeId,
mgmt_api_endpoint: String,
jwt: Option<&str>,
ssl_ca_cert: Option<Certificate>,
) -> Result<Self> {
Ok(Self {
inner: Client::new(mgmt_api_endpoint, jwt, ssl_ca_cert)?,
node_id_label: node_id.0.to_string(),
})
}
pub(crate) fn from_client(
node_id: NodeId,
raw_client: reqwest::Client,
mgmt_api_endpoint: String,
jwt: Option<&str>,
) -> Self {
Self {
inner: Client::from_client(raw_client, mgmt_api_endpoint, jwt),
inner: Client::new(raw_client, mgmt_api_endpoint, jwt),
node_id_label: node_id.0.to_string(),
}
}

View File

@@ -86,6 +86,9 @@ pub(super) struct Reconciler {
/// Access to persistent storage for updating generation numbers
pub(crate) persistence: Arc<Persistence>,
/// HTTP client with proper CA certs.
pub(crate) http_client: reqwest::Client,
}
pub(crate) struct ReconcilerConfigBuilder {
@@ -298,8 +301,8 @@ impl Reconciler {
.location_config(tenant_shard_id, config.clone(), flush_ms, lazy)
.await
},
&self.http_client,
&self.service_config.pageserver_jwt_token,
&self.service_config.ssl_ca_cert,
1,
3,
timeout,
@@ -419,10 +422,10 @@ impl Reconciler {
let client = PageserverClient::new(
node.get_id(),
self.http_client.clone(),
node.base_url(),
self.service_config.pageserver_jwt_token.as_deref(),
self.service_config.ssl_ca_cert.clone(),
)?;
);
client
.wait_lsn(
@@ -443,10 +446,10 @@ impl Reconciler {
) -> anyhow::Result<HashMap<TimelineId, Lsn>> {
let client = PageserverClient::new(
node.get_id(),
self.http_client.clone(),
node.base_url(),
self.service_config.pageserver_jwt_token.as_deref(),
self.service_config.ssl_ca_cert.clone(),
)?;
);
let timelines = client.timeline_list(&tenant_shard_id).await?;
Ok(timelines
@@ -483,8 +486,8 @@ impl Reconciler {
)
.await
},
&self.http_client,
&self.service_config.pageserver_jwt_token,
&self.service_config.ssl_ca_cert,
1,
3,
request_download_timeout * 2,
@@ -778,8 +781,8 @@ impl Reconciler {
let observed_conf = match attached_node
.with_client_retries(
|client| async move { client.get_location_config(tenant_shard_id).await },
&self.http_client,
&self.service_config.pageserver_jwt_token,
&self.service_config.ssl_ca_cert,
1,
1,
Duration::from_secs(5),
@@ -1127,8 +1130,8 @@ impl Reconciler {
match origin
.with_client_retries(
|client| async move { client.get_location_config(tenant_shard_id).await },
&self.http_client,
&self.service_config.pageserver_jwt_token,
&self.service_config.ssl_ca_cert,
1,
3,
Duration::from_secs(5),

View File

@@ -1,7 +1,7 @@
use std::time::Duration;
use pageserver_api::controller_api::{SafekeeperDescribeResponse, SkSchedulingPolicy};
use reqwest::{Certificate, StatusCode};
use reqwest::StatusCode;
use safekeeper_client::mgmt_api;
use tokio_util::sync::CancellationToken;
use utils::backoff;
@@ -94,8 +94,8 @@ impl Safekeeper {
pub(crate) async fn with_client_retries<T, O, F>(
&self,
mut op: O,
http_client: &reqwest::Client,
jwt: &Option<SecretString>,
ssl_ca_cert: &Option<Certificate>,
warn_threshold: u32,
max_retries: u32,
timeout: Duration,
@@ -114,17 +114,10 @@ impl Safekeeper {
| ApiError(StatusCode::REQUEST_TIMEOUT, _) => false,
ApiError(_, _) => true,
Cancelled => true,
CreateClient(_) => true,
Timeout(_) => false,
}
}
// TODO: refactor SafekeeperClient and with_client_retires (#11113).
let mut http_client = reqwest::Client::builder().timeout(timeout);
if let Some(ssl_ca_cert) = ssl_ca_cert.as_ref() {
http_client = http_client.add_root_certificate(ssl_ca_cert.clone());
}
let http_client = http_client.build().map_err(mgmt_api::Error::CreateClient)?;
backoff::retry(
|| {
let client = SafekeeperClient::new(
@@ -136,11 +129,14 @@ impl Safekeeper {
let node_cancel_fut = self.cancel.cancelled();
let op_fut = op(client);
let op_fut = tokio::time::timeout(timeout, op(client));
async {
tokio::select! {
r = op_fut=> {r},
r = op_fut => match r {
Ok(r) => r,
Err(e) => Err(mgmt_api::Error::Timeout(format!("{e}"))),
},
_ = node_cancel_fut => {
Err(mgmt_api::Error::Cancelled)
}}

View File

@@ -267,7 +267,7 @@ fn passthrough_api_error(node: &Node, e: mgmt_api::Error) -> ApiError {
ApiError::Conflict(format!("{node} {status}: {status} {msg}"))
}
mgmt_api::Error::Cancelled => ApiError::ShuttingDown,
mgmt_api::Error::CreateClient(e) => ApiError::InternalServerError(anyhow::anyhow!(e)),
mgmt_api::Error::Timeout(e) => ApiError::Timeout(e.into()),
}
}
@@ -524,6 +524,9 @@ pub struct Service {
/// This waits for initial reconciliation with pageservers to complete. Until this barrier
/// passes, it isn't safe to do any actions that mutate tenants.
pub(crate) startup_complete: Barrier,
/// HTTP client with proper CA certs.
http_client: reqwest::Client,
}
impl From<ReconcileWaitError> for ApiError {
@@ -667,6 +670,10 @@ impl Service {
&self.config
}
pub fn get_http_client(&self) -> &reqwest::Client {
&self.http_client
}
/// Called once on startup, this function attempts to contact all pageservers to build an up-to-date
/// view of the world, and determine which pageservers are responsive.
#[instrument(skip_all)]
@@ -965,8 +972,8 @@ impl Service {
let response = node
.with_client_retries(
|client| async move { client.list_location_config().await },
&self.http_client,
&self.config.pageserver_jwt_token,
&self.config.ssl_ca_cert,
1,
5,
timeout,
@@ -1064,20 +1071,12 @@ impl Service {
break;
}
let client = match PageserverClient::new(
let client = PageserverClient::new(
node.get_id(),
self.http_client.clone(),
node.base_url(),
self.config.pageserver_jwt_token.as_deref(),
self.config.ssl_ca_cert.clone(),
) {
Ok(client) => client,
Err(e) => {
tracing::error!(
"Failed to create client to detach unknown shard {tenant_shard_id} on pageserver {node_id}: {e}"
);
continue;
}
};
);
match client
.location_config(
tenant_shard_id,
@@ -1655,17 +1654,36 @@ impl Service {
let cancel = CancellationToken::new();
let reconcilers_cancel = cancel.child_token();
let mut http_client = reqwest::Client::builder();
// We intentionally disable the connection pool, so every request will create its own TCP connection.
// It's especially important for heartbeaters to notice more network problems.
//
// TODO: It makes sense to use this client only in heartbeaters and create a second one with
// connection pooling for everything else. But reqwest::Client may create a connection without
// ever using it (it uses hyper's Client under the hood):
// https://github.com/hyperium/hyper-util/blob/d51318df3461d40e5f5e5ca163cb3905ac960209/src/client/legacy/client.rs#L415
//
// Because of a bug in hyper0::Connection::graceful_shutdown such connections hang during
// graceful server shutdown: https://github.com/hyperium/hyper/issues/2730
//
// The bug has been fixed in hyper v1, so keep alive may be enabled only after we migrate to hyper1.
http_client = http_client.pool_max_idle_per_host(0);
if let Some(ssl_ca_cert) = &config.ssl_ca_cert {
http_client = http_client.add_root_certificate(ssl_ca_cert.clone());
}
let http_client = http_client.build()?;
let heartbeater_ps = Heartbeater::new(
http_client.clone(),
config.pageserver_jwt_token.clone(),
config.ssl_ca_cert.clone(),
config.max_offline_interval,
config.max_warming_up_interval,
cancel.clone(),
);
let heartbeater_sk = Heartbeater::new(
http_client.clone(),
config.safekeeper_jwt_token.clone(),
config.ssl_ca_cert.clone(),
config.max_offline_interval,
config.max_warming_up_interval,
cancel.clone(),
@@ -1708,6 +1726,7 @@ impl Service {
reconcilers_gate: Gate::default(),
tenant_op_locks: Default::default(),
node_op_locks: Default::default(),
http_client,
});
let result_task_this = this.clone();
@@ -2013,8 +2032,8 @@ impl Service {
let configs = match node
.with_client_retries(
|client| async move { client.list_location_config().await },
&self.http_client,
&self.config.pageserver_jwt_token,
&self.config.ssl_ca_cert,
1,
5,
SHORT_RECONCILE_TIMEOUT,
@@ -2092,8 +2111,8 @@ impl Service {
.location_config(tenant_shard_id, config, None, false)
.await
},
&self.http_client,
&self.config.pageserver_jwt_token,
&self.config.ssl_ca_cert,
1,
5,
SHORT_RECONCILE_TIMEOUT,
@@ -3235,11 +3254,10 @@ impl Service {
for tenant_shard_id in shard_ids {
let client = PageserverClient::new(
node.get_id(),
self.http_client.clone(),
node.base_url(),
self.config.pageserver_jwt_token.as_deref(),
self.config.ssl_ca_cert.clone(),
)
.map_err(|e| passthrough_api_error(&node, e))?;
);
tracing::info!("Doing time travel recovery for shard {tenant_shard_id}",);
@@ -3298,11 +3316,10 @@ impl Service {
for (tenant_shard_id, node) in targets {
let client = PageserverClient::new(
node.get_id(),
self.http_client.clone(),
node.base_url(),
self.config.pageserver_jwt_token.as_deref(),
self.config.ssl_ca_cert.clone(),
)
.map_err(|e| passthrough_api_error(&node, e))?;
);
futs.push(async move {
let result = client
.tenant_secondary_download(tenant_shard_id, wait)
@@ -3427,8 +3444,8 @@ impl Service {
.tenant_delete(TenantShardId::unsharded(tenant_id))
.await
},
&self.http_client,
&self.config.pageserver_jwt_token,
&self.config.ssl_ca_cert,
1,
3,
RECONCILE_TIMEOUT,
@@ -3580,8 +3597,8 @@ impl Service {
async fn create_one(
tenant_shard_id: TenantShardId,
locations: ShardMutationLocations,
http_client: reqwest::Client,
jwt: Option<String>,
ssl_ca_cert: Option<Certificate>,
create_req: TimelineCreateRequest,
) -> Result<TimelineInfo, ApiError> {
let latest = locations.latest.node;
@@ -3594,8 +3611,7 @@ impl Service {
);
let client =
PageserverClient::new(latest.get_id(), latest.base_url(), jwt.as_deref(), ssl_ca_cert.clone())
.map_err(|e| passthrough_api_error(&latest, e))?;
PageserverClient::new(latest.get_id(), http_client.clone(), latest.base_url(), jwt.as_deref());
let timeline_info = client
.timeline_create(tenant_shard_id, &create_req)
@@ -3616,11 +3632,10 @@ impl Service {
let client = PageserverClient::new(
location.node.get_id(),
http_client.clone(),
location.node.base_url(),
jwt.as_deref(),
ssl_ca_cert.clone(),
)
.map_err(|e| passthrough_api_error(&location.node, e))?;
);
let res = client
.timeline_create(tenant_shard_id, &create_req)
@@ -3648,8 +3663,8 @@ impl Service {
let timeline_info = create_one(
shard_zero_tid,
shard_zero_locations,
self.http_client.clone(),
self.config.pageserver_jwt_token.clone(),
self.config.ssl_ca_cert.clone(),
create_req.clone(),
)
.await?;
@@ -3678,8 +3693,8 @@ impl Service {
Box::pin(create_one(
tenant_shard_id,
mutation_locations,
self.http_client.clone(),
jwt.clone(),
self.config.ssl_ca_cert.clone(),
create_req,
))
},
@@ -3762,16 +3777,15 @@ impl Service {
tenant_shard_id: TenantShardId,
timeline_id: TimelineId,
node: Node,
http_client: reqwest::Client,
jwt: Option<String>,
ssl_ca_cert: Option<Certificate>,
req: TimelineArchivalConfigRequest,
) -> Result<(), ApiError> {
tracing::info!(
"Setting archival config of timeline on shard {tenant_shard_id}/{timeline_id}, attached to node {node}",
);
let client = PageserverClient::new(node.get_id(), node.base_url(), jwt.as_deref(), ssl_ca_cert)
.map_err(|e| passthrough_api_error(&node, e))?;
let client = PageserverClient::new(node.get_id(), http_client, node.base_url(), jwt.as_deref());
client
.timeline_archival_config(tenant_shard_id, timeline_id, &req)
@@ -3793,8 +3807,8 @@ impl Service {
tenant_shard_id,
timeline_id,
node,
self.http_client.clone(),
self.config.pageserver_jwt_token.clone(),
self.config.ssl_ca_cert.clone(),
req.clone(),
))
})
@@ -3831,16 +3845,15 @@ impl Service {
tenant_shard_id: TenantShardId,
timeline_id: TimelineId,
node: Node,
http_client: reqwest::Client,
jwt: Option<String>,
ssl_ca_cert: Option<Certificate>,
behavior: Option<DetachBehavior>,
) -> Result<(ShardNumber, models::detach_ancestor::AncestorDetached), ApiError> {
tracing::info!(
"Detaching timeline on shard {tenant_shard_id}/{timeline_id}, attached to node {node}",
);
let client = PageserverClient::new(node.get_id(), node.base_url(), jwt.as_deref(), ssl_ca_cert)
.map_err(|e| passthrough_api_error(&node, e))?;
let client = PageserverClient::new(node.get_id(), http_client, node.base_url(), jwt.as_deref());
client
.timeline_detach_ancestor(tenant_shard_id, timeline_id, behavior)
@@ -3879,8 +3892,8 @@ impl Service {
tenant_shard_id,
timeline_id,
node,
self.http_client.clone(),
self.config.pageserver_jwt_token.clone(),
self.config.ssl_ca_cert.clone(),
behavior,
))
})
@@ -3933,17 +3946,16 @@ impl Service {
tenant_shard_id: TenantShardId,
timeline_id: TimelineId,
node: Node,
http_client: reqwest::Client,
jwt: Option<String>,
ssl_ca_cert: Option<Certificate>,
dir: BlockUnblock,
) -> Result<(), ApiError> {
let client = PageserverClient::new(
node.get_id(),
http_client,
node.base_url(),
jwt.as_deref(),
ssl_ca_cert,
)
.map_err(|e| passthrough_api_error(&node, e))?;
);
client
.timeline_block_unblock_gc(tenant_shard_id, timeline_id, dir)
@@ -3962,8 +3974,8 @@ impl Service {
tenant_shard_id,
timeline_id,
node,
self.http_client.clone(),
self.config.pageserver_jwt_token.clone(),
self.config.ssl_ca_cert.clone(),
dir,
))
})
@@ -4091,8 +4103,8 @@ impl Service {
let r = node
.with_client_retries(
|client| op(tenant_shard_id, client),
&self.http_client,
&self.config.pageserver_jwt_token,
&self.config.ssl_ca_cert,
warn_threshold,
max_retries,
timeout,
@@ -4316,15 +4328,14 @@ impl Service {
tenant_shard_id: TenantShardId,
timeline_id: TimelineId,
node: Node,
http_client: reqwest::Client,
jwt: Option<String>,
ssl_ca_cert: Option<Certificate>,
) -> Result<StatusCode, ApiError> {
tracing::info!(
"Deleting timeline on shard {tenant_shard_id}/{timeline_id}, attached to node {node}",
);
let client = PageserverClient::new(node.get_id(), node.base_url(), jwt.as_deref(), ssl_ca_cert)
.map_err(|e| passthrough_api_error(&node, e))?;
let client = PageserverClient::new(node.get_id(), http_client, node.base_url(), jwt.as_deref());
let res = client
.timeline_delete(tenant_shard_id, timeline_id)
.await;
@@ -4350,8 +4361,8 @@ impl Service {
tenant_shard_id,
timeline_id,
node,
self.http_client.clone(),
self.config.pageserver_jwt_token.clone(),
self.config.ssl_ca_cert.clone(),
))
})
.await?;
@@ -4373,8 +4384,8 @@ impl Service {
shard_zero_tid,
timeline_id,
shard_zero_locations.latest.node,
self.http_client.clone(),
self.config.pageserver_jwt_token.clone(),
self.config.ssl_ca_cert.clone(),
)
.await?;
Ok(shard_zero_status)
@@ -4809,8 +4820,8 @@ impl Service {
client.location_config(child_id, config, None, false).await
},
&self.http_client,
&self.config.pageserver_jwt_token,
&self.config.ssl_ca_cert,
1,
10,
Duration::from_secs(5),
@@ -5412,11 +5423,10 @@ impl Service {
} = target;
let client = PageserverClient::new(
node.get_id(),
self.http_client.clone(),
node.base_url(),
self.config.pageserver_jwt_token.as_deref(),
self.config.ssl_ca_cert.clone(),
)
.map_err(|e| passthrough_api_error(node, e))?;
);
let response = client
.tenant_shard_split(
*parent_id,
@@ -5900,11 +5910,10 @@ impl Service {
let client = PageserverClient::new(
node.get_id(),
self.http_client.clone(),
node.base_url(),
self.config.pageserver_jwt_token.as_deref(),
self.config.ssl_ca_cert.clone(),
)
.map_err(|e| passthrough_api_error(&node, e))?;
);
let scan_result = client
.tenant_scan_remote_storage(tenant_id)
@@ -7138,6 +7147,7 @@ impl Service {
units,
gate_guard,
&self.reconcilers_cancel,
self.http_client.clone(),
)
}
@@ -7545,8 +7555,8 @@ impl Service {
match attached_node
.with_client_retries(
|client| async move { client.tenant_heatmap_upload(tenant_shard_id).await },
&self.http_client,
&self.config.pageserver_jwt_token,
&self.config.ssl_ca_cert,
3,
10,
SHORT_RECONCILE_TIMEOUT,
@@ -7582,8 +7592,8 @@ impl Service {
)
.await
},
&self.http_client,
&self.config.pageserver_jwt_token,
&self.config.ssl_ca_cert,
3,
10,
SHORT_RECONCILE_TIMEOUT,
@@ -7856,8 +7866,8 @@ impl Service {
futures.push(async move {
node.with_client_retries(
|client| async move { client.top_tenant_shards(request.clone()).await },
&self.http_client,
&self.config.pageserver_jwt_token,
&self.config.ssl_ca_cert,
3,
3,
Duration::from_secs(5),
@@ -7976,8 +7986,8 @@ impl Service {
match node
.with_client_retries(
|client| async move { client.tenant_secondary_status(tenant_shard_id).await },
&self.http_client,
&self.config.pageserver_jwt_token,
&self.config.ssl_ca_cert,
1,
3,
Duration::from_millis(250),

View File

@@ -338,7 +338,6 @@ impl SafekeeperReconciler {
.safekeeper_jwt_token
.clone()
.map(SecretString::from);
let ssl_ca_cert = self.service.config.ssl_ca_cert.clone();
loop {
let res = req
.safekeeper
@@ -347,8 +346,8 @@ impl SafekeeperReconciler {
let closure = &closure;
async move { closure(client).await }
},
self.service.get_http_client(),
&jwt,
&ssl_ca_cert,
3,
10,
Duration::from_secs(10),

View File

@@ -78,8 +78,8 @@ impl Service {
for sk in timeline_persistence.sk_set.iter() {
let sk_id = NodeId(*sk as u64);
let safekeepers = safekeepers.clone();
let http_client = self.http_client.clone();
let jwt = jwt.clone();
let ssl_ca_cert = self.config.ssl_ca_cert.clone();
let req = req.clone();
joinset.spawn(async move {
// Unwrap is fine as we already would have returned error above
@@ -90,8 +90,8 @@ impl Service {
let req = req.clone();
async move { client.create_timeline(&req).await }
},
&http_client,
&jwt,
&ssl_ca_cert,
3,
3,
SK_CREATE_TIMELINE_RECONCILE_TIMEOUT,

View File

@@ -1588,6 +1588,7 @@ impl TenantShard {
units: ReconcileUnits,
gate_guard: GateGuard,
cancel: &CancellationToken,
http_client: reqwest::Client,
) -> Option<ReconcilerWaiter> {
// Reconcile in flight for a stale sequence? Our sequence's task will wait for it before
// doing our sequence's work.
@@ -1633,6 +1634,7 @@ impl TenantShard {
cancel: reconciler_cancel.clone(),
persistence: persistence.clone(),
compute_notify_failure: false,
http_client,
};
let reconcile_seq = self.sequence;

View File

@@ -1599,6 +1599,12 @@ def test_storage_controller_heartbeats(
env.storage_controller.allowed_errors.append(
".*Call to node.*management API.*failed.*failpoint.*"
)
# The server starts listening to the socket before sending re-attach request,
# but it starts serving HTTP only when re-attach is completed.
# If re-attach is slow (last scenario), storcon's heartbeat requests will time out.
env.storage_controller.allowed_errors.append(
".*Call to node.*management API.*failed.* Timeout.*"
)
# Initially we have two online pageservers
nodes = env.storage_controller.node_list()