storage_controller: make leadership protocol more robust (#11703)

## Problem

We saw the following scenario in staging:
1. Pod A starts up. Becomes leader and steps down the previous pod
cleanly.
2. Pod B starts up (deployment).
3. Step down request from pod B to pod A times out. Pod A did not manage
to stop its reconciliations within 10 seconds and exited with return
code 1
([code](7ba8519b43/storage_controller/src/service.rs (L8686-L8702))).
4. Pod B marks itself as the leader and finishes start-up
5. k8s restarts pod A
6. k8s marks pod B as ready
7. pod A sends step down request to pod A - this succeeds => pod A is
now the leader
8. k8s kills pod A because it thinks pod B is healthy and pod A is part
of the old replica set

We end up in a situation where the only pod we have (B) is stepped down
and attempts to forward requests to a leader that doesn't exist. k8s
can't detect that pod B is in a bad state since the /status endpoint
simply returns 200 hundred if the pod is running.

## Summary of changes

This PR includes a number of robustness improvements to the leadership
protocol:
* use a single step down task per controller
* add a new endpoint to be used as k8s liveness probe and check
leadership status there
* handle restarts explicitly (i.e. don't step yourself down)
* increase the step down retry count
* don't kill the process on long step down since k8s will just restart
it
This commit is contained in:
Vlad Lazar
2025-04-24 17:59:56 +01:00
committed by GitHub
parent 8afb783708
commit 6f7e3c18e4
5 changed files with 142 additions and 89 deletions

View File

@@ -72,6 +72,7 @@ impl HttpState {
neon_metrics: NeonMetrics::new(build_info), neon_metrics: NeonMetrics::new(build_info),
allowlist_routes: &[ allowlist_routes: &[
"/status", "/status",
"/live",
"/ready", "/ready",
"/metrics", "/metrics",
"/profile/cpu", "/profile/cpu",
@@ -1260,16 +1261,8 @@ async fn handle_step_down(req: Request<Body>) -> Result<Response<Body>, ApiError
ForwardOutcome::NotForwarded(req) => req, ForwardOutcome::NotForwarded(req) => req,
}; };
// Spawn a background task: once we start stepping down, we must finish: if the client drops let state = get_state(&req);
// their request we should avoid stopping in some part-stepped-down state. let result = state.service.step_down().await;
let handle = tokio::spawn(async move {
let state = get_state(&req);
state.service.step_down().await
});
let result = handle
.await
.map_err(|e| ApiError::InternalServerError(e.into()))?;
json_response(StatusCode::OK, result) json_response(StatusCode::OK, result)
} }
@@ -1401,6 +1394,8 @@ async fn handle_reconcile_all(req: Request<Body>) -> Result<Response<Body>, ApiE
} }
/// Status endpoint is just used for checking that our HTTP listener is up /// Status endpoint is just used for checking that our HTTP listener is up
///
/// This serves as our k8s startup probe.
async fn handle_status(req: Request<Body>) -> Result<Response<Body>, ApiError> { async fn handle_status(req: Request<Body>) -> Result<Response<Body>, ApiError> {
match maybe_forward(req).await { match maybe_forward(req).await {
ForwardOutcome::Forwarded(res) => { ForwardOutcome::Forwarded(res) => {
@@ -1412,6 +1407,30 @@ async fn handle_status(req: Request<Body>) -> Result<Response<Body>, ApiError> {
json_response(StatusCode::OK, ()) json_response(StatusCode::OK, ())
} }
/// Liveness endpoint indicates that this storage controller is in a state
/// where it can fulfill it's responsibilties. Namely, startup has finished
/// and it is the current leader.
///
/// This serves as our k8s liveness probe.
async fn handle_live(req: Request<Body>) -> Result<Response<Body>, ApiError> {
let req = match maybe_forward(req).await {
ForwardOutcome::Forwarded(res) => {
return res;
}
ForwardOutcome::NotForwarded(req) => req,
};
let state = get_state(&req);
let live = state.service.startup_complete.is_ready()
&& state.service.get_leadership_status() == LeadershipStatus::Leader;
if live {
json_response(StatusCode::OK, ())
} else {
json_response(StatusCode::SERVICE_UNAVAILABLE, ())
}
}
/// Readiness endpoint indicates when we're done doing startup I/O (e.g. reconciling /// Readiness endpoint indicates when we're done doing startup I/O (e.g. reconciling
/// with remote pageserver nodes). This is intended for use as a kubernetes readiness probe. /// with remote pageserver nodes). This is intended for use as a kubernetes readiness probe.
async fn handle_ready(req: Request<Body>) -> Result<Response<Body>, ApiError> { async fn handle_ready(req: Request<Body>) -> Result<Response<Body>, ApiError> {
@@ -1745,6 +1764,7 @@ async fn maybe_forward(req: Request<Body>) -> ForwardOutcome {
const NOT_FOR_FORWARD: &[&str] = &[ const NOT_FOR_FORWARD: &[&str] = &[
"/control/v1/step_down", "/control/v1/step_down",
"/status", "/status",
"/live",
"/ready", "/ready",
"/metrics", "/metrics",
"/profile/cpu", "/profile/cpu",
@@ -1969,6 +1989,9 @@ pub fn make_router(
.get("/status", |r| { .get("/status", |r| {
named_request_span(r, handle_status, RequestName("status")) named_request_span(r, handle_status, RequestName("status"))
}) })
.get("/live", |r| {
named_request_span(r, handle_live, RequestName("live"))
})
.get("/ready", |r| { .get("/ready", |r| {
named_request_span(r, handle_ready, RequestName("ready")) named_request_span(r, handle_ready, RequestName("ready"))
}) })

View File

@@ -43,6 +43,19 @@ impl Leadership {
&self, &self,
) -> Result<(Option<ControllerPersistence>, Option<GlobalObservedState>)> { ) -> Result<(Option<ControllerPersistence>, Option<GlobalObservedState>)> {
let leader = self.current_leader().await?; let leader = self.current_leader().await?;
if leader.as_ref().map(|l| &l.address)
== self
.config
.address_for_peers
.as_ref()
.map(Uri::to_string)
.as_ref()
{
// We already are the current leader. This is a restart.
return Ok((leader, None));
}
let leader_step_down_state = if let Some(ref leader) = leader { let leader_step_down_state = if let Some(ref leader) = leader {
if self.config.start_as_candidate { if self.config.start_as_candidate {
self.request_step_down(leader).await self.request_step_down(leader).await

View File

@@ -55,9 +55,12 @@ impl ResponseErrorMessageExt for reqwest::Response {
} }
} }
#[derive(Serialize, Deserialize, Debug, Default)] #[derive(Serialize, Deserialize, Debug, Default, Clone)]
pub(crate) struct GlobalObservedState(pub(crate) HashMap<TenantShardId, ObservedState>); pub(crate) struct GlobalObservedState(pub(crate) HashMap<TenantShardId, ObservedState>);
const STEP_DOWN_RETRIES: u32 = 8;
const STEP_DOWN_TIMEOUT: Duration = Duration::from_secs(1);
impl PeerClient { impl PeerClient {
pub(crate) fn new(http_client: reqwest::Client, uri: Uri, jwt: Option<String>) -> Self { pub(crate) fn new(http_client: reqwest::Client, uri: Uri, jwt: Option<String>) -> Self {
Self { Self {
@@ -76,7 +79,7 @@ impl PeerClient {
req req
}; };
let req = req.timeout(Duration::from_secs(2)); let req = req.timeout(STEP_DOWN_TIMEOUT);
let res = req let res = req
.send() .send()
@@ -94,8 +97,7 @@ impl PeerClient {
} }
/// Request the peer to step down and return its current observed state /// Request the peer to step down and return its current observed state
/// All errors are retried with exponential backoff for a maximum of 4 attempts. /// All errors are re-tried
/// Assuming all retries are performed, the function times out after roughly 4 seconds.
pub(crate) async fn step_down( pub(crate) async fn step_down(
&self, &self,
cancel: &CancellationToken, cancel: &CancellationToken,
@@ -104,7 +106,7 @@ impl PeerClient {
|| self.request_step_down(), || self.request_step_down(),
|_e| false, |_e| false,
2, 2,
4, STEP_DOWN_RETRIES,
"Send step down request", "Send step down request",
cancel, cancel,
) )

View File

@@ -11,7 +11,7 @@ use std::num::NonZeroU32;
use std::ops::{Deref, DerefMut}; use std::ops::{Deref, DerefMut};
use std::path::PathBuf; use std::path::PathBuf;
use std::str::FromStr; use std::str::FromStr;
use std::sync::Arc; use std::sync::{Arc, OnceLock};
use std::time::{Duration, Instant, SystemTime}; use std::time::{Duration, Instant, SystemTime};
use anyhow::Context; use anyhow::Context;
@@ -524,6 +524,9 @@ pub struct Service {
/// HTTP client with proper CA certs. /// HTTP client with proper CA certs.
http_client: reqwest::Client, http_client: reqwest::Client,
/// Handle for the step down background task if one was ever requested
step_down_barrier: OnceLock<tokio::sync::watch::Receiver<Option<GlobalObservedState>>>,
} }
impl From<ReconcileWaitError> for ApiError { impl From<ReconcileWaitError> for ApiError {
@@ -1745,6 +1748,7 @@ impl Service {
tenant_op_locks: Default::default(), tenant_op_locks: Default::default(),
node_op_locks: Default::default(), node_op_locks: Default::default(),
http_client, http_client,
step_down_barrier: Default::default(),
}); });
let result_task_this = this.clone(); let result_task_this = this.clone();
@@ -8886,27 +8890,59 @@ impl Service {
self.inner.read().unwrap().get_leadership_status() self.inner.read().unwrap().get_leadership_status()
} }
pub(crate) async fn step_down(&self) -> GlobalObservedState { /// Handler for step down requests
///
/// Step down runs in separate task since once it's called it should
/// be driven to completion. Subsequent requests will wait on the same
/// step down task.
pub(crate) async fn step_down(self: &Arc<Self>) -> GlobalObservedState {
let handle = self.step_down_barrier.get_or_init(|| {
let step_down_self = self.clone();
let (tx, rx) = tokio::sync::watch::channel::<Option<GlobalObservedState>>(None);
tokio::spawn(async move {
let state = step_down_self.step_down_task().await;
tx.send(Some(state))
.expect("Task Arc<Service> keeps receiver alive");
});
rx
});
handle
.clone()
.wait_for(|observed_state| observed_state.is_some())
.await
.expect("Task Arc<Service> keeps sender alive")
.deref()
.clone()
.expect("Checked above")
}
async fn step_down_task(&self) -> GlobalObservedState {
tracing::info!("Received step down request from peer"); tracing::info!("Received step down request from peer");
failpoint_support::sleep_millis_async!("sleep-on-step-down-handling"); failpoint_support::sleep_millis_async!("sleep-on-step-down-handling");
self.inner.write().unwrap().step_down(); self.inner.write().unwrap().step_down();
// Wait for reconciliations to stop, or terminate this process if they let stop_reconciliations =
// fail to stop in time (this indicates a bug in shutdown) self.stop_reconciliations(StopReconciliationsReason::SteppingDown);
tokio::select! { let mut stop_reconciliations = std::pin::pin!(stop_reconciliations);
_ = self.stop_reconciliations(StopReconciliationsReason::SteppingDown) => {
tracing::info!("Reconciliations stopped, proceeding with step down");
}
_ = async {
failpoint_support::sleep_millis_async!("step-down-delay-timeout");
tokio::time::sleep(Duration::from_secs(10)).await
} => {
tracing::warn!("Step down timed out while waiting for reconciliation gate, terminating process");
// The caller may proceed to act as leader when it sees this request fail: reduce the chance let started_at = Instant::now();
// of a split-brain situation by terminating this controller instead of leaving it up in a partially-shut-down state.
std::process::exit(1); // Wait for reconciliations to stop and warn if that's taking a long time
loop {
tokio::select! {
_ = &mut stop_reconciliations => {
tracing::info!("Reconciliations stopped, proceeding with step down");
break;
}
_ = tokio::time::sleep(Duration::from_secs(10)) => {
tracing::warn!(
elapsed_sec=%started_at.elapsed().as_secs(),
"Stopping reconciliations during step down is taking too long"
);
}
} }
} }

View File

@@ -2894,12 +2894,10 @@ def test_storage_controller_leadership_transfer(
) )
@pytest.mark.parametrize("step_down_times_out", [False, True])
def test_storage_controller_leadership_transfer_during_split( def test_storage_controller_leadership_transfer_during_split(
neon_env_builder: NeonEnvBuilder, neon_env_builder: NeonEnvBuilder,
storage_controller_proxy: StorageControllerProxy, storage_controller_proxy: StorageControllerProxy,
port_distributor: PortDistributor, port_distributor: PortDistributor,
step_down_times_out: bool,
): ):
""" """
Exercise a race between shard splitting and graceful leadership transfer. This is Exercise a race between shard splitting and graceful leadership transfer. This is
@@ -2940,8 +2938,8 @@ def test_storage_controller_leadership_transfer_during_split(
) )
env.storage_controller.reconcile_until_idle() env.storage_controller.reconcile_until_idle()
# We are testing scenarios where the step down API does not complete: either because it is stuck # We are testing scenarios where the step down API does not complete: it is stuck
# doing a shard split, or because it totally times out on some other failpoint. # doing a shard split
env.storage_controller.allowed_errors.extend( env.storage_controller.allowed_errors.extend(
[ [
".*step_down.*request was dropped before completing.*", ".*step_down.*request was dropped before completing.*",
@@ -2949,6 +2947,7 @@ def test_storage_controller_leadership_transfer_during_split(
".*Send step down request failed, will retry.*", ".*Send step down request failed, will retry.*",
".*Send step down request still failed after.*retries.*", ".*Send step down request still failed after.*retries.*",
".*Leader .+ did not respond to step-down request.*", ".*Leader .+ did not respond to step-down request.*",
".*Stopping reconciliations during step down is taking too long.*",
] ]
) )
@@ -2960,13 +2959,6 @@ def test_storage_controller_leadership_transfer_during_split(
pause_failpoint = "shard-split-pre-complete" pause_failpoint = "shard-split-pre-complete"
env.storage_controller.configure_failpoints((pause_failpoint, "pause")) env.storage_controller.configure_failpoints((pause_failpoint, "pause"))
if not step_down_times_out:
# Prevent the timeout self-terminate code from executing: we will block step down on the
# shard split itself
env.storage_controller.configure_failpoints(
("step-down-delay-timeout", "return(3600000)")
)
split_fut = executor.submit( split_fut = executor.submit(
env.storage_controller.tenant_shard_split, list(tenants)[0], shard_count * 2 env.storage_controller.tenant_shard_split, list(tenants)[0], shard_count * 2
) )
@@ -2985,13 +2977,9 @@ def test_storage_controller_leadership_transfer_during_split(
timeout_in_seconds=30, instance_id=2, base_port=storage_controller_2_port timeout_in_seconds=30, instance_id=2, base_port=storage_controller_2_port
) )
if step_down_times_out: # Step down does not time out: original controller hits its shard split completion
# Step down will time out, original controller will terminate itself # code path and realises that it must not purge the parent shards from the database.
env.storage_controller.allowed_errors.extend([".*terminating process.*"]) env.storage_controller.allowed_errors.extend([".*Enqueuing background abort.*"])
else:
# Step down does not time out: original controller hits its shard split completion
# code path and realises that it must not purge the parent shards from the database.
env.storage_controller.allowed_errors.extend([".*Enqueuing background abort.*"])
def passed_split_abort(): def passed_split_abort():
try: try:
@@ -3007,42 +2995,34 @@ def test_storage_controller_leadership_transfer_during_split(
wait_until(passed_split_abort, interval=0.1, status_interval=1.0) wait_until(passed_split_abort, interval=0.1, status_interval=1.0)
assert env.storage_controller.log_contains(".*Aborting shard split.*") assert env.storage_controller.log_contains(".*Aborting shard split.*")
if step_down_times_out: # Proxy is still talking to original controller here: disable its pause failpoint so
# We will let the old controller hit a timeout path where it terminates itself, rather than # that its shard split can run to completion.
# completing step_down and trying to complete a shard split log.info("Disabling failpoint")
def old_controller_terminated(): # Bypass the proxy: the python test HTTPServer is single threaded and still blocked
assert env.storage_controller.log_contains(".*terminating process.*") # on handling the shard split request.
env.storage_controller.request(
"PUT",
f"http://127.0.0.1:{storage_controller_1_port}/debug/v1/failpoints",
json=[{"name": "shard-split-pre-complete", "actions": "off"}],
headers=env.storage_controller.headers(TokenScope.ADMIN),
)
wait_until(old_controller_terminated) def previous_stepped_down():
else: assert (
# Proxy is still talking to original controller here: disable its pause failpoint so env.storage_controller.get_leadership_status()
# that its shard split can run to completion. == StorageControllerLeadershipStatus.STEPPED_DOWN
log.info("Disabling failpoint")
# Bypass the proxy: the python test HTTPServer is single threaded and still blocked
# on handling the shard split request.
env.storage_controller.request(
"PUT",
f"http://127.0.0.1:{storage_controller_1_port}/debug/v1/failpoints",
json=[{"name": "shard-split-pre-complete", "actions": "off"}],
headers=env.storage_controller.headers(TokenScope.ADMIN),
) )
def previous_stepped_down(): log.info("Awaiting step down")
assert ( wait_until(previous_stepped_down)
env.storage_controller.get_leadership_status()
== StorageControllerLeadershipStatus.STEPPED_DOWN
)
log.info("Awaiting step down") # Let the shard split complete: this may happen _after_ the replacement has come up
wait_until(previous_stepped_down) # and tried to clean up the databases
log.info("Unblocking & awaiting shard split")
# Let the shard split complete: this may happen _after_ the replacement has come up with pytest.raises(Exception, match="Unexpected child shard count"):
# and tried to clean up the databases # This split fails when it tries to persist results, because it encounters
log.info("Unblocking & awaiting shard split") # changes already made by the new controller's abort-on-startup
with pytest.raises(Exception, match="Unexpected child shard count"): split_fut.result()
# This split fails when it tries to persist results, because it encounters
# changes already made by the new controller's abort-on-startup
split_fut.result()
log.info("Routing to new leader") log.info("Routing to new leader")
storage_controller_proxy.route_to(f"http://127.0.0.1:{storage_controller_2_port}") storage_controller_proxy.route_to(f"http://127.0.0.1:{storage_controller_2_port}")
@@ -3060,14 +3040,13 @@ def test_storage_controller_leadership_transfer_during_split(
env.storage_controller.wait_until_ready() env.storage_controller.wait_until_ready()
env.storage_controller.consistency_check() env.storage_controller.consistency_check()
if not step_down_times_out: # Check that the stepped down instance forwards requests
# Check that the stepped down instance forwards requests # to the new leader while it's still running.
# to the new leader while it's still running. storage_controller_proxy.route_to(f"http://127.0.0.1:{storage_controller_1_port}")
storage_controller_proxy.route_to(f"http://127.0.0.1:{storage_controller_1_port}") env.storage_controller.tenant_shard_dump()
env.storage_controller.tenant_shard_dump() env.storage_controller.node_configure(env.pageservers[0].id, {"scheduling": "Pause"})
env.storage_controller.node_configure(env.pageservers[0].id, {"scheduling": "Pause"}) status = env.storage_controller.node_status(env.pageservers[0].id)
status = env.storage_controller.node_status(env.pageservers[0].id) assert status["scheduling"] == "Pause"
assert status["scheduling"] == "Pause"
def test_storage_controller_ps_restarted_during_drain(neon_env_builder: NeonEnvBuilder): def test_storage_controller_ps_restarted_during_drain(neon_env_builder: NeonEnvBuilder):