diff --git a/control_plane/storcon_cli/src/main.rs b/control_plane/storcon_cli/src/main.rs index a4d1030488..635b1858ec 100644 --- a/control_plane/storcon_cli/src/main.rs +++ b/control_plane/storcon_cli/src/main.rs @@ -303,6 +303,13 @@ enum Command { #[arg(long, required = true, value_delimiter = ',')] new_sk_set: Vec, }, + /// Abort ongoing safekeeper migration. + TimelineSafekeeperMigrateAbort { + #[arg(long)] + tenant_id: TenantId, + #[arg(long)] + timeline_id: TimelineId, + }, } #[derive(Parser)] @@ -1396,6 +1403,17 @@ async fn main() -> anyhow::Result<()> { ) .await?; } + Command::TimelineSafekeeperMigrateAbort { + tenant_id, + timeline_id, + } => { + let path = + format!("v1/tenant/{tenant_id}/timeline/{timeline_id}/safekeeper_migrate_abort"); + + storcon_client + .dispatch::<(), ()>(Method::POST, path, None) + .await?; + } } Ok(()) diff --git a/storage_controller/src/http.rs b/storage_controller/src/http.rs index ff73719adb..b40da4fd65 100644 --- a/storage_controller/src/http.rs +++ b/storage_controller/src/http.rs @@ -644,6 +644,7 @@ async fn handle_tenant_timeline_safekeeper_migrate( req: Request, ) -> Result, ApiError> { let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?; + // TODO(diko): it's not PS operation, there should be a different permission scope. check_permissions(&req, Scope::PageServerApi)?; maybe_rate_limit(&req, tenant_id).await; @@ -665,6 +666,23 @@ async fn handle_tenant_timeline_safekeeper_migrate( json_response(StatusCode::OK, ()) } +async fn handle_tenant_timeline_safekeeper_migrate_abort( + service: Arc, + req: Request, +) -> Result, ApiError> { + let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?; + let timeline_id: TimelineId = parse_request_param(&req, "timeline_id")?; + // TODO(diko): it's not PS operation, there should be a different permission scope. + check_permissions(&req, Scope::PageServerApi)?; + maybe_rate_limit(&req, tenant_id).await; + + service + .tenant_timeline_safekeeper_migrate_abort(tenant_id, timeline_id) + .await?; + + json_response(StatusCode::OK, ()) +} + async fn handle_tenant_timeline_lsn_lease( service: Arc, req: Request, @@ -2611,6 +2629,16 @@ pub fn make_router( ) }, ) + .post( + "/v1/tenant/:tenant_id/timeline/:timeline_id/safekeeper_migrate_abort", + |r| { + tenant_service_handler( + r, + handle_tenant_timeline_safekeeper_migrate_abort, + RequestName("v1_tenant_timeline_safekeeper_migrate_abort"), + ) + }, + ) // LSN lease passthrough to all shards .post( "/v1/tenant/:tenant_id/timeline/:timeline_id/lsn_lease", diff --git a/storage_controller/src/service/safekeeper_service.rs b/storage_controller/src/service/safekeeper_service.rs index fab1342d5d..689d341b6a 100644 --- a/storage_controller/src/service/safekeeper_service.rs +++ b/storage_controller/src/service/safekeeper_service.rs @@ -1230,10 +1230,7 @@ impl Service { } // It it is the same new_sk_set, we can continue the migration (retry). } else { - let prev_finished = timeline.cplane_notified_generation == timeline.generation - && timeline.sk_set_notified_generation == timeline.generation; - - if !prev_finished { + if !is_migration_finished(&timeline) { // The previous migration is committed, but the finish step failed. // Safekeepers/cplane might not know about the last membership configuration. // Retry the finish step to ensure smooth migration. @@ -1545,6 +1542,8 @@ impl Service { timeline_id: TimelineId, timeline: &TimelinePersistence, ) -> Result<(), ApiError> { + tracing::info!(generation=?timeline.generation, sk_set=?timeline.sk_set, new_sk_set=?timeline.new_sk_set, "retrying finish safekeeper migration"); + if timeline.new_sk_set.is_some() { // Logical error, should never happen. return Err(ApiError::InternalServerError(anyhow::anyhow!( @@ -1624,4 +1623,120 @@ impl Service { Ok(wal_positions[quorum_size - 1]) } + + /// Abort ongoing safekeeper migration. + pub(crate) async fn tenant_timeline_safekeeper_migrate_abort( + self: &Arc, + tenant_id: TenantId, + timeline_id: TimelineId, + ) -> Result<(), ApiError> { + // TODO(diko): per-tenant lock is too wide. Consider introducing per-timeline locks. + let _tenant_lock = trace_shared_lock( + &self.tenant_op_locks, + tenant_id, + TenantOperations::TimelineSafekeeperMigrate, + ) + .await; + + // Fetch current timeline configuration from the configuration storage. + let timeline = self + .persistence + .get_timeline(tenant_id, timeline_id) + .await?; + + let Some(timeline) = timeline else { + return Err(ApiError::NotFound( + anyhow::anyhow!( + "timeline {tenant_id}/{timeline_id} doesn't exist in timelines table" + ) + .into(), + )); + }; + + let mut generation = SafekeeperGeneration::new(timeline.generation as u32); + + let Some(new_sk_set) = &timeline.new_sk_set else { + // No new_sk_set -> no active migration that we can abort. + tracing::info!("timeline has no active migration"); + + if !is_migration_finished(&timeline) { + // The last migration is committed, but the finish step failed. + // Safekeepers/cplane might not know about the last membership configuration. + // Retry the finish step to make the timeline state clean. + self.finish_safekeeper_migration_retry(tenant_id, timeline_id, &timeline) + .await?; + } + return Ok(()); + }; + + tracing::info!(sk_set=?timeline.sk_set, ?new_sk_set, ?generation, "aborting timeline migration"); + + let cur_safekeepers = self.get_safekeepers(&timeline.sk_set)?; + let new_safekeepers = self.get_safekeepers(new_sk_set)?; + + let cur_sk_member_set = + Self::make_member_set(&cur_safekeepers).map_err(ApiError::InternalServerError)?; + + // Increment current generation and remove new_sk_set from the timeline to abort the migration. + generation = generation.next(); + + let mconf = membership::Configuration { + generation, + members: cur_sk_member_set, + new_members: None, + }; + + // Exclude safekeepers which were added during the current migration. + let cur_ids: HashSet = cur_safekeepers.iter().map(|sk| sk.get_id()).collect(); + let exclude_safekeepers = new_safekeepers + .into_iter() + .filter(|sk| !cur_ids.contains(&sk.get_id())) + .collect::>(); + + let exclude_requests = exclude_safekeepers + .iter() + .map(|sk| TimelinePendingOpPersistence { + sk_id: sk.skp.id, + tenant_id: tenant_id.to_string(), + timeline_id: timeline_id.to_string(), + generation: generation.into_inner() as i32, + op_kind: SafekeeperTimelineOpKind::Exclude, + }) + .collect::>(); + + let cur_sk_set = cur_safekeepers + .iter() + .map(|sk| sk.get_id()) + .collect::>(); + + // Persist new mconf and exclude requests. + self.persistence + .update_timeline_membership( + tenant_id, + timeline_id, + generation, + &cur_sk_set, + None, + &exclude_requests, + ) + .await?; + + // At this point we have already commited the abort, but still need to notify + // cplane/safekeepers with the new mconf. That's what finish_safekeeper_migration does. + self.finish_safekeeper_migration( + tenant_id, + timeline_id, + &cur_safekeepers, + &mconf, + &exclude_safekeepers, + ) + .await?; + + Ok(()) + } +} + +fn is_migration_finished(timeline: &TimelinePersistence) -> bool { + timeline.cplane_notified_generation == timeline.generation + && timeline.sk_set_notified_generation == timeline.generation } diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index c3dfc78218..41213d374a 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -2323,6 +2323,19 @@ class NeonStorageController(MetricsGetter, LogUtils): response.raise_for_status() log.info(f"migrate_safekeepers success: {response.json()}") + def abort_safekeeper_migration( + self, + tenant_id: TenantId, + timeline_id: TimelineId, + ): + response = self.request( + "POST", + f"{self.api}/v1/tenant/{tenant_id}/timeline/{timeline_id}/safekeeper_migrate_abort", + headers=self.headers(TokenScope.PAGE_SERVER_API), + ) + response.raise_for_status() + log.info(f"abort_safekeeper_migration success: {response.json()}") + def locate(self, tenant_id: TenantId) -> list[dict[str, Any]]: """ :return: list of {"shard_id": "", "node_id": int, "listen_pg_addr": str, "listen_pg_port": int, "listen_http_addr": str, "listen_http_port": int} diff --git a/test_runner/regress/test_safekeeper_migration.py b/test_runner/regress/test_safekeeper_migration.py index 97a6ece446..ba067b97de 100644 --- a/test_runner/regress/test_safekeeper_migration.py +++ b/test_runner/regress/test_safekeeper_migration.py @@ -460,3 +460,91 @@ def test_pull_from_most_advanced_sk(neon_env_builder: NeonEnvBuilder): ep.start(safekeeper_generation=5, safekeepers=new_sk_set2) assert ep.safe_psql("SELECT * FROM t") == [(0,), (1,)] + + +def test_abort_safekeeper_migration(neon_env_builder: NeonEnvBuilder): + """ + Test that safekeeper migration can be aborted. + 1. Insert failpoints and ensure the abort successfully reverts the timeline state. + 2. Check that endpoint is operational after the abort. + """ + neon_env_builder.num_safekeepers = 2 + neon_env_builder.storage_controller_config = { + "timelines_onto_safekeepers": True, + "timeline_safekeeper_count": 1, + } + env = neon_env_builder.init_start() + env.pageserver.allowed_errors.extend(PAGESERVER_ALLOWED_ERRORS) + + mconf = env.storage_controller.timeline_locate(env.initial_tenant, env.initial_timeline) + assert len(mconf["sk_set"]) == 1 + cur_sk = mconf["sk_set"][0] + cur_gen = 1 + + ep = env.endpoints.create("main", tenant_id=env.initial_tenant) + ep.start(safekeeper_generation=1, safekeepers=mconf["sk_set"]) + ep.safe_psql("CREATE EXTENSION neon_test_utils;") + ep.safe_psql("CREATE TABLE t(a int)") + ep.safe_psql("INSERT INTO t VALUES (1)") + + another_sk = [sk.id for sk in env.safekeepers if sk.id != cur_sk][0] + + failpoints = [ + "sk-migration-after-step-3", + "sk-migration-after-step-4", + "sk-migration-after-step-5", + "sk-migration-after-step-7", + ] + + for fp in failpoints: + env.storage_controller.configure_failpoints((fp, "return(1)")) + + with pytest.raises(StorageControllerApiException, match=f"failpoint {fp}"): + env.storage_controller.migrate_safekeepers( + env.initial_tenant, env.initial_timeline, [another_sk] + ) + cur_gen += 1 + + env.storage_controller.configure_failpoints((fp, "off")) + + # We should have a joint mconf after the failure. + mconf = env.storage_controller.timeline_locate(env.initial_tenant, env.initial_timeline) + assert mconf["generation"] == cur_gen + assert mconf["sk_set"] == [cur_sk] + assert mconf["new_sk_set"] == [another_sk] + + env.storage_controller.abort_safekeeper_migration(env.initial_tenant, env.initial_timeline) + cur_gen += 1 + + # Abort should revert the timeline to the previous sk_set and increment the generation. + mconf = env.storage_controller.timeline_locate(env.initial_tenant, env.initial_timeline) + assert mconf["generation"] == cur_gen + assert mconf["sk_set"] == [cur_sk] + assert mconf["new_sk_set"] is None + + assert ep.safe_psql("SHOW neon.safekeepers")[0][0].startswith(f"g#{cur_gen}:") + ep.safe_psql(f"INSERT INTO t VALUES ({cur_gen})") + + # After step-8 the final mconf is committed and the migration is not abortable anymore. + # So the abort should not abort anything. + env.storage_controller.configure_failpoints(("sk-migration-after-step-8", "return(1)")) + + with pytest.raises(StorageControllerApiException, match="failpoint sk-migration-after-step-8"): + env.storage_controller.migrate_safekeepers( + env.initial_tenant, env.initial_timeline, [another_sk] + ) + cur_gen += 2 + + env.storage_controller.configure_failpoints((fp, "off")) + + env.storage_controller.abort_safekeeper_migration(env.initial_tenant, env.initial_timeline) + + # The migration is fully committed, no abort should have been performed. + mconf = env.storage_controller.timeline_locate(env.initial_tenant, env.initial_timeline) + assert mconf["generation"] == cur_gen + assert mconf["sk_set"] == [another_sk] + assert mconf["new_sk_set"] is None + + ep.safe_psql(f"INSERT INTO t VALUES ({cur_gen})") + ep.clear_buffers() + assert ep.safe_psql("SELECT * FROM t") == [(i + 1,) for i in range(cur_gen) if i % 2 == 0]