storcon: do not retry sk migration ops if the quorum is reached

This commit is contained in:
Dmitrii Kovalkov
2025-07-23 13:38:32 +04:00
parent fc242afcc2
commit 961835add6
3 changed files with 72 additions and 7 deletions

View File

@@ -123,12 +123,17 @@ impl Service {
/// Perform an operation on a list of safekeepers in parallel with retries.
///
/// If desired_success_count is set, the remaining operations will be cancelled
/// when the desired number of successful responses is reached.
///
/// Return the results of the operation on each safekeeper in the input order.
async fn tenant_timeline_safekeeper_op<T, O, F>(
&self,
safekeepers: &[Safekeeper],
op: O,
max_retries: u32,
timeout: Duration,
desired_success_count: Option<usize>,
) -> Result<Vec<mgmt_api::Result<T>>, ApiError>
where
O: FnMut(SafekeeperClient) -> F + Send + 'static,
@@ -136,6 +141,7 @@ impl Service {
F: std::future::Future<Output = mgmt_api::Result<T>> + Send + 'static,
T: Sync + Send + 'static,
{
let warn_threshold = std::cmp::min(3, max_retries);
let jwt = self
.config
.safekeeper_jwt_token
@@ -143,23 +149,26 @@ impl Service {
.map(SecretString::from);
let mut joinset = JoinSet::new();
let cancel = CancellationToken::new();
for (idx, sk) in safekeepers.iter().enumerate() {
let sk = sk.clone();
let http_client = self.http_client.clone();
let jwt = jwt.clone();
let op = op.clone();
let cancel = cancel.clone();
joinset.spawn(async move {
let res = sk
.with_client_retries(
op,
&http_client,
&jwt,
3,
3,
warn_threshold,
max_retries,
// TODO(diko): This is a wrong timeout.
// It should be scaled to the retry count.
timeout,
&CancellationToken::new(),
&cancel,
)
.await;
(idx, res)
@@ -184,6 +193,7 @@ impl Service {
// Wait until all tasks finish or timeout is hit, whichever occurs
// first.
let mut result_count = 0;
let mut success_count = 0;
loop {
if let Ok(res) = tokio::time::timeout_at(reconcile_deadline, joinset.join_next()).await
{
@@ -198,6 +208,12 @@ impl Service {
// Only print errors, as there is no Debug trait for T.
res.as_ref().map(|_| ()),
);
if res.is_ok() {
success_count += 1;
if desired_success_count == Some(success_count) {
cancel.cancel();
}
}
results[idx] = res;
result_count += 1;
}
@@ -247,14 +263,14 @@ impl Service {
);
}
let quorum_size = target_sk_count / 2 + 1;
let max_retries = 3;
let results = self
.tenant_timeline_safekeeper_op(safekeepers, op, timeout)
.tenant_timeline_safekeeper_op(safekeepers, op, max_retries, timeout, Some(quorum_size))
.await?;
// Now check if quorum was reached in results.
let quorum_size = target_sk_count / 2 + 1;
let success_count = results.iter().filter(|res| res.is_ok()).count();
if success_count < quorum_size {
// Failure
@@ -1020,6 +1036,7 @@ impl Service {
};
const SK_PULL_TIMELINE_RECONCILE_TIMEOUT: Duration = Duration::from_secs(30);
let max_retries = 3;
let responses = self
.tenant_timeline_safekeeper_op(
@@ -1028,7 +1045,9 @@ impl Service {
let req = req.clone();
async move { client.pull_timeline(&req).await }
},
max_retries,
SK_PULL_TIMELINE_RECONCILE_TIMEOUT,
None,
)
.await?;
@@ -1066,6 +1085,9 @@ impl Service {
};
const SK_EXCLUDE_TIMELINE_TIMEOUT: Duration = Duration::from_secs(30);
// Do not retry failed requests to speed up the finishing phase.
// They will be retried in the reconciler.
let max_retries = 0;
let results = self
.tenant_timeline_safekeeper_op(
@@ -1074,7 +1096,9 @@ impl Service {
let req = req.clone();
async move { client.exclude_timeline(tenant_id, timeline_id, &req).await }
},
max_retries,
SK_EXCLUDE_TIMELINE_TIMEOUT,
None,
)
.await?;

View File

@@ -1540,6 +1540,17 @@ class NeonEnv:
raise RuntimeError(f"Pageserver with ID {id} not found")
def get_safekeeper(self, id: int) -> Safekeeper:
"""
Look up a safekeeper by its ID.
"""
for sk in self.safekeepers:
if sk.id == id:
return sk
raise RuntimeError(f"Safekeeper with ID {id} not found")
def get_tenant_pageserver(self, tenant_id: TenantId | TenantShardId):
"""
Get the NeonPageserver where this tenant shard is currently attached, according

View File

@@ -196,3 +196,33 @@ def test_safekeeper_migration_common_set_failpoints(neon_env_builder: NeonEnvBui
assert (
f"timeline {env.initial_tenant}/{env.initial_timeline} deleted" in exc.value.response.text
)
def test_migrate_from_unavailable_sk(neon_env_builder: NeonEnvBuilder):
"""
Test that we can migrate from an unavailable safekeeper
if the quorum is still alive.
"""
neon_env_builder.num_safekeepers = 4
neon_env_builder.storage_controller_config = {
"timelines_onto_safekeepers": True,
"timeline_safekeeper_count": 3,
}
env = neon_env_builder.init_start()
env.pageserver.allowed_errors.extend(ALLOWED_PAGESERVER_ERRORS)
mconf = env.storage_controller.timeline_locate(env.initial_tenant, env.initial_timeline)
assert len(mconf["sk_set"]) == 3
another_sk = [sk.id for sk in env.safekeepers if sk.id not in mconf["sk_set"]][0]
unavailable_sk = mconf["sk_set"][0]
env.get_safekeeper(unavailable_sk).stop()
new_sk_set = mconf["sk_set"][1:] + [another_sk]
env.storage_controller.migrate_safekeepers(env.initial_tenant, env.initial_timeline, new_sk_set)
mconf = env.storage_controller.timeline_locate(env.initial_tenant, env.initial_timeline)
assert mconf["sk_set"] == new_sk_set
assert mconf["generation"] == 3