From ae527ef088ef1654854c0cbd9b4cc9ab3878619e Mon Sep 17 00:00:00 2001 From: Vlad Lazar Date: Mon, 12 Aug 2024 13:58:46 +0100 Subject: [PATCH] storcon: implement graceful leadership transfer (#8588) ## Problem Storage controller restarts cause temporary unavailability from the control plane POV. See RFC for more details. ## Summary of changes * A couple of small refactors of the storage controller start-up sequence to make extending it easier. * A leader table is added to track the storage controller instance that's currently the leader (if any) * A peer client is added such that storage controllers can send `step_down` requests to each other (implemented in https://github.com/neondatabase/neon/pull/8512). * Implement the leader cut-over as described in the RFC * Add `start-as-candidate` flag to the storage controller to gate the rolling restart behaviour. When the flag is `false` (the default), the only change from the current start-up sequence is persisting the leader entry to the database. --- .../2024-07-26-140924_create_leader/down.sql | 1 + .../2024-07-26-140924_create_leader/up.sql | 5 + storage_controller/src/lib.rs | 1 + storage_controller/src/main.rs | 11 + storage_controller/src/metrics.rs | 16 ++ storage_controller/src/peer_client.rs | 106 +++++++ storage_controller/src/persistence.rs | 74 +++++ storage_controller/src/schema.rs | 9 +- storage_controller/src/service.rs | 261 +++++++++++++----- 9 files changed, 407 insertions(+), 77 deletions(-) create mode 100644 storage_controller/migrations/2024-07-26-140924_create_leader/down.sql create mode 100644 storage_controller/migrations/2024-07-26-140924_create_leader/up.sql create mode 100644 storage_controller/src/peer_client.rs diff --git a/storage_controller/migrations/2024-07-26-140924_create_leader/down.sql b/storage_controller/migrations/2024-07-26-140924_create_leader/down.sql new file mode 100644 index 0000000000..53222c614e --- /dev/null +++ b/storage_controller/migrations/2024-07-26-140924_create_leader/down.sql @@ -0,0 +1 @@ +DROP TABLE controllers; diff --git a/storage_controller/migrations/2024-07-26-140924_create_leader/up.sql b/storage_controller/migrations/2024-07-26-140924_create_leader/up.sql new file mode 100644 index 0000000000..90546948cb --- /dev/null +++ b/storage_controller/migrations/2024-07-26-140924_create_leader/up.sql @@ -0,0 +1,5 @@ +CREATE TABLE controllers ( + address VARCHAR NOT NULL, + started_at TIMESTAMPTZ NOT NULL, + PRIMARY KEY(address, started_at) +); diff --git a/storage_controller/src/lib.rs b/storage_controller/src/lib.rs index 26c258c466..2034addbe1 100644 --- a/storage_controller/src/lib.rs +++ b/storage_controller/src/lib.rs @@ -11,6 +11,7 @@ mod id_lock_map; pub mod metrics; mod node; mod pageserver_client; +mod peer_client; pub mod persistence; mod reconciler; mod scheduler; diff --git a/storage_controller/src/main.rs b/storage_controller/src/main.rs index a66e9128bc..5a68799141 100644 --- a/storage_controller/src/main.rs +++ b/storage_controller/src/main.rs @@ -1,6 +1,7 @@ use anyhow::{anyhow, Context}; use clap::Parser; use diesel::Connection; +use hyper::Uri; use metrics::launch_timestamp::LaunchTimestamp; use metrics::BuildInfo; use std::path::PathBuf; @@ -83,6 +84,13 @@ struct Cli { #[arg(long, default_value = "5s")] db_connect_timeout: humantime::Duration, + #[arg(long, default_value = "false")] + start_as_candidate: bool, + + // TODO: make this mandatory once the helm chart gets updated + #[arg(long)] + address_for_peers: Option, + /// `neon_local` sets this to the path of the neon_local repo dir. /// Only relevant for testing. // TODO: make `cfg(feature = "testing")` @@ -285,6 +293,9 @@ async fn async_main() -> anyhow::Result<()> { split_threshold: args.split_threshold, neon_local_repo_dir: args.neon_local_repo_dir, max_secondary_lag_bytes: args.max_secondary_lag_bytes, + address_for_peers: args.address_for_peers, + start_as_candidate: args.start_as_candidate, + http_service_port: args.listen.port() as i32, }; // After loading secrets & config, but before starting anything else, apply database migrations diff --git a/storage_controller/src/metrics.rs b/storage_controller/src/metrics.rs index a1a4b8543d..c2303e7a7f 100644 --- a/storage_controller/src/metrics.rs +++ b/storage_controller/src/metrics.rs @@ -12,6 +12,7 @@ use measured::{label::LabelValue, metric::histogram, FixedCardinalityLabel, Metr use metrics::NeonMetrics; use once_cell::sync::Lazy; use std::sync::Mutex; +use strum::IntoEnumIterator; use crate::{ persistence::{DatabaseError, DatabaseOperation}, @@ -241,3 +242,18 @@ impl DatabaseError { } } } + +/// Update the leadership status metric gauges to reflect the requested status +pub(crate) fn update_leadership_status(status: LeadershipStatus) { + let status_metric = &METRICS_REGISTRY + .metrics_group + .storage_controller_leadership_status; + + for s in LeadershipStatus::iter() { + if s == status { + status_metric.set(LeadershipStatusGroup { status: s }, 1); + } else { + status_metric.set(LeadershipStatusGroup { status: s }, 0); + } + } +} diff --git a/storage_controller/src/peer_client.rs b/storage_controller/src/peer_client.rs new file mode 100644 index 0000000000..ebb59a1720 --- /dev/null +++ b/storage_controller/src/peer_client.rs @@ -0,0 +1,106 @@ +use crate::tenant_shard::ObservedState; +use pageserver_api::shard::TenantShardId; +use serde::{Deserialize, Serialize}; +use std::collections::HashMap; +use tokio_util::sync::CancellationToken; + +use hyper::Uri; +use reqwest::{StatusCode, Url}; +use utils::{backoff, http::error::HttpErrorBody}; + +#[derive(Debug, Clone)] +pub(crate) struct PeerClient { + uri: Uri, + jwt: Option, + client: reqwest::Client, +} + +#[derive(thiserror::Error, Debug)] +pub(crate) enum StorageControllerPeerError { + #[error("failed to deserialize error response with status code {0} at {1}: {2}")] + DeserializationError(StatusCode, Url, reqwest::Error), + #[error("storage controller peer API error ({0}): {1}")] + ApiError(StatusCode, String), + #[error("failed to send HTTP request: {0}")] + SendError(reqwest::Error), + #[error("Cancelled")] + Cancelled, +} + +pub(crate) type Result = std::result::Result; + +pub(crate) trait ResponseErrorMessageExt: Sized { + fn error_from_body(self) -> impl std::future::Future> + Send; +} + +impl ResponseErrorMessageExt for reqwest::Response { + async fn error_from_body(self) -> Result { + let status = self.status(); + if !(status.is_client_error() || status.is_server_error()) { + return Ok(self); + } + + let url = self.url().to_owned(); + Err(match self.json::().await { + Ok(HttpErrorBody { msg }) => StorageControllerPeerError::ApiError(status, msg), + Err(err) => StorageControllerPeerError::DeserializationError(status, url, err), + }) + } +} + +#[derive(Serialize, Deserialize, Debug, Default)] +pub(crate) struct GlobalObservedState(pub(crate) HashMap); + +impl PeerClient { + pub(crate) fn new(uri: Uri, jwt: Option) -> Self { + Self { + uri, + jwt, + client: reqwest::Client::new(), + } + } + + async fn request_step_down(&self) -> Result { + let step_down_path = format!("{}control/v1/step_down", self.uri); + let req = self.client.put(step_down_path); + let req = if let Some(jwt) = &self.jwt { + req.header(reqwest::header::AUTHORIZATION, format!("Bearer {jwt}")) + } else { + req + }; + + let res = req + .send() + .await + .map_err(StorageControllerPeerError::SendError)?; + let response = res.error_from_body().await?; + + let status = response.status(); + let url = response.url().to_owned(); + + response + .json() + .await + .map_err(|err| StorageControllerPeerError::DeserializationError(status, url, err)) + } + + /// Request the peer to step down and return its current observed state + /// All errors are retried with exponential backoff for a maximum of 4 attempts. + /// Assuming all retries are performed, the function times out after roughly 4 seconds. + pub(crate) async fn step_down( + &self, + cancel: &CancellationToken, + ) -> Result { + backoff::retry( + || self.request_step_down(), + |_e| false, + 2, + 4, + "Send step down request", + cancel, + ) + .await + .ok_or_else(|| StorageControllerPeerError::Cancelled) + .and_then(|x| x) + } +} diff --git a/storage_controller/src/persistence.rs b/storage_controller/src/persistence.rs index 64a3e597ce..aebbdec0d1 100644 --- a/storage_controller/src/persistence.rs +++ b/storage_controller/src/persistence.rs @@ -95,6 +95,8 @@ pub(crate) enum DatabaseOperation { ListMetadataHealth, ListMetadataHealthUnhealthy, ListMetadataHealthOutdated, + GetLeader, + UpdateLeader, } #[must_use] @@ -785,6 +787,69 @@ impl Persistence { ) .await } + + /// Get the current entry from the `leader` table if one exists. + /// It is an error for the table to contain more than one entry. + pub(crate) async fn get_leader(&self) -> DatabaseResult> { + let mut leader: Vec = self + .with_measured_conn( + DatabaseOperation::GetLeader, + move |conn| -> DatabaseResult<_> { + Ok(crate::schema::controllers::table.load::(conn)?) + }, + ) + .await?; + + if leader.len() > 1 { + return Err(DatabaseError::Logical(format!( + "More than one entry present in the leader table: {leader:?}" + ))); + } + + Ok(leader.pop()) + } + + /// Update the new leader with compare-exchange semantics. If `prev` does not + /// match the current leader entry, then the update is treated as a failure. + /// When `prev` is not specified, the update is forced. + pub(crate) async fn update_leader( + &self, + prev: Option, + new: ControllerPersistence, + ) -> DatabaseResult<()> { + use crate::schema::controllers::dsl::*; + + let updated = self + .with_measured_conn( + DatabaseOperation::UpdateLeader, + move |conn| -> DatabaseResult { + let updated = match &prev { + Some(prev) => diesel::update(controllers) + .filter(address.eq(prev.address.clone())) + .filter(started_at.eq(prev.started_at)) + .set(( + address.eq(new.address.clone()), + started_at.eq(new.started_at), + )) + .execute(conn)?, + None => diesel::insert_into(controllers) + .values(new.clone()) + .execute(conn)?, + }; + + Ok(updated) + }, + ) + .await?; + + if updated == 0 { + return Err(DatabaseError::Logical( + "Leader table update failed".to_string(), + )); + } + + Ok(()) + } } /// Parts of [`crate::tenant_shard::TenantShard`] that are stored durably @@ -910,3 +975,12 @@ impl From for MetadataHealthRecord { } } } + +#[derive( + Serialize, Deserialize, Queryable, Selectable, Insertable, Eq, PartialEq, Debug, Clone, +)] +#[diesel(table_name = crate::schema::controllers)] +pub(crate) struct ControllerPersistence { + pub(crate) address: String, + pub(crate) started_at: chrono::DateTime, +} diff --git a/storage_controller/src/schema.rs b/storage_controller/src/schema.rs index cb5ba3f38b..77ba47e114 100644 --- a/storage_controller/src/schema.rs +++ b/storage_controller/src/schema.rs @@ -1,5 +1,12 @@ // @generated automatically by Diesel CLI. +diesel::table! { + controllers (address, started_at) { + address -> Varchar, + started_at -> Timestamptz, + } +} + diesel::table! { metadata_health (tenant_id, shard_number, shard_count) { tenant_id -> Varchar, @@ -36,4 +43,4 @@ diesel::table! { } } -diesel::allow_tables_to_appear_in_same_query!(metadata_health, nodes, tenant_shards,); +diesel::allow_tables_to_appear_in_same_query!(controllers, metadata_health, nodes, tenant_shards,); diff --git a/storage_controller/src/service.rs b/storage_controller/src/service.rs index 31b2d0c3f5..fe582cf0e2 100644 --- a/storage_controller/src/service.rs +++ b/storage_controller/src/service.rs @@ -1,3 +1,4 @@ +use hyper::Uri; use std::{ borrow::Cow, cmp::Ordering, @@ -16,8 +17,11 @@ use crate::{ compute_hook::NotifyError, drain_utils::{self, TenantShardDrain, TenantShardIterator}, id_lock_map::{trace_exclusive_lock, trace_shared_lock, IdLockMap, TracingExclusiveGuard}, - metrics::LeadershipStatusGroup, - persistence::{AbortShardSplitStatus, MetadataHealthPersistence, TenantFilter}, + metrics, + peer_client::{GlobalObservedState, PeerClient}, + persistence::{ + AbortShardSplitStatus, ControllerPersistence, MetadataHealthPersistence, TenantFilter, + }, reconciler::{ReconcileError, ReconcileUnits, ReconcilerConfig, ReconcilerConfigBuilder}, scheduler::{MaySchedule, ScheduleContext, ScheduleMode}, tenant_shard::{ @@ -83,7 +87,6 @@ use crate::{ ReconcilerWaiter, TenantShard, }, }; -use serde::{Deserialize, Serialize}; pub mod chaos_injector; @@ -140,7 +143,15 @@ enum NodeOperations { /// Allowed transitions are: /// 1. Leader -> SteppedDown /// 2. Candidate -> Leader -#[derive(Copy, Clone, strum_macros::Display, measured::FixedCardinalityLabel)] +#[derive( + Eq, + PartialEq, + Copy, + Clone, + strum_macros::Display, + strum_macros::EnumIter, + measured::FixedCardinalityLabel, +)] #[strum(serialize_all = "snake_case")] pub(crate) enum LeadershipStatus { /// This is the steady state where the storage controller can produce @@ -226,22 +237,12 @@ impl ServiceState { tenants: BTreeMap, scheduler: Scheduler, delayed_reconcile_rx: tokio::sync::mpsc::Receiver, + initial_leadership_status: LeadershipStatus, ) -> Self { - let status = &crate::metrics::METRICS_REGISTRY - .metrics_group - .storage_controller_leadership_status; - - status.set( - LeadershipStatusGroup { - status: LeadershipStatus::Leader, - }, - 1, - ); + metrics::update_leadership_status(initial_leadership_status); Self { - // TODO: Starting up as Leader is a transient state. Once we enable rolling - // upgrades on the k8s side, we should start up as Candidate. - leadership_status: LeadershipStatus::Leader, + leadership_status: initial_leadership_status, tenants, nodes: Arc::new(nodes), scheduler, @@ -266,29 +267,12 @@ impl ServiceState { fn step_down(&mut self) { self.leadership_status = LeadershipStatus::SteppedDown; + metrics::update_leadership_status(self.leadership_status); + } - let status = &crate::metrics::METRICS_REGISTRY - .metrics_group - .storage_controller_leadership_status; - - status.set( - LeadershipStatusGroup { - status: LeadershipStatus::SteppedDown, - }, - 1, - ); - status.set( - LeadershipStatusGroup { - status: LeadershipStatus::Leader, - }, - 0, - ); - status.set( - LeadershipStatusGroup { - status: LeadershipStatus::Candidate, - }, - 0, - ); + fn become_leader(&mut self) { + self.leadership_status = LeadershipStatus::Leader; + metrics::update_leadership_status(self.leadership_status); } } @@ -332,6 +316,12 @@ pub struct Config { // by more than the configured amount, then the secondary is not // upgraded to primary. pub max_secondary_lag_bytes: Option, + + pub address_for_peers: Option, + + pub start_as_candidate: bool, + + pub http_service_port: i32, } impl From for ApiError { @@ -499,9 +489,10 @@ pub(crate) enum ReconcileResultRequest { Stop, } -// TODO: move this into the storcon peer client when that gets added -#[derive(Serialize, Deserialize, Debug, Default)] -pub(crate) struct GlobalObservedState(HashMap); +struct LeaderStepDownState { + observed: GlobalObservedState, + leader: ControllerPersistence, +} impl Service { pub fn get_config(&self) -> &Config { @@ -513,15 +504,11 @@ impl Service { #[instrument(skip_all)] async fn startup_reconcile( self: &Arc, + leader_step_down_state: Option, bg_compute_notify_result_tx: tokio::sync::mpsc::Sender< Result<(), (TenantShardId, NotifyError)>, >, ) { - // For all tenant shards, a vector of observed states on nodes (where None means - // indeterminate, same as in [`ObservedStateLocation`]) - let mut observed: HashMap)>> = - HashMap::new(); - // Startup reconciliation does I/O to other services: whether they // are responsive or not, we should aim to finish within our deadline, because: // - If we don't, a k8s readiness hook watching /ready will kill us. @@ -535,26 +522,28 @@ impl Service { .checked_add(STARTUP_RECONCILE_TIMEOUT / 2) .expect("Reconcile timeout is a modest constant"); + let (observed, current_leader) = if let Some(state) = leader_step_down_state { + tracing::info!( + "Using observed state received from leader at {}", + state.leader.address, + ); + (state.observed, Some(state.leader)) + } else { + ( + self.build_global_observed_state(node_scan_deadline).await, + None, + ) + }; + // Accumulate a list of any tenant locations that ought to be detached let mut cleanup = Vec::new(); - let node_listings = self.scan_node_locations(node_scan_deadline).await; - // Send initial heartbeat requests to nodes that replied to the location listing above. - let nodes_online = self.initial_heartbeat_round(node_listings.keys()).await; - - for (node_id, list_response) in node_listings { - let tenant_shards = list_response.tenant_shards; - tracing::info!( - "Received {} shard statuses from pageserver {}, setting it to Active", - tenant_shards.len(), - node_id - ); - - for (tenant_shard_id, conf_opt) in tenant_shards { - let shard_observations = observed.entry(tenant_shard_id).or_default(); - shard_observations.push((node_id, conf_opt)); - } - } + // Send initial heartbeat requests to all nodes loaded from the database + let all_nodes = { + let locked = self.inner.read().unwrap(); + locked.nodes.clone() + }; + let nodes_online = self.initial_heartbeat_round(all_nodes.keys()).await; // List of tenants for which we will attempt to notify compute of their location at startup let mut compute_notifications = Vec::new(); @@ -577,17 +566,16 @@ impl Service { } *nodes = Arc::new(new_nodes); - for (tenant_shard_id, shard_observations) in observed { - for (node_id, observed_loc) in shard_observations { - let Some(tenant_shard) = tenants.get_mut(&tenant_shard_id) else { - cleanup.push((tenant_shard_id, node_id)); - continue; - }; - tenant_shard - .observed - .locations - .insert(node_id, ObservedStateLocation { conf: observed_loc }); - } + for (tenant_shard_id, observed_state) in observed.0 { + let Some(tenant_shard) = tenants.get_mut(&tenant_shard_id) else { + for node_id in observed_state.locations.keys() { + cleanup.push((tenant_shard_id, *node_id)); + } + + continue; + }; + + tenant_shard.observed = observed_state; } // Populate each tenant's intent state @@ -621,6 +609,28 @@ impl Service { tenants.len() }; + // Before making any obeservable changes to the cluster, persist self + // as leader in database and memory. + if let Some(address_for_peers) = &self.config.address_for_peers { + // TODO: `address-for-peers` can become a mandatory cli arg + // after we update the k8s setup + let proposed_leader = ControllerPersistence { + address: address_for_peers.to_string(), + started_at: chrono::Utc::now(), + }; + + if let Err(err) = self + .persistence + .update_leader(current_leader, proposed_leader) + .await + { + tracing::error!("Failed to persist self as leader: {err}. Aborting start-up ..."); + std::process::exit(1); + } + } + + self.inner.write().unwrap().become_leader(); + // TODO: if any tenant's intent now differs from its loaded generation_pageserver, we should clear that // generation_pageserver in the database. @@ -786,6 +796,31 @@ impl Service { node_results } + async fn build_global_observed_state(&self, deadline: Instant) -> GlobalObservedState { + let node_listings = self.scan_node_locations(deadline).await; + let mut observed = GlobalObservedState::default(); + + for (node_id, location_confs) in node_listings { + tracing::info!( + "Received {} shard statuses from pageserver {}", + location_confs.tenant_shards.len(), + node_id + ); + + for (tid, location_conf) in location_confs.tenant_shards { + let entry = observed.0.entry(tid).or_default(); + entry.locations.insert( + node_id, + ObservedStateLocation { + conf: location_conf, + }, + ); + } + } + + observed + } + /// Used during [`Self::startup_reconcile`]: detach a list of unknown-to-us tenants from pageservers. /// /// This is safe to run in the background, because if we don't have this TenantShardId in our map of @@ -1264,12 +1299,20 @@ impl Service { config.max_warming_up_interval, cancel.clone(), ); + + let initial_leadership_status = if config.start_as_candidate { + LeadershipStatus::Candidate + } else { + LeadershipStatus::Leader + }; + let this = Arc::new(Self { inner: Arc::new(std::sync::RwLock::new(ServiceState::new( nodes, tenants, scheduler, delayed_reconcile_rx, + initial_leadership_status, ))), config: config.clone(), persistence, @@ -1338,7 +1381,16 @@ impl Service { return; }; - this.startup_reconcile(bg_compute_notify_result_tx).await; + let leadership_status = this.inner.read().unwrap().get_leadership_status(); + let peer_observed_state = match leadership_status { + LeadershipStatus::Candidate => this.request_step_down().await, + LeadershipStatus::Leader => None, + LeadershipStatus::SteppedDown => unreachable!(), + }; + + this.startup_reconcile(peer_observed_state, bg_compute_notify_result_tx) + .await; + drop(startup_completion); } }); @@ -6285,4 +6337,61 @@ impl Service { global_observed } + + /// Request step down from the currently registered leader in the database + /// + /// If such an entry is persisted, the success path returns the observed + /// state and details of the leader. Otherwise, None is returned indicating + /// there is no leader currently. + /// + /// On failures to query the database or step down error responses the process is killed + /// and we rely on k8s to retry. + async fn request_step_down(&self) -> Option { + let leader = match self.persistence.get_leader().await { + Ok(leader) => leader, + Err(err) => { + tracing::error!( + "Failed to query database for current leader: {err}. Aborting start-up ..." + ); + std::process::exit(1); + } + }; + + match leader { + Some(leader) => { + tracing::info!("Sending step down request to {leader:?}"); + + // TODO: jwt token + let client = PeerClient::new( + Uri::try_from(leader.address.as_str()).expect("Failed to build leader URI"), + self.config.jwt_token.clone(), + ); + let state = client.step_down(&self.cancel).await; + match state { + Ok(state) => Some(LeaderStepDownState { + observed: state, + leader: leader.clone(), + }), + Err(err) => { + // TODO: Make leaders periodically update a timestamp field in the + // database and, if the leader is not reachable from the current instance, + // but inferred as alive from the timestamp, abort start-up. This avoids + // a potential scenario in which we have two controllers acting as leaders. + tracing::error!( + "Leader ({}) did not respond to step-down request: {}", + leader.address, + err + ); + None + } + } + } + None => { + tracing::info!( + "No leader found to request step down from. Will build observed state." + ); + None + } + } + } }