From 961835add6ff52095a3a1ca47eed7645b428d062 Mon Sep 17 00:00:00 2001 From: Dmitrii Kovalkov Date: Wed, 23 Jul 2025 13:38:32 +0400 Subject: [PATCH] storcon: do not retry sk migration ops if the quorum is reached --- .../src/service/safekeeper_service.rs | 38 +++++++++++++++---- test_runner/fixtures/neon_fixtures.py | 11 ++++++ .../regress/test_safekeeper_migration.py | 30 +++++++++++++++ 3 files changed, 72 insertions(+), 7 deletions(-) diff --git a/storage_controller/src/service/safekeeper_service.rs b/storage_controller/src/service/safekeeper_service.rs index bc77a1a6b8..dede1e5904 100644 --- a/storage_controller/src/service/safekeeper_service.rs +++ b/storage_controller/src/service/safekeeper_service.rs @@ -123,12 +123,17 @@ impl Service { /// Perform an operation on a list of safekeepers in parallel with retries. /// + /// If desired_success_count is set, the remaining operations will be cancelled + /// when the desired number of successful responses is reached. + /// /// Return the results of the operation on each safekeeper in the input order. async fn tenant_timeline_safekeeper_op( &self, safekeepers: &[Safekeeper], op: O, + max_retries: u32, timeout: Duration, + desired_success_count: Option, ) -> Result>, ApiError> where O: FnMut(SafekeeperClient) -> F + Send + 'static, @@ -136,6 +141,7 @@ impl Service { F: std::future::Future> + Send + 'static, T: Sync + Send + 'static, { + let warn_threshold = std::cmp::min(3, max_retries); let jwt = self .config .safekeeper_jwt_token @@ -143,23 +149,26 @@ impl Service { .map(SecretString::from); let mut joinset = JoinSet::new(); + let cancel = CancellationToken::new(); + for (idx, sk) in safekeepers.iter().enumerate() { let sk = sk.clone(); let http_client = self.http_client.clone(); let jwt = jwt.clone(); let op = op.clone(); + let cancel = cancel.clone(); joinset.spawn(async move { let res = sk .with_client_retries( op, &http_client, &jwt, - 3, - 3, + warn_threshold, + max_retries, // TODO(diko): This is a wrong timeout. // It should be scaled to the retry count. timeout, - &CancellationToken::new(), + &cancel, ) .await; (idx, res) @@ -184,6 +193,7 @@ impl Service { // Wait until all tasks finish or timeout is hit, whichever occurs // first. let mut result_count = 0; + let mut success_count = 0; loop { if let Ok(res) = tokio::time::timeout_at(reconcile_deadline, joinset.join_next()).await { @@ -198,6 +208,12 @@ impl Service { // Only print errors, as there is no Debug trait for T. res.as_ref().map(|_| ()), ); + if res.is_ok() { + success_count += 1; + if desired_success_count == Some(success_count) { + cancel.cancel(); + } + } results[idx] = res; result_count += 1; } @@ -247,14 +263,14 @@ impl Service { ); } + let quorum_size = target_sk_count / 2 + 1; + let max_retries = 3; + let results = self - .tenant_timeline_safekeeper_op(safekeepers, op, timeout) + .tenant_timeline_safekeeper_op(safekeepers, op, max_retries, timeout, Some(quorum_size)) .await?; // Now check if quorum was reached in results. - - let quorum_size = target_sk_count / 2 + 1; - let success_count = results.iter().filter(|res| res.is_ok()).count(); if success_count < quorum_size { // Failure @@ -1020,6 +1036,7 @@ impl Service { }; const SK_PULL_TIMELINE_RECONCILE_TIMEOUT: Duration = Duration::from_secs(30); + let max_retries = 3; let responses = self .tenant_timeline_safekeeper_op( @@ -1028,7 +1045,9 @@ impl Service { let req = req.clone(); async move { client.pull_timeline(&req).await } }, + max_retries, SK_PULL_TIMELINE_RECONCILE_TIMEOUT, + None, ) .await?; @@ -1066,6 +1085,9 @@ impl Service { }; const SK_EXCLUDE_TIMELINE_TIMEOUT: Duration = Duration::from_secs(30); + // Do not retry failed requests to speed up the finishing phase. + // They will be retried in the reconciler. + let max_retries = 0; let results = self .tenant_timeline_safekeeper_op( @@ -1074,7 +1096,9 @@ impl Service { let req = req.clone(); async move { client.exclude_timeline(tenant_id, timeline_id, &req).await } }, + max_retries, SK_EXCLUDE_TIMELINE_TIMEOUT, + None, ) .await?; diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index 88919fe888..774074cfd1 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -1540,6 +1540,17 @@ class NeonEnv: raise RuntimeError(f"Pageserver with ID {id} not found") + def get_safekeeper(self, id: int) -> Safekeeper: + """ + Look up a safekeeper by its ID. + """ + + for sk in self.safekeepers: + if sk.id == id: + return sk + + raise RuntimeError(f"Safekeeper with ID {id} not found") + def get_tenant_pageserver(self, tenant_id: TenantId | TenantShardId): """ Get the NeonPageserver where this tenant shard is currently attached, according diff --git a/test_runner/regress/test_safekeeper_migration.py b/test_runner/regress/test_safekeeper_migration.py index 371bec0c62..af0457270f 100644 --- a/test_runner/regress/test_safekeeper_migration.py +++ b/test_runner/regress/test_safekeeper_migration.py @@ -196,3 +196,33 @@ def test_safekeeper_migration_common_set_failpoints(neon_env_builder: NeonEnvBui assert ( f"timeline {env.initial_tenant}/{env.initial_timeline} deleted" in exc.value.response.text ) + + +def test_migrate_from_unavailable_sk(neon_env_builder: NeonEnvBuilder): + """ + Test that we can migrate from an unavailable safekeeper + if the quorum is still alive. + """ + neon_env_builder.num_safekeepers = 4 + neon_env_builder.storage_controller_config = { + "timelines_onto_safekeepers": True, + "timeline_safekeeper_count": 3, + } + env = neon_env_builder.init_start() + env.pageserver.allowed_errors.extend(ALLOWED_PAGESERVER_ERRORS) + + mconf = env.storage_controller.timeline_locate(env.initial_tenant, env.initial_timeline) + assert len(mconf["sk_set"]) == 3 + + another_sk = [sk.id for sk in env.safekeepers if sk.id not in mconf["sk_set"]][0] + + unavailable_sk = mconf["sk_set"][0] + env.get_safekeeper(unavailable_sk).stop() + + new_sk_set = mconf["sk_set"][1:] + [another_sk] + + env.storage_controller.migrate_safekeepers(env.initial_tenant, env.initial_timeline, new_sk_set) + + mconf = env.storage_controller.timeline_locate(env.initial_tenant, env.initial_timeline) + assert mconf["sk_set"] == new_sk_set + assert mconf["generation"] == 3