Compare commits

...

4 Commits

Author SHA1 Message Date
Vlad Lazar
96bd74697a storage_controller: remove node activation reconciliation 2025-01-07 17:32:53 +01:00
Vlad Lazar
8e77e10aa3 storage_controller: don't drop observed state on unavailable detach
The observed state removal may race with the inline updates of the
observed state done from `Service::node_activate_reconcile`.

This was intended to work as follows:
1. Detaches while the node is unavailable remove the entry from the
   observed state.
2. `Service::node_activate_reconcile` diffs the locations returned
   by the pageserver with the observed state and detaches in-line
   when required.

This commit removes step (1) and lets background reconciliations
deal with the mismatch between the intent and observed state.
2025-01-07 17:04:47 +01:00
Vlad Lazar
83e90d3b65 tests: add repro for node flap detach race 2025-01-07 16:39:24 +01:00
Vlad Lazar
d3fa0f6b9e storage_controller: rename failpoint and make it pausable
The same failpoint is used for a new test by a follow up commit
and that needs a pausable failpoint.
2025-01-07 16:30:32 +01:00
4 changed files with 149 additions and 152 deletions

View File

@@ -14,7 +14,6 @@ use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio_util::sync::CancellationToken;
use utils::backoff::exponential_backoff;
use utils::failpoint_support;
use utils::generation::Generation;
use utils::id::{NodeId, TimelineId};
use utils::lsn::Lsn;
@@ -212,11 +211,11 @@ impl Reconciler {
lazy: bool,
) -> Result<(), ReconcileError> {
if !node.is_available() && config.mode == LocationConfigMode::Detached {
// Attempts to detach from offline nodes may be imitated without doing I/O: a node which is offline
// will get fully reconciled wrt the shard's intent state when it is reactivated, irrespective of
// what we put into `observed`, in [`crate::service::Service::node_activate_reconcile`]
tracing::info!("Node {node} is unavailable during detach: proceeding anyway, it will be detached on next activation");
self.observed.locations.remove(&node.get_id());
// Skip if we cannot detach right now due to the node being unavailable. It will be
// handled by the background reconciliation loop.
tracing::info!(
"Node {node} is unavailable during detach: proceeding anyway, it will be detached via background reconciliation"
);
return Ok(());
}
@@ -749,6 +748,8 @@ impl Reconciler {
};
if increment_generation {
pausable_failpoint!("reconciler-pre-increment-generation");
let generation = self
.persistence
.increment_generation(self.tenant_shard_id, node.get_id())
@@ -824,7 +825,7 @@ impl Reconciler {
.handle_detach(self.tenant_shard_id, self.shard.stripe_size);
}
failpoint_support::sleep_millis_async!("sleep-on-reconcile-epilogue");
pausable_failpoint!("reconciler-epilogue");
Ok(())
}

View File

@@ -83,6 +83,7 @@ use utils::{
generation::Generation,
http::error::ApiError,
id::{NodeId, TenantId, TimelineId},
pausable_failpoint,
sync::gate::Gate,
};
@@ -1024,10 +1025,12 @@ impl Service {
)
.await;
pausable_failpoint!("heartbeat-pre-node-state-configure");
// This is the code path for geniune availability transitions (i.e node
// goes unavailable and/or comes back online).
let res = self
.node_state_configure(node_id, Some(new_availability), None, &node_lock)
.node_state_configure(node_id, Some(new_availability), None)
.await;
match res {
@@ -1314,7 +1317,7 @@ impl Service {
if count_min != count_max {
// Aborting the split in the database and dropping the child shards is sufficient: the reconciliation in
// [`Self::startup_reconcile`] will implicitly drop the child shards on remote pageservers, or they'll
// be dropped later in [`Self::node_activate_reconcile`] if it isn't available right now.
// be dropped later via background reconciliation if it isn't available right now.
tracing::info!("Aborting shard split {tenant_id} {count_min:?} -> {count_max:?}");
let abort_status = persistence.abort_shard_split(tenant_id, count_max).await?;
@@ -1724,118 +1727,6 @@ impl Service {
}
}
// When the availability state of a node transitions to active, we must do a full reconciliation
// of LocationConfigs on that node. This is because while a node was offline:
// - we might have proceeded through startup_reconcile without checking for extraneous LocationConfigs on this node
// - aborting a tenant shard split might have left rogue child shards behind on this node.
//
// This function must complete _before_ setting a `Node` to Active: once it is set to Active, other
// Reconcilers might communicate with the node, and these must not overlap with the work we do in
// this function.
//
// The reconciliation logic in here is very similar to what [`Self::startup_reconcile`] does, but
// for written for a single node rather than as a batch job for all nodes.
#[tracing::instrument(skip_all, fields(node_id=%node.get_id()))]
async fn node_activate_reconcile(
&self,
mut node: Node,
_lock: &TracingExclusiveGuard<NodeOperations>,
) -> Result<(), ApiError> {
// This Node is a mutable local copy: we will set it active so that we can use its
// API client to reconcile with the node. The Node in [`Self::nodes`] will get updated
// later.
node.set_availability(NodeAvailability::Active(PageserverUtilization::full()));
let configs = match node
.with_client_retries(
|client| async move { client.list_location_config().await },
&self.config.jwt_token,
1,
5,
SHORT_RECONCILE_TIMEOUT,
&self.cancel,
)
.await
{
None => {
// We're shutting down (the Node's cancellation token can't have fired, because
// we're the only scope that has a reference to it, and we didn't fire it).
return Err(ApiError::ShuttingDown);
}
Some(Err(e)) => {
// This node didn't succeed listing its locations: it may not proceed to active state
// as it is apparently unavailable.
return Err(ApiError::PreconditionFailed(
format!("Failed to query node location configs, cannot activate ({e})").into(),
));
}
Some(Ok(configs)) => configs,
};
tracing::info!("Loaded {} LocationConfigs", configs.tenant_shards.len());
let mut cleanup = Vec::new();
{
let mut locked = self.inner.write().unwrap();
for (tenant_shard_id, observed_loc) in configs.tenant_shards {
let Some(tenant_shard) = locked.tenants.get_mut(&tenant_shard_id) else {
cleanup.push(tenant_shard_id);
continue;
};
tenant_shard
.observed
.locations
.insert(node.get_id(), ObservedStateLocation { conf: observed_loc });
}
}
for tenant_shard_id in cleanup {
tracing::info!("Detaching {tenant_shard_id}");
match node
.with_client_retries(
|client| async move {
let config = LocationConfig {
mode: LocationConfigMode::Detached,
generation: None,
secondary_conf: None,
shard_number: tenant_shard_id.shard_number.0,
shard_count: tenant_shard_id.shard_count.literal(),
shard_stripe_size: 0,
tenant_conf: models::TenantConfig::default(),
};
client
.location_config(tenant_shard_id, config, None, false)
.await
},
&self.config.jwt_token,
1,
5,
SHORT_RECONCILE_TIMEOUT,
&self.cancel,
)
.await
{
None => {
// We're shutting down (the Node's cancellation token can't have fired, because
// we're the only scope that has a reference to it, and we didn't fire it).
return Err(ApiError::ShuttingDown);
}
Some(Err(e)) => {
// Do not let the node proceed to Active state if it is not responsive to requests
// to detach. This could happen if e.g. a shutdown bug in the pageserver is preventing
// detach completing: we should not let this node back into the set of nodes considered
// okay for scheduling.
return Err(ApiError::Conflict(format!(
"Node {node} failed to detach {tenant_shard_id}: {e}"
)));
}
Some(Ok(_)) => {}
};
}
Ok(())
}
pub(crate) async fn re_attach(
&self,
reattach_req: ReAttachRequest,
@@ -1923,9 +1814,8 @@ impl Service {
}
}
// We consider a node Active once we have composed a re-attach response, but we
// do not call [`Self::node_activate_reconcile`]: the handling of the re-attach response
// implicitly synchronizes the LocationConfigs on the node.
// We consider a node Active once we have composed a re-attach response:
// the handling of the re-attach response implicitly synchronizes the LocationConfigs on the node.
//
// Setting a node active unblocks any Reconcilers that might write to the location config API,
// but those requests will not be accepted by the node until it has finished processing
@@ -2492,6 +2382,7 @@ impl Service {
// Persist updates
// Ordering: write to the database before applying changes in-memory, so that
// we will not appear time-travel backwards on a restart.
let mut schedule_context = ScheduleContext::default();
for ShardUpdate {
tenant_shard_id,
@@ -5507,7 +5398,6 @@ impl Service {
node_id: NodeId,
availability: Option<NodeAvailability>,
scheduling: Option<NodeSchedulingPolicy>,
node_lock: &TracingExclusiveGuard<NodeOperations>,
) -> Result<AvailabilityTransition, ApiError> {
if let Some(scheduling) = scheduling {
// Scheduling is a persistent part of Node: we must write updates to the database before
@@ -5515,32 +5405,15 @@ impl Service {
self.persistence.update_node(node_id, scheduling).await?;
}
// If we're activating a node, then before setting it active we must reconcile any shard locations
// on that node, in case it is out of sync, e.g. due to being unavailable during controller startup,
// by calling [`Self::node_activate_reconcile`]
//
// The transition we calculate here remains valid later in the function because we hold the op lock on the node:
// nothing else can mutate its availability while we run.
let availability_transition = if let Some(input_availability) = availability.as_ref() {
let (activate_node, availability_transition) = {
let locked = self.inner.read().unwrap();
let Some(node) = locked.nodes.get(&node_id) else {
return Err(ApiError::NotFound(
anyhow::anyhow!("Node {} not registered", node_id).into(),
));
};
(
node.clone(),
node.get_availability_transition(input_availability),
)
let locked = self.inner.read().unwrap();
let Some(node) = locked.nodes.get(&node_id) else {
return Err(ApiError::NotFound(
anyhow::anyhow!("Node {} not registered", node_id).into(),
));
};
if matches!(availability_transition, AvailabilityTransition::ToActive) {
self.node_activate_reconcile(activate_node, node_lock)
.await?;
}
availability_transition
node.get_availability_transition(input_availability)
} else {
AvailabilityTransition::Unchanged
};
@@ -5730,7 +5603,7 @@ impl Service {
trace_exclusive_lock(&self.node_op_locks, node_id, NodeOperations::Configure).await;
let transition = self
.node_state_configure(node_id, availability, scheduling, &node_lock)
.node_state_configure(node_id, availability, scheduling)
.await?;
self.handle_node_availability_transition(node_id, transition, &node_lock)
.await

View File

@@ -2521,6 +2521,7 @@ class NeonPageserver(PgProtocol, LogUtils):
self,
extra_env_vars: dict[str, str] | None = None,
timeout_in_seconds: int | None = None,
await_active: bool = True,
) -> Self:
"""
Start the page server.
@@ -2547,8 +2548,10 @@ class NeonPageserver(PgProtocol, LogUtils):
)
self.running = True
if self.env.storage_controller.running and self.env.storage_controller.node_registered(
self.id
if (
await_active
and self.env.storage_controller.running
and self.env.storage_controller.node_registered(self.id)
):
self.env.storage_controller.poll_node_status(
self.id, PageserverAvailability.ACTIVE, None, max_attempts=200, backoff=0.1

View File

@@ -17,6 +17,7 @@ from fixtures.compute_reconfigure import ComputeReconfigure
from fixtures.log_helper import log
from fixtures.neon_fixtures import (
DEFAULT_AZ_ID,
LogCursor,
NeonEnv,
NeonEnvBuilder,
NeonPageserver,
@@ -2406,7 +2407,14 @@ def test_storage_controller_step_down(neon_env_builder: NeonEnvBuilder):
env.storage_controller.tenant_create(tid)
env.storage_controller.reconcile_until_idle()
env.storage_controller.configure_failpoints(("sleep-on-reconcile-epilogue", "return(10000)"))
env.storage_controller.configure_failpoints(("reconciler-epilogue", "pause"))
def unpause_failpoint():
time.sleep(2)
env.storage_controller.configure_failpoints(("reconciler-epilogue", "off"))
thread = threading.Thread(target=unpause_failpoint)
thread.start()
# Make a change to the tenant config to trigger a slow reconcile
virtual_ps_http = PageserverHttpClient(env.storage_controller_port, lambda: True)
@@ -2421,6 +2429,8 @@ def test_storage_controller_step_down(neon_env_builder: NeonEnvBuilder):
observed_state = env.storage_controller.step_down()
log.info(f"Storage controller stepped down with {observed_state=}")
thread.join()
# Validate that we waited for the slow reconcile to complete
# and updated the observed state in the storcon before stepping down.
node_id = str(env.pageserver.id)
@@ -3294,3 +3304,113 @@ def test_storage_controller_detached_stopped(
# Confirm the detach happened
assert env.pageserver.http_client().tenant_list_locations()["tenant_shards"] == []
@run_only_on_default_postgres("Postgres version makes no difference here")
def test_storage_controller_node_flap_detach_race(
neon_env_builder: NeonEnvBuilder,
):
"""
Reproducer for https://github.com/neondatabase/neon/issues/10253.
When a node's availability flaps, the reconciliations spawned by the node
going offline may race with the reconciliation done when then node comes
back online.
"""
neon_env_builder.num_pageservers = 4
env = neon_env_builder.init_configs()
env.start()
tenant_id = TenantId.generate()
env.storage_controller.tenant_create(
tenant_id,
shard_count=2,
)
env.storage_controller.reconcile_until_idle()
stopped_nodes = [s["node_id"] for s in env.storage_controller.locate(tenant_id)]
def has_hit_failpoint(failpoint: str, offset: LogCursor | None = None) -> LogCursor:
res = env.storage_controller.log_contains(f"at failpoint {failpoint}", offset=offset)
assert res
return res[1]
# Stop the nodes which host attached shards.
# This will trigger reconciliations which pause before incrmenenting the generation,
# and, more importantly, updating the `generation_pageserver` of the shards.
env.storage_controller.configure_failpoints(("reconciler-pre-increment-generation", "pause"))
for node_id in stopped_nodes:
env.get_pageserver(node_id).stop(immediate=True)
def failure_handled() -> LogCursor:
stop_offset = None
for node_id in stopped_nodes:
res = env.storage_controller.log_contains(f"node {node_id} going offline")
assert res
stop_offset = res[1]
assert stop_offset
return stop_offset
offset = wait_until(failure_handled)
# Now restart the nodes and make them pause before marking themselves as available
# or running the activation reconciliation.
env.storage_controller.configure_failpoints(("heartbeat-pre-node-state-configure", "pause"))
for node_id in stopped_nodes:
env.get_pageserver(node_id).start(await_active=False)
offset = wait_until(
lambda: has_hit_failpoint("heartbeat-pre-node-state-configure", offset=offset)
)
# The nodes have restarted and are waiting to perform activaction reconciliation.
# Unpause the initial reconciliation triggered by the nodes going offline.
# It will attempt to detach from the old location, but notice that the old location
# is not yet available, and then stop before processing the results of the reconciliation.
env.storage_controller.configure_failpoints(("reconciler-epilogue", "pause"))
env.storage_controller.configure_failpoints(("reconciler-pre-increment-generation", "off"))
offset = wait_until(lambda: has_hit_failpoint("reconciler-epilogue", offset=offset))
# Let the nodes perform activation reconciliation while still holding up processing the result
# from the initial reconcile triggered by going offline.
env.storage_controller.configure_failpoints(("heartbeat-pre-node-state-configure", "off"))
def activate_reconciliation_done():
for node_id in stopped_nodes:
assert env.storage_controller.log_contains(
f"Node {node_id} transition to active", offset=offset
)
wait_until(activate_reconciliation_done)
# Finally, allow the initial reconcile to finish up.
env.storage_controller.configure_failpoints(("reconciler-epilogue", "off"))
# Give things a chance to settle and validate that no stale locations exist
env.storage_controller.reconcile_until_idle()
def validate_locations():
shard_locations = defaultdict(list)
for ps in env.pageservers:
locations = ps.http_client().tenant_list_locations()["tenant_shards"]
for loc in locations:
shard_locations[loc[0]].append(
{"generation": loc[1]["generation"], "mode": loc[1]["mode"], "node": ps.id}
)
log.info(f"Shard locations: {shard_locations}")
attached_locations = {
k: list(filter(lambda loc: loc["mode"] == "AttachedSingle", v))
for k, v in shard_locations.items()
}
for shard, locs in attached_locations.items():
assert len(locs) == 1, f"{shard} has {len(locs)} attached locations"
wait_until(validate_locations, timeout=10)