mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-06 21:12:55 +00:00
storcon: implement graceful leadership transfer (#8588)
## Problem Storage controller restarts cause temporary unavailability from the control plane POV. See RFC for more details. ## Summary of changes * A couple of small refactors of the storage controller start-up sequence to make extending it easier. * A leader table is added to track the storage controller instance that's currently the leader (if any) * A peer client is added such that storage controllers can send `step_down` requests to each other (implemented in https://github.com/neondatabase/neon/pull/8512). * Implement the leader cut-over as described in the RFC * Add `start-as-candidate` flag to the storage controller to gate the rolling restart behaviour. When the flag is `false` (the default), the only change from the current start-up sequence is persisting the leader entry to the database.
This commit is contained in:
@@ -11,6 +11,7 @@ mod id_lock_map;
|
||||
pub mod metrics;
|
||||
mod node;
|
||||
mod pageserver_client;
|
||||
mod peer_client;
|
||||
pub mod persistence;
|
||||
mod reconciler;
|
||||
mod scheduler;
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
use anyhow::{anyhow, Context};
|
||||
use clap::Parser;
|
||||
use diesel::Connection;
|
||||
use hyper::Uri;
|
||||
use metrics::launch_timestamp::LaunchTimestamp;
|
||||
use metrics::BuildInfo;
|
||||
use std::path::PathBuf;
|
||||
@@ -83,6 +84,13 @@ struct Cli {
|
||||
#[arg(long, default_value = "5s")]
|
||||
db_connect_timeout: humantime::Duration,
|
||||
|
||||
#[arg(long, default_value = "false")]
|
||||
start_as_candidate: bool,
|
||||
|
||||
// TODO: make this mandatory once the helm chart gets updated
|
||||
#[arg(long)]
|
||||
address_for_peers: Option<Uri>,
|
||||
|
||||
/// `neon_local` sets this to the path of the neon_local repo dir.
|
||||
/// Only relevant for testing.
|
||||
// TODO: make `cfg(feature = "testing")`
|
||||
@@ -285,6 +293,9 @@ async fn async_main() -> anyhow::Result<()> {
|
||||
split_threshold: args.split_threshold,
|
||||
neon_local_repo_dir: args.neon_local_repo_dir,
|
||||
max_secondary_lag_bytes: args.max_secondary_lag_bytes,
|
||||
address_for_peers: args.address_for_peers,
|
||||
start_as_candidate: args.start_as_candidate,
|
||||
http_service_port: args.listen.port() as i32,
|
||||
};
|
||||
|
||||
// After loading secrets & config, but before starting anything else, apply database migrations
|
||||
|
||||
@@ -12,6 +12,7 @@ use measured::{label::LabelValue, metric::histogram, FixedCardinalityLabel, Metr
|
||||
use metrics::NeonMetrics;
|
||||
use once_cell::sync::Lazy;
|
||||
use std::sync::Mutex;
|
||||
use strum::IntoEnumIterator;
|
||||
|
||||
use crate::{
|
||||
persistence::{DatabaseError, DatabaseOperation},
|
||||
@@ -241,3 +242,18 @@ impl DatabaseError {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Update the leadership status metric gauges to reflect the requested status
|
||||
pub(crate) fn update_leadership_status(status: LeadershipStatus) {
|
||||
let status_metric = &METRICS_REGISTRY
|
||||
.metrics_group
|
||||
.storage_controller_leadership_status;
|
||||
|
||||
for s in LeadershipStatus::iter() {
|
||||
if s == status {
|
||||
status_metric.set(LeadershipStatusGroup { status: s }, 1);
|
||||
} else {
|
||||
status_metric.set(LeadershipStatusGroup { status: s }, 0);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
106
storage_controller/src/peer_client.rs
Normal file
106
storage_controller/src/peer_client.rs
Normal file
@@ -0,0 +1,106 @@
|
||||
use crate::tenant_shard::ObservedState;
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::collections::HashMap;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
|
||||
use hyper::Uri;
|
||||
use reqwest::{StatusCode, Url};
|
||||
use utils::{backoff, http::error::HttpErrorBody};
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub(crate) struct PeerClient {
|
||||
uri: Uri,
|
||||
jwt: Option<String>,
|
||||
client: reqwest::Client,
|
||||
}
|
||||
|
||||
#[derive(thiserror::Error, Debug)]
|
||||
pub(crate) enum StorageControllerPeerError {
|
||||
#[error("failed to deserialize error response with status code {0} at {1}: {2}")]
|
||||
DeserializationError(StatusCode, Url, reqwest::Error),
|
||||
#[error("storage controller peer API error ({0}): {1}")]
|
||||
ApiError(StatusCode, String),
|
||||
#[error("failed to send HTTP request: {0}")]
|
||||
SendError(reqwest::Error),
|
||||
#[error("Cancelled")]
|
||||
Cancelled,
|
||||
}
|
||||
|
||||
pub(crate) type Result<T> = std::result::Result<T, StorageControllerPeerError>;
|
||||
|
||||
pub(crate) trait ResponseErrorMessageExt: Sized {
|
||||
fn error_from_body(self) -> impl std::future::Future<Output = Result<Self>> + Send;
|
||||
}
|
||||
|
||||
impl ResponseErrorMessageExt for reqwest::Response {
|
||||
async fn error_from_body(self) -> Result<Self> {
|
||||
let status = self.status();
|
||||
if !(status.is_client_error() || status.is_server_error()) {
|
||||
return Ok(self);
|
||||
}
|
||||
|
||||
let url = self.url().to_owned();
|
||||
Err(match self.json::<HttpErrorBody>().await {
|
||||
Ok(HttpErrorBody { msg }) => StorageControllerPeerError::ApiError(status, msg),
|
||||
Err(err) => StorageControllerPeerError::DeserializationError(status, url, err),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Default)]
|
||||
pub(crate) struct GlobalObservedState(pub(crate) HashMap<TenantShardId, ObservedState>);
|
||||
|
||||
impl PeerClient {
|
||||
pub(crate) fn new(uri: Uri, jwt: Option<String>) -> Self {
|
||||
Self {
|
||||
uri,
|
||||
jwt,
|
||||
client: reqwest::Client::new(),
|
||||
}
|
||||
}
|
||||
|
||||
async fn request_step_down(&self) -> Result<GlobalObservedState> {
|
||||
let step_down_path = format!("{}control/v1/step_down", self.uri);
|
||||
let req = self.client.put(step_down_path);
|
||||
let req = if let Some(jwt) = &self.jwt {
|
||||
req.header(reqwest::header::AUTHORIZATION, format!("Bearer {jwt}"))
|
||||
} else {
|
||||
req
|
||||
};
|
||||
|
||||
let res = req
|
||||
.send()
|
||||
.await
|
||||
.map_err(StorageControllerPeerError::SendError)?;
|
||||
let response = res.error_from_body().await?;
|
||||
|
||||
let status = response.status();
|
||||
let url = response.url().to_owned();
|
||||
|
||||
response
|
||||
.json()
|
||||
.await
|
||||
.map_err(|err| StorageControllerPeerError::DeserializationError(status, url, err))
|
||||
}
|
||||
|
||||
/// Request the peer to step down and return its current observed state
|
||||
/// All errors are retried with exponential backoff for a maximum of 4 attempts.
|
||||
/// Assuming all retries are performed, the function times out after roughly 4 seconds.
|
||||
pub(crate) async fn step_down(
|
||||
&self,
|
||||
cancel: &CancellationToken,
|
||||
) -> Result<GlobalObservedState> {
|
||||
backoff::retry(
|
||||
|| self.request_step_down(),
|
||||
|_e| false,
|
||||
2,
|
||||
4,
|
||||
"Send step down request",
|
||||
cancel,
|
||||
)
|
||||
.await
|
||||
.ok_or_else(|| StorageControllerPeerError::Cancelled)
|
||||
.and_then(|x| x)
|
||||
}
|
||||
}
|
||||
@@ -95,6 +95,8 @@ pub(crate) enum DatabaseOperation {
|
||||
ListMetadataHealth,
|
||||
ListMetadataHealthUnhealthy,
|
||||
ListMetadataHealthOutdated,
|
||||
GetLeader,
|
||||
UpdateLeader,
|
||||
}
|
||||
|
||||
#[must_use]
|
||||
@@ -785,6 +787,69 @@ impl Persistence {
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
/// Get the current entry from the `leader` table if one exists.
|
||||
/// It is an error for the table to contain more than one entry.
|
||||
pub(crate) async fn get_leader(&self) -> DatabaseResult<Option<ControllerPersistence>> {
|
||||
let mut leader: Vec<ControllerPersistence> = self
|
||||
.with_measured_conn(
|
||||
DatabaseOperation::GetLeader,
|
||||
move |conn| -> DatabaseResult<_> {
|
||||
Ok(crate::schema::controllers::table.load::<ControllerPersistence>(conn)?)
|
||||
},
|
||||
)
|
||||
.await?;
|
||||
|
||||
if leader.len() > 1 {
|
||||
return Err(DatabaseError::Logical(format!(
|
||||
"More than one entry present in the leader table: {leader:?}"
|
||||
)));
|
||||
}
|
||||
|
||||
Ok(leader.pop())
|
||||
}
|
||||
|
||||
/// Update the new leader with compare-exchange semantics. If `prev` does not
|
||||
/// match the current leader entry, then the update is treated as a failure.
|
||||
/// When `prev` is not specified, the update is forced.
|
||||
pub(crate) async fn update_leader(
|
||||
&self,
|
||||
prev: Option<ControllerPersistence>,
|
||||
new: ControllerPersistence,
|
||||
) -> DatabaseResult<()> {
|
||||
use crate::schema::controllers::dsl::*;
|
||||
|
||||
let updated = self
|
||||
.with_measured_conn(
|
||||
DatabaseOperation::UpdateLeader,
|
||||
move |conn| -> DatabaseResult<usize> {
|
||||
let updated = match &prev {
|
||||
Some(prev) => diesel::update(controllers)
|
||||
.filter(address.eq(prev.address.clone()))
|
||||
.filter(started_at.eq(prev.started_at))
|
||||
.set((
|
||||
address.eq(new.address.clone()),
|
||||
started_at.eq(new.started_at),
|
||||
))
|
||||
.execute(conn)?,
|
||||
None => diesel::insert_into(controllers)
|
||||
.values(new.clone())
|
||||
.execute(conn)?,
|
||||
};
|
||||
|
||||
Ok(updated)
|
||||
},
|
||||
)
|
||||
.await?;
|
||||
|
||||
if updated == 0 {
|
||||
return Err(DatabaseError::Logical(
|
||||
"Leader table update failed".to_string(),
|
||||
));
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// Parts of [`crate::tenant_shard::TenantShard`] that are stored durably
|
||||
@@ -910,3 +975,12 @@ impl From<MetadataHealthPersistence> for MetadataHealthRecord {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(
|
||||
Serialize, Deserialize, Queryable, Selectable, Insertable, Eq, PartialEq, Debug, Clone,
|
||||
)]
|
||||
#[diesel(table_name = crate::schema::controllers)]
|
||||
pub(crate) struct ControllerPersistence {
|
||||
pub(crate) address: String,
|
||||
pub(crate) started_at: chrono::DateTime<chrono::Utc>,
|
||||
}
|
||||
|
||||
@@ -1,5 +1,12 @@
|
||||
// @generated automatically by Diesel CLI.
|
||||
|
||||
diesel::table! {
|
||||
controllers (address, started_at) {
|
||||
address -> Varchar,
|
||||
started_at -> Timestamptz,
|
||||
}
|
||||
}
|
||||
|
||||
diesel::table! {
|
||||
metadata_health (tenant_id, shard_number, shard_count) {
|
||||
tenant_id -> Varchar,
|
||||
@@ -36,4 +43,4 @@ diesel::table! {
|
||||
}
|
||||
}
|
||||
|
||||
diesel::allow_tables_to_appear_in_same_query!(metadata_health, nodes, tenant_shards,);
|
||||
diesel::allow_tables_to_appear_in_same_query!(controllers, metadata_health, nodes, tenant_shards,);
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
use hyper::Uri;
|
||||
use std::{
|
||||
borrow::Cow,
|
||||
cmp::Ordering,
|
||||
@@ -16,8 +17,11 @@ use crate::{
|
||||
compute_hook::NotifyError,
|
||||
drain_utils::{self, TenantShardDrain, TenantShardIterator},
|
||||
id_lock_map::{trace_exclusive_lock, trace_shared_lock, IdLockMap, TracingExclusiveGuard},
|
||||
metrics::LeadershipStatusGroup,
|
||||
persistence::{AbortShardSplitStatus, MetadataHealthPersistence, TenantFilter},
|
||||
metrics,
|
||||
peer_client::{GlobalObservedState, PeerClient},
|
||||
persistence::{
|
||||
AbortShardSplitStatus, ControllerPersistence, MetadataHealthPersistence, TenantFilter,
|
||||
},
|
||||
reconciler::{ReconcileError, ReconcileUnits, ReconcilerConfig, ReconcilerConfigBuilder},
|
||||
scheduler::{MaySchedule, ScheduleContext, ScheduleMode},
|
||||
tenant_shard::{
|
||||
@@ -83,7 +87,6 @@ use crate::{
|
||||
ReconcilerWaiter, TenantShard,
|
||||
},
|
||||
};
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
pub mod chaos_injector;
|
||||
|
||||
@@ -140,7 +143,15 @@ enum NodeOperations {
|
||||
/// Allowed transitions are:
|
||||
/// 1. Leader -> SteppedDown
|
||||
/// 2. Candidate -> Leader
|
||||
#[derive(Copy, Clone, strum_macros::Display, measured::FixedCardinalityLabel)]
|
||||
#[derive(
|
||||
Eq,
|
||||
PartialEq,
|
||||
Copy,
|
||||
Clone,
|
||||
strum_macros::Display,
|
||||
strum_macros::EnumIter,
|
||||
measured::FixedCardinalityLabel,
|
||||
)]
|
||||
#[strum(serialize_all = "snake_case")]
|
||||
pub(crate) enum LeadershipStatus {
|
||||
/// This is the steady state where the storage controller can produce
|
||||
@@ -226,22 +237,12 @@ impl ServiceState {
|
||||
tenants: BTreeMap<TenantShardId, TenantShard>,
|
||||
scheduler: Scheduler,
|
||||
delayed_reconcile_rx: tokio::sync::mpsc::Receiver<TenantShardId>,
|
||||
initial_leadership_status: LeadershipStatus,
|
||||
) -> Self {
|
||||
let status = &crate::metrics::METRICS_REGISTRY
|
||||
.metrics_group
|
||||
.storage_controller_leadership_status;
|
||||
|
||||
status.set(
|
||||
LeadershipStatusGroup {
|
||||
status: LeadershipStatus::Leader,
|
||||
},
|
||||
1,
|
||||
);
|
||||
metrics::update_leadership_status(initial_leadership_status);
|
||||
|
||||
Self {
|
||||
// TODO: Starting up as Leader is a transient state. Once we enable rolling
|
||||
// upgrades on the k8s side, we should start up as Candidate.
|
||||
leadership_status: LeadershipStatus::Leader,
|
||||
leadership_status: initial_leadership_status,
|
||||
tenants,
|
||||
nodes: Arc::new(nodes),
|
||||
scheduler,
|
||||
@@ -266,29 +267,12 @@ impl ServiceState {
|
||||
|
||||
fn step_down(&mut self) {
|
||||
self.leadership_status = LeadershipStatus::SteppedDown;
|
||||
metrics::update_leadership_status(self.leadership_status);
|
||||
}
|
||||
|
||||
let status = &crate::metrics::METRICS_REGISTRY
|
||||
.metrics_group
|
||||
.storage_controller_leadership_status;
|
||||
|
||||
status.set(
|
||||
LeadershipStatusGroup {
|
||||
status: LeadershipStatus::SteppedDown,
|
||||
},
|
||||
1,
|
||||
);
|
||||
status.set(
|
||||
LeadershipStatusGroup {
|
||||
status: LeadershipStatus::Leader,
|
||||
},
|
||||
0,
|
||||
);
|
||||
status.set(
|
||||
LeadershipStatusGroup {
|
||||
status: LeadershipStatus::Candidate,
|
||||
},
|
||||
0,
|
||||
);
|
||||
fn become_leader(&mut self) {
|
||||
self.leadership_status = LeadershipStatus::Leader;
|
||||
metrics::update_leadership_status(self.leadership_status);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -332,6 +316,12 @@ pub struct Config {
|
||||
// by more than the configured amount, then the secondary is not
|
||||
// upgraded to primary.
|
||||
pub max_secondary_lag_bytes: Option<u64>,
|
||||
|
||||
pub address_for_peers: Option<Uri>,
|
||||
|
||||
pub start_as_candidate: bool,
|
||||
|
||||
pub http_service_port: i32,
|
||||
}
|
||||
|
||||
impl From<DatabaseError> for ApiError {
|
||||
@@ -499,9 +489,10 @@ pub(crate) enum ReconcileResultRequest {
|
||||
Stop,
|
||||
}
|
||||
|
||||
// TODO: move this into the storcon peer client when that gets added
|
||||
#[derive(Serialize, Deserialize, Debug, Default)]
|
||||
pub(crate) struct GlobalObservedState(HashMap<TenantShardId, ObservedState>);
|
||||
struct LeaderStepDownState {
|
||||
observed: GlobalObservedState,
|
||||
leader: ControllerPersistence,
|
||||
}
|
||||
|
||||
impl Service {
|
||||
pub fn get_config(&self) -> &Config {
|
||||
@@ -513,15 +504,11 @@ impl Service {
|
||||
#[instrument(skip_all)]
|
||||
async fn startup_reconcile(
|
||||
self: &Arc<Service>,
|
||||
leader_step_down_state: Option<LeaderStepDownState>,
|
||||
bg_compute_notify_result_tx: tokio::sync::mpsc::Sender<
|
||||
Result<(), (TenantShardId, NotifyError)>,
|
||||
>,
|
||||
) {
|
||||
// For all tenant shards, a vector of observed states on nodes (where None means
|
||||
// indeterminate, same as in [`ObservedStateLocation`])
|
||||
let mut observed: HashMap<TenantShardId, Vec<(NodeId, Option<LocationConfig>)>> =
|
||||
HashMap::new();
|
||||
|
||||
// Startup reconciliation does I/O to other services: whether they
|
||||
// are responsive or not, we should aim to finish within our deadline, because:
|
||||
// - If we don't, a k8s readiness hook watching /ready will kill us.
|
||||
@@ -535,26 +522,28 @@ impl Service {
|
||||
.checked_add(STARTUP_RECONCILE_TIMEOUT / 2)
|
||||
.expect("Reconcile timeout is a modest constant");
|
||||
|
||||
let (observed, current_leader) = if let Some(state) = leader_step_down_state {
|
||||
tracing::info!(
|
||||
"Using observed state received from leader at {}",
|
||||
state.leader.address,
|
||||
);
|
||||
(state.observed, Some(state.leader))
|
||||
} else {
|
||||
(
|
||||
self.build_global_observed_state(node_scan_deadline).await,
|
||||
None,
|
||||
)
|
||||
};
|
||||
|
||||
// Accumulate a list of any tenant locations that ought to be detached
|
||||
let mut cleanup = Vec::new();
|
||||
|
||||
let node_listings = self.scan_node_locations(node_scan_deadline).await;
|
||||
// Send initial heartbeat requests to nodes that replied to the location listing above.
|
||||
let nodes_online = self.initial_heartbeat_round(node_listings.keys()).await;
|
||||
|
||||
for (node_id, list_response) in node_listings {
|
||||
let tenant_shards = list_response.tenant_shards;
|
||||
tracing::info!(
|
||||
"Received {} shard statuses from pageserver {}, setting it to Active",
|
||||
tenant_shards.len(),
|
||||
node_id
|
||||
);
|
||||
|
||||
for (tenant_shard_id, conf_opt) in tenant_shards {
|
||||
let shard_observations = observed.entry(tenant_shard_id).or_default();
|
||||
shard_observations.push((node_id, conf_opt));
|
||||
}
|
||||
}
|
||||
// Send initial heartbeat requests to all nodes loaded from the database
|
||||
let all_nodes = {
|
||||
let locked = self.inner.read().unwrap();
|
||||
locked.nodes.clone()
|
||||
};
|
||||
let nodes_online = self.initial_heartbeat_round(all_nodes.keys()).await;
|
||||
|
||||
// List of tenants for which we will attempt to notify compute of their location at startup
|
||||
let mut compute_notifications = Vec::new();
|
||||
@@ -577,17 +566,16 @@ impl Service {
|
||||
}
|
||||
*nodes = Arc::new(new_nodes);
|
||||
|
||||
for (tenant_shard_id, shard_observations) in observed {
|
||||
for (node_id, observed_loc) in shard_observations {
|
||||
let Some(tenant_shard) = tenants.get_mut(&tenant_shard_id) else {
|
||||
cleanup.push((tenant_shard_id, node_id));
|
||||
continue;
|
||||
};
|
||||
tenant_shard
|
||||
.observed
|
||||
.locations
|
||||
.insert(node_id, ObservedStateLocation { conf: observed_loc });
|
||||
}
|
||||
for (tenant_shard_id, observed_state) in observed.0 {
|
||||
let Some(tenant_shard) = tenants.get_mut(&tenant_shard_id) else {
|
||||
for node_id in observed_state.locations.keys() {
|
||||
cleanup.push((tenant_shard_id, *node_id));
|
||||
}
|
||||
|
||||
continue;
|
||||
};
|
||||
|
||||
tenant_shard.observed = observed_state;
|
||||
}
|
||||
|
||||
// Populate each tenant's intent state
|
||||
@@ -621,6 +609,28 @@ impl Service {
|
||||
tenants.len()
|
||||
};
|
||||
|
||||
// Before making any obeservable changes to the cluster, persist self
|
||||
// as leader in database and memory.
|
||||
if let Some(address_for_peers) = &self.config.address_for_peers {
|
||||
// TODO: `address-for-peers` can become a mandatory cli arg
|
||||
// after we update the k8s setup
|
||||
let proposed_leader = ControllerPersistence {
|
||||
address: address_for_peers.to_string(),
|
||||
started_at: chrono::Utc::now(),
|
||||
};
|
||||
|
||||
if let Err(err) = self
|
||||
.persistence
|
||||
.update_leader(current_leader, proposed_leader)
|
||||
.await
|
||||
{
|
||||
tracing::error!("Failed to persist self as leader: {err}. Aborting start-up ...");
|
||||
std::process::exit(1);
|
||||
}
|
||||
}
|
||||
|
||||
self.inner.write().unwrap().become_leader();
|
||||
|
||||
// TODO: if any tenant's intent now differs from its loaded generation_pageserver, we should clear that
|
||||
// generation_pageserver in the database.
|
||||
|
||||
@@ -786,6 +796,31 @@ impl Service {
|
||||
node_results
|
||||
}
|
||||
|
||||
async fn build_global_observed_state(&self, deadline: Instant) -> GlobalObservedState {
|
||||
let node_listings = self.scan_node_locations(deadline).await;
|
||||
let mut observed = GlobalObservedState::default();
|
||||
|
||||
for (node_id, location_confs) in node_listings {
|
||||
tracing::info!(
|
||||
"Received {} shard statuses from pageserver {}",
|
||||
location_confs.tenant_shards.len(),
|
||||
node_id
|
||||
);
|
||||
|
||||
for (tid, location_conf) in location_confs.tenant_shards {
|
||||
let entry = observed.0.entry(tid).or_default();
|
||||
entry.locations.insert(
|
||||
node_id,
|
||||
ObservedStateLocation {
|
||||
conf: location_conf,
|
||||
},
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
observed
|
||||
}
|
||||
|
||||
/// Used during [`Self::startup_reconcile`]: detach a list of unknown-to-us tenants from pageservers.
|
||||
///
|
||||
/// This is safe to run in the background, because if we don't have this TenantShardId in our map of
|
||||
@@ -1264,12 +1299,20 @@ impl Service {
|
||||
config.max_warming_up_interval,
|
||||
cancel.clone(),
|
||||
);
|
||||
|
||||
let initial_leadership_status = if config.start_as_candidate {
|
||||
LeadershipStatus::Candidate
|
||||
} else {
|
||||
LeadershipStatus::Leader
|
||||
};
|
||||
|
||||
let this = Arc::new(Self {
|
||||
inner: Arc::new(std::sync::RwLock::new(ServiceState::new(
|
||||
nodes,
|
||||
tenants,
|
||||
scheduler,
|
||||
delayed_reconcile_rx,
|
||||
initial_leadership_status,
|
||||
))),
|
||||
config: config.clone(),
|
||||
persistence,
|
||||
@@ -1338,7 +1381,16 @@ impl Service {
|
||||
return;
|
||||
};
|
||||
|
||||
this.startup_reconcile(bg_compute_notify_result_tx).await;
|
||||
let leadership_status = this.inner.read().unwrap().get_leadership_status();
|
||||
let peer_observed_state = match leadership_status {
|
||||
LeadershipStatus::Candidate => this.request_step_down().await,
|
||||
LeadershipStatus::Leader => None,
|
||||
LeadershipStatus::SteppedDown => unreachable!(),
|
||||
};
|
||||
|
||||
this.startup_reconcile(peer_observed_state, bg_compute_notify_result_tx)
|
||||
.await;
|
||||
|
||||
drop(startup_completion);
|
||||
}
|
||||
});
|
||||
@@ -6285,4 +6337,61 @@ impl Service {
|
||||
|
||||
global_observed
|
||||
}
|
||||
|
||||
/// Request step down from the currently registered leader in the database
|
||||
///
|
||||
/// If such an entry is persisted, the success path returns the observed
|
||||
/// state and details of the leader. Otherwise, None is returned indicating
|
||||
/// there is no leader currently.
|
||||
///
|
||||
/// On failures to query the database or step down error responses the process is killed
|
||||
/// and we rely on k8s to retry.
|
||||
async fn request_step_down(&self) -> Option<LeaderStepDownState> {
|
||||
let leader = match self.persistence.get_leader().await {
|
||||
Ok(leader) => leader,
|
||||
Err(err) => {
|
||||
tracing::error!(
|
||||
"Failed to query database for current leader: {err}. Aborting start-up ..."
|
||||
);
|
||||
std::process::exit(1);
|
||||
}
|
||||
};
|
||||
|
||||
match leader {
|
||||
Some(leader) => {
|
||||
tracing::info!("Sending step down request to {leader:?}");
|
||||
|
||||
// TODO: jwt token
|
||||
let client = PeerClient::new(
|
||||
Uri::try_from(leader.address.as_str()).expect("Failed to build leader URI"),
|
||||
self.config.jwt_token.clone(),
|
||||
);
|
||||
let state = client.step_down(&self.cancel).await;
|
||||
match state {
|
||||
Ok(state) => Some(LeaderStepDownState {
|
||||
observed: state,
|
||||
leader: leader.clone(),
|
||||
}),
|
||||
Err(err) => {
|
||||
// TODO: Make leaders periodically update a timestamp field in the
|
||||
// database and, if the leader is not reachable from the current instance,
|
||||
// but inferred as alive from the timestamp, abort start-up. This avoids
|
||||
// a potential scenario in which we have two controllers acting as leaders.
|
||||
tracing::error!(
|
||||
"Leader ({}) did not respond to step-down request: {}",
|
||||
leader.address,
|
||||
err
|
||||
);
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
None => {
|
||||
tracing::info!(
|
||||
"No leader found to request step down from. Will build observed state."
|
||||
);
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user