From c8475ed008de112d76f5596387f31c9b3d7a81a0 Mon Sep 17 00:00:00 2001 From: Aleksandr Sarantsev Date: Tue, 8 Jul 2025 17:20:15 +0400 Subject: [PATCH] Introduce flag for deletion API --- control_plane/storcon_cli/src/main.rs | 19 +++-- storage_controller/src/http.rs | 3 +- storage_controller/src/service.rs | 70 ++++++++++--------- test_runner/fixtures/neon_fixtures.py | 7 +- .../regress/test_storage_controller.py | 11 +-- 5 files changed, 64 insertions(+), 46 deletions(-) diff --git a/control_plane/storcon_cli/src/main.rs b/control_plane/storcon_cli/src/main.rs index 701c4b3b2e..21fcb9c168 100644 --- a/control_plane/storcon_cli/src/main.rs +++ b/control_plane/storcon_cli/src/main.rs @@ -75,6 +75,12 @@ enum Command { NodeStartDelete { #[arg(long)] node_id: NodeId, + /// When `force` is true, skip waiting for shards to prewarm during migration. + /// This can significantly speed up node deletion since prewarming all shards + /// can take considerable time, but may result in slower initial access to + /// migrated shards until they warm up naturally. + #[arg(long)] + force: bool, }, /// Cancel deletion of the specified pageserver and wait for `timeout` /// for the operation to be canceled. May be retried. @@ -933,13 +939,14 @@ async fn main() -> anyhow::Result<()> { .dispatch::<(), ()>(Method::DELETE, format!("control/v1/node/{node_id}"), None) .await?; } - Command::NodeStartDelete { node_id } => { + Command::NodeStartDelete { node_id, force } => { + let query = if force { + format!("control/v1/node/{node_id}/delete?force=true") + } else { + format!("control/v1/node/{node_id}/delete") + }; storcon_client - .dispatch::<(), ()>( - Method::PUT, - format!("control/v1/node/{node_id}/delete"), - None, - ) + .dispatch::<(), ()>(Method::PUT, query, None) .await?; println!("Delete started for {node_id}"); } diff --git a/storage_controller/src/http.rs b/storage_controller/src/http.rs index e5a3a969d4..e762f6903c 100644 --- a/storage_controller/src/http.rs +++ b/storage_controller/src/http.rs @@ -1066,9 +1066,10 @@ async fn handle_node_delete(req: Request) -> Result, ApiErr let state = get_state(&req); let node_id: NodeId = parse_request_param(&req, "node_id")?; + let force: bool = parse_query_param(&req, "force")?.unwrap_or(false); json_response( StatusCode::OK, - state.service.start_node_delete(node_id).await?, + state.service.start_node_delete(node_id, force).await?, ) } diff --git a/storage_controller/src/service.rs b/storage_controller/src/service.rs index 9360225396..aac78388e7 100644 --- a/storage_controller/src/service.rs +++ b/storage_controller/src/service.rs @@ -7165,6 +7165,7 @@ impl Service { self: &Arc, node_id: NodeId, policy_on_start: NodeSchedulingPolicy, + force: bool, cancel: CancellationToken, ) -> Result<(), OperationError> { let reconciler_config = ReconcilerConfigBuilder::new(ReconcilerPriority::Normal).build(); @@ -7172,23 +7173,28 @@ impl Service { let mut waiters: Vec = Vec::new(); let mut tid_iter = create_shared_shard_iterator(self.clone()); + let process_cancel = || async { + // Attempt to restore the node to its original scheduling policy + match self + .node_configure(node_id, None, Some(policy_on_start)) + .await + { + Ok(()) => Err(OperationError::Cancelled), + Err(err) => { + Err(OperationError::FinalizeError( + format!( + "Failed to finalise delete cancel of {} by setting scheduling policy to {}: {}", + node_id, String::from(policy_on_start), err + ) + .into(), + )) + } + } + }; + while !tid_iter.finished() { if cancel.is_cancelled() { - match self - .node_configure(node_id, None, Some(policy_on_start)) - .await - { - Ok(()) => return Err(OperationError::Cancelled), - Err(err) => { - return Err(OperationError::FinalizeError( - format!( - "Failed to finalise delete cancel of {} by setting scheduling policy to {}: {}", - node_id, String::from(policy_on_start), err - ) - .into(), - )); - } - } + return process_cancel().await; } operation_utils::validate_node_state( @@ -7249,13 +7255,24 @@ impl Service { ) } + // Do not wait for any reconciliations to finish if the deletion has been forced. let waiter = self.maybe_configured_reconcile_shard( tenant_shard, nodes, reconciler_config, ); - if let Some(some) = waiter { - waiters.push(some); + + if force { + // Here we remove an existing observed location for the node we're removing, and it will + // not be re-added by a reconciler's completion because we filter out removed nodes in + // process_result. + // + // Note that we update the shard's observed state _after_ calling maybe_configured_reconcile_shard: + // that means any reconciles we spawned will know about the node we're deleting, + // enabling them to do live migrations if it's still online. + tenant_shard.observed.locations.remove(&node_id); + } else if let Some(waiter) = waiter { + waiters.push(waiter); } } } @@ -7269,21 +7286,7 @@ impl Service { while !waiters.is_empty() { if cancel.is_cancelled() { - match self - .node_configure(node_id, None, Some(policy_on_start)) - .await - { - Ok(()) => return Err(OperationError::Cancelled), - Err(err) => { - return Err(OperationError::FinalizeError( - format!( - "Failed to finalise drain cancel of {} by setting scheduling policy to {}: {}", - node_id, String::from(policy_on_start), err - ) - .into(), - )); - } - } + return process_cancel().await; } tracing::info!("Awaiting {} pending delete reconciliations", waiters.len()); @@ -7888,6 +7891,7 @@ impl Service { pub(crate) async fn start_node_delete( self: &Arc, node_id: NodeId, + force: bool, ) -> Result<(), ApiError> { let (ongoing_op, node_policy, schedulable_nodes_count) = { let locked = self.inner.read().unwrap(); @@ -7957,7 +7961,7 @@ impl Service { tracing::info!("Delete background operation starting"); let res = service - .delete_node(node_id, policy_on_start, cancel) + .delete_node(node_id, policy_on_start, force, cancel) .await; match res { Ok(()) => { diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index f2ec022666..e5ad8181e4 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -2084,11 +2084,14 @@ class NeonStorageController(MetricsGetter, LogUtils): headers=self.headers(TokenScope.ADMIN), ) - def node_delete(self, node_id): + def node_delete(self, node_id, force: bool = False): log.info(f"node_delete({node_id})") + query = f"{self.api}/control/v1/node/{node_id}/delete" + if force: + query += "?force=true" self.request( "PUT", - f"{self.api}/control/v1/node/{node_id}/delete", + query, headers=self.headers(TokenScope.ADMIN), ) diff --git a/test_runner/regress/test_storage_controller.py b/test_runner/regress/test_storage_controller.py index 10845ef02e..119bd6eef7 100644 --- a/test_runner/regress/test_storage_controller.py +++ b/test_runner/regress/test_storage_controller.py @@ -2621,7 +2621,7 @@ def test_storage_controller_node_deletion( wait_until(assert_shards_migrated) log.info(f"Deleting pageserver {victim.id}") - env.storage_controller.node_delete_old(victim.id) + env.storage_controller.node_delete(victim.id, force=True) if not while_offline: @@ -2634,7 +2634,10 @@ def test_storage_controller_node_deletion( wait_until(assert_victim_evacuated) # The node should be gone from the list API - assert victim.id not in [n["id"] for n in env.storage_controller.node_list()] + def assert_victim_gone(): + assert victim.id not in [n["id"] for n in env.storage_controller.node_list()] + + wait_until(assert_victim_gone) # No tenants should refer to the node in their intent for tenant_id in tenant_ids: @@ -3265,10 +3268,10 @@ def test_ps_unavailable_after_delete(neon_env_builder: NeonEnvBuilder): assert_nodes_count(3) ps = env.pageservers[0] - env.storage_controller.node_delete_old(ps.id) + env.storage_controller.node_delete(ps.id, force=True) # After deletion, the node count must be reduced - assert_nodes_count(2) + wait_until(lambda: assert_nodes_count(2)) # Running pageserver CLI init in a separate thread with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor: