Compare commits

...

3 Commits

Author SHA1 Message Date
Dmitrii Kovalkov
c53b4545c8 rename cancel -> cancel_new_retries 2025-07-29 12:11:47 +04:00
Dmitrii Kovalkov
e48ac9ed76 Merge branch 'main' into diko/safekeeper_migrate_from_down_sk 2025-07-23 15:15:17 +04:00
Dmitrii Kovalkov
961835add6 storcon: do not retry sk migration ops if the quorum is reached 2025-07-23 13:38:32 +04:00
4 changed files with 68 additions and 11 deletions

View File

@@ -351,7 +351,7 @@ impl Node {
warn_threshold: u32,
max_retries: u32,
timeout: Duration,
cancel: &CancellationToken,
cancel_new_retries: &CancellationToken,
) -> Option<mgmt_api::Result<T>>
where
O: FnMut(PageserverClient) -> F,
@@ -402,7 +402,7 @@ impl Node {
self.id,
self.base_url(),
),
cancel,
cancel_new_retries,
)
.await
}

View File

@@ -110,7 +110,7 @@ impl Safekeeper {
warn_threshold: u32,
max_retries: u32,
timeout: Duration,
cancel: &CancellationToken,
cancel_new_retries: &CancellationToken,
) -> mgmt_api::Result<T>
where
O: FnMut(SafekeeperClient) -> F,
@@ -161,7 +161,7 @@ impl Safekeeper {
self.id,
self.base_url(),
),
cancel,
cancel_new_retries,
)
.await
.unwrap_or(Err(mgmt_api::Error::Cancelled))

View File

@@ -123,12 +123,17 @@ impl Service {
/// Perform an operation on a list of safekeepers in parallel with retries.
///
/// If desired_success_count is set, the remaining operations will be cancelled
/// when the desired number of successful responses is reached.
///
/// Return the results of the operation on each safekeeper in the input order.
async fn tenant_timeline_safekeeper_op<T, O, F>(
&self,
safekeepers: &[Safekeeper],
op: O,
max_retries: u32,
timeout: Duration,
desired_success_count: Option<usize>,
) -> Result<Vec<mgmt_api::Result<T>>, ApiError>
where
O: FnMut(SafekeeperClient) -> F + Send + 'static,
@@ -136,6 +141,7 @@ impl Service {
F: std::future::Future<Output = mgmt_api::Result<T>> + Send + 'static,
T: Sync + Send + 'static,
{
let warn_threshold = std::cmp::min(3, max_retries);
let jwt = self
.config
.safekeeper_jwt_token
@@ -143,23 +149,26 @@ impl Service {
.map(SecretString::from);
let mut joinset = JoinSet::new();
let cancel_new_retries = CancellationToken::new();
for (idx, sk) in safekeepers.iter().enumerate() {
let sk = sk.clone();
let http_client = self.http_client.clone();
let jwt = jwt.clone();
let op = op.clone();
let cancel_new_retries = cancel_new_retries.clone();
joinset.spawn(async move {
let res = sk
.with_client_retries(
op,
&http_client,
&jwt,
3,
3,
warn_threshold,
max_retries,
// TODO(diko): This is a wrong timeout.
// It should be scaled to the retry count.
timeout,
&CancellationToken::new(),
&cancel_new_retries,
)
.await;
(idx, res)
@@ -184,6 +193,7 @@ impl Service {
// Wait until all tasks finish or timeout is hit, whichever occurs
// first.
let mut result_count = 0;
let mut success_count = 0;
loop {
if let Ok(res) = tokio::time::timeout_at(reconcile_deadline, joinset.join_next()).await
{
@@ -198,6 +208,15 @@ impl Service {
// Only print errors, as there is no Debug trait for T.
res.as_ref().map(|_| ()),
);
if res.is_ok() {
success_count += 1;
if desired_success_count == Some(success_count) {
// We reached the desired number of successful responses, cancel new retries for
// the remaining safekeepers.
// It does not cancel already started requests, we will still wait for them.
cancel_new_retries.cancel();
}
}
results[idx] = res;
result_count += 1;
}
@@ -247,14 +266,14 @@ impl Service {
);
}
let quorum_size = target_sk_count / 2 + 1;
let max_retries = 3;
let results = self
.tenant_timeline_safekeeper_op(safekeepers, op, timeout)
.tenant_timeline_safekeeper_op(safekeepers, op, max_retries, timeout, Some(quorum_size))
.await?;
// Now check if quorum was reached in results.
let quorum_size = target_sk_count / 2 + 1;
let success_count = results.iter().filter(|res| res.is_ok()).count();
if success_count < quorum_size {
// Failure
@@ -1018,6 +1037,7 @@ impl Service {
};
const SK_PULL_TIMELINE_RECONCILE_TIMEOUT: Duration = Duration::from_secs(30);
let max_retries = 3;
let responses = self
.tenant_timeline_safekeeper_op(
@@ -1026,7 +1046,9 @@ impl Service {
let req = req.clone();
async move { client.pull_timeline(&req).await }
},
max_retries,
SK_PULL_TIMELINE_RECONCILE_TIMEOUT,
None,
)
.await?;
@@ -1064,6 +1086,9 @@ impl Service {
};
const SK_EXCLUDE_TIMELINE_TIMEOUT: Duration = Duration::from_secs(30);
// Do not retry failed requests to speed up the finishing phase.
// They will be retried in the reconciler.
let max_retries = 0;
let results = self
.tenant_timeline_safekeeper_op(
@@ -1072,7 +1097,9 @@ impl Service {
let req = req.clone();
async move { client.exclude_timeline(tenant_id, timeline_id, &req).await }
},
max_retries,
SK_EXCLUDE_TIMELINE_TIMEOUT,
None,
)
.await?;

View File

@@ -286,3 +286,33 @@ def test_sk_generation_aware_tombstones(neon_env_builder: NeonEnvBuilder):
assert re.match(r".*Timeline .* deleted.*", exc.value.response.text)
# The timeline should remain deleted.
expect_deleted(second_sk)
def test_migrate_from_unavailable_sk(neon_env_builder: NeonEnvBuilder):
"""
Test that we can migrate from an unavailable safekeeper
if the quorum is still alive.
"""
neon_env_builder.num_safekeepers = 4
neon_env_builder.storage_controller_config = {
"timelines_onto_safekeepers": True,
"timeline_safekeeper_count": 3,
}
env = neon_env_builder.init_start()
env.pageserver.allowed_errors.extend(PAGESERVER_ALLOWED_ERRORS)
mconf = env.storage_controller.timeline_locate(env.initial_tenant, env.initial_timeline)
assert len(mconf["sk_set"]) == 3
another_sk = [sk.id for sk in env.safekeepers if sk.id not in mconf["sk_set"]][0]
unavailable_sk = mconf["sk_set"][0]
env.get_safekeeper(unavailable_sk).stop()
new_sk_set = mconf["sk_set"][1:] + [another_sk]
env.storage_controller.migrate_safekeepers(env.initial_tenant, env.initial_timeline, new_sk_set)
mconf = env.storage_controller.timeline_locate(env.initial_tenant, env.initial_timeline)
assert mconf["sk_set"] == new_sk_set
assert mconf["generation"] == 3