mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-15 01:12:56 +00:00
storage_controller: remove node activation reconciliation
This commit is contained in:
@@ -211,9 +211,8 @@ impl Reconciler {
|
||||
lazy: bool,
|
||||
) -> Result<(), ReconcileError> {
|
||||
if !node.is_available() && config.mode == LocationConfigMode::Detached {
|
||||
// [`crate::service::Service::node_activate_reconcile`] will update the observed state
|
||||
// when the node comes back online. At that point, the intent and observed states will
|
||||
// be mismatched and a background reconciliation will detach.
|
||||
// Skip if we cannot detach right now due to the node being unavailable. It will be
|
||||
// handled by the background reconciliation loop.
|
||||
tracing::info!(
|
||||
"Node {node} is unavailable during detach: proceeding anyway, it will be detached via background reconciliation"
|
||||
);
|
||||
|
||||
@@ -1030,7 +1030,7 @@ impl Service {
|
||||
// This is the code path for geniune availability transitions (i.e node
|
||||
// goes unavailable and/or comes back online).
|
||||
let res = self
|
||||
.node_state_configure(node_id, Some(new_availability), None, &node_lock)
|
||||
.node_state_configure(node_id, Some(new_availability), None)
|
||||
.await;
|
||||
|
||||
match res {
|
||||
@@ -1317,7 +1317,7 @@ impl Service {
|
||||
if count_min != count_max {
|
||||
// Aborting the split in the database and dropping the child shards is sufficient: the reconciliation in
|
||||
// [`Self::startup_reconcile`] will implicitly drop the child shards on remote pageservers, or they'll
|
||||
// be dropped later in [`Self::node_activate_reconcile`] if it isn't available right now.
|
||||
// be dropped later via background reconciliation if it isn't available right now.
|
||||
tracing::info!("Aborting shard split {tenant_id} {count_min:?} -> {count_max:?}");
|
||||
let abort_status = persistence.abort_shard_split(tenant_id, count_max).await?;
|
||||
|
||||
@@ -1727,118 +1727,6 @@ impl Service {
|
||||
}
|
||||
}
|
||||
|
||||
// When the availability state of a node transitions to active, we must do a full reconciliation
|
||||
// of LocationConfigs on that node. This is because while a node was offline:
|
||||
// - we might have proceeded through startup_reconcile without checking for extraneous LocationConfigs on this node
|
||||
// - aborting a tenant shard split might have left rogue child shards behind on this node.
|
||||
//
|
||||
// This function must complete _before_ setting a `Node` to Active: once it is set to Active, other
|
||||
// Reconcilers might communicate with the node, and these must not overlap with the work we do in
|
||||
// this function.
|
||||
//
|
||||
// The reconciliation logic in here is very similar to what [`Self::startup_reconcile`] does, but
|
||||
// for written for a single node rather than as a batch job for all nodes.
|
||||
#[tracing::instrument(skip_all, fields(node_id=%node.get_id()))]
|
||||
async fn node_activate_reconcile(
|
||||
&self,
|
||||
mut node: Node,
|
||||
_lock: &TracingExclusiveGuard<NodeOperations>,
|
||||
) -> Result<(), ApiError> {
|
||||
// This Node is a mutable local copy: we will set it active so that we can use its
|
||||
// API client to reconcile with the node. The Node in [`Self::nodes`] will get updated
|
||||
// later.
|
||||
node.set_availability(NodeAvailability::Active(PageserverUtilization::full()));
|
||||
|
||||
let configs = match node
|
||||
.with_client_retries(
|
||||
|client| async move { client.list_location_config().await },
|
||||
&self.config.jwt_token,
|
||||
1,
|
||||
5,
|
||||
SHORT_RECONCILE_TIMEOUT,
|
||||
&self.cancel,
|
||||
)
|
||||
.await
|
||||
{
|
||||
None => {
|
||||
// We're shutting down (the Node's cancellation token can't have fired, because
|
||||
// we're the only scope that has a reference to it, and we didn't fire it).
|
||||
return Err(ApiError::ShuttingDown);
|
||||
}
|
||||
Some(Err(e)) => {
|
||||
// This node didn't succeed listing its locations: it may not proceed to active state
|
||||
// as it is apparently unavailable.
|
||||
return Err(ApiError::PreconditionFailed(
|
||||
format!("Failed to query node location configs, cannot activate ({e})").into(),
|
||||
));
|
||||
}
|
||||
Some(Ok(configs)) => configs,
|
||||
};
|
||||
tracing::info!("Loaded {} LocationConfigs", configs.tenant_shards.len());
|
||||
|
||||
let mut cleanup = Vec::new();
|
||||
{
|
||||
let mut locked = self.inner.write().unwrap();
|
||||
|
||||
for (tenant_shard_id, observed_loc) in configs.tenant_shards {
|
||||
let Some(tenant_shard) = locked.tenants.get_mut(&tenant_shard_id) else {
|
||||
cleanup.push(tenant_shard_id);
|
||||
continue;
|
||||
};
|
||||
tenant_shard
|
||||
.observed
|
||||
.locations
|
||||
.insert(node.get_id(), ObservedStateLocation { conf: observed_loc });
|
||||
}
|
||||
}
|
||||
|
||||
for tenant_shard_id in cleanup {
|
||||
tracing::info!("Detaching {tenant_shard_id}");
|
||||
match node
|
||||
.with_client_retries(
|
||||
|client| async move {
|
||||
let config = LocationConfig {
|
||||
mode: LocationConfigMode::Detached,
|
||||
generation: None,
|
||||
secondary_conf: None,
|
||||
shard_number: tenant_shard_id.shard_number.0,
|
||||
shard_count: tenant_shard_id.shard_count.literal(),
|
||||
shard_stripe_size: 0,
|
||||
tenant_conf: models::TenantConfig::default(),
|
||||
};
|
||||
client
|
||||
.location_config(tenant_shard_id, config, None, false)
|
||||
.await
|
||||
},
|
||||
&self.config.jwt_token,
|
||||
1,
|
||||
5,
|
||||
SHORT_RECONCILE_TIMEOUT,
|
||||
&self.cancel,
|
||||
)
|
||||
.await
|
||||
{
|
||||
None => {
|
||||
// We're shutting down (the Node's cancellation token can't have fired, because
|
||||
// we're the only scope that has a reference to it, and we didn't fire it).
|
||||
return Err(ApiError::ShuttingDown);
|
||||
}
|
||||
Some(Err(e)) => {
|
||||
// Do not let the node proceed to Active state if it is not responsive to requests
|
||||
// to detach. This could happen if e.g. a shutdown bug in the pageserver is preventing
|
||||
// detach completing: we should not let this node back into the set of nodes considered
|
||||
// okay for scheduling.
|
||||
return Err(ApiError::Conflict(format!(
|
||||
"Node {node} failed to detach {tenant_shard_id}: {e}"
|
||||
)));
|
||||
}
|
||||
Some(Ok(_)) => {}
|
||||
};
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) async fn re_attach(
|
||||
&self,
|
||||
reattach_req: ReAttachRequest,
|
||||
@@ -1926,9 +1814,8 @@ impl Service {
|
||||
}
|
||||
}
|
||||
|
||||
// We consider a node Active once we have composed a re-attach response, but we
|
||||
// do not call [`Self::node_activate_reconcile`]: the handling of the re-attach response
|
||||
// implicitly synchronizes the LocationConfigs on the node.
|
||||
// We consider a node Active once we have composed a re-attach response:
|
||||
// the handling of the re-attach response implicitly synchronizes the LocationConfigs on the node.
|
||||
//
|
||||
// Setting a node active unblocks any Reconcilers that might write to the location config API,
|
||||
// but those requests will not be accepted by the node until it has finished processing
|
||||
@@ -5511,7 +5398,6 @@ impl Service {
|
||||
node_id: NodeId,
|
||||
availability: Option<NodeAvailability>,
|
||||
scheduling: Option<NodeSchedulingPolicy>,
|
||||
node_lock: &TracingExclusiveGuard<NodeOperations>,
|
||||
) -> Result<AvailabilityTransition, ApiError> {
|
||||
if let Some(scheduling) = scheduling {
|
||||
// Scheduling is a persistent part of Node: we must write updates to the database before
|
||||
@@ -5519,32 +5405,15 @@ impl Service {
|
||||
self.persistence.update_node(node_id, scheduling).await?;
|
||||
}
|
||||
|
||||
// If we're activating a node, then before setting it active we must reconcile any shard locations
|
||||
// on that node, in case it is out of sync, e.g. due to being unavailable during controller startup,
|
||||
// by calling [`Self::node_activate_reconcile`]
|
||||
//
|
||||
// The transition we calculate here remains valid later in the function because we hold the op lock on the node:
|
||||
// nothing else can mutate its availability while we run.
|
||||
let availability_transition = if let Some(input_availability) = availability.as_ref() {
|
||||
let (activate_node, availability_transition) = {
|
||||
let locked = self.inner.read().unwrap();
|
||||
let Some(node) = locked.nodes.get(&node_id) else {
|
||||
return Err(ApiError::NotFound(
|
||||
anyhow::anyhow!("Node {} not registered", node_id).into(),
|
||||
));
|
||||
};
|
||||
|
||||
(
|
||||
node.clone(),
|
||||
node.get_availability_transition(input_availability),
|
||||
)
|
||||
let locked = self.inner.read().unwrap();
|
||||
let Some(node) = locked.nodes.get(&node_id) else {
|
||||
return Err(ApiError::NotFound(
|
||||
anyhow::anyhow!("Node {} not registered", node_id).into(),
|
||||
));
|
||||
};
|
||||
|
||||
if matches!(availability_transition, AvailabilityTransition::ToActive) {
|
||||
self.node_activate_reconcile(activate_node, node_lock)
|
||||
.await?;
|
||||
}
|
||||
availability_transition
|
||||
node.get_availability_transition(input_availability)
|
||||
} else {
|
||||
AvailabilityTransition::Unchanged
|
||||
};
|
||||
@@ -5734,7 +5603,7 @@ impl Service {
|
||||
trace_exclusive_lock(&self.node_op_locks, node_id, NodeOperations::Configure).await;
|
||||
|
||||
let transition = self
|
||||
.node_state_configure(node_id, availability, scheduling, &node_lock)
|
||||
.node_state_configure(node_id, availability, scheduling)
|
||||
.await?;
|
||||
self.handle_node_availability_transition(node_id, transition, &node_lock)
|
||||
.await
|
||||
|
||||
Reference in New Issue
Block a user