mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-16 01:42:55 +00:00
Merge branch 'hotfix/release/2025-03-14-storcon-optimizations' into rc/release/2025-03-14--storcon-hotfix
This commit is contained in:
@@ -3837,7 +3837,8 @@ impl Service {
|
||||
|
||||
/// Helper for concurrently calling a pageserver API on a number of shards, such as timeline creation.
|
||||
///
|
||||
/// On success, the returned vector contains exactly the same number of elements as the input `locations`.
|
||||
/// On success, the returned vector contains exactly the same number of elements as the input `locations`
|
||||
/// and returned element at index `i` is the result for `req_fn(op(locations[i])`.
|
||||
async fn tenant_for_shards<F, R>(
|
||||
&self,
|
||||
locations: Vec<(TenantShardId, Node)>,
|
||||
@@ -3853,18 +3854,23 @@ impl Service {
|
||||
let mut futs = FuturesUnordered::new();
|
||||
let mut results = Vec::with_capacity(locations.len());
|
||||
|
||||
for (tenant_shard_id, node) in locations {
|
||||
futs.push(req_fn(tenant_shard_id, node));
|
||||
for (idx, (tenant_shard_id, node)) in locations.into_iter().enumerate() {
|
||||
let fut = req_fn(tenant_shard_id, node);
|
||||
futs.push(async move { (idx, fut.await) });
|
||||
}
|
||||
|
||||
while let Some(r) = futs.next().await {
|
||||
results.push(r?);
|
||||
while let Some((idx, r)) = futs.next().await {
|
||||
results.push((idx, r?));
|
||||
}
|
||||
|
||||
Ok(results)
|
||||
results.sort_by_key(|(idx, _)| *idx);
|
||||
Ok(results.into_iter().map(|(_, r)| r).collect())
|
||||
}
|
||||
|
||||
/// Concurrently invoke a pageserver API call on many shards at once
|
||||
/// Concurrently invoke a pageserver API call on many shards at once.
|
||||
///
|
||||
/// The returned Vec has the same length as the `locations` Vec,
|
||||
/// and returned element at index `i` is the result for `op(locations[i])`.
|
||||
pub(crate) async fn tenant_for_shards_api<T, O, F>(
|
||||
&self,
|
||||
locations: Vec<(TenantShardId, Node)>,
|
||||
@@ -3881,26 +3887,28 @@ impl Service {
|
||||
let mut futs = FuturesUnordered::new();
|
||||
let mut results = Vec::with_capacity(locations.len());
|
||||
|
||||
for (tenant_shard_id, node) in locations {
|
||||
for (idx, (tenant_shard_id, node)) in locations.into_iter().enumerate() {
|
||||
futs.push(async move {
|
||||
node.with_client_retries(
|
||||
|client| op(tenant_shard_id, client),
|
||||
&self.config.pageserver_jwt_token,
|
||||
warn_threshold,
|
||||
max_retries,
|
||||
timeout,
|
||||
cancel,
|
||||
)
|
||||
.await
|
||||
let r = node
|
||||
.with_client_retries(
|
||||
|client| op(tenant_shard_id, client),
|
||||
&self.config.pageserver_jwt_token,
|
||||
warn_threshold,
|
||||
max_retries,
|
||||
timeout,
|
||||
cancel,
|
||||
)
|
||||
.await;
|
||||
(idx, r)
|
||||
});
|
||||
}
|
||||
|
||||
while let Some(r) = futs.next().await {
|
||||
let r = r.unwrap_or(Err(mgmt_api::Error::Cancelled));
|
||||
results.push(r);
|
||||
while let Some((idx, r)) = futs.next().await {
|
||||
results.push((idx, r.unwrap_or(Err(mgmt_api::Error::Cancelled))));
|
||||
}
|
||||
|
||||
results
|
||||
results.sort_by_key(|(idx, _)| *idx);
|
||||
results.into_iter().map(|(_, r)| r).collect()
|
||||
}
|
||||
|
||||
/// Helper for safely working with the shards in a tenant remotely on pageservers, for example
|
||||
|
||||
Reference in New Issue
Block a user