From bcc68a7866c633d74a482266bfe34053a093b9d8 Mon Sep 17 00:00:00 2001 From: Vlad Lazar Date: Fri, 23 Aug 2024 14:48:06 +0100 Subject: [PATCH] storcon_cli: add support for drain and fill operations (#8791) ## Problem We have been naughty and curl-ed storcon to fix-up drains and fills. ## Summary of changes Add support for starting/cancelling drain/fill operations via `storcon_cli`. --- control_plane/storcon_cli/src/main.rs | 135 ++++++++++++++++++++++++-- storage_controller/src/http.rs | 1 - 2 files changed, 126 insertions(+), 10 deletions(-) diff --git a/control_plane/storcon_cli/src/main.rs b/control_plane/storcon_cli/src/main.rs index e27491c1c8..35510ccbca 100644 --- a/control_plane/storcon_cli/src/main.rs +++ b/control_plane/storcon_cli/src/main.rs @@ -147,9 +147,9 @@ enum Command { #[arg(long)] threshold: humantime::Duration, }, - // Drain a set of specified pageservers by moving the primary attachments to pageservers + // Migrate away from a set of specified pageservers by moving the primary attachments to pageservers // outside of the specified set. - Drain { + BulkMigrate { // Set of pageserver node ids to drain. #[arg(long)] nodes: Vec, @@ -163,6 +163,34 @@ enum Command { #[arg(long)] dry_run: Option, }, + /// Start draining the specified pageserver. + /// The drain is complete when the schedulling policy returns to active. + StartDrain { + #[arg(long)] + node_id: NodeId, + }, + /// Cancel draining the specified pageserver and wait for `timeout` + /// for the operation to be canceled. May be retried. + CancelDrain { + #[arg(long)] + node_id: NodeId, + #[arg(long)] + timeout: humantime::Duration, + }, + /// Start filling the specified pageserver. + /// The drain is complete when the schedulling policy returns to active. + StartFill { + #[arg(long)] + node_id: NodeId, + }, + /// Cancel filling the specified pageserver and wait for `timeout` + /// for the operation to be canceled. May be retried. + CancelFill { + #[arg(long)] + node_id: NodeId, + #[arg(long)] + timeout: humantime::Duration, + }, } #[derive(Parser)] @@ -249,6 +277,34 @@ impl FromStr for NodeAvailabilityArg { } } +async fn wait_for_scheduling_policy( + client: Client, + node_id: NodeId, + timeout: Duration, + f: F, +) -> anyhow::Result +where + F: Fn(NodeSchedulingPolicy) -> bool, +{ + let waiter = tokio::time::timeout(timeout, async move { + loop { + let node = client + .dispatch::<(), NodeDescribeResponse>( + Method::GET, + format!("control/v1/node/{node_id}"), + None, + ) + .await?; + + if f(node.scheduling) { + return Ok::(node.scheduling); + } + } + }); + + Ok(waiter.await??) +} + #[tokio::main] async fn main() -> anyhow::Result<()> { let cli = Cli::parse(); @@ -628,7 +684,7 @@ async fn main() -> anyhow::Result<()> { }) .await?; } - Command::Drain { + Command::BulkMigrate { nodes, concurrency, max_shards, @@ -657,7 +713,7 @@ async fn main() -> anyhow::Result<()> { } if nodes.len() != node_to_drain_descs.len() { - anyhow::bail!("Drain requested for node which doesn't exist.") + anyhow::bail!("Bulk migration requested away from node which doesn't exist.") } node_to_fill_descs.retain(|desc| { @@ -669,7 +725,7 @@ async fn main() -> anyhow::Result<()> { }); if node_to_fill_descs.is_empty() { - anyhow::bail!("There are no nodes to drain to") + anyhow::bail!("There are no nodes to migrate to") } // Set the node scheduling policy to draining for the nodes which @@ -690,7 +746,7 @@ async fn main() -> anyhow::Result<()> { .await?; } - // Perform the drain: move each tenant shard scheduled on a node to + // Perform the migration: move each tenant shard scheduled on a node to // be drained to a node which is being filled. A simple round robin // strategy is used to pick the new node. let tenants = storcon_client @@ -703,13 +759,13 @@ async fn main() -> anyhow::Result<()> { let mut selected_node_idx = 0; - struct DrainMove { + struct MigrationMove { tenant_shard_id: TenantShardId, from: NodeId, to: NodeId, } - let mut moves: Vec = Vec::new(); + let mut moves: Vec = Vec::new(); let shards = tenants .into_iter() @@ -739,7 +795,7 @@ async fn main() -> anyhow::Result<()> { continue; } - moves.push(DrainMove { + moves.push(MigrationMove { tenant_shard_id: shard.tenant_shard_id, from: shard .node_attached @@ -816,6 +872,67 @@ async fn main() -> anyhow::Result<()> { failure ); } + Command::StartDrain { node_id } => { + storcon_client + .dispatch::<(), ()>( + Method::PUT, + format!("control/v1/node/{node_id}/drain"), + None, + ) + .await?; + println!("Drain started for {node_id}"); + } + Command::CancelDrain { node_id, timeout } => { + storcon_client + .dispatch::<(), ()>( + Method::DELETE, + format!("control/v1/node/{node_id}/drain"), + None, + ) + .await?; + + println!("Waiting for node {node_id} to quiesce on scheduling policy ..."); + + let final_policy = + wait_for_scheduling_policy(storcon_client, node_id, *timeout, |sched| { + use NodeSchedulingPolicy::*; + matches!(sched, Active | PauseForRestart) + }) + .await?; + + println!( + "Drain was cancelled for node {node_id}. Schedulling policy is now {final_policy:?}" + ); + } + Command::StartFill { node_id } => { + storcon_client + .dispatch::<(), ()>(Method::PUT, format!("control/v1/node/{node_id}/fill"), None) + .await?; + + println!("Fill started for {node_id}"); + } + Command::CancelFill { node_id, timeout } => { + storcon_client + .dispatch::<(), ()>( + Method::DELETE, + format!("control/v1/node/{node_id}/fill"), + None, + ) + .await?; + + println!("Waiting for node {node_id} to quiesce on scheduling policy ..."); + + let final_policy = + wait_for_scheduling_policy(storcon_client, node_id, *timeout, |sched| { + use NodeSchedulingPolicy::*; + matches!(sched, Active) + }) + .await?; + + println!( + "Fill was cancelled for node {node_id}. Schedulling policy is now {final_policy:?}" + ); + } } Ok(()) diff --git a/storage_controller/src/http.rs b/storage_controller/src/http.rs index 7bbd1541cf..207bd5a1e6 100644 --- a/storage_controller/src/http.rs +++ b/storage_controller/src/http.rs @@ -1074,7 +1074,6 @@ pub fn make_router( RequestName("control_v1_metadata_health_list_outdated"), ) }) - // TODO(vlad): endpoint for cancelling drain and fill // Tenant Shard operations .put("/control/v1/tenant/:tenant_shard_id/migrate", |r| { tenant_service_handler(