diff --git a/storage_controller/src/service.rs b/storage_controller/src/service.rs index 789f4da255..f33408a89b 100644 --- a/storage_controller/src/service.rs +++ b/storage_controller/src/service.rs @@ -4268,7 +4268,8 @@ impl Service { /// Helper for concurrently calling a pageserver API on a number of shards, such as timeline creation. /// - /// On success, the returned vector contains exactly the same number of elements as the input `locations`. + /// On success, the returned vector contains exactly the same number of elements as the input `locations` + /// and returned element at index `i` is the result for `req_fn(op(locations[i])`. async fn tenant_for_shards( &self, locations: Vec<(TenantShardId, Node)>, @@ -4284,18 +4285,23 @@ impl Service { let mut futs = FuturesUnordered::new(); let mut results = Vec::with_capacity(locations.len()); - for (tenant_shard_id, node) in locations { - futs.push(req_fn(tenant_shard_id, node)); + for (idx, (tenant_shard_id, node)) in locations.into_iter().enumerate() { + let fut = req_fn(tenant_shard_id, node); + futs.push(async move { (idx, fut.await) }); } - while let Some(r) = futs.next().await { - results.push(r?); + while let Some((idx, r)) = futs.next().await { + results.push((idx, r?)); } - Ok(results) + results.sort_by_key(|(idx, _)| *idx); + Ok(results.into_iter().map(|(_, r)| r).collect()) } - /// Concurrently invoke a pageserver API call on many shards at once + /// Concurrently invoke a pageserver API call on many shards at once. + /// + /// The returned Vec has the same length as the `locations` Vec, + /// and returned element at index `i` is the result for `op(locations[i])`. pub(crate) async fn tenant_for_shards_api( &self, locations: Vec<(TenantShardId, Node)>, @@ -4312,27 +4318,29 @@ impl Service { let mut futs = FuturesUnordered::new(); let mut results = Vec::with_capacity(locations.len()); - for (tenant_shard_id, node) in locations { + for (idx, (tenant_shard_id, node)) in locations.into_iter().enumerate() { futs.push(async move { - node.with_client_retries( - |client| op(tenant_shard_id, client), - &self.config.pageserver_jwt_token, - &self.config.ssl_ca_cert, - warn_threshold, - max_retries, - timeout, - cancel, - ) - .await + let r = node + .with_client_retries( + |client| op(tenant_shard_id, client), + &self.config.pageserver_jwt_token, + &self.config.ssl_ca_cert, + warn_threshold, + max_retries, + timeout, + cancel, + ) + .await; + (idx, r) }); } - while let Some(r) = futs.next().await { - let r = r.unwrap_or(Err(mgmt_api::Error::Cancelled)); - results.push(r); + while let Some((idx, r)) = futs.next().await { + results.push((idx, r.unwrap_or(Err(mgmt_api::Error::Cancelled)))); } - results + results.sort_by_key(|(idx, _)| *idx); + results.into_iter().map(|(_, r)| r).collect() } /// Helper for safely working with the shards in a tenant remotely on pageservers, for example