mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-04 12:02:55 +00:00
storcon: fix case where we might fail to send compute notifications after two opposite migrations (#9435)
## Problem If we migrate A->B, then B->A, and the notification of A->B fails, then we might have retained state that makes us think "A" is the last state we sent to the compute hook, whereas when we migrate B->A we should really be sending a fresh notification in case our earlier failed notification has actually mutated the remote compute config. Closes: #9417 ## Summary of changes - Add a reproducer for the bug (`test_storage_controller_compute_hook_revert`) - Refactor compute hook code to represent remote state with `ComputeRemoteState` which stores a boolean for whether the compute has fully applied the change as well as the request that the compute accepted. - The actual bug fix: after sending a compute notification, if we got a 423 response then update our ComputeRemoteState to reflect that we have mutated the remote state. This way, when we later try and notify for our historic location, we will properly see that as a change and send the notification. Co-authored-by: Vlad Lazar <vlad@neon.tech>
This commit is contained in:
@@ -28,7 +28,7 @@ struct UnshardedComputeHookTenant {
|
||||
node_id: NodeId,
|
||||
|
||||
// Must hold this lock to send a notification.
|
||||
send_lock: Arc<tokio::sync::Mutex<Option<ComputeHookNotifyRequest>>>,
|
||||
send_lock: Arc<tokio::sync::Mutex<Option<ComputeRemoteState>>>,
|
||||
}
|
||||
struct ShardedComputeHookTenant {
|
||||
stripe_size: ShardStripeSize,
|
||||
@@ -38,7 +38,22 @@ struct ShardedComputeHookTenant {
|
||||
// Must hold this lock to send a notification. The contents represent
|
||||
// the last successfully sent notification, and are used to coalesce multiple
|
||||
// updates by only sending when there is a chance since our last successful send.
|
||||
send_lock: Arc<tokio::sync::Mutex<Option<ComputeHookNotifyRequest>>>,
|
||||
send_lock: Arc<tokio::sync::Mutex<Option<ComputeRemoteState>>>,
|
||||
}
|
||||
|
||||
/// Represents our knowledge of the compute's state: we can update this when we get a
|
||||
/// response from a notify API call, which tells us what has been applied.
|
||||
///
|
||||
/// Should be wrapped in an Option<>, as we cannot always know the remote state.
|
||||
#[derive(PartialEq, Eq, Debug)]
|
||||
struct ComputeRemoteState {
|
||||
// The request body which was acked by the compute
|
||||
request: ComputeHookNotifyRequest,
|
||||
|
||||
// Whether the cplane indicated that the state was applied to running computes, or just
|
||||
// persisted. In the Neon control plane, this is the difference between a 423 response (meaning
|
||||
// persisted but not applied), and a 2xx response (both persisted and applied)
|
||||
applied: bool,
|
||||
}
|
||||
|
||||
enum ComputeHookTenant {
|
||||
@@ -64,7 +79,7 @@ impl ComputeHookTenant {
|
||||
}
|
||||
}
|
||||
|
||||
fn get_send_lock(&self) -> &Arc<tokio::sync::Mutex<Option<ComputeHookNotifyRequest>>> {
|
||||
fn get_send_lock(&self) -> &Arc<tokio::sync::Mutex<Option<ComputeRemoteState>>> {
|
||||
match self {
|
||||
Self::Unsharded(unsharded_tenant) => &unsharded_tenant.send_lock,
|
||||
Self::Sharded(sharded_tenant) => &sharded_tenant.send_lock,
|
||||
@@ -188,11 +203,11 @@ enum MaybeSendResult {
|
||||
Transmit(
|
||||
(
|
||||
ComputeHookNotifyRequest,
|
||||
tokio::sync::OwnedMutexGuard<Option<ComputeHookNotifyRequest>>,
|
||||
tokio::sync::OwnedMutexGuard<Option<ComputeRemoteState>>,
|
||||
),
|
||||
),
|
||||
// Something requires sending, but you must wait for a current sender then call again
|
||||
AwaitLock(Arc<tokio::sync::Mutex<Option<ComputeHookNotifyRequest>>>),
|
||||
AwaitLock(Arc<tokio::sync::Mutex<Option<ComputeRemoteState>>>),
|
||||
// Nothing requires sending
|
||||
Noop,
|
||||
}
|
||||
@@ -201,7 +216,7 @@ impl ComputeHookTenant {
|
||||
fn maybe_send(
|
||||
&self,
|
||||
tenant_id: TenantId,
|
||||
lock: Option<tokio::sync::OwnedMutexGuard<Option<ComputeHookNotifyRequest>>>,
|
||||
lock: Option<tokio::sync::OwnedMutexGuard<Option<ComputeRemoteState>>>,
|
||||
) -> MaybeSendResult {
|
||||
let locked = match lock {
|
||||
Some(already_locked) => already_locked,
|
||||
@@ -257,11 +272,22 @@ impl ComputeHookTenant {
|
||||
tracing::info!("Tenant isn't yet ready to emit a notification");
|
||||
MaybeSendResult::Noop
|
||||
}
|
||||
Some(request) if Some(&request) == locked.as_ref() => {
|
||||
// No change from the last value successfully sent
|
||||
Some(request)
|
||||
if Some(&request) == locked.as_ref().map(|s| &s.request)
|
||||
&& locked.as_ref().map(|s| s.applied).unwrap_or(false) =>
|
||||
{
|
||||
tracing::info!(
|
||||
"Skipping notification because remote state already matches ({:?})",
|
||||
&request
|
||||
);
|
||||
// No change from the last value successfully sent, and our state indicates that the last
|
||||
// value sent was fully applied on the control plane side.
|
||||
MaybeSendResult::Noop
|
||||
}
|
||||
Some(request) => MaybeSendResult::Transmit((request, locked)),
|
||||
Some(request) => {
|
||||
// Our request differs from the last one sent, or the last one sent was not fully applied on the compute side
|
||||
MaybeSendResult::Transmit((request, locked))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -550,10 +576,28 @@ impl ComputeHook {
|
||||
})
|
||||
};
|
||||
|
||||
if result.is_ok() {
|
||||
// Before dropping the send lock, stash the request we just sent so that
|
||||
// subsequent callers can avoid redundantly re-sending the same thing.
|
||||
*send_lock_guard = Some(request);
|
||||
match result {
|
||||
Ok(_) => {
|
||||
// Before dropping the send lock, stash the request we just sent so that
|
||||
// subsequent callers can avoid redundantly re-sending the same thing.
|
||||
*send_lock_guard = Some(ComputeRemoteState {
|
||||
request,
|
||||
applied: true,
|
||||
});
|
||||
}
|
||||
Err(NotifyError::Busy) => {
|
||||
// Busy result means that the server responded and has stored the new configuration,
|
||||
// but was not able to fully apply it to the compute
|
||||
*send_lock_guard = Some(ComputeRemoteState {
|
||||
request,
|
||||
applied: false,
|
||||
});
|
||||
}
|
||||
Err(_) => {
|
||||
// General error case: we can no longer know the remote state, so clear it. This will result in
|
||||
// the logic in maybe_send recognizing that we should call the hook again.
|
||||
*send_lock_guard = None;
|
||||
}
|
||||
}
|
||||
result
|
||||
}
|
||||
@@ -707,7 +751,10 @@ pub(crate) mod tests {
|
||||
assert!(request.stripe_size.is_none());
|
||||
|
||||
// Simulate successful send
|
||||
*guard = Some(request);
|
||||
*guard = Some(ComputeRemoteState {
|
||||
request,
|
||||
applied: true,
|
||||
});
|
||||
drop(guard);
|
||||
|
||||
// Try asking again: this should be a no-op
|
||||
@@ -750,7 +797,10 @@ pub(crate) mod tests {
|
||||
assert_eq!(request.stripe_size, Some(ShardStripeSize(32768)));
|
||||
|
||||
// Simulate successful send
|
||||
*guard = Some(request);
|
||||
*guard = Some(ComputeRemoteState {
|
||||
request,
|
||||
applied: true,
|
||||
});
|
||||
drop(guard);
|
||||
|
||||
Ok(())
|
||||
|
||||
@@ -576,6 +576,14 @@ def test_storage_controller_compute_hook(
|
||||
env.storage_controller.consistency_check()
|
||||
|
||||
|
||||
NOTIFY_BLOCKED_LOG = ".*Live migration blocked.*"
|
||||
NOTIFY_FAILURE_LOGS = [
|
||||
".*Failed to notify compute.*",
|
||||
".*Reconcile error.*Cancelled",
|
||||
".*Reconcile error.*Control plane tenant busy",
|
||||
]
|
||||
|
||||
|
||||
def test_storage_controller_stuck_compute_hook(
|
||||
httpserver: HTTPServer,
|
||||
neon_env_builder: NeonEnvBuilder,
|
||||
@@ -620,15 +628,8 @@ def test_storage_controller_stuck_compute_hook(
|
||||
dest_pageserver = env.get_pageserver(dest_ps_id)
|
||||
shard_0_id = TenantShardId(tenant_id, 0, 0)
|
||||
|
||||
NOTIFY_BLOCKED_LOG = ".*Live migration blocked.*"
|
||||
env.storage_controller.allowed_errors.extend(
|
||||
[
|
||||
NOTIFY_BLOCKED_LOG,
|
||||
".*Failed to notify compute.*",
|
||||
".*Reconcile error.*Cancelled",
|
||||
".*Reconcile error.*Control plane tenant busy",
|
||||
]
|
||||
)
|
||||
env.storage_controller.allowed_errors.append(NOTIFY_BLOCKED_LOG)
|
||||
env.storage_controller.allowed_errors.extend(NOTIFY_FAILURE_LOGS)
|
||||
|
||||
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
|
||||
# We expect the controller to hit the 423 (locked) and retry. Migration shouldn't complete until that
|
||||
@@ -719,6 +720,114 @@ def test_storage_controller_stuck_compute_hook(
|
||||
env.storage_controller.consistency_check()
|
||||
|
||||
|
||||
@run_only_on_default_postgres("this test doesn't start an endpoint")
|
||||
def test_storage_controller_compute_hook_revert(
|
||||
httpserver: HTTPServer,
|
||||
neon_env_builder: NeonEnvBuilder,
|
||||
httpserver_listen_address,
|
||||
):
|
||||
"""
|
||||
'revert' in the sense of a migration which gets reversed shortly after, as may happen during
|
||||
a rolling upgrade.
|
||||
|
||||
This is a reproducer for https://github.com/neondatabase/neon/issues/9417
|
||||
|
||||
The buggy behavior was that when the compute hook gave us errors, we assumed our last successfully
|
||||
sent state was still in effect, so when migrating back to the original pageserver we didn't bother
|
||||
notifying of that. This is wrong because even a failed request might mutate the state on the server.
|
||||
"""
|
||||
|
||||
# We will run two pageserver to migrate and check that the storage controller sends notifications
|
||||
# when migrating.
|
||||
neon_env_builder.num_pageservers = 2
|
||||
(host, port) = httpserver_listen_address
|
||||
neon_env_builder.control_plane_compute_hook_api = f"http://{host}:{port}/notify"
|
||||
|
||||
# Set up fake HTTP notify endpoint
|
||||
notifications = []
|
||||
|
||||
handle_params = {"status": 200}
|
||||
|
||||
def handler(request: Request):
|
||||
status = handle_params["status"]
|
||||
log.info(f"Notify request[{status}]: {request}")
|
||||
notifications.append(request.json)
|
||||
return Response(status=status)
|
||||
|
||||
httpserver.expect_request("/notify", method="PUT").respond_with_handler(handler)
|
||||
|
||||
# Start running
|
||||
env = neon_env_builder.init_start(initial_tenant_conf={"lsn_lease_length": "0s"})
|
||||
tenant_id = env.initial_tenant
|
||||
tenant_shard_id = TenantShardId(tenant_id, 0, 0)
|
||||
|
||||
pageserver_a = env.get_tenant_pageserver(tenant_id)
|
||||
pageserver_b = [p for p in env.pageservers if p.id != pageserver_a.id][0]
|
||||
|
||||
def notified_ps(ps_id: int) -> None:
|
||||
latest = notifications[-1]
|
||||
log.info(f"Waiting for {ps_id}, have {latest}")
|
||||
assert latest is not None
|
||||
assert latest["shards"] is not None
|
||||
assert latest["shards"][0]["node_id"] == ps_id
|
||||
|
||||
wait_until(30, 1, lambda: notified_ps(pageserver_a.id))
|
||||
|
||||
env.storage_controller.allowed_errors.append(NOTIFY_BLOCKED_LOG)
|
||||
env.storage_controller.allowed_errors.extend(NOTIFY_FAILURE_LOGS)
|
||||
|
||||
# Migrate A -> B, and make notifications fail while this is happening
|
||||
handle_params["status"] = 423
|
||||
|
||||
with pytest.raises(StorageControllerApiException, match="Timeout waiting for shard"):
|
||||
# We expect the controller to give us an error because its reconciliation timed out
|
||||
# waiting for the compute hook.
|
||||
env.storage_controller.tenant_shard_migrate(tenant_shard_id, pageserver_b.id)
|
||||
|
||||
# Although the migration API failed, the hook should still see pageserver B (it remembers what
|
||||
# was posted even when returning an error code)
|
||||
wait_until(30, 1, lambda: notified_ps(pageserver_b.id))
|
||||
|
||||
# Although the migration API failed, the tenant should still have moved to the right pageserver
|
||||
assert len(pageserver_b.http_client().tenant_list()) == 1
|
||||
|
||||
# Before we clear the failure on the migration hook, we need the controller to give up
|
||||
# trying to notify about B -- the bug case we're reproducing is when the controller
|
||||
# _never_ successfully notified for B, then tries to notify for A.
|
||||
#
|
||||
# The controller will give up notifying if the origin of a migration becomes unavailable.
|
||||
pageserver_a.stop()
|
||||
|
||||
# Preempt heartbeats for a faster test
|
||||
env.storage_controller.node_configure(pageserver_a.id, {"availability": "Offline"})
|
||||
|
||||
def logged_giving_up():
|
||||
env.storage_controller.assert_log_contains(".*Giving up on compute notification.*")
|
||||
|
||||
wait_until(30, 1, logged_giving_up)
|
||||
|
||||
pageserver_a.start()
|
||||
|
||||
# Preempt heartbeats for determinism
|
||||
env.storage_controller.node_configure(pageserver_a.id, {"availability": "Active"})
|
||||
# Starting node will prompt a reconcile to clean up old AttachedStale location, for a deterministic test
|
||||
# we want that complete before we start our migration. Tolerate failure because our compute hook is
|
||||
# still configured to fail
|
||||
try:
|
||||
env.storage_controller.reconcile_all()
|
||||
except StorageControllerApiException as e:
|
||||
# This exception _might_ be raised: it depends if our reconcile_all hit the on-node-activation
|
||||
# Reconciler lifetime or ran after it already completed.
|
||||
log.info(f"Expected error from reconcile_all: {e}")
|
||||
|
||||
# Migrate B -> A, with a working compute hook: the controller should notify the hook because the
|
||||
# last update it made that was acked (423) by the compute was for node B.
|
||||
handle_params["status"] = 200
|
||||
env.storage_controller.tenant_shard_migrate(tenant_shard_id, pageserver_a.id)
|
||||
|
||||
wait_until(30, 1, lambda: notified_ps(pageserver_a.id))
|
||||
|
||||
|
||||
def test_storage_controller_debug_apis(neon_env_builder: NeonEnvBuilder):
|
||||
"""
|
||||
Verify that occasional-use debug APIs work as expected. This is a lightweight test
|
||||
|
||||
Reference in New Issue
Block a user