storage controller: drop out of blocking compute notification loop if migration origin becomes unavailable (#9147)

## Problem

The live migration code waits forever for the compute notification hook,
on the basis that until it succeeds, the compute is probably using the
old location and we shouldn't detach it.

However, if a pageserver stops or restarts in the background, then this
original location might no longer be available, so there is no point
waiting. Waiting is also actively harmful, because it prevents other
reconciliations happening for the tenant shard, such as during an
upgrade where a stuck "drain" migration might prevent the later "fill"
migration from moving the shard back to its original location.

## Summary of changes

- Refactor the notification wait loop into a function
- Add a checks during the loop, for the origin node's cancellation token
and an explicit HTTP request to the origin node to confirm the shard is
still attached there.

Closes: https://github.com/neondatabase/neon/issues/8901
This commit is contained in:
John Spray
2024-10-01 08:57:22 +01:00
committed by GitHub
parent 65bda19051
commit 651ae44569
3 changed files with 261 additions and 25 deletions

View File

@@ -572,30 +572,7 @@ impl Reconciler {
// During a live migration it is unhelpful to proceed if we couldn't notify compute: if we detach
// the origin without notifying compute, we will render the tenant unavailable.
let mut notify_attempts = 0;
while let Err(e) = self.compute_notify().await {
match e {
NotifyError::Fatal(_) => return Err(ReconcileError::Notify(e)),
NotifyError::ShuttingDown => return Err(ReconcileError::Cancel),
_ => {
tracing::warn!(
"Live migration blocked by compute notification error, retrying: {e}"
);
}
}
exponential_backoff(
notify_attempts,
// Generous waits: control plane operations which might be blocking us usually complete on the order
// of hundreds to thousands of milliseconds, so no point busy polling.
1.0,
10.0,
&self.cancel,
)
.await;
notify_attempts += 1;
}
self.compute_notify_blocking(&origin_ps).await?;
pausable_failpoint!("reconciler-live-migrate-post-notify");
// Downgrade the origin to secondary. If the tenant's policy is PlacementPolicy::Attached(0), then
@@ -869,6 +846,117 @@ impl Reconciler {
Ok(())
}
}
/// Keep trying to notify the compute indefinitely, only dropping out if:
/// - the node `origin` becomes unavailable -> Ok(())
/// - the node `origin` no longer has our tenant shard attached -> Ok(())
/// - our cancellation token fires -> Err(ReconcileError::Cancelled)
///
/// This is used during live migration, where we do not wish to detach
/// an origin location until the compute definitely knows about the new
/// location.
///
/// In cases where the origin node becomes unavailable, we return success, indicating
/// to the caller that they should continue irrespective of whether the compute was notified,
/// because the origin node is unusable anyway. Notification will be retried later via the
/// [`Self::compute_notify_failure`] flag.
async fn compute_notify_blocking(&mut self, origin: &Node) -> Result<(), ReconcileError> {
let mut notify_attempts = 0;
while let Err(e) = self.compute_notify().await {
match e {
NotifyError::Fatal(_) => return Err(ReconcileError::Notify(e)),
NotifyError::ShuttingDown => return Err(ReconcileError::Cancel),
_ => {
tracing::warn!(
"Live migration blocked by compute notification error, retrying: {e}"
);
}
}
// Did the origin pageserver become unavailable?
if !origin.is_available() {
tracing::info!("Giving up on compute notification because {origin} is unavailable");
break;
}
// Does the origin pageserver still host the shard we are interested in? We should only
// continue waiting for compute notification to be acked if the old location is still usable.
let tenant_shard_id = self.tenant_shard_id;
match origin
.with_client_retries(
|client| async move { client.get_location_config(tenant_shard_id).await },
&self.service_config.jwt_token,
1,
3,
Duration::from_secs(5),
&self.cancel,
)
.await
{
Some(Ok(Some(location_conf))) => {
if matches!(
location_conf.mode,
LocationConfigMode::AttachedMulti
| LocationConfigMode::AttachedSingle
| LocationConfigMode::AttachedStale
) {
tracing::debug!(
"Still attached to {origin}, will wait & retry compute notification"
);
} else {
tracing::info!(
"Giving up on compute notification because {origin} is in state {:?}",
location_conf.mode
);
return Ok(());
}
// Fall through
}
Some(Ok(None)) => {
tracing::info!(
"No longer attached to {origin}, giving up on compute notification"
);
return Ok(());
}
Some(Err(e)) => {
match e {
mgmt_api::Error::Cancelled => {
tracing::info!(
"Giving up on compute notification because {origin} is unavailable"
);
return Ok(());
}
mgmt_api::Error::ApiError(StatusCode::NOT_FOUND, _) => {
tracing::info!(
"No longer attached to {origin}, giving up on compute notification"
);
return Ok(());
}
e => {
// Other API errors are unexpected here.
tracing::warn!("Unexpected error checking location on {origin}: {e}");
// Fall through, we will retry compute notification.
}
}
}
None => return Err(ReconcileError::Cancel),
};
exponential_backoff(
notify_attempts,
// Generous waits: control plane operations which might be blocking us usually complete on the order
// of hundreds to thousands of milliseconds, so no point busy polling.
1.0,
10.0,
&self.cancel,
)
.await;
notify_attempts += 1;
}
Ok(())
}
}
/// We tweak the externally-set TenantConfig while configuring

View File

@@ -4974,7 +4974,12 @@ impl Service {
{
let mut nodes_mut = (**nodes).clone();
nodes_mut.remove(&node_id);
if let Some(mut removed_node) = nodes_mut.remove(&node_id) {
// Ensure that any reconciler holding an Arc<> to this node will
// drop out when trying to RPC to it (setting Offline state sets the
// cancellation token on the Node object).
removed_node.set_availability(NodeAvailability::Offline);
}
*nodes = Arc::new(nodes_mut);
}
}

