mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-15 01:12:56 +00:00
Graceful draining
This commit is contained in:
@@ -64,6 +64,10 @@ enum Command {
|
||||
NodeDelete {
|
||||
#[arg(long)]
|
||||
node_id: NodeId,
|
||||
|
||||
/// Force flag to delete the node without draining
|
||||
#[arg(long)]
|
||||
force: bool,
|
||||
},
|
||||
/// Modify a tenant's policies in the storage controller
|
||||
TenantPolicy {
|
||||
@@ -895,7 +899,39 @@ async fn main() -> anyhow::Result<()> {
|
||||
.dispatch::<(), ()>(Method::POST, format!("debug/v1/node/{node_id}/drop"), None)
|
||||
.await?;
|
||||
}
|
||||
Command::NodeDelete { node_id } => {
|
||||
Command::NodeDelete { node_id, force } => {
|
||||
// If force is not set, we need to drain the node first
|
||||
// This prevents the node from being deleted while there are still tenants on it
|
||||
if !force {
|
||||
match &storcon_client
|
||||
.dispatch::<(), NodeDescribeResponse>(
|
||||
Method::GET,
|
||||
format!("control/v1/node/{node_id}"),
|
||||
None,
|
||||
)
|
||||
.await?
|
||||
.scheduling
|
||||
{
|
||||
NodeSchedulingPolicy::Draining | NodeSchedulingPolicy::PauseForRestart => {
|
||||
println!("Node {} is already draining", node_id);
|
||||
}
|
||||
_ => {
|
||||
println!("Node {} is not draining, starting drain", node_id);
|
||||
storcon_client
|
||||
.dispatch::<(), ()>(
|
||||
Method::PUT,
|
||||
format!("control/v1/node/{node_id}/drain"),
|
||||
None,
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
}
|
||||
|
||||
// Wait for the node to be drained and printing the current state
|
||||
watch_node_drain(&storcon_client, node_id).await?;
|
||||
}
|
||||
|
||||
// Finally delete the node
|
||||
storcon_client
|
||||
.dispatch::<(), ()>(Method::DELETE, format!("control/v1/node/{node_id}"), None)
|
||||
.await?;
|
||||
@@ -1309,3 +1345,46 @@ async fn watch_tenant_shard(
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn watch_node_drain(storcon_client: &Client, node_id: NodeId) -> anyhow::Result<()> {
|
||||
loop {
|
||||
let node_desc = storcon_client
|
||||
.dispatch::<(), NodeDescribeResponse>(
|
||||
Method::GET,
|
||||
format!("control/v1/node/{node_id}"),
|
||||
None,
|
||||
)
|
||||
.await?;
|
||||
let shards_count = storcon_client
|
||||
.dispatch::<(), NodeShardResponse>(
|
||||
Method::GET,
|
||||
format!("control/v1/node/{node_id}/shards"),
|
||||
None,
|
||||
)
|
||||
.await?
|
||||
.shards
|
||||
.len();
|
||||
|
||||
// Print the state
|
||||
if node_desc.scheduling != NodeSchedulingPolicy::Draining {
|
||||
if shards_count != 0 {
|
||||
anyhow::bail!(
|
||||
"Node {} is not draining, but has {} shards",
|
||||
node_id,
|
||||
shards_count
|
||||
);
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
||||
println!(
|
||||
"Node {} is draining, {} shards remaining",
|
||||
node_id, shards_count
|
||||
);
|
||||
|
||||
tokio::time::sleep(WATCH_INTERVAL).await;
|
||||
}
|
||||
|
||||
println!("Node {} is not draining", node_id);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user