storcon: apply all node status changes before handling transitions (#9281)

## Problem

When a node goes offline, we trigger reconciles to migrate shards away
from it. If multiple nodes go offline at the same time, we handled them in
sequence. Hence, we might migrate shards from the first offline node to the second
offline node and increase the unavailability period.

## Summary of changes

Refactor heartbeat delta handling to:
1. Update in memory state for all nodes first
2. Handle availability transitions one by one (we have full picture for each node after (1))

Closes https://github.com/neondatabase/neon/issues/9126
This commit is contained in:
Vlad Lazar
2024-10-08 17:55:25 +01:00
committed by GitHub
parent baf27ba6a3
commit 618680c299

View File

@@ -966,6 +966,8 @@ impl Service {
let res = self.heartbeater.heartbeat(nodes).await;
if let Ok(deltas) = res {
let mut to_handle = Vec::default();
for (node_id, state) in deltas.0 {
let new_availability = match state {
PageserverState::Available { utilization, .. } => {
@@ -997,14 +999,27 @@ impl Service {
}
};
let node_lock = trace_exclusive_lock(
&self.node_op_locks,
node_id,
NodeOperations::Configure,
)
.await;
// This is the code path for geniune availability transitions (i.e node
// goes unavailable and/or comes back online).
let res = self
.node_configure(node_id, Some(new_availability), None)
.node_state_configure(node_id, Some(new_availability), None, &node_lock)
.await;
match res {
Ok(()) => {}
Ok(transition) => {
// Keep hold of the lock until the availability transitions
// have been handled in
// [`Service::handle_node_availability_transitions`] in order avoid
// racing with [`Service::external_node_configure`].
to_handle.push((node_id, node_lock, transition));
}
Err(ApiError::NotFound(_)) => {
// This should be rare, but legitimate since the heartbeats are done
// on a snapshot of the nodes.
@@ -1014,13 +1029,37 @@ impl Service {
// Transition to active involves reconciling: if a node responds to a heartbeat then
// becomes unavailable again, we may get an error here.
tracing::error!(
"Failed to update node {} after heartbeat round: {}",
"Failed to update node state {} after heartbeat round: {}",
node_id,
err
);
}
}
}
// We collected all the transitions above and now we handle them.
let res = self.handle_node_availability_transitions(to_handle).await;
if let Err(errs) = res {
for (node_id, err) in errs {
match err {
ApiError::NotFound(_) => {
// This should be rare, but legitimate since the heartbeats are done
// on a snapshot of the nodes.
tracing::info!(
"Node {} was not found after heartbeat round",
node_id
);
}
err => {
tracing::error!(
"Failed to handle availability transition for {} after heartbeat round: {}",
node_id,
err
);
}
}
}
}
}
}
}
@@ -5299,15 +5338,17 @@ impl Service {
Ok(())
}
pub(crate) async fn node_configure(
/// Configure in-memory and persistent state of a node as requested
///
/// Note that this function does not trigger any immediate side effects in response
/// to the changes. That part is handled by [`Self::handle_node_availability_transition`].
async fn node_state_configure(
&self,
node_id: NodeId,
availability: Option<NodeAvailability>,
scheduling: Option<NodeSchedulingPolicy>,
) -> Result<(), ApiError> {
let _node_lock =
trace_exclusive_lock(&self.node_op_locks, node_id, NodeOperations::Configure).await;
node_lock: &TracingExclusiveGuard<NodeOperations>,
) -> Result<AvailabilityTransition, ApiError> {
if let Some(scheduling) = scheduling {
// Scheduling is a persistent part of Node: we must write updates to the database before
// applying them in memory
@@ -5336,7 +5377,7 @@ impl Service {
};
if matches!(availability_transition, AvailabilityTransition::ToActive) {
self.node_activate_reconcile(activate_node, &_node_lock)
self.node_activate_reconcile(activate_node, node_lock)
.await?;
}
availability_transition
@@ -5346,7 +5387,7 @@ impl Service {
// Apply changes from the request to our in-memory state for the Node
let mut locked = self.inner.write().unwrap();
let (nodes, tenants, scheduler) = locked.parts_mut();
let (nodes, _tenants, scheduler) = locked.parts_mut();
let mut new_nodes = (**nodes).clone();
@@ -5356,8 +5397,8 @@ impl Service {
));
};
if let Some(availability) = availability.as_ref() {
node.set_availability(availability.clone());
if let Some(availability) = availability {
node.set_availability(availability);
}
if let Some(scheduling) = scheduling {
@@ -5368,11 +5409,30 @@ impl Service {
scheduler.node_upsert(node);
let new_nodes = Arc::new(new_nodes);
locked.nodes = new_nodes;
Ok(availability_transition)
}
/// Handle availability transition of one node
///
/// Note that you should first call [`Self::node_state_configure`] to update
/// the in-memory state referencing that node. If you need to handle more than one transition
/// consider using [`Self::handle_node_availability_transitions`].
async fn handle_node_availability_transition(
&self,
node_id: NodeId,
transition: AvailabilityTransition,
_node_lock: &TracingExclusiveGuard<NodeOperations>,
) -> Result<(), ApiError> {
// Modify scheduling state for any Tenants that are affected by a change in the node's availability state.
match availability_transition {
match transition {
AvailabilityTransition::ToOffline => {
tracing::info!("Node {} transition to offline", node_id);
let mut locked = self.inner.write().unwrap();
let (nodes, tenants, scheduler) = locked.parts_mut();
let mut tenants_affected: usize = 0;
for (tenant_shard_id, tenant_shard) in tenants {
@@ -5382,14 +5442,14 @@ impl Service {
observed_loc.conf = None;
}
if new_nodes.len() == 1 {
if nodes.len() == 1 {
// Special case for single-node cluster: there is no point trying to reschedule
// any tenant shards: avoid doing so, in order to avoid spewing warnings about
// failures to schedule them.
continue;
}
if !new_nodes
if !nodes
.values()
.any(|n| matches!(n.may_schedule(), MaySchedule::Yes(_)))
{
@@ -5415,10 +5475,7 @@ impl Service {
tracing::warn!(%tenant_shard_id, "Scheduling error when marking pageserver {} offline: {e}", node_id);
}
Ok(()) => {
if self
.maybe_reconcile_shard(tenant_shard, &new_nodes)
.is_some()
{
if self.maybe_reconcile_shard(tenant_shard, nodes).is_some() {
tenants_affected += 1;
};
}
@@ -5433,9 +5490,13 @@ impl Service {
}
AvailabilityTransition::ToActive => {
tracing::info!("Node {} transition to active", node_id);
let mut locked = self.inner.write().unwrap();
let (nodes, tenants, _scheduler) = locked.parts_mut();
// When a node comes back online, we must reconcile any tenant that has a None observed
// location on the node.
for tenant_shard in locked.tenants.values_mut() {
for tenant_shard in tenants.values_mut() {
// If a reconciliation is already in progress, rely on the previous scheduling
// decision and skip triggering a new reconciliation.
if tenant_shard.reconciler.is_some() {
@@ -5444,7 +5505,7 @@ impl Service {
if let Some(observed_loc) = tenant_shard.observed.locations.get_mut(&node_id) {
if observed_loc.conf.is_none() {
self.maybe_reconcile_shard(tenant_shard, &new_nodes);
self.maybe_reconcile_shard(tenant_shard, nodes);
}
}
}
@@ -5465,11 +5526,54 @@ impl Service {
}
}
locked.nodes = new_nodes;
Ok(())
}
/// Handle availability transition for multiple nodes
///
/// Note that you should first call [`Self::node_state_configure`] for
/// all nodes being handled here for the handling to use fresh in-memory state.
async fn handle_node_availability_transitions(
&self,
transitions: Vec<(
NodeId,
TracingExclusiveGuard<NodeOperations>,
AvailabilityTransition,
)>,
) -> Result<(), Vec<(NodeId, ApiError)>> {
let mut errors = Vec::default();
for (node_id, node_lock, transition) in transitions {
let res = self
.handle_node_availability_transition(node_id, transition, &node_lock)
.await;
if let Err(err) = res {
errors.push((node_id, err));
}
}
if errors.is_empty() {
Ok(())
} else {
Err(errors)
}
}
pub(crate) async fn node_configure(
&self,
node_id: NodeId,
availability: Option<NodeAvailability>,
scheduling: Option<NodeSchedulingPolicy>,
) -> Result<(), ApiError> {
let node_lock =
trace_exclusive_lock(&self.node_op_locks, node_id, NodeOperations::Configure).await;
let transition = self
.node_state_configure(node_id, availability, scheduling, &node_lock)
.await?;
self.handle_node_availability_transition(node_id, transition, &node_lock)
.await
}
/// Wrapper around [`Self::node_configure`] which only allows changes while there is no ongoing
/// operation for HTTP api.
pub(crate) async fn external_node_configure(