From 651ae44569cf31a47b69b004f5444c79aa12f9cb Mon Sep 17 00:00:00 2001 From: John Spray Date: Tue, 1 Oct 2024 08:57:22 +0100 Subject: [PATCH] storage controller: drop out of blocking compute notification loop if migration origin becomes unavailable (#9147) ## Problem The live migration code waits forever for the compute notification hook, on the basis that until it succeeds, the compute is probably using the old location and we shouldn't detach it. However, if a pageserver stops or restarts in the background, then this original location might no longer be available, so there is no point waiting. Waiting is also actively harmful, because it prevents other reconciliations happening for the tenant shard, such as during an upgrade where a stuck "drain" migration might prevent the later "fill" migration from moving the shard back to its original location. ## Summary of changes - Refactor the notification wait loop into a function - Add a checks during the loop, for the origin node's cancellation token and an explicit HTTP request to the origin node to confirm the shard is still attached there. Closes: https://github.com/neondatabase/neon/issues/8901 --- storage_controller/src/reconciler.rs | 136 ++++++++++++++--- storage_controller/src/service.rs | 7 +- .../regress/test_storage_controller.py | 143 ++++++++++++++++++ 3 files changed, 261 insertions(+), 25 deletions(-) diff --git a/storage_controller/src/reconciler.rs b/storage_controller/src/reconciler.rs index 2c42da4043..1e7d7adffe 100644 --- a/storage_controller/src/reconciler.rs +++ b/storage_controller/src/reconciler.rs @@ -572,30 +572,7 @@ impl Reconciler { // During a live migration it is unhelpful to proceed if we couldn't notify compute: if we detach // the origin without notifying compute, we will render the tenant unavailable. - let mut notify_attempts = 0; - while let Err(e) = self.compute_notify().await { - match e { - NotifyError::Fatal(_) => return Err(ReconcileError::Notify(e)), - NotifyError::ShuttingDown => return Err(ReconcileError::Cancel), - _ => { - tracing::warn!( - "Live migration blocked by compute notification error, retrying: {e}" - ); - } - } - - exponential_backoff( - notify_attempts, - // Generous waits: control plane operations which might be blocking us usually complete on the order - // of hundreds to thousands of milliseconds, so no point busy polling. - 1.0, - 10.0, - &self.cancel, - ) - .await; - notify_attempts += 1; - } - + self.compute_notify_blocking(&origin_ps).await?; pausable_failpoint!("reconciler-live-migrate-post-notify"); // Downgrade the origin to secondary. If the tenant's policy is PlacementPolicy::Attached(0), then @@ -869,6 +846,117 @@ impl Reconciler { Ok(()) } } + + /// Keep trying to notify the compute indefinitely, only dropping out if: + /// - the node `origin` becomes unavailable -> Ok(()) + /// - the node `origin` no longer has our tenant shard attached -> Ok(()) + /// - our cancellation token fires -> Err(ReconcileError::Cancelled) + /// + /// This is used during live migration, where we do not wish to detach + /// an origin location until the compute definitely knows about the new + /// location. + /// + /// In cases where the origin node becomes unavailable, we return success, indicating + /// to the caller that they should continue irrespective of whether the compute was notified, + /// because the origin node is unusable anyway. Notification will be retried later via the + /// [`Self::compute_notify_failure`] flag. + async fn compute_notify_blocking(&mut self, origin: &Node) -> Result<(), ReconcileError> { + let mut notify_attempts = 0; + while let Err(e) = self.compute_notify().await { + match e { + NotifyError::Fatal(_) => return Err(ReconcileError::Notify(e)), + NotifyError::ShuttingDown => return Err(ReconcileError::Cancel), + _ => { + tracing::warn!( + "Live migration blocked by compute notification error, retrying: {e}" + ); + } + } + + // Did the origin pageserver become unavailable? + if !origin.is_available() { + tracing::info!("Giving up on compute notification because {origin} is unavailable"); + break; + } + + // Does the origin pageserver still host the shard we are interested in? We should only + // continue waiting for compute notification to be acked if the old location is still usable. + let tenant_shard_id = self.tenant_shard_id; + match origin + .with_client_retries( + |client| async move { client.get_location_config(tenant_shard_id).await }, + &self.service_config.jwt_token, + 1, + 3, + Duration::from_secs(5), + &self.cancel, + ) + .await + { + Some(Ok(Some(location_conf))) => { + if matches!( + location_conf.mode, + LocationConfigMode::AttachedMulti + | LocationConfigMode::AttachedSingle + | LocationConfigMode::AttachedStale + ) { + tracing::debug!( + "Still attached to {origin}, will wait & retry compute notification" + ); + } else { + tracing::info!( + "Giving up on compute notification because {origin} is in state {:?}", + location_conf.mode + ); + return Ok(()); + } + // Fall through + } + Some(Ok(None)) => { + tracing::info!( + "No longer attached to {origin}, giving up on compute notification" + ); + return Ok(()); + } + Some(Err(e)) => { + match e { + mgmt_api::Error::Cancelled => { + tracing::info!( + "Giving up on compute notification because {origin} is unavailable" + ); + return Ok(()); + } + mgmt_api::Error::ApiError(StatusCode::NOT_FOUND, _) => { + tracing::info!( + "No longer attached to {origin}, giving up on compute notification" + ); + return Ok(()); + } + e => { + // Other API errors are unexpected here. + tracing::warn!("Unexpected error checking location on {origin}: {e}"); + + // Fall through, we will retry compute notification. + } + } + } + None => return Err(ReconcileError::Cancel), + }; + + exponential_backoff( + notify_attempts, + // Generous waits: control plane operations which might be blocking us usually complete on the order + // of hundreds to thousands of milliseconds, so no point busy polling. + 1.0, + 10.0, + &self.cancel, + ) + .await; + notify_attempts += 1; + } + + Ok(()) + } } /// We tweak the externally-set TenantConfig while configuring diff --git a/storage_controller/src/service.rs b/storage_controller/src/service.rs index a5e0129684..851db97310 100644 --- a/storage_controller/src/service.rs +++ b/storage_controller/src/service.rs @@ -4974,7 +4974,12 @@ impl Service { { let mut nodes_mut = (**nodes).clone(); - nodes_mut.remove(&node_id); + if let Some(mut removed_node) = nodes_mut.remove(&node_id) { + // Ensure that any reconciler holding an Arc<> to this node will + // drop out when trying to RPC to it (setting Offline state sets the + // cancellation token on the Node object). + removed_node.set_availability(NodeAvailability::Offline); + } *nodes = Arc::new(nodes_mut); } } diff --git a/test_runner/regress/test_storage_controller.py b/test_runner/regress/test_storage_controller.py index 3861f0b822..789623cb27 100644 --- a/test_runner/regress/test_storage_controller.py +++ b/test_runner/regress/test_storage_controller.py @@ -567,6 +567,149 @@ def test_storage_controller_compute_hook( env.storage_controller.consistency_check() +def test_storage_controller_stuck_compute_hook( + httpserver: HTTPServer, + neon_env_builder: NeonEnvBuilder, + httpserver_listen_address, +): + """ + Test the migration process's behavior when the compute hook does not enable it to proceed + """ + + neon_env_builder.num_pageservers = 2 + (host, port) = httpserver_listen_address + neon_env_builder.control_plane_compute_hook_api = f"http://{host}:{port}/notify" + + handle_params = {"status": 200} + + notifications = [] + + def handler(request: Request): + status = handle_params["status"] + log.info(f"Notify request[{status}]: {request}") + notifications.append(request.json) + return Response(status=status) + + httpserver.expect_request("/notify", method="PUT").respond_with_handler(handler) + + # Start running + env = neon_env_builder.init_start(initial_tenant_conf={"lsn_lease_length": "0s"}) + + # Initial notification from tenant creation + assert len(notifications) == 1 + expect: Dict[str, Union[List[Dict[str, int]], str, None, int]] = { + "tenant_id": str(env.initial_tenant), + "stripe_size": None, + "shards": [{"node_id": int(env.pageservers[0].id), "shard_number": 0}], + } + assert notifications[0] == expect + + # Do a migration while the compute hook is returning 423 status + tenant_id = env.initial_tenant + origin_pageserver = env.get_tenant_pageserver(tenant_id) + dest_ps_id = [p.id for p in env.pageservers if p.id != origin_pageserver.id][0] + dest_pageserver = env.get_pageserver(dest_ps_id) + shard_0_id = TenantShardId(tenant_id, 0, 0) + + NOTIFY_BLOCKED_LOG = ".*Live migration blocked.*" + env.storage_controller.allowed_errors.extend( + [ + NOTIFY_BLOCKED_LOG, + ".*Failed to notify compute.*", + ".*Reconcile error.*Cancelled", + ".*Reconcile error.*Control plane tenant busy", + ] + ) + + with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor: + # We expect the controller to hit the 423 (locked) and retry. Migration shouldn't complete until that + # status is cleared. + handle_params["status"] = 423 + migrate_fut = executor.submit( + env.storage_controller.tenant_shard_migrate, shard_0_id, dest_ps_id + ) + + def logged_stuck(): + env.storage_controller.assert_log_contains(NOTIFY_BLOCKED_LOG) + + wait_until(10, 0.25, logged_stuck) + contains_r = env.storage_controller.log_contains(NOTIFY_BLOCKED_LOG) + assert contains_r is not None # Appease mypy + (_, log_cursor) = contains_r + assert migrate_fut.running() + + # Permit the compute hook to proceed + handle_params["status"] = 200 + migrate_fut.result(timeout=10) + + # Advance log cursor past the last 'stuck' message (we already waited for one, but + # there could be more than one) + while True: + contains_r = env.storage_controller.log_contains(NOTIFY_BLOCKED_LOG, offset=log_cursor) + if contains_r is None: + break + else: + (_, log_cursor) = contains_r + + # Now, do a migration in the opposite direction + handle_params["status"] = 423 + migrate_fut = executor.submit( + env.storage_controller.tenant_shard_migrate, shard_0_id, origin_pageserver.id + ) + + def logged_stuck_again(): + env.storage_controller.assert_log_contains(NOTIFY_BLOCKED_LOG, offset=log_cursor) + + wait_until(10, 0.25, logged_stuck_again) + assert migrate_fut.running() + + # This time, the compute hook remains stuck, but we mark the origin node offline: this should + # also allow the migration to complete -- we only wait for the compute hook as long as we think + # the old location is still usable for computes. + # This is a regression test for issue https://github.com/neondatabase/neon/issues/8901 + dest_pageserver.stop() + env.storage_controller.node_configure(dest_ps_id, {"availability": "Offline"}) + + try: + migrate_fut.result(timeout=10) + except StorageControllerApiException as e: + # The reconciler will fail because it can't detach from the origin: the important + # thing is that it finishes, rather than getting stuck in the compute notify loop. + assert "Reconcile error" in str(e) + + # A later background reconciliation will clean up and leave things in a neat state, even + # while the compute hook is still blocked + try: + env.storage_controller.reconcile_all() + except StorageControllerApiException as e: + # We expect that the reconciler will do its work, but be unable to fully succeed + # because it can't send a compute notification. It will complete, but leave + # the internal flag set for "retry compute notification later" + assert "Control plane tenant busy" in str(e) + + # Confirm that we are AttachedSingle on the node we last called the migrate API for + loc = origin_pageserver.http_client().tenant_get_location(shard_0_id) + assert loc["mode"] == "AttachedSingle" + + # When the origin node comes back, it should get cleaned up + dest_pageserver.start() + try: + env.storage_controller.reconcile_all() + except StorageControllerApiException as e: + # Compute hook is still blocked: reconciler will configure PS but not fully succeed + assert "Control plane tenant busy" in str(e) + + with pytest.raises(PageserverApiException, match="Tenant shard not found"): + dest_pageserver.http_client().tenant_get_location(shard_0_id) + + # Once the compute hook is unblocked, we should be able to get into a totally + # quiescent state again + handle_params["status"] = 200 + env.storage_controller.reconcile_until_idle() + + env.storage_controller.consistency_check() + + def test_storage_controller_debug_apis(neon_env_builder: NeonEnvBuilder): """ Verify that occasional-use debug APIs work as expected. This is a lightweight test