From 1d3fd5bfc7223e79665a466fbfb4de4c093e0346 Mon Sep 17 00:00:00 2001 From: Aleksandr Sarantsev Date: Fri, 6 Jun 2025 10:56:58 +0400 Subject: [PATCH] Better storcon API --- control_plane/storcon_cli/src/main.rs | 17 ++++++---- storage_controller/src/http.rs | 17 ++-------- storage_controller/src/service.rs | 47 +++++++++++++++++++-------- 3 files changed, 46 insertions(+), 35 deletions(-) diff --git a/control_plane/storcon_cli/src/main.rs b/control_plane/storcon_cli/src/main.rs index 8ab8688622..5418417c54 100644 --- a/control_plane/storcon_cli/src/main.rs +++ b/control_plane/storcon_cli/src/main.rs @@ -211,6 +211,8 @@ enum Command { StartDrain { #[arg(long)] node_id: NodeId, + #[arg(long)] + drain_all: Option, }, /// Cancel draining the specified pageserver and wait for `timeout` /// for the operation to be canceled. May be retried. @@ -906,7 +908,7 @@ async fn main() -> anyhow::Result<()> { match &storcon_client .dispatch::<(), NodeDescribeResponse>( Method::GET, - format!("control/v1/node/{node_id}"), + format!("control/v1/node/{node_id}?drain_all=true"), None, ) .await? @@ -1146,13 +1148,14 @@ async fn main() -> anyhow::Result<()> { failure ); } - Command::StartDrain { node_id } => { + Command::StartDrain { node_id, drain_all } => { + let path = if drain_all == Some(true) { + format!("control/v1/node/{node_id}/drain?drain_all=true") + } else { + format!("control/v1/node/{node_id}/drain") + }; storcon_client - .dispatch::<(), ()>( - Method::PUT, - format!("control/v1/node/{node_id}/drain"), - None, - ) + .dispatch::<(), ()>(Method::PUT, path, None) .await?; println!("Drain started for {node_id}"); } diff --git a/storage_controller/src/http.rs b/storage_controller/src/http.rs index 0834b632a5..5066b431df 100644 --- a/storage_controller/src/http.rs +++ b/storage_controller/src/http.rs @@ -47,9 +47,7 @@ use crate::metrics::{ }; use crate::persistence::SafekeeperUpsert; use crate::reconciler::ReconcileError; -use crate::service::{ - LeadershipStatus, NodeDrainMode, RECONCILE_TIMEOUT, STARTUP_RECONCILE_TIMEOUT, Service, -}; +use crate::service::{LeadershipStatus, RECONCILE_TIMEOUT, STARTUP_RECONCILE_TIMEOUT, Service}; /// State available to HTTP request handlers pub struct HttpState { @@ -1002,18 +1000,9 @@ async fn handle_node_drain(req: Request) -> Result, ApiErro let state = get_state(&req); let node_id: NodeId = parse_request_param(&req, "node_id")?; - let graceful: bool = parse_query_param(&req, "graceful")?.unwrap_or(false); + let drain_all: bool = parse_query_param(&req, "drain_all")?.unwrap_or(false); - let node_drain_mode = if graceful { - NodeDrainMode::FullWithReprovisioning - } else { - NodeDrainMode::AttachedOnly - }; - - state - .service - .start_node_drain(node_id, node_drain_mode) - .await?; + state.service.start_node_drain(node_id, drain_all).await?; json_response(StatusCode::ACCEPTED, ()) } diff --git a/storage_controller/src/service.rs b/storage_controller/src/service.rs index 18c81c9bca..a685019fd7 100644 --- a/storage_controller/src/service.rs +++ b/storage_controller/src/service.rs @@ -687,16 +687,6 @@ struct ShardMutationLocations { #[derive(Default, Clone)] struct TenantMutationLocations(BTreeMap); -#[allow(dead_code)] -pub(crate) enum NodeDrainMode { - /// Drain only attached locations (primary node itself) - AttachedOnly, - /// Drain attached and secondary locations (but don't create new secondaries) - AttachedAndSecondaries, - /// Drain everything and also provision new secondaries for the newly promoted primaries - FullWithReprovisioning, -} - impl Service { pub fn get_config(&self) -> &Config { &self.config @@ -7558,7 +7548,7 @@ impl Service { pub(crate) async fn start_node_drain( self: &Arc, node_id: NodeId, - _mode: NodeDrainMode, + drain_all: bool, ) -> Result<(), ApiError> { let (ongoing_op, node_available, node_policy, schedulable_nodes_count) = { let locked = self.inner.read().unwrap(); @@ -7632,7 +7622,7 @@ impl Service { } tracing::info!("Drain background operation starting"); - let res = service.drain_node(node_id, cancel).await; + let res = service.drain_node(node_id, drain_all, cancel).await; match res { Ok(()) => { tracing::info!("Drain background operation completed successfully"); @@ -8798,9 +8788,30 @@ impl Service { } } - /// Drain a node by moving the shards attached to it as primaries. - /// This is a long running operation and it should run as a separate Tokio task. + /// Drain a node by moving shards that are attached to it, either as primaries or secondaries. + /// When `drain_all` is false, only primary attachments are moved - this is used during node + /// deployment when the node is expected to return to service soon. When `drain_all` is true, + /// both primary and secondary attachments are moved - this is used when permanently removing + /// a node. + /// + /// This is a long running operation that should be spawned as a separate Tokio task. pub(crate) async fn drain_node( + self: &Arc, + node_id: NodeId, + drain_all: bool, + cancel: CancellationToken, + ) -> Result<(), OperationError> { + self.drain_primary_attachments(node_id, cancel.clone()) + .await?; + if drain_all { + self.drain_secondary_attachments(node_id, cancel).await?; + } + Ok(()) + } + + /// Drain a node by moving the shards attached to it as primaries. + /// This is a long running operation + async fn drain_primary_attachments( self: &Arc, node_id: NodeId, cancel: CancellationToken, @@ -8996,6 +9007,14 @@ impl Service { Ok(()) } + async fn drain_secondary_attachments( + self: &Arc, + node_id: NodeId, + cancel: CancellationToken, + ) -> Result<(), OperationError> { + unimplemented!(); + } + /// Create a node fill plan (pick secondaries to promote), based on: /// 1. Shards which have a secondary on this node, and this node is in their home AZ, and are currently attached to a node /// outside their home AZ, should be migrated back here.