diff --git a/storage_controller/src/reconciler.rs b/storage_controller/src/reconciler.rs index 9f6f385dc9..b03a6dae04 100644 --- a/storage_controller/src/reconciler.rs +++ b/storage_controller/src/reconciler.rs @@ -686,6 +686,8 @@ impl Reconciler { .await?, ); + pausable_failpoint!("reconciler-live-migrate-post-generation-inc"); + let dest_conf = build_location_config( &self.shard, &self.config, @@ -760,7 +762,9 @@ impl Reconciler { Ok(()) } - async fn maybe_refresh_observed(&mut self) -> Result<(), ReconcileError> { + /// Returns true if the observed state of the attached location was refreshed + /// and false otherwise. + async fn maybe_refresh_observed(&mut self) -> Result { // If the attached node has uncertain state, read it from the pageserver before proceeding: this // is important to avoid spurious generation increments. // @@ -770,7 +774,7 @@ impl Reconciler { let Some(attached_node) = self.intent.attached.as_ref() else { // Nothing to do - return Ok(()); + return Ok(false); }; if matches!( @@ -815,7 +819,7 @@ impl Reconciler { } } - Ok(()) + Ok(true) } /// Reconciling a tenant makes API calls to pageservers until the observed state @@ -831,7 +835,7 @@ impl Reconciler { /// state where it still requires later reconciliation. pub(crate) async fn reconcile(&mut self) -> Result<(), ReconcileError> { // Prepare: if we have uncertain `observed` state for our would-be attachement location, then refresh it - self.maybe_refresh_observed().await?; + let refreshed = self.maybe_refresh_observed().await?; // Special case: live migration self.maybe_live_migrate().await?; @@ -855,8 +859,14 @@ impl Reconciler { ); match self.observed.locations.get(&node.get_id()) { Some(conf) if conf.conf.as_ref() == Some(&wanted_conf) => { - // Nothing to do - tracing::info!(node_id=%node.get_id(), "Observed configuration already correct.") + if refreshed { + tracing::info!( + node_id=%node.get_id(), "Observed configuration correct after refresh. Notifying compute."); + self.compute_notify().await?; + } else { + // Nothing to do + tracing::info!(node_id=%node.get_id(), "Observed configuration already correct."); + } } observed => { // In all cases other than a matching observed configuration, we will diff --git a/test_runner/regress/test_compatibility.py b/test_runner/regress/test_compatibility.py index f61778e4c5..fcc2e7006f 100644 --- a/test_runner/regress/test_compatibility.py +++ b/test_runner/regress/test_compatibility.py @@ -249,6 +249,7 @@ def test_forward_compatibility( top_output_dir: Path, pg_version: PgVersion, compatibility_snapshot_dir: Path, + compute_reconfigure_listener: ComputeReconfigure, ): """ Test that the old binaries can read new data @@ -257,6 +258,7 @@ def test_forward_compatibility( os.environ.get("ALLOW_FORWARD_COMPATIBILITY_BREAKAGE", "false").lower() == "true" ) + neon_env_builder.control_plane_hooks_api = compute_reconfigure_listener.control_plane_hooks_api neon_env_builder.test_may_use_compatibility_snapshot_binaries = True try: diff --git a/test_runner/regress/test_storage_controller.py b/test_runner/regress/test_storage_controller.py index 35a75ca607..b9344f2fb4 100644 --- a/test_runner/regress/test_storage_controller.py +++ b/test_runner/regress/test_storage_controller.py @@ -4176,3 +4176,121 @@ def test_storage_controller_graceful_migration(neon_env_builder: NeonEnvBuilder, ) else: assert initial_ps.http_client().tenant_list_locations()["tenant_shards"] == [] + + +@run_only_on_default_postgres("this is like a 'unit test' against storcon db") +def test_storage_controller_migrate_with_pageserver_restart( + neon_env_builder: NeonEnvBuilder, make_httpserver +): + """ + Test that live migrations which fail right after incrementing the generation + due to the destination going offline eventually send a compute notification + after the destination re-attaches. + """ + neon_env_builder.num_pageservers = 2 + + neon_env_builder.storage_controller_config = { + # Disable transitions to offline + "max_offline": "600s", + "use_local_compute_notifications": False, + } + + neon_env_builder.control_plane_hooks_api = ( + f"http://{make_httpserver.host}:{make_httpserver.port}/" + ) + + notifications = [] + + def notify(request: Request): + log.info(f"Received notify-attach: {request}") + notifications.append(request.json) + + make_httpserver.expect_request("/notify-attach", method="PUT").respond_with_handler(notify) + + env = neon_env_builder.init_start() + + env.storage_controller.allowed_errors.extend( + [ + ".*Call to node.*management API failed.*", + ".*Call to node.*management API still failed.*", + ".*Reconcile error.*", + ".*request.*PUT.*migrate.*", + ] + ) + + env.storage_controller.tenant_policy_update(env.initial_tenant, {"placement": {"Attached": 1}}) + env.storage_controller.reconcile_until_idle() + + initial_desc = env.storage_controller.tenant_describe(env.initial_tenant)["shards"][0] + log.info(f"{initial_desc=}") + primary = env.get_pageserver(initial_desc["node_attached"]) + secondary = env.get_pageserver(initial_desc["node_secondary"][0]) + + # Pause the migration after incrementing the generation in the database + env.storage_controller.configure_failpoints( + ("reconciler-live-migrate-post-generation-inc", "pause") + ) + + tenant_shard_id = TenantShardId(env.initial_tenant, 0, 0) + + try: + with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor: + migrate_fut = executor.submit( + env.storage_controller.tenant_shard_migrate, + tenant_shard_id, + secondary.id, + config=StorageControllerMigrationConfig(prewarm=False, override_scheduler=True), + ) + + def has_hit_migration_failpoint(): + expr = "at failpoint reconciler-live-migrate-post-generation-inc" + log.info(expr) + assert env.storage_controller.log_contains(expr) + + wait_until(has_hit_migration_failpoint) + + secondary.stop() + + # Eventually migration completes + env.storage_controller.configure_failpoints( + ("reconciler-live-migrate-post-generation-inc", "off") + ) + try: + migrate_fut.result() + except StorageControllerApiException as err: + log.info(f"Migration failed: {err}") + except: + env.storage_controller.configure_failpoints( + ("reconciler-live-migrate-post-generation-inc", "off") + ) + raise + + def process_migration_result(): + dump = env.storage_controller.tenant_shard_dump() + observed = dump[0]["observed"]["locations"] + + log.info(f"{observed=} primary={primary.id} secondary={secondary.id}") + + assert observed[str(primary.id)]["conf"]["mode"] == "AttachedStale" + assert observed[str(secondary.id)]["conf"] is None + + wait_until(process_migration_result) + + # Start and wait for re-attach to be processed + secondary.start() + env.storage_controller.poll_node_status( + secondary.id, + desired_availability=PageserverAvailability.ACTIVE, + desired_scheduling_policy=None, + max_attempts=10, + backoff=1, + ) + + env.storage_controller.reconcile_until_idle() + + assert notifications[-1] == { + "tenant_id": str(env.initial_tenant), + "stripe_size": None, + "shards": [{"node_id": int(secondary.id), "shard_number": 0}], + "preferred_az": DEFAULT_AZ_ID, + }