From 4559ba79b66bb19062de65fd3963543ed1b01fa2 Mon Sep 17 00:00:00 2001 From: Aleksandr Sarantsev <99037063+ephemeralsad@users.noreply.github.com> Date: Thu, 17 Jul 2025 15:51:31 +0400 Subject: [PATCH] Introduce force flag for new deletion API (#12588) ## Problem The force deletion API should behave like the graceful deletion API - it needs to support cancellation, persistence, and be non-blocking. ## Summary of Changes - Added a `force` flag to the `NodeStartDelete` command. - Passed the `force` flag through the `start_node_delete` handler in the storage controller. - Handled the `force` flag in the `delete_node` function. - Set the tombstone after removing the node from memory. - Minor cleanup, like adding a `get_error_on_cancel` closure. --------- Co-authored-by: Aleksandr Sarantsev --- control_plane/storcon_cli/src/main.rs | 19 +++-- storage_controller/src/http.rs | 3 +- storage_controller/src/service.rs | 74 ++++++++++--------- test_runner/fixtures/neon_fixtures.py | 7 +- .../regress/test_storage_controller.py | 58 +++++++++++++-- 5 files changed, 111 insertions(+), 50 deletions(-) diff --git a/control_plane/storcon_cli/src/main.rs b/control_plane/storcon_cli/src/main.rs index fcc5549beb..a4d1030488 100644 --- a/control_plane/storcon_cli/src/main.rs +++ b/control_plane/storcon_cli/src/main.rs @@ -76,6 +76,12 @@ enum Command { NodeStartDelete { #[arg(long)] node_id: NodeId, + /// When `force` is true, skip waiting for shards to prewarm during migration. + /// This can significantly speed up node deletion since prewarming all shards + /// can take considerable time, but may result in slower initial access to + /// migrated shards until they warm up naturally. + #[arg(long)] + force: bool, }, /// Cancel deletion of the specified pageserver and wait for `timeout` /// for the operation to be canceled. May be retried. @@ -952,13 +958,14 @@ async fn main() -> anyhow::Result<()> { .dispatch::<(), ()>(Method::DELETE, format!("control/v1/node/{node_id}"), None) .await?; } - Command::NodeStartDelete { node_id } => { + Command::NodeStartDelete { node_id, force } => { + let query = if force { + format!("control/v1/node/{node_id}/delete?force=true") + } else { + format!("control/v1/node/{node_id}/delete") + }; storcon_client - .dispatch::<(), ()>( - Method::PUT, - format!("control/v1/node/{node_id}/delete"), - None, - ) + .dispatch::<(), ()>(Method::PUT, query, None) .await?; println!("Delete started for {node_id}"); } diff --git a/storage_controller/src/http.rs b/storage_controller/src/http.rs index c8227f0219..5f9a1124de 100644 --- a/storage_controller/src/http.rs +++ b/storage_controller/src/http.rs @@ -1085,9 +1085,10 @@ async fn handle_node_delete(req: Request) -> Result, ApiErr let state = get_state(&req); let node_id: NodeId = parse_request_param(&req, "node_id")?; + let force: bool = parse_query_param(&req, "force")?.unwrap_or(false); json_response( StatusCode::OK, - state.service.start_node_delete(node_id).await?, + state.service.start_node_delete(node_id, force).await?, ) } diff --git a/storage_controller/src/service.rs b/storage_controller/src/service.rs index 0c5d7f44d4..b315b88fcc 100644 --- a/storage_controller/src/service.rs +++ b/storage_controller/src/service.rs @@ -7385,6 +7385,7 @@ impl Service { self: &Arc, node_id: NodeId, policy_on_start: NodeSchedulingPolicy, + force: bool, cancel: CancellationToken, ) -> Result<(), OperationError> { let reconciler_config = ReconcilerConfigBuilder::new(ReconcilerPriority::Normal).build(); @@ -7392,23 +7393,27 @@ impl Service { let mut waiters: Vec = Vec::new(); let mut tid_iter = create_shared_shard_iterator(self.clone()); + let reset_node_policy_on_cancel = || async { + match self + .node_configure(node_id, None, Some(policy_on_start)) + .await + { + Ok(()) => OperationError::Cancelled, + Err(err) => { + OperationError::FinalizeError( + format!( + "Failed to finalise delete cancel of {} by setting scheduling policy to {}: {}", + node_id, String::from(policy_on_start), err + ) + .into(), + ) + } + } + }; + while !tid_iter.finished() { if cancel.is_cancelled() { - match self - .node_configure(node_id, None, Some(policy_on_start)) - .await - { - Ok(()) => return Err(OperationError::Cancelled), - Err(err) => { - return Err(OperationError::FinalizeError( - format!( - "Failed to finalise delete cancel of {} by setting scheduling policy to {}: {}", - node_id, String::from(policy_on_start), err - ) - .into(), - )); - } - } + return Err(reset_node_policy_on_cancel().await); } operation_utils::validate_node_state( @@ -7477,8 +7482,18 @@ impl Service { nodes, reconciler_config, ); - if let Some(some) = waiter { - waiters.push(some); + + if force { + // Here we remove an existing observed location for the node we're removing, and it will + // not be re-added by a reconciler's completion because we filter out removed nodes in + // process_result. + // + // Note that we update the shard's observed state _after_ calling maybe_configured_reconcile_shard: + // that means any reconciles we spawned will know about the node we're deleting, + // enabling them to do live migrations if it's still online. + tenant_shard.observed.locations.remove(&node_id); + } else if let Some(waiter) = waiter { + waiters.push(waiter); } } } @@ -7492,21 +7507,7 @@ impl Service { while !waiters.is_empty() { if cancel.is_cancelled() { - match self - .node_configure(node_id, None, Some(policy_on_start)) - .await - { - Ok(()) => return Err(OperationError::Cancelled), - Err(err) => { - return Err(OperationError::FinalizeError( - format!( - "Failed to finalise drain cancel of {} by setting scheduling policy to {}: {}", - node_id, String::from(policy_on_start), err - ) - .into(), - )); - } - } + return Err(reset_node_policy_on_cancel().await); } tracing::info!("Awaiting {} pending delete reconciliations", waiters.len()); @@ -7516,6 +7517,12 @@ impl Service { .await; } + let pf = pausable_failpoint!("delete-node-after-reconciles-spawned", &cancel); + if pf.is_err() { + // An error from pausable_failpoint indicates the cancel token was triggered. + return Err(reset_node_policy_on_cancel().await); + } + self.persistence .set_tombstone(node_id) .await @@ -8111,6 +8118,7 @@ impl Service { pub(crate) async fn start_node_delete( self: &Arc, node_id: NodeId, + force: bool, ) -> Result<(), ApiError> { let (ongoing_op, node_policy, schedulable_nodes_count) = { let locked = self.inner.read().unwrap(); @@ -8180,7 +8188,7 @@ impl Service { tracing::info!("Delete background operation starting"); let res = service - .delete_node(node_id, policy_on_start, cancel) + .delete_node(node_id, policy_on_start, force, cancel) .await; match res { Ok(()) => { diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index ae73ace9bb..86ffa9e4d4 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -2119,11 +2119,14 @@ class NeonStorageController(MetricsGetter, LogUtils): headers=self.headers(TokenScope.ADMIN), ) - def node_delete(self, node_id): + def node_delete(self, node_id, force: bool = False): log.info(f"node_delete({node_id})") + query = f"{self.api}/control/v1/node/{node_id}/delete" + if force: + query += "?force=true" self.request( "PUT", - f"{self.api}/control/v1/node/{node_id}/delete", + query, headers=self.headers(TokenScope.ADMIN), ) diff --git a/test_runner/regress/test_storage_controller.py b/test_runner/regress/test_storage_controller.py index 10845ef02e..d1e9bbd7dc 100644 --- a/test_runner/regress/test_storage_controller.py +++ b/test_runner/regress/test_storage_controller.py @@ -72,6 +72,12 @@ def get_node_shard_counts(env: NeonEnv, tenant_ids): return counts +class DeletionAPIKind(Enum): + OLD = "old" + FORCE = "force" + GRACEFUL = "graceful" + + @pytest.mark.parametrize(**fixtures.utils.allpairs_versions()) def test_storage_controller_smoke( neon_env_builder: NeonEnvBuilder, compute_reconfigure_listener: ComputeReconfigure, combination @@ -2572,9 +2578,11 @@ def test_background_operation_cancellation(neon_env_builder: NeonEnvBuilder): @pytest.mark.parametrize("while_offline", [True, False]) +@pytest.mark.parametrize("deletion_api", [DeletionAPIKind.OLD, DeletionAPIKind.FORCE]) def test_storage_controller_node_deletion( neon_env_builder: NeonEnvBuilder, while_offline: bool, + deletion_api: DeletionAPIKind, ): """ Test that deleting a node works & properly reschedules everything that was on the node. @@ -2598,6 +2606,8 @@ def test_storage_controller_node_deletion( assert env.storage_controller.reconcile_all() == 0 victim = env.pageservers[-1] + if deletion_api == DeletionAPIKind.FORCE and not while_offline: + victim.allowed_errors.append(".*request was dropped before completing.*") # The procedure a human would follow is: # 1. Mark pageserver scheduling=pause @@ -2621,7 +2631,12 @@ def test_storage_controller_node_deletion( wait_until(assert_shards_migrated) log.info(f"Deleting pageserver {victim.id}") - env.storage_controller.node_delete_old(victim.id) + if deletion_api == DeletionAPIKind.FORCE: + env.storage_controller.node_delete(victim.id, force=True) + elif deletion_api == DeletionAPIKind.OLD: + env.storage_controller.node_delete_old(victim.id) + else: + raise AssertionError(f"Invalid deletion API: {deletion_api}") if not while_offline: @@ -2634,7 +2649,15 @@ def test_storage_controller_node_deletion( wait_until(assert_victim_evacuated) # The node should be gone from the list API - assert victim.id not in [n["id"] for n in env.storage_controller.node_list()] + def assert_node_is_gone(): + assert victim.id not in [n["id"] for n in env.storage_controller.node_list()] + + if deletion_api == DeletionAPIKind.FORCE: + wait_until(assert_node_is_gone) + elif deletion_api == DeletionAPIKind.OLD: + assert_node_is_gone() + else: + raise AssertionError(f"Invalid deletion API: {deletion_api}") # No tenants should refer to the node in their intent for tenant_id in tenant_ids: @@ -2656,7 +2679,11 @@ def test_storage_controller_node_deletion( env.storage_controller.consistency_check() -def test_storage_controller_node_delete_cancellation(neon_env_builder: NeonEnvBuilder): +@pytest.mark.parametrize("deletion_api", [DeletionAPIKind.FORCE, DeletionAPIKind.GRACEFUL]) +def test_storage_controller_node_delete_cancellation( + neon_env_builder: NeonEnvBuilder, + deletion_api: DeletionAPIKind, +): neon_env_builder.num_pageservers = 3 neon_env_builder.num_azs = 3 env = neon_env_builder.init_configs() @@ -2680,12 +2707,16 @@ def test_storage_controller_node_delete_cancellation(neon_env_builder: NeonEnvBu assert len(nodes) == 3 env.storage_controller.configure_failpoints(("sleepy-delete-loop", "return(10000)")) + env.storage_controller.configure_failpoints(("delete-node-after-reconciles-spawned", "pause")) ps_id_to_delete = env.pageservers[0].id env.storage_controller.warm_up_all_secondaries() + + assert deletion_api in [DeletionAPIKind.FORCE, DeletionAPIKind.GRACEFUL] + force = deletion_api == DeletionAPIKind.FORCE env.storage_controller.retryable_node_operation( - lambda ps_id: env.storage_controller.node_delete(ps_id), + lambda ps_id: env.storage_controller.node_delete(ps_id, force), ps_id_to_delete, max_attempts=3, backoff=2, @@ -2701,6 +2732,8 @@ def test_storage_controller_node_delete_cancellation(neon_env_builder: NeonEnvBu env.storage_controller.cancel_node_delete(ps_id_to_delete) + env.storage_controller.configure_failpoints(("delete-node-after-reconciles-spawned", "off")) + env.storage_controller.poll_node_status( ps_id_to_delete, PageserverAvailability.ACTIVE, @@ -3252,7 +3285,10 @@ def test_storage_controller_ps_restarted_during_drain(neon_env_builder: NeonEnvB wait_until(reconfigure_node_again) -def test_ps_unavailable_after_delete(neon_env_builder: NeonEnvBuilder): +@pytest.mark.parametrize("deletion_api", [DeletionAPIKind.OLD, DeletionAPIKind.FORCE]) +def test_ps_unavailable_after_delete( + neon_env_builder: NeonEnvBuilder, deletion_api: DeletionAPIKind +): neon_env_builder.num_pageservers = 3 env = neon_env_builder.init_start() @@ -3265,10 +3301,16 @@ def test_ps_unavailable_after_delete(neon_env_builder: NeonEnvBuilder): assert_nodes_count(3) ps = env.pageservers[0] - env.storage_controller.node_delete_old(ps.id) - # After deletion, the node count must be reduced - assert_nodes_count(2) + if deletion_api == DeletionAPIKind.FORCE: + ps.allowed_errors.append(".*request was dropped before completing.*") + env.storage_controller.node_delete(ps.id, force=True) + wait_until(lambda: assert_nodes_count(2)) + elif deletion_api == DeletionAPIKind.OLD: + env.storage_controller.node_delete_old(ps.id) + assert_nodes_count(2) + else: + raise AssertionError(f"Invalid deletion API: {deletion_api}") # Running pageserver CLI init in a separate thread with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor: