diff --git a/storage_controller/src/http.rs b/storage_controller/src/http.rs
index 8fb4be93e0..c77918827f 100644
--- a/storage_controller/src/http.rs
+++ b/storage_controller/src/http.rs
@@ -3,7 +3,7 @@ use crate::metrics::{
METRICS_REGISTRY,
};
use crate::reconciler::ReconcileError;
-use crate::service::{Service, STARTUP_RECONCILE_TIMEOUT};
+use crate::service::{LeadershipStatus, Service, STARTUP_RECONCILE_TIMEOUT};
use anyhow::Context;
use futures::Future;
use hyper::header::CONTENT_TYPE;
@@ -607,6 +607,13 @@ async fn handle_tenant_update_policy(mut req: Request
) -> Result) -> Result, ApiError> {
+ check_permissions(&req, Scope::Admin)?;
+
+ let state = get_state(&req);
+ json_response(StatusCode::OK, state.service.step_down().await)
+}
+
async fn handle_tenant_drop(req: Request) -> Result, ApiError> {
let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
check_permissions(&req, Scope::PageServerApi)?;
@@ -734,6 +741,47 @@ struct RequestMeta {
at: Instant,
}
+pub fn prologue_leadership_status_check_middleware<
+ B: hyper::body::HttpBody + Send + Sync + 'static,
+>() -> Middleware {
+ Middleware::pre(move |req| async move {
+ let state = get_state(&req);
+ let leadership_status = state.service.get_leadership_status();
+
+ enum AllowedRoutes<'a> {
+ All,
+ Some(Vec<&'a str>),
+ }
+
+ let allowed_routes = match leadership_status {
+ LeadershipStatus::Leader => AllowedRoutes::All,
+ LeadershipStatus::SteppedDown => {
+ // TODO: does it make sense to allow /status here?
+ AllowedRoutes::Some(["/control/v1/step_down", "/status", "/metrics"].to_vec())
+ }
+ LeadershipStatus::Candidate => {
+ AllowedRoutes::Some(["/ready", "/status", "/metrics"].to_vec())
+ }
+ };
+
+ let uri = req.uri().to_string();
+ match allowed_routes {
+ AllowedRoutes::All => Ok(req),
+ AllowedRoutes::Some(allowed) if allowed.contains(&uri.as_str()) => Ok(req),
+ _ => {
+ tracing::info!(
+ "Request {} not allowed due to current leadership state",
+ req.uri()
+ );
+
+ Err(ApiError::ResourceUnavailable(
+ format!("Current leadership status is {leadership_status}").into(),
+ ))
+ }
+ }
+ })
+}
+
fn prologue_metrics_middleware(
) -> Middleware {
Middleware::pre(move |req| async move {
@@ -820,6 +868,7 @@ pub fn make_router(
build_info: BuildInfo,
) -> RouterBuilder {
let mut router = endpoint::make_router()
+ .middleware(prologue_leadership_status_check_middleware())
.middleware(prologue_metrics_middleware())
.middleware(epilogue_metrics_middleware());
if auth.is_some() {
@@ -971,6 +1020,9 @@ pub fn make_router(
RequestName("control_v1_tenant_policy"),
)
})
+ .put("/control/v1/step_down", |r| {
+ named_request_span(r, handle_step_down, RequestName("control_v1_step_down"))
+ })
// Tenant operations
// The ^/v1/ endpoints act as a "Virtual Pageserver", enabling shard-naive clients to call into
// this service to manage tenants that actually consist of many tenant shards, as if they are a single entity.
diff --git a/storage_controller/src/metrics.rs b/storage_controller/src/metrics.rs
index ac9f22c739..a1a4b8543d 100644
--- a/storage_controller/src/metrics.rs
+++ b/storage_controller/src/metrics.rs
@@ -13,7 +13,10 @@ use metrics::NeonMetrics;
use once_cell::sync::Lazy;
use std::sync::Mutex;
-use crate::persistence::{DatabaseError, DatabaseOperation};
+use crate::{
+ persistence::{DatabaseError, DatabaseOperation},
+ service::LeadershipStatus,
+};
pub(crate) static METRICS_REGISTRY: Lazy =
Lazy::new(StorageControllerMetrics::default);
@@ -81,6 +84,8 @@ pub(crate) struct StorageControllerMetricGroup {
#[metric(metadata = histogram::Thresholds::exponential_buckets(0.1, 2.0))]
pub(crate) storage_controller_database_query_latency:
measured::HistogramVec,
+
+ pub(crate) storage_controller_leadership_status: measured::GaugeVec,
}
impl StorageControllerMetrics {
@@ -156,6 +161,12 @@ pub(crate) struct DatabaseQueryLatencyLabelGroup {
pub(crate) operation: DatabaseOperation,
}
+#[derive(measured::LabelGroup)]
+#[label(set = LeadershipStatusGroupSet)]
+pub(crate) struct LeadershipStatusGroup {
+ pub(crate) status: LeadershipStatus,
+}
+
#[derive(FixedCardinalityLabel, Clone, Copy)]
pub(crate) enum ReconcileOutcome {
#[label(rename = "ok")]
diff --git a/storage_controller/src/reconciler.rs b/storage_controller/src/reconciler.rs
index 886ceae90f..12dea2c7ef 100644
--- a/storage_controller/src/reconciler.rs
+++ b/storage_controller/src/reconciler.rs
@@ -12,6 +12,7 @@ use std::collections::HashMap;
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio_util::sync::CancellationToken;
+use utils::failpoint_support;
use utils::generation::Generation;
use utils::id::{NodeId, TimelineId};
use utils::lsn::Lsn;
@@ -749,6 +750,8 @@ impl Reconciler {
self.location_config(&node, conf, None, false).await?;
}
+ failpoint_support::sleep_millis_async!("sleep-on-reconcile-epilogue");
+
Ok(())
}
diff --git a/storage_controller/src/service.rs b/storage_controller/src/service.rs
index e890c5e45e..821f45d0c0 100644
--- a/storage_controller/src/service.rs
+++ b/storage_controller/src/service.rs
@@ -15,6 +15,7 @@ use crate::{
},
compute_hook::NotifyError,
id_lock_map::{trace_exclusive_lock, trace_shared_lock, IdLockMap, TracingExclusiveGuard},
+ metrics::LeadershipStatusGroup,
persistence::{AbortShardSplitStatus, TenantFilter},
reconciler::{ReconcileError, ReconcileUnits},
scheduler::{MaySchedule, ScheduleContext, ScheduleMode},
@@ -81,6 +82,7 @@ use crate::{
ReconcilerWaiter, TenantShard,
},
};
+use serde::{Deserialize, Serialize};
// For operations that should be quick, like attaching a new tenant
const SHORT_RECONCILE_TIMEOUT: Duration = Duration::from_secs(5);
@@ -131,6 +133,24 @@ enum NodeOperations {
Delete,
}
+/// The leadership status for the storage controller process.
+/// Allowed transitions are:
+/// 1. Leader -> SteppedDown
+/// 2. Candidate -> Leader
+#[derive(Copy, Clone, strum_macros::Display, measured::FixedCardinalityLabel)]
+#[strum(serialize_all = "snake_case")]
+pub(crate) enum LeadershipStatus {
+ /// This is the steady state where the storage controller can produce
+ /// side effects in the cluster.
+ Leader,
+ /// We've been notified to step down by another candidate. No reconciliations
+ /// take place in this state.
+ SteppedDown,
+ /// Initial state for a new storage controller instance. Will attempt to assume leadership.
+ #[allow(unused)]
+ Candidate,
+}
+
pub const RECONCILER_CONCURRENCY_DEFAULT: usize = 128;
// Depth of the channel used to enqueue shards for reconciliation when they can't do it immediately.
@@ -140,6 +160,8 @@ const MAX_DELAYED_RECONCILES: usize = 10000;
// Top level state available to all HTTP handlers
struct ServiceState {
+ leadership_status: LeadershipStatus,
+
tenants: BTreeMap,
nodes: Arc>,
@@ -202,7 +224,21 @@ impl ServiceState {
scheduler: Scheduler,
delayed_reconcile_rx: tokio::sync::mpsc::Receiver,
) -> Self {
+ let status = &crate::metrics::METRICS_REGISTRY
+ .metrics_group
+ .storage_controller_leadership_status;
+
+ status.set(
+ LeadershipStatusGroup {
+ status: LeadershipStatus::Leader,
+ },
+ 1,
+ );
+
Self {
+ // TODO: Starting up as Leader is a transient state. Once we enable rolling
+ // upgrades on the k8s side, we should start up as Candidate.
+ leadership_status: LeadershipStatus::Leader,
tenants,
nodes: Arc::new(nodes),
scheduler,
@@ -220,6 +256,37 @@ impl ServiceState {
) {
(&mut self.nodes, &mut self.tenants, &mut self.scheduler)
}
+
+ fn get_leadership_status(&self) -> LeadershipStatus {
+ self.leadership_status
+ }
+
+ fn step_down(&mut self) {
+ self.leadership_status = LeadershipStatus::SteppedDown;
+
+ let status = &crate::metrics::METRICS_REGISTRY
+ .metrics_group
+ .storage_controller_leadership_status;
+
+ status.set(
+ LeadershipStatusGroup {
+ status: LeadershipStatus::SteppedDown,
+ },
+ 1,
+ );
+ status.set(
+ LeadershipStatusGroup {
+ status: LeadershipStatus::Leader,
+ },
+ 0,
+ );
+ status.set(
+ LeadershipStatusGroup {
+ status: LeadershipStatus::Candidate,
+ },
+ 0,
+ );
+ }
}
#[derive(Clone)]
@@ -403,11 +470,30 @@ struct ShardUpdate {
generation: Option,
}
+enum StopReconciliationsReason {
+ ShuttingDown,
+ SteppingDown,
+}
+
+impl std::fmt::Display for StopReconciliationsReason {
+ fn fmt(&self, writer: &mut std::fmt::Formatter) -> std::fmt::Result {
+ let s = match self {
+ Self::ShuttingDown => "Shutting down",
+ Self::SteppingDown => "Stepping down",
+ };
+ write!(writer, "{}", s)
+ }
+}
+
pub(crate) enum ReconcileResultRequest {
ReconcileResult(ReconcileResult),
Stop,
}
+// TODO: move this into the storcon peer client when that gets added
+#[derive(Serialize, Deserialize, Debug, Default)]
+pub(crate) struct GlobalObservedState(HashMap);
+
impl Service {
pub fn get_config(&self) -> &Config {
&self.config
@@ -5603,17 +5689,22 @@ impl Service {
Ok(std::cmp::max(waiter_count, reconciles_spawned))
}
- pub async fn shutdown(&self) {
+ async fn stop_reconciliations(&self, reason: StopReconciliationsReason) {
// Cancel all on-going reconciles and wait for them to exit the gate.
- tracing::info!("Shutting down: cancelling and waiting for in-flight reconciles");
+ tracing::info!("{reason}: cancelling and waiting for in-flight reconciles");
self.reconcilers_cancel.cancel();
self.reconcilers_gate.close().await;
// Signal the background loop in [`Service::process_results`] to exit once
// it has proccessed the results from all the reconciles we cancelled earlier.
- tracing::info!("Shutting down: processing results from previously in-flight reconciles");
+ tracing::info!("{reason}: processing results from previously in-flight reconciles");
self.result_tx.send(ReconcileResultRequest::Stop).ok();
self.result_tx.closed().await;
+ }
+
+ pub async fn shutdown(&self) {
+ self.stop_reconciliations(StopReconciliationsReason::ShuttingDown)
+ .await;
// Background tasks hold gate guards: this notifies them of the cancellation and
// waits for them all to complete.
@@ -6003,4 +6094,27 @@ impl Service {
Ok(())
}
+
+ pub(crate) fn get_leadership_status(&self) -> LeadershipStatus {
+ self.inner.read().unwrap().get_leadership_status()
+ }
+
+ pub(crate) async fn step_down(&self) -> GlobalObservedState {
+ tracing::info!("Received step down request from peer");
+
+ self.inner.write().unwrap().step_down();
+ // TODO: would it make sense to have a time-out for this?
+ self.stop_reconciliations(StopReconciliationsReason::SteppingDown)
+ .await;
+
+ let mut global_observed = GlobalObservedState::default();
+ let locked = self.inner.read().unwrap();
+ for (tid, tenant_shard) in locked.tenants.iter() {
+ global_observed
+ .0
+ .insert(*tid, tenant_shard.observed.clone());
+ }
+
+ global_observed
+ }
}
diff --git a/storage_controller/src/tenant_shard.rs b/storage_controller/src/tenant_shard.rs
index 670efae154..e250f29f98 100644
--- a/storage_controller/src/tenant_shard.rs
+++ b/storage_controller/src/tenant_shard.rs
@@ -18,7 +18,7 @@ use pageserver_api::{
models::{LocationConfig, LocationConfigMode, TenantConfig},
shard::{ShardIdentity, TenantShardId},
};
-use serde::Serialize;
+use serde::{Deserialize, Serialize};
use tokio::task::JoinHandle;
use tokio_util::sync::CancellationToken;
use tracing::{instrument, Instrument};
@@ -284,7 +284,7 @@ impl Drop for IntentState {
}
}
-#[derive(Default, Clone, Serialize)]
+#[derive(Default, Clone, Serialize, Deserialize, Debug)]
pub(crate) struct ObservedState {
pub(crate) locations: HashMap,
}
@@ -298,7 +298,7 @@ pub(crate) struct ObservedState {
/// what it is (e.g. we failed partway through configuring it)
/// * Instance exists with conf==Some: this tells us what we last successfully configured on this node,
/// and that configuration will still be present unless something external interfered.
-#[derive(Clone, Serialize)]
+#[derive(Clone, Serialize, Deserialize, Debug)]
pub(crate) struct ObservedStateLocation {
/// If None, it means we do not know the status of this shard's location on this node, but
/// we know that we might have some state on this node.
diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py
index 09c28148b4..0a06398391 100644
--- a/test_runner/fixtures/neon_fixtures.py
+++ b/test_runner/fixtures/neon_fixtures.py
@@ -2582,6 +2582,17 @@ class NeonStorageController(MetricsGetter, LogUtils):
time.sleep(backoff)
+ def step_down(self):
+ log.info("Asking storage controller to step down")
+ response = self.request(
+ "PUT",
+ f"{self.env.storage_controller_api}/control/v1/step_down",
+ headers=self.headers(TokenScope.ADMIN),
+ )
+
+ response.raise_for_status()
+ return response.json()
+
def configure_failpoints(self, config_strings: Tuple[str, str] | List[Tuple[str, str]]):
if isinstance(config_strings, tuple):
pairs = [config_strings]
diff --git a/test_runner/regress/test_storage_controller.py b/test_runner/regress/test_storage_controller.py
index 9a47d7d651..da638ac233 100644
--- a/test_runner/regress/test_storage_controller.py
+++ b/test_runner/regress/test_storage_controller.py
@@ -1783,3 +1783,78 @@ def test_storage_controller_node_deletion(
assert victim.id not in [n["id"] for n in env.storage_controller.node_list()]
env.storage_controller.reconcile_all() # FIXME: workaround for optimizations happening on startup, see FIXME above.
env.storage_controller.consistency_check()
+
+
+def test_storage_controller_step_down(neon_env_builder: NeonEnvBuilder):
+ """
+ Test the `/control/v1/step_down` storage controller API. Upon receiving such
+ a request, the storage controller cancels any on-going reconciles and replies
+ with 503 to all requests apart from `/control/v1/step_down`, `/status` and `/metrics`.
+ """
+ env = neon_env_builder.init_configs()
+ env.start()
+
+ tid = TenantId.generate()
+ tsid = str(TenantShardId(tid, shard_number=0, shard_count=0))
+ env.storage_controller.tenant_create(tid)
+
+ env.storage_controller.reconcile_until_idle()
+ env.storage_controller.configure_failpoints(("sleep-on-reconcile-epilogue", "return(10000)"))
+
+ # Make a change to the tenant config to trigger a slow reconcile
+ virtual_ps_http = PageserverHttpClient(env.storage_controller_port, lambda: True)
+ virtual_ps_http.patch_tenant_config_client_side(tid, {"compaction_threshold": 5}, None)
+ env.storage_controller.allowed_errors.append(
+ ".*Accepted configuration update but reconciliation failed.*"
+ )
+
+ observed_state = env.storage_controller.step_down()
+ log.info(f"Storage controller stepped down with {observed_state=}")
+
+ # Validate that we waited for the slow reconcile to complete
+ # and updated the observed state in the storcon before stepping down.
+ node_id = str(env.pageserver.id)
+ assert tsid in observed_state
+ assert node_id in observed_state[tsid]["locations"]
+ assert "conf" in observed_state[tsid]["locations"][node_id]
+ assert "tenant_conf" in observed_state[tsid]["locations"][node_id]["conf"]
+
+ tenant_conf = observed_state[tsid]["locations"][node_id]["conf"]["tenant_conf"]
+ assert "compaction_threshold" in tenant_conf
+ assert tenant_conf["compaction_threshold"] == 5
+
+ # Validate that we propagated the change to the pageserver
+ ps_tenant_conf = env.pageserver.http_client().tenant_config(tid)
+ assert "compaction_threshold" in ps_tenant_conf.effective_config
+ assert ps_tenant_conf.effective_config["compaction_threshold"] == 5
+
+ # Validate that the storcon is not replying to the usual requests
+ # once it has stepped down.
+ with pytest.raises(StorageControllerApiException, match="stepped_down"):
+ env.storage_controller.tenant_list()
+
+ # Validate that we can step down multiple times and the observed state
+ # doesn't change.
+ observed_state_again = env.storage_controller.step_down()
+ assert observed_state == observed_state_again
+
+ assert (
+ env.storage_controller.get_metric_value(
+ "storage_controller_leadership_status", filter={"status": "leader"}
+ )
+ == 0
+ )
+
+ assert (
+ env.storage_controller.get_metric_value(
+ "storage_controller_leadership_status", filter={"status": "stepped_down"}
+ )
+ == 1
+ )
+
+ assert (
+ env.storage_controller.get_metric_value(
+ "storage_controller_leadership_status", filter={"status": "candidate"}
+ )
+ == 0
+ )