From 04370b48b30a2ba63a2e17fbc0405ec9e403ff50 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Fri, 14 Mar 2025 12:21:16 +0100 Subject: [PATCH] fix(storcon): optimization validation makes decisions based on wrong SecondaryProgress (#11229) # Refs - fixes https://github.com/neondatabase/neon/issues/11228 # Problem High-Level When storcon validates whether a `ScheduleOptimizationAction` should be applied, it retrieves the `tenant_secondary_status` to determine whether a secondary is ready for the optimization. When collecting results, it associates secondary statuses with the wrong optimization actions in the batch of optimizations that we're validating. The result is that we make the decision for shard/location X based on the SecondaryStatus of a random secondary location Y in the current batch of optimizations. A possible symptom is an early cutover, as seen in this engineering investigation here: - https://github.com/neondatabase/cloud/issues/25734 # Problem Code-Level This code here in `optimize_all_validate` https://github.com/neondatabase/neon/blob/97e2e27f682003bcc8ac1c9e625bc3675f394264/storage_controller/src/service.rs#L7012-L7029 zips the `want_secondary_status` with the Vec returned from `tenant_for_shards_api` . However, the Vec returned from `want_secondary_status` is not ordered (it uses FuturesUnordered internally). # Solution Sort the Vec in input order before returning it. `optimize_all_validate` was the only caller affected by this problem While at it, also future-proof similar-looking function `tenant_for_shards`. None of its callers care about the order, but this type of function signature is easy to use incorrectly. # Future Work Avoid the additional iteration, map, and allocation. Change API to leverage AsyncFn (async closure). And/or invert `tenant_for_shards_api` into a Future ext trait / iterator adaptor thing. --- storage_controller/src/service.rs | 52 ++++++++++++++++++------------- 1 file changed, 30 insertions(+), 22 deletions(-) diff --git a/storage_controller/src/service.rs b/storage_controller/src/service.rs index 789f4da255..f33408a89b 100644 --- a/storage_controller/src/service.rs +++ b/storage_controller/src/service.rs @@ -4268,7 +4268,8 @@ impl Service { /// Helper for concurrently calling a pageserver API on a number of shards, such as timeline creation. /// - /// On success, the returned vector contains exactly the same number of elements as the input `locations`. + /// On success, the returned vector contains exactly the same number of elements as the input `locations` + /// and returned element at index `i` is the result for `req_fn(op(locations[i])`. async fn tenant_for_shards( &self, locations: Vec<(TenantShardId, Node)>, @@ -4284,18 +4285,23 @@ impl Service { let mut futs = FuturesUnordered::new(); let mut results = Vec::with_capacity(locations.len()); - for (tenant_shard_id, node) in locations { - futs.push(req_fn(tenant_shard_id, node)); + for (idx, (tenant_shard_id, node)) in locations.into_iter().enumerate() { + let fut = req_fn(tenant_shard_id, node); + futs.push(async move { (idx, fut.await) }); } - while let Some(r) = futs.next().await { - results.push(r?); + while let Some((idx, r)) = futs.next().await { + results.push((idx, r?)); } - Ok(results) + results.sort_by_key(|(idx, _)| *idx); + Ok(results.into_iter().map(|(_, r)| r).collect()) } - /// Concurrently invoke a pageserver API call on many shards at once + /// Concurrently invoke a pageserver API call on many shards at once. + /// + /// The returned Vec has the same length as the `locations` Vec, + /// and returned element at index `i` is the result for `op(locations[i])`. pub(crate) async fn tenant_for_shards_api( &self, locations: Vec<(TenantShardId, Node)>, @@ -4312,27 +4318,29 @@ impl Service { let mut futs = FuturesUnordered::new(); let mut results = Vec::with_capacity(locations.len()); - for (tenant_shard_id, node) in locations { + for (idx, (tenant_shard_id, node)) in locations.into_iter().enumerate() { futs.push(async move { - node.with_client_retries( - |client| op(tenant_shard_id, client), - &self.config.pageserver_jwt_token, - &self.config.ssl_ca_cert, - warn_threshold, - max_retries, - timeout, - cancel, - ) - .await + let r = node + .with_client_retries( + |client| op(tenant_shard_id, client), + &self.config.pageserver_jwt_token, + &self.config.ssl_ca_cert, + warn_threshold, + max_retries, + timeout, + cancel, + ) + .await; + (idx, r) }); } - while let Some(r) = futs.next().await { - let r = r.unwrap_or(Err(mgmt_api::Error::Cancelled)); - results.push(r); + while let Some((idx, r)) = futs.next().await { + results.push((idx, r.unwrap_or(Err(mgmt_api::Error::Cancelled)))); } - results + results.sort_by_key(|(idx, _)| *idx); + results.into_iter().map(|(_, r)| r).collect() } /// Helper for safely working with the shards in a tenant remotely on pageservers, for example