View File

@@ -567,6 +567,149 @@ def test_storage_controller_compute_hook(
env.storage_controller.consistency_check()
def test_storage_controller_stuck_compute_hook(
httpserver: HTTPServer,
neon_env_builder: NeonEnvBuilder,
httpserver_listen_address,
):
"""
Test the migration process's behavior when the compute hook does not enable it to proceed
"""
neon_env_builder.num_pageservers = 2
(host, port) = httpserver_listen_address
neon_env_builder.control_plane_compute_hook_api = f"http://{host}:{port}/notify"
handle_params = {"status": 200}
notifications = []
def handler(request: Request):
status = handle_params["status"]
log.info(f"Notify request[{status}]: {request}")
notifications.append(request.json)
return Response(status=status)
httpserver.expect_request("/notify", method="PUT").respond_with_handler(handler)
# Start running
env = neon_env_builder.init_start(initial_tenant_conf={"lsn_lease_length": "0s"})
# Initial notification from tenant creation
assert len(notifications) == 1
expect: Dict[str, Union[List[Dict[str, int]], str, None, int]] = {
"tenant_id": str(env.initial_tenant),
"stripe_size": None,
"shards": [{"node_id": int(env.pageservers[0].id), "shard_number": 0}],
}
assert notifications[0] == expect
# Do a migration while the compute hook is returning 423 status
tenant_id = env.initial_tenant
origin_pageserver = env.get_tenant_pageserver(tenant_id)
dest_ps_id = [p.id for p in env.pageservers if p.id != origin_pageserver.id][0]
dest_pageserver = env.get_pageserver(dest_ps_id)
shard_0_id = TenantShardId(tenant_id, 0, 0)
NOTIFY_BLOCKED_LOG = ".*Live migration blocked.*"
env.storage_controller.allowed_errors.extend(
[
NOTIFY_BLOCKED_LOG,
".*Failed to notify compute.*",
".*Reconcile error.*Cancelled",
".*Reconcile error.*Control plane tenant busy",
]
)
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
# We expect the controller to hit the 423 (locked) and retry. Migration shouldn't complete until that
# status is cleared.
handle_params["status"] = 423
migrate_fut = executor.submit(
env.storage_controller.tenant_shard_migrate, shard_0_id, dest_ps_id
)
def logged_stuck():
env.storage_controller.assert_log_contains(NOTIFY_BLOCKED_LOG)
wait_until(10, 0.25, logged_stuck)
contains_r = env.storage_controller.log_contains(NOTIFY_BLOCKED_LOG)
assert contains_r is not None # Appease mypy
(_, log_cursor) = contains_r
assert migrate_fut.running()
# Permit the compute hook to proceed
handle_params["status"] = 200
migrate_fut.result(timeout=10)
# Advance log cursor past the last 'stuck' message (we already waited for one, but
# there could be more than one)
while True:
contains_r = env.storage_controller.log_contains(NOTIFY_BLOCKED_LOG, offset=log_cursor)
if contains_r is None:
break
else:
(_, log_cursor) = contains_r
# Now, do a migration in the opposite direction
handle_params["status"] = 423
migrate_fut = executor.submit(
env.storage_controller.tenant_shard_migrate, shard_0_id, origin_pageserver.id
)
def logged_stuck_again():
env.storage_controller.assert_log_contains(NOTIFY_BLOCKED_LOG, offset=log_cursor)
wait_until(10, 0.25, logged_stuck_again)
assert migrate_fut.running()
# This time, the compute hook remains stuck, but we mark the origin node offline: this should
# also allow the migration to complete -- we only wait for the compute hook as long as we think
# the old location is still usable for computes.
# This is a regression test for issue https://github.com/neondatabase/neon/issues/8901
dest_pageserver.stop()
env.storage_controller.node_configure(dest_ps_id, {"availability": "Offline"})
try:
migrate_fut.result(timeout=10)
except StorageControllerApiException as e:
# The reconciler will fail because it can't detach from the origin: the important
# thing is that it finishes, rather than getting stuck in the compute notify loop.
assert "Reconcile error" in str(e)
# A later background reconciliation will clean up and leave things in a neat state, even
# while the compute hook is still blocked
try:
env.storage_controller.reconcile_all()
except StorageControllerApiException as e:
# We expect that the reconciler will do its work, but be unable to fully succeed
# because it can't send a compute notification. It will complete, but leave
# the internal flag set for "retry compute notification later"
assert "Control plane tenant busy" in str(e)
# Confirm that we are AttachedSingle on the node we last called the migrate API for
loc = origin_pageserver.http_client().tenant_get_location(shard_0_id)
assert loc["mode"] == "AttachedSingle"
# When the origin node comes back, it should get cleaned up
dest_pageserver.start()
try:
env.storage_controller.reconcile_all()
except StorageControllerApiException as e:
# Compute hook is still blocked: reconciler will configure PS but not fully succeed
assert "Control plane tenant busy" in str(e)
with pytest.raises(PageserverApiException, match="Tenant shard not found"):
dest_pageserver.http_client().tenant_get_location(shard_0_id)
# Once the compute hook is unblocked, we should be able to get into a totally
# quiescent state again
handle_params["status"] = 200
env.storage_controller.reconcile_until_idle()
env.storage_controller.consistency_check()
def test_storage_controller_debug_apis(neon_env_builder: NeonEnvBuilder):
"""
Verify that occasional-use debug APIs work as expected. This is a lightweight test