From 6f7e3c18e4536dba40d2deb2f2100a3fe54baffd Mon Sep 17 00:00:00 2001 From: Vlad Lazar Date: Thu, 24 Apr 2025 17:59:56 +0100 Subject: [PATCH] storage_controller: make leadership protocol more robust (#11703) ## Problem We saw the following scenario in staging: 1. Pod A starts up. Becomes leader and steps down the previous pod cleanly. 2. Pod B starts up (deployment). 3. Step down request from pod B to pod A times out. Pod A did not manage to stop its reconciliations within 10 seconds and exited with return code 1 ([code](https://github.com/neondatabase/neon/blob/7ba8519b43c1e414e95e4cdf450712f227e49a1e/storage_controller/src/service.rs#L8686-L8702)). 4. Pod B marks itself as the leader and finishes start-up 5. k8s restarts pod A 6. k8s marks pod B as ready 7. pod A sends step down request to pod A - this succeeds => pod A is now the leader 8. k8s kills pod A because it thinks pod B is healthy and pod A is part of the old replica set We end up in a situation where the only pod we have (B) is stepped down and attempts to forward requests to a leader that doesn't exist. k8s can't detect that pod B is in a bad state since the /status endpoint simply returns 200 hundred if the pod is running. ## Summary of changes This PR includes a number of robustness improvements to the leadership protocol: * use a single step down task per controller * add a new endpoint to be used as k8s liveness probe and check leadership status there * handle restarts explicitly (i.e. don't step yourself down) * increase the step down retry count * don't kill the process on long step down since k8s will just restart it --- storage_controller/src/http.rs | 43 +++++++-- storage_controller/src/leadership.rs | 13 +++ storage_controller/src/peer_client.rs | 12 ++- storage_controller/src/service.rs | 68 +++++++++---- .../regress/test_storage_controller.py | 95 ++++++++----------- 5 files changed, 142 insertions(+), 89 deletions(-) diff --git a/storage_controller/src/http.rs b/storage_controller/src/http.rs index 0a36ce8b6f..649113b8ce 100644 --- a/storage_controller/src/http.rs +++ b/storage_controller/src/http.rs @@ -72,6 +72,7 @@ impl HttpState { neon_metrics: NeonMetrics::new(build_info), allowlist_routes: &[ "/status", + "/live", "/ready", "/metrics", "/profile/cpu", @@ -1260,16 +1261,8 @@ async fn handle_step_down(req: Request) -> Result, ApiError ForwardOutcome::NotForwarded(req) => req, }; - // Spawn a background task: once we start stepping down, we must finish: if the client drops - // their request we should avoid stopping in some part-stepped-down state. - let handle = tokio::spawn(async move { - let state = get_state(&req); - state.service.step_down().await - }); - - let result = handle - .await - .map_err(|e| ApiError::InternalServerError(e.into()))?; + let state = get_state(&req); + let result = state.service.step_down().await; json_response(StatusCode::OK, result) } @@ -1401,6 +1394,8 @@ async fn handle_reconcile_all(req: Request) -> Result, ApiE } /// Status endpoint is just used for checking that our HTTP listener is up +/// +/// This serves as our k8s startup probe. async fn handle_status(req: Request) -> Result, ApiError> { match maybe_forward(req).await { ForwardOutcome::Forwarded(res) => { @@ -1412,6 +1407,30 @@ async fn handle_status(req: Request) -> Result, ApiError> { json_response(StatusCode::OK, ()) } +/// Liveness endpoint indicates that this storage controller is in a state +/// where it can fulfill it's responsibilties. Namely, startup has finished +/// and it is the current leader. +/// +/// This serves as our k8s liveness probe. +async fn handle_live(req: Request) -> Result, ApiError> { + let req = match maybe_forward(req).await { + ForwardOutcome::Forwarded(res) => { + return res; + } + ForwardOutcome::NotForwarded(req) => req, + }; + + let state = get_state(&req); + let live = state.service.startup_complete.is_ready() + && state.service.get_leadership_status() == LeadershipStatus::Leader; + + if live { + json_response(StatusCode::OK, ()) + } else { + json_response(StatusCode::SERVICE_UNAVAILABLE, ()) + } +} + /// Readiness endpoint indicates when we're done doing startup I/O (e.g. reconciling /// with remote pageserver nodes). This is intended for use as a kubernetes readiness probe. async fn handle_ready(req: Request) -> Result, ApiError> { @@ -1745,6 +1764,7 @@ async fn maybe_forward(req: Request) -> ForwardOutcome { const NOT_FOR_FORWARD: &[&str] = &[ "/control/v1/step_down", "/status", + "/live", "/ready", "/metrics", "/profile/cpu", @@ -1969,6 +1989,9 @@ pub fn make_router( .get("/status", |r| { named_request_span(r, handle_status, RequestName("status")) }) + .get("/live", |r| { + named_request_span(r, handle_live, RequestName("live")) + }) .get("/ready", |r| { named_request_span(r, handle_ready, RequestName("ready")) }) diff --git a/storage_controller/src/leadership.rs b/storage_controller/src/leadership.rs index 39c28d60a9..048f752db5 100644 --- a/storage_controller/src/leadership.rs +++ b/storage_controller/src/leadership.rs @@ -43,6 +43,19 @@ impl Leadership { &self, ) -> Result<(Option, Option)> { let leader = self.current_leader().await?; + + if leader.as_ref().map(|l| &l.address) + == self + .config + .address_for_peers + .as_ref() + .map(Uri::to_string) + .as_ref() + { + // We already are the current leader. This is a restart. + return Ok((leader, None)); + } + let leader_step_down_state = if let Some(ref leader) = leader { if self.config.start_as_candidate { self.request_step_down(leader).await diff --git a/storage_controller/src/peer_client.rs b/storage_controller/src/peer_client.rs index 604d1024ba..bae2fed096 100644 --- a/storage_controller/src/peer_client.rs +++ b/storage_controller/src/peer_client.rs @@ -55,9 +55,12 @@ impl ResponseErrorMessageExt for reqwest::Response { } } -#[derive(Serialize, Deserialize, Debug, Default)] +#[derive(Serialize, Deserialize, Debug, Default, Clone)] pub(crate) struct GlobalObservedState(pub(crate) HashMap); +const STEP_DOWN_RETRIES: u32 = 8; +const STEP_DOWN_TIMEOUT: Duration = Duration::from_secs(1); + impl PeerClient { pub(crate) fn new(http_client: reqwest::Client, uri: Uri, jwt: Option) -> Self { Self { @@ -76,7 +79,7 @@ impl PeerClient { req }; - let req = req.timeout(Duration::from_secs(2)); + let req = req.timeout(STEP_DOWN_TIMEOUT); let res = req .send() @@ -94,8 +97,7 @@ impl PeerClient { } /// Request the peer to step down and return its current observed state - /// All errors are retried with exponential backoff for a maximum of 4 attempts. - /// Assuming all retries are performed, the function times out after roughly 4 seconds. + /// All errors are re-tried pub(crate) async fn step_down( &self, cancel: &CancellationToken, @@ -104,7 +106,7 @@ impl PeerClient { || self.request_step_down(), |_e| false, 2, - 4, + STEP_DOWN_RETRIES, "Send step down request", cancel, ) diff --git a/storage_controller/src/service.rs b/storage_controller/src/service.rs index c5cf4bedcf..acd399dca2 100644 --- a/storage_controller/src/service.rs +++ b/storage_controller/src/service.rs @@ -11,7 +11,7 @@ use std::num::NonZeroU32; use std::ops::{Deref, DerefMut}; use std::path::PathBuf; use std::str::FromStr; -use std::sync::Arc; +use std::sync::{Arc, OnceLock}; use std::time::{Duration, Instant, SystemTime}; use anyhow::Context; @@ -524,6 +524,9 @@ pub struct Service { /// HTTP client with proper CA certs. http_client: reqwest::Client, + + /// Handle for the step down background task if one was ever requested + step_down_barrier: OnceLock>>, } impl From for ApiError { @@ -1745,6 +1748,7 @@ impl Service { tenant_op_locks: Default::default(), node_op_locks: Default::default(), http_client, + step_down_barrier: Default::default(), }); let result_task_this = this.clone(); @@ -8886,27 +8890,59 @@ impl Service { self.inner.read().unwrap().get_leadership_status() } - pub(crate) async fn step_down(&self) -> GlobalObservedState { + /// Handler for step down requests + /// + /// Step down runs in separate task since once it's called it should + /// be driven to completion. Subsequent requests will wait on the same + /// step down task. + pub(crate) async fn step_down(self: &Arc) -> GlobalObservedState { + let handle = self.step_down_barrier.get_or_init(|| { + let step_down_self = self.clone(); + let (tx, rx) = tokio::sync::watch::channel::>(None); + tokio::spawn(async move { + let state = step_down_self.step_down_task().await; + tx.send(Some(state)) + .expect("Task Arc keeps receiver alive"); + }); + + rx + }); + + handle + .clone() + .wait_for(|observed_state| observed_state.is_some()) + .await + .expect("Task Arc keeps sender alive") + .deref() + .clone() + .expect("Checked above") + } + + async fn step_down_task(&self) -> GlobalObservedState { tracing::info!("Received step down request from peer"); failpoint_support::sleep_millis_async!("sleep-on-step-down-handling"); self.inner.write().unwrap().step_down(); - // Wait for reconciliations to stop, or terminate this process if they - // fail to stop in time (this indicates a bug in shutdown) - tokio::select! { - _ = self.stop_reconciliations(StopReconciliationsReason::SteppingDown) => { - tracing::info!("Reconciliations stopped, proceeding with step down"); - } - _ = async { - failpoint_support::sleep_millis_async!("step-down-delay-timeout"); - tokio::time::sleep(Duration::from_secs(10)).await - } => { - tracing::warn!("Step down timed out while waiting for reconciliation gate, terminating process"); + let stop_reconciliations = + self.stop_reconciliations(StopReconciliationsReason::SteppingDown); + let mut stop_reconciliations = std::pin::pin!(stop_reconciliations); - // The caller may proceed to act as leader when it sees this request fail: reduce the chance - // of a split-brain situation by terminating this controller instead of leaving it up in a partially-shut-down state. - std::process::exit(1); + let started_at = Instant::now(); + + // Wait for reconciliations to stop and warn if that's taking a long time + loop { + tokio::select! { + _ = &mut stop_reconciliations => { + tracing::info!("Reconciliations stopped, proceeding with step down"); + break; + } + _ = tokio::time::sleep(Duration::from_secs(10)) => { + tracing::warn!( + elapsed_sec=%started_at.elapsed().as_secs(), + "Stopping reconciliations during step down is taking too long" + ); + } } } diff --git a/test_runner/regress/test_storage_controller.py b/test_runner/regress/test_storage_controller.py index a0fa8b55f3..af018f7b5d 100644 --- a/test_runner/regress/test_storage_controller.py +++ b/test_runner/regress/test_storage_controller.py @@ -2894,12 +2894,10 @@ def test_storage_controller_leadership_transfer( ) -@pytest.mark.parametrize("step_down_times_out", [False, True]) def test_storage_controller_leadership_transfer_during_split( neon_env_builder: NeonEnvBuilder, storage_controller_proxy: StorageControllerProxy, port_distributor: PortDistributor, - step_down_times_out: bool, ): """ Exercise a race between shard splitting and graceful leadership transfer. This is @@ -2940,8 +2938,8 @@ def test_storage_controller_leadership_transfer_during_split( ) env.storage_controller.reconcile_until_idle() - # We are testing scenarios where the step down API does not complete: either because it is stuck - # doing a shard split, or because it totally times out on some other failpoint. + # We are testing scenarios where the step down API does not complete: it is stuck + # doing a shard split env.storage_controller.allowed_errors.extend( [ ".*step_down.*request was dropped before completing.*", @@ -2949,6 +2947,7 @@ def test_storage_controller_leadership_transfer_during_split( ".*Send step down request failed, will retry.*", ".*Send step down request still failed after.*retries.*", ".*Leader .+ did not respond to step-down request.*", + ".*Stopping reconciliations during step down is taking too long.*", ] ) @@ -2960,13 +2959,6 @@ def test_storage_controller_leadership_transfer_during_split( pause_failpoint = "shard-split-pre-complete" env.storage_controller.configure_failpoints((pause_failpoint, "pause")) - if not step_down_times_out: - # Prevent the timeout self-terminate code from executing: we will block step down on the - # shard split itself - env.storage_controller.configure_failpoints( - ("step-down-delay-timeout", "return(3600000)") - ) - split_fut = executor.submit( env.storage_controller.tenant_shard_split, list(tenants)[0], shard_count * 2 ) @@ -2985,13 +2977,9 @@ def test_storage_controller_leadership_transfer_during_split( timeout_in_seconds=30, instance_id=2, base_port=storage_controller_2_port ) - if step_down_times_out: - # Step down will time out, original controller will terminate itself - env.storage_controller.allowed_errors.extend([".*terminating process.*"]) - else: - # Step down does not time out: original controller hits its shard split completion - # code path and realises that it must not purge the parent shards from the database. - env.storage_controller.allowed_errors.extend([".*Enqueuing background abort.*"]) + # Step down does not time out: original controller hits its shard split completion + # code path and realises that it must not purge the parent shards from the database. + env.storage_controller.allowed_errors.extend([".*Enqueuing background abort.*"]) def passed_split_abort(): try: @@ -3007,42 +2995,34 @@ def test_storage_controller_leadership_transfer_during_split( wait_until(passed_split_abort, interval=0.1, status_interval=1.0) assert env.storage_controller.log_contains(".*Aborting shard split.*") - if step_down_times_out: - # We will let the old controller hit a timeout path where it terminates itself, rather than - # completing step_down and trying to complete a shard split - def old_controller_terminated(): - assert env.storage_controller.log_contains(".*terminating process.*") + # Proxy is still talking to original controller here: disable its pause failpoint so + # that its shard split can run to completion. + log.info("Disabling failpoint") + # Bypass the proxy: the python test HTTPServer is single threaded and still blocked + # on handling the shard split request. + env.storage_controller.request( + "PUT", + f"http://127.0.0.1:{storage_controller_1_port}/debug/v1/failpoints", + json=[{"name": "shard-split-pre-complete", "actions": "off"}], + headers=env.storage_controller.headers(TokenScope.ADMIN), + ) - wait_until(old_controller_terminated) - else: - # Proxy is still talking to original controller here: disable its pause failpoint so - # that its shard split can run to completion. - log.info("Disabling failpoint") - # Bypass the proxy: the python test HTTPServer is single threaded and still blocked - # on handling the shard split request. - env.storage_controller.request( - "PUT", - f"http://127.0.0.1:{storage_controller_1_port}/debug/v1/failpoints", - json=[{"name": "shard-split-pre-complete", "actions": "off"}], - headers=env.storage_controller.headers(TokenScope.ADMIN), + def previous_stepped_down(): + assert ( + env.storage_controller.get_leadership_status() + == StorageControllerLeadershipStatus.STEPPED_DOWN ) - def previous_stepped_down(): - assert ( - env.storage_controller.get_leadership_status() - == StorageControllerLeadershipStatus.STEPPED_DOWN - ) + log.info("Awaiting step down") + wait_until(previous_stepped_down) - log.info("Awaiting step down") - wait_until(previous_stepped_down) - - # Let the shard split complete: this may happen _after_ the replacement has come up - # and tried to clean up the databases - log.info("Unblocking & awaiting shard split") - with pytest.raises(Exception, match="Unexpected child shard count"): - # This split fails when it tries to persist results, because it encounters - # changes already made by the new controller's abort-on-startup - split_fut.result() + # Let the shard split complete: this may happen _after_ the replacement has come up + # and tried to clean up the databases + log.info("Unblocking & awaiting shard split") + with pytest.raises(Exception, match="Unexpected child shard count"): + # This split fails when it tries to persist results, because it encounters + # changes already made by the new controller's abort-on-startup + split_fut.result() log.info("Routing to new leader") storage_controller_proxy.route_to(f"http://127.0.0.1:{storage_controller_2_port}") @@ -3060,14 +3040,13 @@ def test_storage_controller_leadership_transfer_during_split( env.storage_controller.wait_until_ready() env.storage_controller.consistency_check() - if not step_down_times_out: - # Check that the stepped down instance forwards requests - # to the new leader while it's still running. - storage_controller_proxy.route_to(f"http://127.0.0.1:{storage_controller_1_port}") - env.storage_controller.tenant_shard_dump() - env.storage_controller.node_configure(env.pageservers[0].id, {"scheduling": "Pause"}) - status = env.storage_controller.node_status(env.pageservers[0].id) - assert status["scheduling"] == "Pause" + # Check that the stepped down instance forwards requests + # to the new leader while it's still running. + storage_controller_proxy.route_to(f"http://127.0.0.1:{storage_controller_1_port}") + env.storage_controller.tenant_shard_dump() + env.storage_controller.node_configure(env.pageservers[0].id, {"scheduling": "Pause"}) + status = env.storage_controller.node_status(env.pageservers[0].id) + assert status["scheduling"] == "Pause" def test_storage_controller_ps_restarted_during_drain(neon_env_builder: NeonEnvBuilder):