mirror of
https://github.com/neondatabase/neon.git
synced 2026-06-03 13:30:38 +00:00
storcon: add node drain algorithm
This commit is contained in:
@@ -8,7 +8,7 @@ use std::{
|
||||
};
|
||||
|
||||
use crate::{
|
||||
background_node_operations::{self, Controller},
|
||||
background_node_operations::{self, Controller, OperationError, MAX_RECONCILES_PER_OPERATION},
|
||||
compute_hook::NotifyError,
|
||||
id_lock_map::{trace_exclusive_lock, trace_shared_lock, IdLockMap, WrappedWriteGuard},
|
||||
persistence::{AbortShardSplitStatus, TenantFilter},
|
||||
@@ -4996,4 +4996,100 @@ impl Service {
|
||||
// to complete.
|
||||
self.gate.close().await;
|
||||
}
|
||||
|
||||
pub(crate) async fn drain_node(
|
||||
&self,
|
||||
node_id: NodeId,
|
||||
cancel: CancellationToken,
|
||||
) -> Result<(), OperationError> {
|
||||
tracing::info!(%node_id, "Starting drain background operation");
|
||||
|
||||
let mut last_inspected_shard: Option<TenantShardId> = None;
|
||||
let mut inspected_all_shards = false;
|
||||
let mut waiters = Vec::new();
|
||||
let mut schedule_context = ScheduleContext::default();
|
||||
|
||||
while !inspected_all_shards {
|
||||
if cancel.is_cancelled() {
|
||||
return Err(OperationError::Cancelled);
|
||||
}
|
||||
|
||||
{
|
||||
let mut locked = self.inner.write().unwrap();
|
||||
let (nodes, tenants, scheduler) = locked.parts_mut();
|
||||
|
||||
let node = nodes.get(&node_id).ok_or(OperationError::NodeStateChanged(
|
||||
format!("node {node_id} was removed").into(),
|
||||
))?;
|
||||
|
||||
let current_policy = node.get_scheduling();
|
||||
if !matches!(current_policy, NodeSchedulingPolicy::Draining) {
|
||||
// TODO(vlad): maybe cancel pending reconciles before erroring out. need to think
|
||||
// about it
|
||||
return Err(OperationError::NodeStateChanged(
|
||||
format!("node {node_id} changed state to {current_policy:?}").into(),
|
||||
));
|
||||
}
|
||||
|
||||
let mut cursor = tenants.iter_mut().skip_while({
|
||||
let skip_past = last_inspected_shard;
|
||||
move |(tid, _)| match skip_past {
|
||||
Some(last) => **tid != last,
|
||||
None => false,
|
||||
}
|
||||
});
|
||||
|
||||
while waiters.len() < MAX_RECONCILES_PER_OPERATION {
|
||||
let (tid, tenant_shard) = match cursor.next() {
|
||||
Some(some) => some,
|
||||
None => {
|
||||
inspected_all_shards = true;
|
||||
break;
|
||||
}
|
||||
};
|
||||
|
||||
if tenant_shard.intent.demote_attached(scheduler, node_id) {
|
||||
tenant_shard.sequence = tenant_shard.sequence.next();
|
||||
match tenant_shard.schedule(scheduler, &mut schedule_context) {
|
||||
Err(e) => {
|
||||
tracing::warn!(%tid, "Scheduling error when draining pageserver {} : {e}", node_id);
|
||||
}
|
||||
Ok(()) => {
|
||||
let waiter = self.maybe_reconcile_shard(tenant_shard, nodes);
|
||||
if let Some(some) = waiter {
|
||||
waiters.push(some);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
last_inspected_shard = Some(*tid);
|
||||
}
|
||||
}
|
||||
|
||||
waiters = self.kick_waiters(waiters, SHORT_RECONCILE_TIMEOUT).await;
|
||||
}
|
||||
|
||||
while !waiters.is_empty() {
|
||||
waiters = self.kick_waiters(waiters, SHORT_RECONCILE_TIMEOUT).await;
|
||||
}
|
||||
|
||||
// At this point we have done the best we could to drain shards from this node.
|
||||
// Set the node scheduling policy to `[NodeSchedulingPolicy::PauseForRestart]`
|
||||
// to complete the drain.
|
||||
if let Err(err) = self
|
||||
.node_configure(node_id, None, Some(NodeSchedulingPolicy::PauseForRestart))
|
||||
.await
|
||||
{
|
||||
// This is not fatal. Anything that is polling the node scheduling policy to detect
|
||||
// the end of the drain operations will hang, but all such places should enforce an
|
||||
// overall timeout. The scheduling policy will be updated upon node re-attach and/or
|
||||
// by the counterpart fill operation.
|
||||
tracing::warn!(%node_id, "Failed to finalise drain by setting scheduling policy: {err}");
|
||||
}
|
||||
|
||||
tracing::info!(%node_id, "Completed drain background operation");
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user