mirror of
https://github.com/neondatabase/neon.git
synced 2026-06-02 04:50:38 +00:00
storage controller: reconcile nodes on activation
This commit is contained in:
@@ -36,6 +36,7 @@ use pageserver_api::{
|
||||
},
|
||||
};
|
||||
use pageserver_client::mgmt_api;
|
||||
use tokio::sync::OwnedRwLockWriteGuard;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::instrument;
|
||||
use utils::{
|
||||
@@ -690,9 +691,6 @@ impl Service {
|
||||
// Retry until shutdown: we must keep this request object alive until it is properly
|
||||
// processed, as it holds a lock guard that prevents other operations trying to do things
|
||||
// to the tenant while it is in a weird part-split state.
|
||||
//
|
||||
// TODO: if a node goes Offline, we should make abort_tenant_shard_split return Ok, and then
|
||||
// have some logic that insists on a full reconciliation with a node when it goes back to Active.
|
||||
while !self.cancel.is_cancelled() {
|
||||
match self.abort_tenant_shard_split(&op).await {
|
||||
Ok(_) => break,
|
||||
@@ -1055,19 +1053,122 @@ impl Service {
|
||||
}
|
||||
}
|
||||
|
||||
// When the availability state of a node transitions to active, we must do a full reconciliation
|
||||
// of LocationConfigs on that node. This is because while a node was offline:
|
||||
// - we might have proceeded through startup_reconcile without checking for extraneous LocationConfigs on this node
|
||||
// - aborting a tenant shard split might have left rogue child shards behind on this node.
|
||||
//
|
||||
// This function must complete _before_ setting a `Node` to Active: once it is set to Active, other
|
||||
// Reconcilers might communicate with the node, and these must not overlap with the work we do in
|
||||
// this function.
|
||||
//
|
||||
// The reconciliation logic in here is very similar to what [`Self::startup_reconcile`] does, but
|
||||
// for written for a single node rather than as a batch job for all nodes.
|
||||
#[tracing::instrument(skip_all, fields(node_id=%node.get_id()))]
|
||||
async fn node_activate_reconcile(
|
||||
&self,
|
||||
mut node: Node,
|
||||
_lock: &OwnedRwLockWriteGuard<()>,
|
||||
) -> Result<(), ApiError> {
|
||||
// This Node is a mutable local copy: we will set it active so that we can use its
|
||||
// API client to reconcile with the node. The Node in [`Self::nodes`] will get updated
|
||||
// later.
|
||||
node.set_availability(NodeAvailability::Active);
|
||||
|
||||
let configs = match node
|
||||
.with_client_retries(
|
||||
|client| async move { client.list_location_config().await },
|
||||
&self.config.jwt_token,
|
||||
1,
|
||||
5,
|
||||
SHORT_RECONCILE_TIMEOUT,
|
||||
&self.cancel,
|
||||
)
|
||||
.await
|
||||
{
|
||||
None => {
|
||||
// We're shutting down (the Node's cancellation token can't have fired, because
|
||||
// we're the only scope that has a reference to it, and we didn't fire it).
|
||||
return Err(ApiError::ShuttingDown);
|
||||
}
|
||||
Some(Err(e)) => {
|
||||
// This node didn't succeed listing its locations: it may not proceed to active state
|
||||
// as it is apparently unavailable.
|
||||
return Err(ApiError::PreconditionFailed(
|
||||
format!("Failed to query node location configs, cannot activate ({e})").into(),
|
||||
));
|
||||
}
|
||||
Some(Ok(configs)) => configs,
|
||||
};
|
||||
tracing::info!("Loaded {} LocationConfigs", configs.tenant_shards.len());
|
||||
|
||||
let mut cleanup = Vec::new();
|
||||
{
|
||||
let mut locked = self.inner.write().unwrap();
|
||||
|
||||
for (tenant_shard_id, observed_loc) in configs.tenant_shards {
|
||||
let Some(tenant_state) = locked.tenants.get_mut(&tenant_shard_id) else {
|
||||
cleanup.push(tenant_shard_id);
|
||||
continue;
|
||||
};
|
||||
tenant_state
|
||||
.observed
|
||||
.locations
|
||||
.insert(node.get_id(), ObservedStateLocation { conf: observed_loc });
|
||||
}
|
||||
}
|
||||
|
||||
for tenant_shard_id in cleanup {
|
||||
tracing::info!("Detaching {tenant_shard_id}");
|
||||
match node
|
||||
.with_client_retries(
|
||||
|client| async move {
|
||||
let config = LocationConfig {
|
||||
mode: LocationConfigMode::Detached,
|
||||
generation: None,
|
||||
secondary_conf: None,
|
||||
shard_number: tenant_shard_id.shard_number.0,
|
||||
shard_count: tenant_shard_id.shard_count.literal(),
|
||||
shard_stripe_size: 0,
|
||||
tenant_conf: models::TenantConfig::default(),
|
||||
};
|
||||
client
|
||||
.location_config(tenant_shard_id, config, None, false)
|
||||
.await
|
||||
},
|
||||
&self.config.jwt_token,
|
||||
1,
|
||||
5,
|
||||
SHORT_RECONCILE_TIMEOUT,
|
||||
&self.cancel,
|
||||
)
|
||||
.await
|
||||
{
|
||||
None => {
|
||||
// We're shutting down (the Node's cancellation token can't have fired, because
|
||||
// we're the only scope that has a reference to it, and we didn't fire it).
|
||||
return Err(ApiError::ShuttingDown);
|
||||
}
|
||||
Some(Err(e)) => {
|
||||
// Do not let the node proceed to Active state if it is not responsive to requests
|
||||
// to detach. This could happen if e.g. a shutdown bug in the pageserver is preventing
|
||||
// detach completing: we should not let this node back into the set of nodes considered
|
||||
// okay for scheduling.
|
||||
return Err(ApiError::Conflict(format!(
|
||||
"Node {node} failed to detach {tenant_shard_id}: {e}"
|
||||
)));
|
||||
}
|
||||
Some(Ok(_)) => {}
|
||||
};
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) async fn re_attach(
|
||||
&self,
|
||||
reattach_req: ReAttachRequest,
|
||||
) -> Result<ReAttachResponse, ApiError> {
|
||||
// Take a re-attach as indication that the node is available: this is a precursor to proper
|
||||
// heartbeating in https://github.com/neondatabase/neon/issues/6844
|
||||
self.node_configure(NodeConfigureRequest {
|
||||
node_id: reattach_req.node_id,
|
||||
availability: Some(NodeAvailability::Active),
|
||||
scheduling: None,
|
||||
})
|
||||
.await?;
|
||||
|
||||
// Ordering: we must persist generation number updates before making them visible in the in-memory state
|
||||
let incremented_generations = self.persistence.re_attach(reattach_req.node_id).await?;
|
||||
|
||||
@@ -1079,6 +1180,7 @@ impl Service {
|
||||
|
||||
// Apply the updated generation to our in-memory state
|
||||
let mut locked = self.inner.write().unwrap();
|
||||
let (nodes, tenants, _scheduler) = locked.parts_mut();
|
||||
|
||||
let mut response = ReAttachResponse {
|
||||
tenants: Vec::new(),
|
||||
@@ -1090,7 +1192,7 @@ impl Service {
|
||||
gen: new_gen.into().unwrap(),
|
||||
});
|
||||
// Apply the new generation number to our in-memory state
|
||||
let shard_state = locked.tenants.get_mut(&tenant_shard_id);
|
||||
let shard_state = tenants.get_mut(&tenant_shard_id);
|
||||
let Some(shard_state) = shard_state else {
|
||||
// Not fatal. This edge case requires a re-attach to happen
|
||||
// between inserting a new tenant shard in to the database, and updating our in-memory
|
||||
@@ -1141,6 +1243,25 @@ impl Service {
|
||||
// request in flight over the network: TODO handle that by making location_conf API refuse
|
||||
// to go backward in generations.
|
||||
}
|
||||
|
||||
// We consider a node Active once we have composed a re-attach response, but we
|
||||
// do not call [`Self::node_activate_reconcile`]: the handling of the re-attach response
|
||||
// implicitly synchronizes the LocationConfigs on the node.
|
||||
//
|
||||
// Setting a node active unblocks any Reconcilers that might write to the location config API,
|
||||
// but those requests will not be accepted by the node until it has finished processing
|
||||
// the re-attach response.
|
||||
if let Some(node) = nodes.get(&reattach_req.node_id) {
|
||||
if !node.is_available() {
|
||||
let mut new_nodes = (**nodes).clone();
|
||||
new_nodes
|
||||
.get_mut(&reattach_req.node_id)
|
||||
.map(|n| n.set_availability(NodeAvailability::Active));
|
||||
let new_nodes = Arc::new(new_nodes);
|
||||
*nodes = new_nodes;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(response)
|
||||
}
|
||||
|
||||
@@ -3221,6 +3342,27 @@ impl Service {
|
||||
) -> Result<(), ApiError> {
|
||||
let _node_lock = self.node_locks.exclusive(config_req.node_id).await;
|
||||
|
||||
let activate = {
|
||||
let locked = self.inner.read().unwrap();
|
||||
let Some(node) = locked.nodes.get(&config_req.node_id) else {
|
||||
return Err(ApiError::NotFound(
|
||||
anyhow::anyhow!("Node {} not registered", config_req.node_id).into(),
|
||||
));
|
||||
};
|
||||
|
||||
if !node.is_available()
|
||||
&& matches!(config_req.availability, Some(NodeAvailability::Active))
|
||||
{
|
||||
Some(node.clone())
|
||||
} else {
|
||||
None
|
||||
}
|
||||
};
|
||||
|
||||
if let Some(node) = activate {
|
||||
self.node_activate_reconcile(node, &_node_lock).await?;
|
||||
}
|
||||
|
||||
if let Some(scheduling) = config_req.scheduling {
|
||||
// Scheduling is a persistent part of Node: we must write updates to the database before
|
||||
// applying them in memory
|
||||
|
||||
Reference in New Issue
Block a user