diff --git a/control_plane/attachment_service/src/service.rs b/control_plane/attachment_service/src/service.rs index a8f97eeaa4..8a8bd98837 100644 --- a/control_plane/attachment_service/src/service.rs +++ b/control_plane/attachment_service/src/service.rs @@ -36,6 +36,7 @@ use pageserver_api::{ }, }; use pageserver_client::mgmt_api; +use tokio::sync::OwnedRwLockWriteGuard; use tokio_util::sync::CancellationToken; use tracing::instrument; use utils::{ @@ -690,9 +691,6 @@ impl Service { // Retry until shutdown: we must keep this request object alive until it is properly // processed, as it holds a lock guard that prevents other operations trying to do things // to the tenant while it is in a weird part-split state. - // - // TODO: if a node goes Offline, we should make abort_tenant_shard_split return Ok, and then - // have some logic that insists on a full reconciliation with a node when it goes back to Active. while !self.cancel.is_cancelled() { match self.abort_tenant_shard_split(&op).await { Ok(_) => break, @@ -1055,19 +1053,122 @@ impl Service { } } + // When the availability state of a node transitions to active, we must do a full reconciliation + // of LocationConfigs on that node. This is because while a node was offline: + // - we might have proceeded through startup_reconcile without checking for extraneous LocationConfigs on this node + // - aborting a tenant shard split might have left rogue child shards behind on this node. + // + // This function must complete _before_ setting a `Node` to Active: once it is set to Active, other + // Reconcilers might communicate with the node, and these must not overlap with the work we do in + // this function. + // + // The reconciliation logic in here is very similar to what [`Self::startup_reconcile`] does, but + // for written for a single node rather than as a batch job for all nodes. + #[tracing::instrument(skip_all, fields(node_id=%node.get_id()))] + async fn node_activate_reconcile( + &self, + mut node: Node, + _lock: &OwnedRwLockWriteGuard<()>, + ) -> Result<(), ApiError> { + // This Node is a mutable local copy: we will set it active so that we can use its + // API client to reconcile with the node. The Node in [`Self::nodes`] will get updated + // later. + node.set_availability(NodeAvailability::Active); + + let configs = match node + .with_client_retries( + |client| async move { client.list_location_config().await }, + &self.config.jwt_token, + 1, + 5, + SHORT_RECONCILE_TIMEOUT, + &self.cancel, + ) + .await + { + None => { + // We're shutting down (the Node's cancellation token can't have fired, because + // we're the only scope that has a reference to it, and we didn't fire it). + return Err(ApiError::ShuttingDown); + } + Some(Err(e)) => { + // This node didn't succeed listing its locations: it may not proceed to active state + // as it is apparently unavailable. + return Err(ApiError::PreconditionFailed( + format!("Failed to query node location configs, cannot activate ({e})").into(), + )); + } + Some(Ok(configs)) => configs, + }; + tracing::info!("Loaded {} LocationConfigs", configs.tenant_shards.len()); + + let mut cleanup = Vec::new(); + { + let mut locked = self.inner.write().unwrap(); + + for (tenant_shard_id, observed_loc) in configs.tenant_shards { + let Some(tenant_state) = locked.tenants.get_mut(&tenant_shard_id) else { + cleanup.push(tenant_shard_id); + continue; + }; + tenant_state + .observed + .locations + .insert(node.get_id(), ObservedStateLocation { conf: observed_loc }); + } + } + + for tenant_shard_id in cleanup { + tracing::info!("Detaching {tenant_shard_id}"); + match node + .with_client_retries( + |client| async move { + let config = LocationConfig { + mode: LocationConfigMode::Detached, + generation: None, + secondary_conf: None, + shard_number: tenant_shard_id.shard_number.0, + shard_count: tenant_shard_id.shard_count.literal(), + shard_stripe_size: 0, + tenant_conf: models::TenantConfig::default(), + }; + client + .location_config(tenant_shard_id, config, None, false) + .await + }, + &self.config.jwt_token, + 1, + 5, + SHORT_RECONCILE_TIMEOUT, + &self.cancel, + ) + .await + { + None => { + // We're shutting down (the Node's cancellation token can't have fired, because + // we're the only scope that has a reference to it, and we didn't fire it). + return Err(ApiError::ShuttingDown); + } + Some(Err(e)) => { + // Do not let the node proceed to Active state if it is not responsive to requests + // to detach. This could happen if e.g. a shutdown bug in the pageserver is preventing + // detach completing: we should not let this node back into the set of nodes considered + // okay for scheduling. + return Err(ApiError::Conflict(format!( + "Node {node} failed to detach {tenant_shard_id}: {e}" + ))); + } + Some(Ok(_)) => {} + }; + } + + Ok(()) + } + pub(crate) async fn re_attach( &self, reattach_req: ReAttachRequest, ) -> Result { - // Take a re-attach as indication that the node is available: this is a precursor to proper - // heartbeating in https://github.com/neondatabase/neon/issues/6844 - self.node_configure(NodeConfigureRequest { - node_id: reattach_req.node_id, - availability: Some(NodeAvailability::Active), - scheduling: None, - }) - .await?; - // Ordering: we must persist generation number updates before making them visible in the in-memory state let incremented_generations = self.persistence.re_attach(reattach_req.node_id).await?; @@ -1079,6 +1180,7 @@ impl Service { // Apply the updated generation to our in-memory state let mut locked = self.inner.write().unwrap(); + let (nodes, tenants, _scheduler) = locked.parts_mut(); let mut response = ReAttachResponse { tenants: Vec::new(), @@ -1090,7 +1192,7 @@ impl Service { gen: new_gen.into().unwrap(), }); // Apply the new generation number to our in-memory state - let shard_state = locked.tenants.get_mut(&tenant_shard_id); + let shard_state = tenants.get_mut(&tenant_shard_id); let Some(shard_state) = shard_state else { // Not fatal. This edge case requires a re-attach to happen // between inserting a new tenant shard in to the database, and updating our in-memory @@ -1141,6 +1243,25 @@ impl Service { // request in flight over the network: TODO handle that by making location_conf API refuse // to go backward in generations. } + + // We consider a node Active once we have composed a re-attach response, but we + // do not call [`Self::node_activate_reconcile`]: the handling of the re-attach response + // implicitly synchronizes the LocationConfigs on the node. + // + // Setting a node active unblocks any Reconcilers that might write to the location config API, + // but those requests will not be accepted by the node until it has finished processing + // the re-attach response. + if let Some(node) = nodes.get(&reattach_req.node_id) { + if !node.is_available() { + let mut new_nodes = (**nodes).clone(); + new_nodes + .get_mut(&reattach_req.node_id) + .map(|n| n.set_availability(NodeAvailability::Active)); + let new_nodes = Arc::new(new_nodes); + *nodes = new_nodes; + } + } + Ok(response) } @@ -3221,6 +3342,27 @@ impl Service { ) -> Result<(), ApiError> { let _node_lock = self.node_locks.exclusive(config_req.node_id).await; + let activate = { + let locked = self.inner.read().unwrap(); + let Some(node) = locked.nodes.get(&config_req.node_id) else { + return Err(ApiError::NotFound( + anyhow::anyhow!("Node {} not registered", config_req.node_id).into(), + )); + }; + + if !node.is_available() + && matches!(config_req.availability, Some(NodeAvailability::Active)) + { + Some(node.clone()) + } else { + None + } + }; + + if let Some(node) = activate { + self.node_activate_reconcile(node, &_node_lock).await?; + } + if let Some(scheduling) = config_req.scheduling { // Scheduling is a persistent part of Node: we must write updates to the database before // applying them in memory