mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-03 19:42:55 +00:00
storage_controller: fix node flap detach race (#10298)
## Problem The observed state removal may race with the inline updates of the observed state done from `Service::node_activate_reconcile`. This was intended to work as follows: 1. Detaches while the node is unavailable remove the entry from the observed state. 2. `Service::node_activate_reconcile` diffs the locations returned by the pageserver with the observed state and detaches in-line when required. ## Summary of changes This PR removes step (1) and lets background reconciliations deal with the mismatch between the intent and observed state. A follow up will attempt to remove `Service::node_activate_reconcile` altogether. Closes https://github.com/neondatabase/neon/issues/10253
This commit is contained in:
@@ -14,7 +14,6 @@ use std::sync::Arc;
|
||||
use std::time::{Duration, Instant};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use utils::backoff::exponential_backoff;
|
||||
use utils::failpoint_support;
|
||||
use utils::generation::Generation;
|
||||
use utils::id::{NodeId, TimelineId};
|
||||
use utils::lsn::Lsn;
|
||||
@@ -212,11 +211,12 @@ impl Reconciler {
|
||||
lazy: bool,
|
||||
) -> Result<(), ReconcileError> {
|
||||
if !node.is_available() && config.mode == LocationConfigMode::Detached {
|
||||
// Attempts to detach from offline nodes may be imitated without doing I/O: a node which is offline
|
||||
// will get fully reconciled wrt the shard's intent state when it is reactivated, irrespective of
|
||||
// what we put into `observed`, in [`crate::service::Service::node_activate_reconcile`]
|
||||
tracing::info!("Node {node} is unavailable during detach: proceeding anyway, it will be detached on next activation");
|
||||
self.observed.locations.remove(&node.get_id());
|
||||
// [`crate::service::Service::node_activate_reconcile`] will update the observed state
|
||||
// when the node comes back online. At that point, the intent and observed states will
|
||||
// be mismatched and a background reconciliation will detach.
|
||||
tracing::info!(
|
||||
"Node {node} is unavailable during detach: proceeding anyway, it will be detached via background reconciliation"
|
||||
);
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
@@ -749,6 +749,8 @@ impl Reconciler {
|
||||
};
|
||||
|
||||
if increment_generation {
|
||||
pausable_failpoint!("reconciler-pre-increment-generation");
|
||||
|
||||
let generation = self
|
||||
.persistence
|
||||
.increment_generation(self.tenant_shard_id, node.get_id())
|
||||
@@ -824,7 +826,7 @@ impl Reconciler {
|
||||
.handle_detach(self.tenant_shard_id, self.shard.stripe_size);
|
||||
}
|
||||
|
||||
failpoint_support::sleep_millis_async!("sleep-on-reconcile-epilogue");
|
||||
pausable_failpoint!("reconciler-epilogue");
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -83,6 +83,7 @@ use utils::{
|
||||
generation::Generation,
|
||||
http::error::ApiError,
|
||||
id::{NodeId, TenantId, TimelineId},
|
||||
pausable_failpoint,
|
||||
sync::gate::Gate,
|
||||
};
|
||||
|
||||
@@ -1024,6 +1025,8 @@ impl Service {
|
||||
)
|
||||
.await;
|
||||
|
||||
pausable_failpoint!("heartbeat-pre-node-state-configure");
|
||||
|
||||
// This is the code path for geniune availability transitions (i.e node
|
||||
// goes unavailable and/or comes back online).
|
||||
let res = self
|
||||
@@ -2492,6 +2495,7 @@ impl Service {
|
||||
// Persist updates
|
||||
// Ordering: write to the database before applying changes in-memory, so that
|
||||
// we will not appear time-travel backwards on a restart.
|
||||
|
||||
let mut schedule_context = ScheduleContext::default();
|
||||
for ShardUpdate {
|
||||
tenant_shard_id,
|
||||
|
||||
@@ -2521,6 +2521,7 @@ class NeonPageserver(PgProtocol, LogUtils):
|
||||
self,
|
||||
extra_env_vars: dict[str, str] | None = None,
|
||||
timeout_in_seconds: int | None = None,
|
||||
await_active: bool = True,
|
||||
) -> Self:
|
||||
"""
|
||||
Start the page server.
|
||||
@@ -2547,8 +2548,10 @@ class NeonPageserver(PgProtocol, LogUtils):
|
||||
)
|
||||
self.running = True
|
||||
|
||||
if self.env.storage_controller.running and self.env.storage_controller.node_registered(
|
||||
self.id
|
||||
if (
|
||||
await_active
|
||||
and self.env.storage_controller.running
|
||||
and self.env.storage_controller.node_registered(self.id)
|
||||
):
|
||||
self.env.storage_controller.poll_node_status(
|
||||
self.id, PageserverAvailability.ACTIVE, None, max_attempts=200, backoff=0.1
|
||||
|
||||
@@ -17,6 +17,7 @@ from fixtures.compute_reconfigure import ComputeReconfigure
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import (
|
||||
DEFAULT_AZ_ID,
|
||||
LogCursor,
|
||||
NeonEnv,
|
||||
NeonEnvBuilder,
|
||||
NeonPageserver,
|
||||
@@ -2406,7 +2407,14 @@ def test_storage_controller_step_down(neon_env_builder: NeonEnvBuilder):
|
||||
env.storage_controller.tenant_create(tid)
|
||||
|
||||
env.storage_controller.reconcile_until_idle()
|
||||
env.storage_controller.configure_failpoints(("sleep-on-reconcile-epilogue", "return(10000)"))
|
||||
env.storage_controller.configure_failpoints(("reconciler-epilogue", "pause"))
|
||||
|
||||
def unpause_failpoint():
|
||||
time.sleep(2)
|
||||
env.storage_controller.configure_failpoints(("reconciler-epilogue", "off"))
|
||||
|
||||
thread = threading.Thread(target=unpause_failpoint)
|
||||
thread.start()
|
||||
|
||||
# Make a change to the tenant config to trigger a slow reconcile
|
||||
virtual_ps_http = PageserverHttpClient(env.storage_controller_port, lambda: True)
|
||||
@@ -2421,6 +2429,8 @@ def test_storage_controller_step_down(neon_env_builder: NeonEnvBuilder):
|
||||
observed_state = env.storage_controller.step_down()
|
||||
log.info(f"Storage controller stepped down with {observed_state=}")
|
||||
|
||||
thread.join()
|
||||
|
||||
# Validate that we waited for the slow reconcile to complete
|
||||
# and updated the observed state in the storcon before stepping down.
|
||||
node_id = str(env.pageserver.id)
|
||||
@@ -3294,3 +3304,113 @@ def test_storage_controller_detached_stopped(
|
||||
|
||||
# Confirm the detach happened
|
||||
assert env.pageserver.http_client().tenant_list_locations()["tenant_shards"] == []
|
||||
|
||||
|
||||
@run_only_on_default_postgres("Postgres version makes no difference here")
|
||||
def test_storage_controller_node_flap_detach_race(
|
||||
neon_env_builder: NeonEnvBuilder,
|
||||
):
|
||||
"""
|
||||
Reproducer for https://github.com/neondatabase/neon/issues/10253.
|
||||
|
||||
When a node's availability flaps, the reconciliations spawned by the node
|
||||
going offline may race with the reconciliation done when then node comes
|
||||
back online.
|
||||
"""
|
||||
neon_env_builder.num_pageservers = 4
|
||||
|
||||
env = neon_env_builder.init_configs()
|
||||
env.start()
|
||||
|
||||
tenant_id = TenantId.generate()
|
||||
env.storage_controller.tenant_create(
|
||||
tenant_id,
|
||||
shard_count=2,
|
||||
)
|
||||
env.storage_controller.reconcile_until_idle()
|
||||
|
||||
stopped_nodes = [s["node_id"] for s in env.storage_controller.locate(tenant_id)]
|
||||
|
||||
def has_hit_failpoint(failpoint: str, offset: LogCursor | None = None) -> LogCursor:
|
||||
res = env.storage_controller.log_contains(f"at failpoint {failpoint}", offset=offset)
|
||||
assert res
|
||||
return res[1]
|
||||
|
||||
# Stop the nodes which host attached shards.
|
||||
# This will trigger reconciliations which pause before incrmenenting the generation,
|
||||
# and, more importantly, updating the `generation_pageserver` of the shards.
|
||||
env.storage_controller.configure_failpoints(("reconciler-pre-increment-generation", "pause"))
|
||||
for node_id in stopped_nodes:
|
||||
env.get_pageserver(node_id).stop(immediate=True)
|
||||
|
||||
def failure_handled() -> LogCursor:
|
||||
stop_offset = None
|
||||
|
||||
for node_id in stopped_nodes:
|
||||
res = env.storage_controller.log_contains(f"node {node_id} going offline")
|
||||
assert res
|
||||
stop_offset = res[1]
|
||||
|
||||
assert stop_offset
|
||||
return stop_offset
|
||||
|
||||
offset = wait_until(failure_handled)
|
||||
|
||||
# Now restart the nodes and make them pause before marking themselves as available
|
||||
# or running the activation reconciliation.
|
||||
env.storage_controller.configure_failpoints(("heartbeat-pre-node-state-configure", "pause"))
|
||||
|
||||
for node_id in stopped_nodes:
|
||||
env.get_pageserver(node_id).start(await_active=False)
|
||||
|
||||
offset = wait_until(
|
||||
lambda: has_hit_failpoint("heartbeat-pre-node-state-configure", offset=offset)
|
||||
)
|
||||
|
||||
# The nodes have restarted and are waiting to perform activaction reconciliation.
|
||||
# Unpause the initial reconciliation triggered by the nodes going offline.
|
||||
# It will attempt to detach from the old location, but notice that the old location
|
||||
# is not yet available, and then stop before processing the results of the reconciliation.
|
||||
env.storage_controller.configure_failpoints(("reconciler-epilogue", "pause"))
|
||||
env.storage_controller.configure_failpoints(("reconciler-pre-increment-generation", "off"))
|
||||
|
||||
offset = wait_until(lambda: has_hit_failpoint("reconciler-epilogue", offset=offset))
|
||||
|
||||
# Let the nodes perform activation reconciliation while still holding up processing the result
|
||||
# from the initial reconcile triggered by going offline.
|
||||
env.storage_controller.configure_failpoints(("heartbeat-pre-node-state-configure", "off"))
|
||||
|
||||
def activate_reconciliation_done():
|
||||
for node_id in stopped_nodes:
|
||||
assert env.storage_controller.log_contains(
|
||||
f"Node {node_id} transition to active", offset=offset
|
||||
)
|
||||
|
||||
wait_until(activate_reconciliation_done)
|
||||
|
||||
# Finally, allow the initial reconcile to finish up.
|
||||
env.storage_controller.configure_failpoints(("reconciler-epilogue", "off"))
|
||||
|
||||
# Give things a chance to settle and validate that no stale locations exist
|
||||
env.storage_controller.reconcile_until_idle()
|
||||
|
||||
def validate_locations():
|
||||
shard_locations = defaultdict(list)
|
||||
for ps in env.pageservers:
|
||||
locations = ps.http_client().tenant_list_locations()["tenant_shards"]
|
||||
for loc in locations:
|
||||
shard_locations[loc[0]].append(
|
||||
{"generation": loc[1]["generation"], "mode": loc[1]["mode"], "node": ps.id}
|
||||
)
|
||||
|
||||
log.info(f"Shard locations: {shard_locations}")
|
||||
|
||||
attached_locations = {
|
||||
k: list(filter(lambda loc: loc["mode"] == "AttachedSingle", v))
|
||||
for k, v in shard_locations.items()
|
||||
}
|
||||
|
||||
for shard, locs in attached_locations.items():
|
||||
assert len(locs) == 1, f"{shard} has {len(locs)} attached locations"
|
||||
|
||||
wait_until(validate_locations, timeout=10)
|
||||
|
||||
Reference in New Issue
Block a user