diff --git a/storage_controller/src/service.rs b/storage_controller/src/service.rs index dcc1dac165..f74017c172 100644 --- a/storage_controller/src/service.rs +++ b/storage_controller/src/service.rs @@ -8,7 +8,7 @@ use std::{ }; use crate::{ - background_node_operations::{self, Controller}, + background_node_operations::{self, Controller, OperationError, MAX_RECONCILES_PER_OPERATION}, compute_hook::NotifyError, id_lock_map::{trace_exclusive_lock, trace_shared_lock, IdLockMap, WrappedWriteGuard}, persistence::{AbortShardSplitStatus, TenantFilter}, @@ -4996,4 +4996,100 @@ impl Service { // to complete. self.gate.close().await; } + + pub(crate) async fn drain_node( + &self, + node_id: NodeId, + cancel: CancellationToken, + ) -> Result<(), OperationError> { + tracing::info!(%node_id, "Starting drain background operation"); + + let mut last_inspected_shard: Option = None; + let mut inspected_all_shards = false; + let mut waiters = Vec::new(); + let mut schedule_context = ScheduleContext::default(); + + while !inspected_all_shards { + if cancel.is_cancelled() { + return Err(OperationError::Cancelled); + } + + { + let mut locked = self.inner.write().unwrap(); + let (nodes, tenants, scheduler) = locked.parts_mut(); + + let node = nodes.get(&node_id).ok_or(OperationError::NodeStateChanged( + format!("node {node_id} was removed").into(), + ))?; + + let current_policy = node.get_scheduling(); + if !matches!(current_policy, NodeSchedulingPolicy::Draining) { + // TODO(vlad): maybe cancel pending reconciles before erroring out. need to think + // about it + return Err(OperationError::NodeStateChanged( + format!("node {node_id} changed state to {current_policy:?}").into(), + )); + } + + let mut cursor = tenants.iter_mut().skip_while({ + let skip_past = last_inspected_shard; + move |(tid, _)| match skip_past { + Some(last) => **tid != last, + None => false, + } + }); + + while waiters.len() < MAX_RECONCILES_PER_OPERATION { + let (tid, tenant_shard) = match cursor.next() { + Some(some) => some, + None => { + inspected_all_shards = true; + break; + } + }; + + if tenant_shard.intent.demote_attached(scheduler, node_id) { + tenant_shard.sequence = tenant_shard.sequence.next(); + match tenant_shard.schedule(scheduler, &mut schedule_context) { + Err(e) => { + tracing::warn!(%tid, "Scheduling error when draining pageserver {} : {e}", node_id); + } + Ok(()) => { + let waiter = self.maybe_reconcile_shard(tenant_shard, nodes); + if let Some(some) = waiter { + waiters.push(some); + } + } + } + } + + last_inspected_shard = Some(*tid); + } + } + + waiters = self.kick_waiters(waiters, SHORT_RECONCILE_TIMEOUT).await; + } + + while !waiters.is_empty() { + waiters = self.kick_waiters(waiters, SHORT_RECONCILE_TIMEOUT).await; + } + + // At this point we have done the best we could to drain shards from this node. + // Set the node scheduling policy to `[NodeSchedulingPolicy::PauseForRestart]` + // to complete the drain. + if let Err(err) = self + .node_configure(node_id, None, Some(NodeSchedulingPolicy::PauseForRestart)) + .await + { + // This is not fatal. Anything that is polling the node scheduling policy to detect + // the end of the drain operations will hang, but all such places should enforce an + // overall timeout. The scheduling policy will be updated upon node re-attach and/or + // by the counterpart fill operation. + tracing::warn!(%node_id, "Failed to finalise drain by setting scheduling policy: {err}"); + } + + tracing::info!(%node_id, "Completed drain background operation"); + + Ok(()) + } }