diff --git a/compute_tools/src/compute.rs b/compute_tools/src/compute.rs index 5b1cdb9805..1df837e1e6 100644 --- a/compute_tools/src/compute.rs +++ b/compute_tools/src/compute.rs @@ -35,6 +35,9 @@ use tokio::{spawn, sync::watch, task::JoinHandle, time}; use tokio_util::sync::CancellationToken; use tracing::{Instrument, debug, error, info, instrument, warn}; use url::Url; +use utils::backoff::{ + DEFAULT_BASE_BACKOFF_SECONDS, DEFAULT_MAX_BACKOFF_SECONDS, exponential_backoff_duration, +}; use utils::id::{TenantId, TimelineId}; use utils::lsn::Lsn; use utils::measured_stream::MeasuredReader; @@ -1557,6 +1560,41 @@ impl ComputeNode { Ok(lsn) } + fn sync_safekeepers_with_retries(&self, storage_auth_token: Option) -> Result { + let max_retries = 5; + let mut attempts = 0; + loop { + let result = self.sync_safekeepers(storage_auth_token.clone()); + match &result { + Ok(_) => { + if attempts > 0 { + tracing::info!("sync_safekeepers succeeded after {attempts} retries"); + } + return result; + } + Err(e) if attempts < max_retries => { + tracing::info!( + "sync_safekeepers failed, will retry (attempt {attempts}): {e:#}" + ); + } + Err(err) => { + tracing::warn!( + "sync_safekeepers still failed after {attempts} retries, giving up: {err:?}" + ); + return result; + } + } + // sleep and retry + let backoff = exponential_backoff_duration( + attempts, + DEFAULT_BASE_BACKOFF_SECONDS, + DEFAULT_MAX_BACKOFF_SECONDS, + ); + std::thread::sleep(backoff); + attempts += 1; + } + } + /// Do all the preparations like PGDATA directory creation, configuration, /// safekeepers sync, basebackup, etc. #[instrument(skip_all)] @@ -1592,7 +1630,7 @@ impl ComputeNode { lsn } else { info!("starting safekeepers syncing"); - self.sync_safekeepers(pspec.storage_auth_token.clone()) + self.sync_safekeepers_with_retries(pspec.storage_auth_token.clone()) .with_context(|| "failed to sync safekeepers")? }; info!("safekeepers synced at LSN {}", lsn); diff --git a/safekeeper/src/pull_timeline.rs b/safekeeper/src/pull_timeline.rs index 43232db950..4febc7656e 100644 --- a/safekeeper/src/pull_timeline.rs +++ b/safekeeper/src/pull_timeline.rs @@ -612,19 +612,25 @@ pub async fn handle_request( } } + let max_term = statuses + .iter() + .map(|(status, _)| status.acceptor_state.term) + .max() + .unwrap(); + // Find the most advanced safekeeper let (status, i) = statuses .into_iter() .max_by_key(|(status, _)| { ( status.acceptor_state.epoch, + status.flush_lsn, /* BEGIN_HADRON */ // We need to pull from the SK with the highest term. // This is because another compute may come online and vote the same highest term again on the other two SKs. // Then, there will be 2 computes running on the same term. status.acceptor_state.term, /* END_HADRON */ - status.flush_lsn, status.commit_lsn, ) }) @@ -634,6 +640,22 @@ pub async fn handle_request( assert!(status.tenant_id == request.tenant_id); assert!(status.timeline_id == request.timeline_id); + // TODO(diko): This is hadron only check to make sure that we pull the timeline + // from the safekeeper with the highest term during timeline restore. + // We could avoid returning the error by calling bump_term after pull_timeline. + // However, this is not a big deal because we retry the pull_timeline requests. + // The check should be removed together with removing custom hadron logic for + // safekeeper restore. + if wait_for_peer_timeline_status && status.acceptor_state.term != max_term { + return Err(ApiError::PreconditionFailed( + format!( + "choosen safekeeper {} has term {}, but the most advanced term is {}", + safekeeper_host, status.acceptor_state.term, max_term + ) + .into(), + )); + } + match pull_timeline( status, safekeeper_host, diff --git a/safekeeper/src/timeline.rs b/safekeeper/src/timeline.rs index e083a49428..25ac8e5bd3 100644 --- a/safekeeper/src/timeline.rs +++ b/safekeeper/src/timeline.rs @@ -195,12 +195,14 @@ impl StateSK { to: Configuration, ) -> Result { let result = self.state_mut().membership_switch(to).await?; + let flush_lsn = self.flush_lsn(); + let last_log_term = self.state().acceptor_state.get_last_log_term(flush_lsn); Ok(TimelineMembershipSwitchResponse { previous_conf: result.previous_conf, current_conf: result.current_conf, - last_log_term: self.state().acceptor_state.term, - flush_lsn: self.flush_lsn(), + last_log_term, + flush_lsn, }) } diff --git a/storage_controller/src/service/safekeeper_service.rs b/storage_controller/src/service/safekeeper_service.rs index a60ebb85c6..fab1342d5d 100644 --- a/storage_controller/src/service/safekeeper_service.rs +++ b/storage_controller/src/service/safekeeper_service.rs @@ -24,12 +24,12 @@ use pageserver_api::controller_api::{ }; use pageserver_api::models::{SafekeeperInfo, SafekeepersInfo, TimelineInfo}; use safekeeper_api::PgVersionId; +use safekeeper_api::Term; use safekeeper_api::membership::{self, MemberSet, SafekeeperGeneration}; use safekeeper_api::models::{ PullTimelineRequest, TimelineLocateResponse, TimelineMembershipSwitchRequest, TimelineMembershipSwitchResponse, }; -use safekeeper_api::{INITIAL_TERM, Term}; use safekeeper_client::mgmt_api; use tokio::task::JoinSet; use tokio_util::sync::CancellationToken; @@ -1298,13 +1298,7 @@ impl Service { ) .await?; - let mut sync_position = (INITIAL_TERM, Lsn::INVALID); - for res in results.into_iter().flatten() { - let sk_position = (res.last_log_term, res.flush_lsn); - if sync_position < sk_position { - sync_position = sk_position; - } - } + let sync_position = Self::get_sync_position(&results)?; tracing::info!( %generation, @@ -1598,4 +1592,36 @@ impl Service { Ok(()) } + + /// Get membership switch responses from all safekeepers and return the sync position. + /// + /// Sync position is a position equal or greater than the commit position. + /// It is guaranteed that all WAL entries with (last_log_term, flush_lsn) + /// greater than the sync position are not committed (= not on a quorum). + /// + /// Returns error if there is no quorum of successful responses. + fn get_sync_position( + responses: &[mgmt_api::Result], + ) -> Result<(Term, Lsn), ApiError> { + let quorum_size = responses.len() / 2 + 1; + + let mut wal_positions = responses + .iter() + .flatten() + .map(|res| (res.last_log_term, res.flush_lsn)) + .collect::>(); + + // Should be already checked if the responses are from tenant_timeline_set_membership_quorum. + if wal_positions.len() < quorum_size { + return Err(ApiError::InternalServerError(anyhow::anyhow!( + "not enough successful responses to get sync position: {}/{}", + wal_positions.len(), + quorum_size, + ))); + } + + wal_positions.sort(); + + Ok(wal_positions[quorum_size - 1]) + } } diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index 11d54f7831..c3dfc78218 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -2313,6 +2313,7 @@ class NeonStorageController(MetricsGetter, LogUtils): timeline_id: TimelineId, new_sk_set: list[int], ): + log.info(f"migrate_safekeepers({tenant_id}, {timeline_id}, {new_sk_set})") response = self.request( "POST", f"{self.api}/v1/tenant/{tenant_id}/timeline/{timeline_id}/safekeeper_migrate", diff --git a/test_runner/regress/test_safekeeper_migration.py b/test_runner/regress/test_safekeeper_migration.py index 2ceeea37a5..97a6ece446 100644 --- a/test_runner/regress/test_safekeeper_migration.py +++ b/test_runner/regress/test_safekeeper_migration.py @@ -286,3 +286,177 @@ def test_sk_generation_aware_tombstones(neon_env_builder: NeonEnvBuilder): assert re.match(r".*Timeline .* deleted.*", exc.value.response.text) # The timeline should remain deleted. expect_deleted(second_sk) + + +def test_safekeeper_migration_stale_timeline(neon_env_builder: NeonEnvBuilder): + """ + Test that safekeeper migration handles stale timeline correctly by migrating to + a safekeeper with a stale timeline. + 1. Check that we are waiting for the stale timeline to catch up with the commit lsn. + The migration might fail if there is no compute to advance the WAL. + 2. Check that we rely on last_log_term (and not the current term) when waiting for the + sync_position on step 7. + 3. Check that migration succeeds if the compute is running. + """ + neon_env_builder.num_safekeepers = 2 + neon_env_builder.storage_controller_config = { + "timelines_onto_safekeepers": True, + "timeline_safekeeper_count": 1, + } + env = neon_env_builder.init_start() + env.pageserver.allowed_errors.extend(PAGESERVER_ALLOWED_ERRORS) + env.storage_controller.allowed_errors.append(".*not enough successful .* to reach quorum.*") + + mconf = env.storage_controller.timeline_locate(env.initial_tenant, env.initial_timeline) + + active_sk = env.get_safekeeper(mconf["sk_set"][0]) + other_sk = [sk for sk in env.safekeepers if sk.id != active_sk.id][0] + + ep = env.endpoints.create("main", tenant_id=env.initial_tenant) + ep.start(safekeeper_generation=1, safekeepers=[active_sk.id]) + ep.safe_psql("CREATE TABLE t(a int)") + ep.safe_psql("INSERT INTO t VALUES (0)") + + # Pull the timeline to other_sk, so other_sk now has a "stale" timeline on it. + other_sk.pull_timeline([active_sk], env.initial_tenant, env.initial_timeline) + + # Advance the WAL on active_sk. + ep.safe_psql("INSERT INTO t VALUES (1)") + + # The test is more tricky if we have the same last_log_term but different term/flush_lsn. + # Stop the active_sk during the endpoint shutdown because otherwise compute_ctl runs + # sync_safekeepers and advances last_log_term on active_sk. + active_sk.stop() + ep.stop(mode="immediate") + active_sk.start() + + active_sk_status = active_sk.http_client().timeline_status( + env.initial_tenant, env.initial_timeline + ) + other_sk_status = other_sk.http_client().timeline_status( + env.initial_tenant, env.initial_timeline + ) + + # other_sk should have the same last_log_term, but a stale flush_lsn. + assert active_sk_status.last_log_term == other_sk_status.last_log_term + assert active_sk_status.flush_lsn > other_sk_status.flush_lsn + + commit_lsn = active_sk_status.flush_lsn + + # Bump the term on other_sk to make it higher than active_sk. + # This is to make sure we don't use current term instead of last_log_term in the algorithm. + other_sk.http_client().term_bump( + env.initial_tenant, env.initial_timeline, active_sk_status.term + 100 + ) + + # TODO(diko): now it fails because the timeline on other_sk is stale and there is no compute + # to catch up it with active_sk. It might be fixed in https://databricks.atlassian.net/browse/LKB-946 + # if we delete stale timelines before starting the migration. + # But the rest of the test is still valid: we should not lose committed WAL after the migration. + with pytest.raises( + StorageControllerApiException, match="not enough successful .* to reach quorum" + ): + env.storage_controller.migrate_safekeepers( + env.initial_tenant, env.initial_timeline, [other_sk.id] + ) + + mconf = env.storage_controller.timeline_locate(env.initial_tenant, env.initial_timeline) + assert mconf["new_sk_set"] == [other_sk.id] + assert mconf["sk_set"] == [active_sk.id] + assert mconf["generation"] == 2 + + # Start the endpoint, so it advances the WAL on other_sk. + ep.start(safekeeper_generation=2, safekeepers=[active_sk.id, other_sk.id]) + # Now the migration should succeed. + env.storage_controller.migrate_safekeepers( + env.initial_tenant, env.initial_timeline, [other_sk.id] + ) + + # Check that we didn't lose committed WAL. + assert ( + other_sk.http_client().timeline_status(env.initial_tenant, env.initial_timeline).flush_lsn + >= commit_lsn + ) + assert ep.safe_psql("SELECT * FROM t") == [(0,), (1,)] + + +def test_pull_from_most_advanced_sk(neon_env_builder: NeonEnvBuilder): + """ + Test that we pull the timeline from the most advanced safekeeper during the + migration and do not lose committed WAL. + """ + neon_env_builder.num_safekeepers = 4 + neon_env_builder.storage_controller_config = { + "timelines_onto_safekeepers": True, + "timeline_safekeeper_count": 3, + } + env = neon_env_builder.init_start() + env.pageserver.allowed_errors.extend(PAGESERVER_ALLOWED_ERRORS) + + mconf = env.storage_controller.timeline_locate(env.initial_tenant, env.initial_timeline) + + sk_set = mconf["sk_set"] + assert len(sk_set) == 3 + + other_sk = [sk.id for sk in env.safekeepers if sk.id not in sk_set][0] + + ep = env.endpoints.create("main", tenant_id=env.initial_tenant) + ep.start(safekeeper_generation=1, safekeepers=sk_set) + ep.safe_psql("CREATE TABLE t(a int)") + ep.safe_psql("INSERT INTO t VALUES (0)") + + # Stop one sk, so we have a lagging WAL on it. + env.get_safekeeper(sk_set[0]).stop() + # Advance the WAL on the other sks. + ep.safe_psql("INSERT INTO t VALUES (1)") + + # Stop other sks to make sure compute_ctl doesn't advance the last_log_term on them during shutdown. + for sk_id in sk_set[1:]: + env.get_safekeeper(sk_id).stop() + ep.stop(mode="immediate") + for sk_id in sk_set: + env.get_safekeeper(sk_id).start() + + # Bump the term on the lagging sk to make sure we don't use it to choose the most advanced sk. + env.get_safekeeper(sk_set[0]).http_client().term_bump( + env.initial_tenant, env.initial_timeline, 100 + ) + + def get_commit_lsn(sk_set: list[int]): + flush_lsns = [] + last_log_terms = [] + for sk_id in sk_set: + sk = env.get_safekeeper(sk_id) + status = sk.http_client().timeline_status(env.initial_tenant, env.initial_timeline) + flush_lsns.append(status.flush_lsn) + last_log_terms.append(status.last_log_term) + + # In this test we assume that all sks have the same last_log_term. + assert len(set(last_log_terms)) == 1 + + flush_lsns.sort(reverse=True) + commit_lsn = flush_lsns[len(sk_set) // 2] + + log.info(f"sk_set: {sk_set}, flush_lsns: {flush_lsns}, commit_lsn: {commit_lsn}") + return commit_lsn + + commit_lsn_before_migration = get_commit_lsn(sk_set) + + # Make two migrations, so the lagging sk stays in the sk_set, but other sks are replaced. + new_sk_set1 = [sk_set[0], sk_set[1], other_sk] # remove sk_set[2], add other_sk + new_sk_set2 = [sk_set[0], other_sk, sk_set[2]] # remove sk_set[1], add sk_set[2] back + env.storage_controller.migrate_safekeepers( + env.initial_tenant, env.initial_timeline, new_sk_set1 + ) + env.storage_controller.migrate_safekeepers( + env.initial_tenant, env.initial_timeline, new_sk_set2 + ) + + commit_lsn_after_migration = get_commit_lsn(new_sk_set2) + + # We should not lose committed WAL. + # If we have choosen the lagging sk to pull the timeline from, this might fail. + assert commit_lsn_before_migration <= commit_lsn_after_migration + + ep.start(safekeeper_generation=5, safekeepers=new_sk_set2) + assert ep.safe_psql("SELECT * FROM t") == [(0,), (1,)]