diff --git a/storage_controller/src/http.rs b/storage_controller/src/http.rs
index 0a36ce8b6f..649113b8ce 100644
--- a/storage_controller/src/http.rs
+++ b/storage_controller/src/http.rs
@@ -72,6 +72,7 @@ impl HttpState {
neon_metrics: NeonMetrics::new(build_info),
allowlist_routes: &[
"/status",
+ "/live",
"/ready",
"/metrics",
"/profile/cpu",
@@ -1260,16 +1261,8 @@ async fn handle_step_down(req: Request
) -> Result, ApiError
ForwardOutcome::NotForwarded(req) => req,
};
- // Spawn a background task: once we start stepping down, we must finish: if the client drops
- // their request we should avoid stopping in some part-stepped-down state.
- let handle = tokio::spawn(async move {
- let state = get_state(&req);
- state.service.step_down().await
- });
-
- let result = handle
- .await
- .map_err(|e| ApiError::InternalServerError(e.into()))?;
+ let state = get_state(&req);
+ let result = state.service.step_down().await;
json_response(StatusCode::OK, result)
}
@@ -1401,6 +1394,8 @@ async fn handle_reconcile_all(req: Request) -> Result, ApiE
}
/// Status endpoint is just used for checking that our HTTP listener is up
+///
+/// This serves as our k8s startup probe.
async fn handle_status(req: Request) -> Result, ApiError> {
match maybe_forward(req).await {
ForwardOutcome::Forwarded(res) => {
@@ -1412,6 +1407,30 @@ async fn handle_status(req: Request) -> Result, ApiError> {
json_response(StatusCode::OK, ())
}
+/// Liveness endpoint indicates that this storage controller is in a state
+/// where it can fulfill it's responsibilties. Namely, startup has finished
+/// and it is the current leader.
+///
+/// This serves as our k8s liveness probe.
+async fn handle_live(req: Request) -> Result, ApiError> {
+ let req = match maybe_forward(req).await {
+ ForwardOutcome::Forwarded(res) => {
+ return res;
+ }
+ ForwardOutcome::NotForwarded(req) => req,
+ };
+
+ let state = get_state(&req);
+ let live = state.service.startup_complete.is_ready()
+ && state.service.get_leadership_status() == LeadershipStatus::Leader;
+
+ if live {
+ json_response(StatusCode::OK, ())
+ } else {
+ json_response(StatusCode::SERVICE_UNAVAILABLE, ())
+ }
+}
+
/// Readiness endpoint indicates when we're done doing startup I/O (e.g. reconciling
/// with remote pageserver nodes). This is intended for use as a kubernetes readiness probe.
async fn handle_ready(req: Request) -> Result, ApiError> {
@@ -1745,6 +1764,7 @@ async fn maybe_forward(req: Request) -> ForwardOutcome {
const NOT_FOR_FORWARD: &[&str] = &[
"/control/v1/step_down",
"/status",
+ "/live",
"/ready",
"/metrics",
"/profile/cpu",
@@ -1969,6 +1989,9 @@ pub fn make_router(
.get("/status", |r| {
named_request_span(r, handle_status, RequestName("status"))
})
+ .get("/live", |r| {
+ named_request_span(r, handle_live, RequestName("live"))
+ })
.get("/ready", |r| {
named_request_span(r, handle_ready, RequestName("ready"))
})
diff --git a/storage_controller/src/leadership.rs b/storage_controller/src/leadership.rs
index 39c28d60a9..048f752db5 100644
--- a/storage_controller/src/leadership.rs
+++ b/storage_controller/src/leadership.rs
@@ -43,6 +43,19 @@ impl Leadership {
&self,
) -> Result<(Option, Option)> {
let leader = self.current_leader().await?;
+
+ if leader.as_ref().map(|l| &l.address)
+ == self
+ .config
+ .address_for_peers
+ .as_ref()
+ .map(Uri::to_string)
+ .as_ref()
+ {
+ // We already are the current leader. This is a restart.
+ return Ok((leader, None));
+ }
+
let leader_step_down_state = if let Some(ref leader) = leader {
if self.config.start_as_candidate {
self.request_step_down(leader).await
diff --git a/storage_controller/src/peer_client.rs b/storage_controller/src/peer_client.rs
index 604d1024ba..bae2fed096 100644
--- a/storage_controller/src/peer_client.rs
+++ b/storage_controller/src/peer_client.rs
@@ -55,9 +55,12 @@ impl ResponseErrorMessageExt for reqwest::Response {
}
}
-#[derive(Serialize, Deserialize, Debug, Default)]
+#[derive(Serialize, Deserialize, Debug, Default, Clone)]
pub(crate) struct GlobalObservedState(pub(crate) HashMap);
+const STEP_DOWN_RETRIES: u32 = 8;
+const STEP_DOWN_TIMEOUT: Duration = Duration::from_secs(1);
+
impl PeerClient {
pub(crate) fn new(http_client: reqwest::Client, uri: Uri, jwt: Option) -> Self {
Self {
@@ -76,7 +79,7 @@ impl PeerClient {
req
};
- let req = req.timeout(Duration::from_secs(2));
+ let req = req.timeout(STEP_DOWN_TIMEOUT);
let res = req
.send()
@@ -94,8 +97,7 @@ impl PeerClient {
}
/// Request the peer to step down and return its current observed state
- /// All errors are retried with exponential backoff for a maximum of 4 attempts.
- /// Assuming all retries are performed, the function times out after roughly 4 seconds.
+ /// All errors are re-tried
pub(crate) async fn step_down(
&self,
cancel: &CancellationToken,
@@ -104,7 +106,7 @@ impl PeerClient {
|| self.request_step_down(),
|_e| false,
2,
- 4,
+ STEP_DOWN_RETRIES,
"Send step down request",
cancel,
)
diff --git a/storage_controller/src/service.rs b/storage_controller/src/service.rs
index c5cf4bedcf..acd399dca2 100644
--- a/storage_controller/src/service.rs
+++ b/storage_controller/src/service.rs
@@ -11,7 +11,7 @@ use std::num::NonZeroU32;
use std::ops::{Deref, DerefMut};
use std::path::PathBuf;
use std::str::FromStr;
-use std::sync::Arc;
+use std::sync::{Arc, OnceLock};
use std::time::{Duration, Instant, SystemTime};
use anyhow::Context;
@@ -524,6 +524,9 @@ pub struct Service {
/// HTTP client with proper CA certs.
http_client: reqwest::Client,
+
+ /// Handle for the step down background task if one was ever requested
+ step_down_barrier: OnceLock>>,
}
impl From for ApiError {
@@ -1745,6 +1748,7 @@ impl Service {
tenant_op_locks: Default::default(),
node_op_locks: Default::default(),
http_client,
+ step_down_barrier: Default::default(),
});
let result_task_this = this.clone();
@@ -8886,27 +8890,59 @@ impl Service {
self.inner.read().unwrap().get_leadership_status()
}
- pub(crate) async fn step_down(&self) -> GlobalObservedState {
+ /// Handler for step down requests
+ ///
+ /// Step down runs in separate task since once it's called it should
+ /// be driven to completion. Subsequent requests will wait on the same
+ /// step down task.
+ pub(crate) async fn step_down(self: &Arc) -> GlobalObservedState {
+ let handle = self.step_down_barrier.get_or_init(|| {
+ let step_down_self = self.clone();
+ let (tx, rx) = tokio::sync::watch::channel::