From 52dee408dce3676dceb4d01c475d5e693bec2c6f Mon Sep 17 00:00:00 2001 From: John Spray Date: Thu, 10 Apr 2025 17:55:37 +0100 Subject: [PATCH] storage controller: improve safety of shard splits coinciding with controller restarts (#11412) ## Problem The graceful leadership transfer process involves calling step_down on the old controller, but this was not waiting for shard splits to complete, and the new controller could therefore end up trying to abort a shard split while it was still going on. We mitigated this already in #11256 by avoiding the case where shard split completion would update the database incorrectly, but this was a fragile fix because it assumes that is the only problematic part of the split running concurrently. Precursors: - #11290 - #11256 Closes: #11254 ## Summary of changes - Hold the reconciler gate from shard splits, so that step_down will wait for them. Splits should always be fairly prompt, so it is okay to wait here. - Defense in depth: if step_down times out (hardcoded 10 second limit), then fully terminate the controller process rather than letting it continue running, potentially doing split-brainy things. This makes sense because the new controller will always declare itself leader unilaterally if step_down fails, so leaving an old controller running is not beneficial. - Tests: extend `test_storage_controller_leadership_transfer_during_split` to separately exercise the case of a split holding up step_down, and the case where the overall timeout on step_down is hit and the controller terminates. --- storage_controller/src/http.rs | 14 ++- storage_controller/src/service.rs | 51 +++++++-- .../regress/test_storage_controller.py | 107 ++++++++++++------ 3 files changed, 125 insertions(+), 47 deletions(-) diff --git a/storage_controller/src/http.rs b/storage_controller/src/http.rs index 0d1dc8f8ee..4f3613b687 100644 --- a/storage_controller/src/http.rs +++ b/storage_controller/src/http.rs @@ -1235,8 +1235,18 @@ async fn handle_step_down(req: Request) -> Result, ApiError ForwardOutcome::NotForwarded(req) => req, }; - let state = get_state(&req); - json_response(StatusCode::OK, state.service.step_down().await) + // Spawn a background task: once we start stepping down, we must finish: if the client drops + // their request we should avoid stopping in some part-stepped-down state. + let handle = tokio::spawn(async move { + let state = get_state(&req); + state.service.step_down().await + }); + + let result = handle + .await + .map_err(|e| ApiError::InternalServerError(e.into()))?; + + json_response(StatusCode::OK, result) } async fn handle_tenant_drop(req: Request) -> Result, ApiError> { diff --git a/storage_controller/src/service.rs b/storage_controller/src/service.rs index 2ef09cd2e3..4790f80162 100644 --- a/storage_controller/src/service.rs +++ b/storage_controller/src/service.rs @@ -61,7 +61,7 @@ use utils::completion::Barrier; use utils::generation::Generation; use utils::id::{NodeId, TenantId, TimelineId}; use utils::lsn::Lsn; -use utils::sync::gate::Gate; +use utils::sync::gate::{Gate, GateGuard}; use utils::{failpoint_support, pausable_failpoint}; use crate::background_node_operations::{ @@ -594,6 +594,8 @@ struct TenantShardSplitAbort { new_stripe_size: Option, /// Until this abort op is complete, no other operations may be done on the tenant _tenant_lock: TracingExclusiveGuard, + /// The reconciler gate for the duration of the split operation, and any included abort. + _gate: GateGuard, } #[derive(thiserror::Error, Debug)] @@ -1460,7 +1462,7 @@ impl Service { // Retry until shutdown: we must keep this request object alive until it is properly // processed, as it holds a lock guard that prevents other operations trying to do things // to the tenant while it is in a weird part-split state. - while !self.cancel.is_cancelled() { + while !self.reconcilers_cancel.is_cancelled() { match self.abort_tenant_shard_split(&op).await { Ok(_) => break, Err(e) => { @@ -1473,9 +1475,12 @@ impl Service { // when we retry, so that the abort op will succeed. If the abort op is failing // for some other reason, we will keep retrying forever, or until a human notices // and does something about it (either fixing a pageserver or restarting the controller). - tokio::time::timeout(Duration::from_secs(5), self.cancel.cancelled()) - .await - .ok(); + tokio::time::timeout( + Duration::from_secs(5), + self.reconcilers_cancel.cancelled(), + ) + .await + .ok(); } } } @@ -4910,7 +4915,7 @@ impl Service { 1, 10, Duration::from_secs(5), - &self.cancel, + &self.reconcilers_cancel, ) .await { @@ -5161,6 +5166,11 @@ impl Service { ) .await; + let _gate = self + .reconcilers_gate + .enter() + .map_err(|_| ApiError::ShuttingDown)?; + let new_shard_count = ShardCount::new(split_req.new_shard_count); let new_stripe_size = split_req.new_stripe_size; @@ -5188,6 +5198,7 @@ impl Service { new_shard_count, new_stripe_size, _tenant_lock, + _gate, }) // Ignore error sending: that just means we're shutting down: aborts are ephemeral so it's fine to drop it. .ok(); @@ -5527,7 +5538,10 @@ impl Service { "failpoint".to_string() ))); - failpoint_support::sleep_millis_async!("shard-split-post-remote-sleep", &self.cancel); + failpoint_support::sleep_millis_async!( + "shard-split-post-remote-sleep", + &self.reconcilers_cancel + ); tracing::info!( "Split {} into {}", @@ -5585,7 +5599,7 @@ impl Service { stripe_size, preferred_az: preferred_az_id.as_ref().map(Cow::Borrowed), }, - &self.cancel, + &self.reconcilers_cancel, ) .await { @@ -8670,9 +8684,24 @@ impl Service { failpoint_support::sleep_millis_async!("sleep-on-step-down-handling"); self.inner.write().unwrap().step_down(); - // TODO: would it make sense to have a time-out for this? - self.stop_reconciliations(StopReconciliationsReason::SteppingDown) - .await; + + // Wait for reconciliations to stop, or terminate this process if they + // fail to stop in time (this indicates a bug in shutdown) + tokio::select! { + _ = self.stop_reconciliations(StopReconciliationsReason::SteppingDown) => { + tracing::info!("Reconciliations stopped, proceeding with step down"); + } + _ = async { + failpoint_support::sleep_millis_async!("step-down-delay-timeout"); + tokio::time::sleep(Duration::from_secs(10)).await + } => { + tracing::warn!("Step down timed out while waiting for reconciliation gate, terminating process"); + + // The caller may proceed to act as leader when it sees this request fail: reduce the chance + // of a split-brain situation by terminating this controller instead of leaving it up in a partially-shut-down state. + std::process::exit(1); + } + } let mut global_observed = GlobalObservedState::default(); let locked = self.inner.read().unwrap(); diff --git a/test_runner/regress/test_storage_controller.py b/test_runner/regress/test_storage_controller.py index ce73c9a738..b2c8415e9a 100644 --- a/test_runner/regress/test_storage_controller.py +++ b/test_runner/regress/test_storage_controller.py @@ -2892,10 +2892,12 @@ def test_storage_controller_leadership_transfer( ) +@pytest.mark.parametrize("step_down_times_out", [False, True]) def test_storage_controller_leadership_transfer_during_split( neon_env_builder: NeonEnvBuilder, storage_controller_proxy: StorageControllerProxy, port_distributor: PortDistributor, + step_down_times_out: bool, ): """ Exercise a race between shard splitting and graceful leadership transfer. This is @@ -2936,6 +2938,18 @@ def test_storage_controller_leadership_transfer_during_split( ) env.storage_controller.reconcile_until_idle() + # We are testing scenarios where the step down API does not complete: either because it is stuck + # doing a shard split, or because it totally times out on some other failpoint. + env.storage_controller.allowed_errors.extend( + [ + ".*step_down.*request was dropped before completing.*", + ".*step_down.*operation timed out.*", + ".*Send step down request failed, will retry.*", + ".*Send step down request still failed after.*retries.*", + ".*Leader .+ did not respond to step-down request.*", + ] + ) + with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor: # Start a shard split env.storage_controller.allowed_errors.extend( @@ -2943,6 +2957,14 @@ def test_storage_controller_leadership_transfer_during_split( ) pause_failpoint = "shard-split-pre-complete" env.storage_controller.configure_failpoints((pause_failpoint, "pause")) + + if not step_down_times_out: + # Prevent the timeout self-terminate code from executing: we will block step down on the + # shard split itself + env.storage_controller.configure_failpoints( + ("step-down-delay-timeout", "return(3600000)") + ) + split_fut = executor.submit( env.storage_controller.tenant_shard_split, list(tenants)[0], shard_count * 2 ) @@ -2961,12 +2983,20 @@ def test_storage_controller_leadership_transfer_during_split( timeout_in_seconds=30, instance_id=2, base_port=storage_controller_2_port ) + if step_down_times_out: + # Step down will time out, original controller will terminate itself + env.storage_controller.allowed_errors.extend([".*terminating process.*"]) + else: + # Step down does not time out: original controller hits its shard split completion + # code path and realises that it must not purge the parent shards from the database. + env.storage_controller.allowed_errors.extend([".*Enqueuing background abort.*"]) + def passed_split_abort(): try: log.info("Checking log for pattern...") - assert env.storage_controller.log_contains( - ".*Using observed state received from leader.*" - ) + # This log is indicative of entering startup_reconcile, which happens + # after the point we would abort shard splits + assert env.storage_controller.log_contains(".*Populating tenant shards.*") except Exception: log.exception("Failed to find pattern in log") raise @@ -2975,34 +3005,42 @@ def test_storage_controller_leadership_transfer_during_split( wait_until(passed_split_abort, interval=0.1, status_interval=1.0) assert env.storage_controller.log_contains(".*Aborting shard split.*") - # Proxy is still talking to original controller here: disable its pause failpoint so - # that its shard split can run to completion. - log.info("Disabling failpoint") - # Bypass the proxy: the python test HTTPServer is single threaded and still blocked - # on handling the shard split request. - env.storage_controller.request( - "PUT", - f"http://127.0.0.1:{storage_controller_1_port}/debug/v1/failpoints", - json=[{"name": "shard-split-pre-complete", "actions": "off"}], - headers=env.storage_controller.headers(TokenScope.ADMIN), - ) + if step_down_times_out: + # We will let the old controller hit a timeout path where it terminates itself, rather than + # completing step_down and trying to complete a shard split + def old_controller_terminated(): + assert env.storage_controller.log_contains(".*terminating process.*") - def previous_stepped_down(): - assert ( - env.storage_controller.get_leadership_status() - == StorageControllerLeadershipStatus.STEPPED_DOWN + wait_until(old_controller_terminated) + else: + # Proxy is still talking to original controller here: disable its pause failpoint so + # that its shard split can run to completion. + log.info("Disabling failpoint") + # Bypass the proxy: the python test HTTPServer is single threaded and still blocked + # on handling the shard split request. + env.storage_controller.request( + "PUT", + f"http://127.0.0.1:{storage_controller_1_port}/debug/v1/failpoints", + json=[{"name": "shard-split-pre-complete", "actions": "off"}], + headers=env.storage_controller.headers(TokenScope.ADMIN), ) - log.info("Awaiting step down") - wait_until(previous_stepped_down) + def previous_stepped_down(): + assert ( + env.storage_controller.get_leadership_status() + == StorageControllerLeadershipStatus.STEPPED_DOWN + ) - # Let the shard split complete: this may happen _after_ the replacement has come up - # and tried to clean up the databases - log.info("Unblocking & awaiting shard split") - with pytest.raises(Exception, match="Unexpected child shard count"): - # This split fails when it tries to persist results, because it encounters - # changes already made by the new controller's abort-on-startup - split_fut.result() + log.info("Awaiting step down") + wait_until(previous_stepped_down) + + # Let the shard split complete: this may happen _after_ the replacement has come up + # and tried to clean up the databases + log.info("Unblocking & awaiting shard split") + with pytest.raises(Exception, match="Unexpected child shard count"): + # This split fails when it tries to persist results, because it encounters + # changes already made by the new controller's abort-on-startup + split_fut.result() log.info("Routing to new leader") storage_controller_proxy.route_to(f"http://127.0.0.1:{storage_controller_2_port}") @@ -3020,13 +3058,14 @@ def test_storage_controller_leadership_transfer_during_split( env.storage_controller.wait_until_ready() env.storage_controller.consistency_check() - # Check that the stepped down instance forwards requests - # to the new leader while it's still running. - storage_controller_proxy.route_to(f"http://127.0.0.1:{storage_controller_1_port}") - env.storage_controller.tenant_shard_dump() - env.storage_controller.node_configure(env.pageservers[0].id, {"scheduling": "Pause"}) - status = env.storage_controller.node_status(env.pageservers[0].id) - assert status["scheduling"] == "Pause" + if not step_down_times_out: + # Check that the stepped down instance forwards requests + # to the new leader while it's still running. + storage_controller_proxy.route_to(f"http://127.0.0.1:{storage_controller_1_port}") + env.storage_controller.tenant_shard_dump() + env.storage_controller.node_configure(env.pageservers[0].id, {"scheduling": "Pause"}) + status = env.storage_controller.node_status(env.pageservers[0].id) + assert status["scheduling"] == "Pause" def test_storage_controller_ps_restarted_during_drain(neon_env_builder: NeonEnvBuilder):