mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-17 10:22:56 +00:00
Compare commits
8 Commits
release-pr
...
vlad/storc
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
71ff8f2433 | ||
|
|
56c43c4fae | ||
|
|
4187657082 | ||
|
|
b690ba5838 | ||
|
|
dd7cafdd97 | ||
|
|
c501a10612 | ||
|
|
1fdbef9a44 | ||
|
|
3ad1221e55 |
13
Cargo.lock
generated
13
Cargo.lock
generated
@@ -1744,6 +1744,18 @@ dependencies = [
|
||||
"const-random",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "dns-lookup"
|
||||
version = "2.0.4"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "e5766087c2235fec47fafa4cfecc81e494ee679d0fd4a59887ea0919bfb0e4fc"
|
||||
dependencies = [
|
||||
"cfg-if",
|
||||
"libc",
|
||||
"socket2 0.5.5",
|
||||
"windows-sys 0.48.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "dsl_auto_type"
|
||||
version = "0.1.1"
|
||||
@@ -5724,6 +5736,7 @@ dependencies = [
|
||||
"control_plane",
|
||||
"diesel",
|
||||
"diesel_migrations",
|
||||
"dns-lookup",
|
||||
"fail",
|
||||
"futures",
|
||||
"git-version",
|
||||
|
||||
@@ -53,6 +53,7 @@ diesel = { version = "2.1.4", features = [
|
||||
] }
|
||||
diesel_migrations = { version = "2.1.0" }
|
||||
r2d2 = { version = "0.8.10" }
|
||||
dns-lookup = { version = "2.0.4" }
|
||||
|
||||
utils = { path = "../libs/utils/" }
|
||||
metrics = { path = "../libs/metrics/" }
|
||||
|
||||
@@ -0,0 +1 @@
|
||||
DROP TABLE leader;
|
||||
@@ -0,0 +1,6 @@
|
||||
CREATE TABLE leader (
|
||||
hostname VARCHAR NOT NULL,
|
||||
port INTEGER NOT NULL,
|
||||
started_at TIMESTAMPTZ NOT NULL,
|
||||
PRIMARY KEY(hostname, port, started_at)
|
||||
);
|
||||
@@ -10,6 +10,7 @@ mod id_lock_map;
|
||||
pub mod metrics;
|
||||
mod node;
|
||||
mod pageserver_client;
|
||||
mod peer_client;
|
||||
pub mod persistence;
|
||||
mod reconciler;
|
||||
mod scheduler;
|
||||
|
||||
@@ -81,6 +81,9 @@ struct Cli {
|
||||
#[arg(long, default_value = "5s")]
|
||||
db_connect_timeout: humantime::Duration,
|
||||
|
||||
#[arg(long, default_value = "false")]
|
||||
start_as_candidate: bool,
|
||||
|
||||
/// `neon_local` sets this to the path of the neon_local repo dir.
|
||||
/// Only relevant for testing.
|
||||
// TODO: make `cfg(feature = "testing")`
|
||||
@@ -273,6 +276,8 @@ async fn async_main() -> anyhow::Result<()> {
|
||||
.unwrap_or(RECONCILER_CONCURRENCY_DEFAULT),
|
||||
split_threshold: args.split_threshold,
|
||||
neon_local_repo_dir: args.neon_local_repo_dir,
|
||||
start_as_candidate: args.start_as_candidate,
|
||||
http_service_port: args.listen.port() as i32,
|
||||
};
|
||||
|
||||
// After loading secrets & config, but before starting anything else, apply database migrations
|
||||
|
||||
104
storage_controller/src/peer_client.rs
Normal file
104
storage_controller/src/peer_client.rs
Normal file
@@ -0,0 +1,104 @@
|
||||
use crate::tenant_shard::ObservedState;
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::collections::HashMap;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
|
||||
use reqwest::{StatusCode, Url};
|
||||
use utils::{backoff, http::error::HttpErrorBody};
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub(crate) struct PeerClient {
|
||||
hostname: String,
|
||||
port: i32,
|
||||
jwt: Option<String>,
|
||||
client: reqwest::Client,
|
||||
}
|
||||
|
||||
#[derive(thiserror::Error, Debug)]
|
||||
pub(crate) enum StorageControllerPeerError {
|
||||
#[error("failed to deserialize error response with status code {0} at {1}: {2}")]
|
||||
DeserializationError(StatusCode, Url, reqwest::Error),
|
||||
#[error("storage controller peer API error ({0}): {1}")]
|
||||
ApiError(StatusCode, String),
|
||||
#[error("failed to send HTTP request: {0}")]
|
||||
SendError(reqwest::Error),
|
||||
#[error("Cancelled")]
|
||||
Cancelled,
|
||||
}
|
||||
|
||||
pub(crate) type Result<T> = std::result::Result<T, StorageControllerPeerError>;
|
||||
|
||||
pub(crate) trait ResponseErrorMessageExt: Sized {
|
||||
fn error_from_body(self) -> impl std::future::Future<Output = Result<Self>> + Send;
|
||||
}
|
||||
|
||||
impl ResponseErrorMessageExt for reqwest::Response {
|
||||
async fn error_from_body(self) -> Result<Self> {
|
||||
let status = self.status();
|
||||
if !(status.is_client_error() || status.is_server_error()) {
|
||||
return Ok(self);
|
||||
}
|
||||
|
||||
let url = self.url().to_owned();
|
||||
Err(match self.json::<HttpErrorBody>().await {
|
||||
Ok(HttpErrorBody { msg }) => StorageControllerPeerError::ApiError(status, msg),
|
||||
Err(err) => StorageControllerPeerError::DeserializationError(status, url, err),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Default)]
|
||||
pub(crate) struct GlobalObservedState(pub(crate) HashMap<TenantShardId, ObservedState>);
|
||||
|
||||
impl PeerClient {
|
||||
pub(crate) fn new(hostname: String, port: i32, jwt: Option<String>) -> Self {
|
||||
Self {
|
||||
hostname,
|
||||
port,
|
||||
jwt,
|
||||
client: reqwest::Client::new(),
|
||||
}
|
||||
}
|
||||
|
||||
async fn request_step_down(&self) -> Result<GlobalObservedState> {
|
||||
let uri = format!("{}:{}/control/v1/step_down", self.hostname, self.port);
|
||||
let req = self.client.put(uri);
|
||||
let req = if let Some(jwt) = &self.jwt {
|
||||
req.header(reqwest::header::AUTHORIZATION, format!("Bearer {jwt}"))
|
||||
} else {
|
||||
req
|
||||
};
|
||||
|
||||
let res = req
|
||||
.send()
|
||||
.await
|
||||
.map_err(StorageControllerPeerError::SendError)?;
|
||||
let response = res.error_from_body().await?;
|
||||
|
||||
let status = response.status();
|
||||
let url = response.url().to_owned();
|
||||
|
||||
response
|
||||
.json()
|
||||
.await
|
||||
.map_err(|err| StorageControllerPeerError::DeserializationError(status, url, err))
|
||||
}
|
||||
|
||||
pub(crate) async fn step_down(
|
||||
&self,
|
||||
cancel: &CancellationToken,
|
||||
) -> Result<GlobalObservedState> {
|
||||
backoff::retry(
|
||||
|| self.request_step_down(),
|
||||
|_e| false,
|
||||
4,
|
||||
8,
|
||||
"Send step down request",
|
||||
cancel,
|
||||
)
|
||||
.await
|
||||
.ok_or_else(|| StorageControllerPeerError::Cancelled)
|
||||
.and_then(|x| x)
|
||||
}
|
||||
}
|
||||
@@ -95,6 +95,8 @@ pub(crate) enum DatabaseOperation {
|
||||
ListMetadataHealth,
|
||||
ListMetadataHealthUnhealthy,
|
||||
ListMetadataHealthOutdated,
|
||||
GetLeader,
|
||||
UpdateLeader,
|
||||
}
|
||||
|
||||
#[must_use]
|
||||
@@ -785,6 +787,71 @@ impl Persistence {
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
/// Get the current entry from the `leader` table if one exists.
|
||||
/// It is an error for the table to contain more than one entry.
|
||||
pub(crate) async fn get_leader(&self) -> DatabaseResult<Option<LeaderPersistence>> {
|
||||
let mut leader: Vec<LeaderPersistence> = self
|
||||
.with_measured_conn(
|
||||
DatabaseOperation::GetLeader,
|
||||
move |conn| -> DatabaseResult<_> {
|
||||
Ok(crate::schema::leader::table.load::<LeaderPersistence>(conn)?)
|
||||
},
|
||||
)
|
||||
.await?;
|
||||
|
||||
if leader.len() > 1 {
|
||||
return Err(DatabaseError::Logical(format!(
|
||||
"More than one entry present in the leader table: {leader:?}"
|
||||
)));
|
||||
}
|
||||
|
||||
Ok(leader.pop())
|
||||
}
|
||||
|
||||
/// Update the new leader with compare-exchange semantics. If `prev` does not
|
||||
/// match the current leader entry, then the update is treated as a failure.
|
||||
/// When `prev` is not specified, the update is forced.
|
||||
pub(crate) async fn update_leader(
|
||||
&self,
|
||||
prev: Option<LeaderPersistence>,
|
||||
new: LeaderPersistence,
|
||||
) -> DatabaseResult<()> {
|
||||
use crate::schema::leader::dsl::*;
|
||||
|
||||
let updated = self
|
||||
.with_measured_conn(
|
||||
DatabaseOperation::UpdateLeader,
|
||||
move |conn| -> DatabaseResult<usize> {
|
||||
let updated = match &prev {
|
||||
Some(prev) => diesel::update(leader)
|
||||
.filter(hostname.eq(prev.hostname.clone()))
|
||||
.filter(port.eq(prev.port))
|
||||
.filter(started_at.eq(prev.started_at))
|
||||
.set((
|
||||
hostname.eq(new.hostname.clone()),
|
||||
port.eq(new.port),
|
||||
started_at.eq(new.started_at),
|
||||
))
|
||||
.execute(conn)?,
|
||||
None => diesel::insert_into(leader)
|
||||
.values(new.clone())
|
||||
.execute(conn)?,
|
||||
};
|
||||
|
||||
Ok(updated)
|
||||
},
|
||||
)
|
||||
.await?;
|
||||
|
||||
if updated == 0 {
|
||||
return Err(DatabaseError::Logical(
|
||||
"Leader table update failed".to_string(),
|
||||
));
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// Parts of [`crate::tenant_shard::TenantShard`] that are stored durably
|
||||
@@ -910,3 +977,13 @@ impl From<MetadataHealthPersistence> for MetadataHealthRecord {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(
|
||||
Serialize, Deserialize, Queryable, Selectable, Insertable, Eq, PartialEq, Debug, Clone,
|
||||
)]
|
||||
#[diesel(table_name = crate::schema::leader)]
|
||||
pub(crate) struct LeaderPersistence {
|
||||
pub(crate) hostname: String,
|
||||
pub(crate) port: i32,
|
||||
pub(crate) started_at: chrono::DateTime<chrono::Utc>,
|
||||
}
|
||||
|
||||
@@ -1,5 +1,13 @@
|
||||
// @generated automatically by Diesel CLI.
|
||||
|
||||
diesel::table! {
|
||||
leader (hostname, port, started_at) {
|
||||
hostname -> Varchar,
|
||||
port -> Int4,
|
||||
started_at -> Timestamptz,
|
||||
}
|
||||
}
|
||||
|
||||
diesel::table! {
|
||||
metadata_health (tenant_id, shard_number, shard_count) {
|
||||
tenant_id -> Varchar,
|
||||
@@ -36,4 +44,4 @@ diesel::table! {
|
||||
}
|
||||
}
|
||||
|
||||
diesel::allow_tables_to_appear_in_same_query!(metadata_health, nodes, tenant_shards,);
|
||||
diesel::allow_tables_to_appear_in_same_query!(leader, metadata_health, nodes, tenant_shards,);
|
||||
|
||||
@@ -16,7 +16,10 @@ use crate::{
|
||||
compute_hook::NotifyError,
|
||||
id_lock_map::{trace_exclusive_lock, trace_shared_lock, IdLockMap, TracingExclusiveGuard},
|
||||
metrics::LeadershipStatusGroup,
|
||||
persistence::{AbortShardSplitStatus, MetadataHealthPersistence, TenantFilter},
|
||||
peer_client::{GlobalObservedState, PeerClient},
|
||||
persistence::{
|
||||
AbortShardSplitStatus, LeaderPersistence, MetadataHealthPersistence, TenantFilter,
|
||||
},
|
||||
reconciler::{ReconcileError, ReconcileUnits},
|
||||
scheduler::{MaySchedule, ScheduleContext, ScheduleMode},
|
||||
tenant_shard::{
|
||||
@@ -82,7 +85,6 @@ use crate::{
|
||||
ReconcilerWaiter, TenantShard,
|
||||
},
|
||||
};
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
// For operations that should be quick, like attaching a new tenant
|
||||
const SHORT_RECONCILE_TIMEOUT: Duration = Duration::from_secs(5);
|
||||
@@ -223,6 +225,7 @@ impl ServiceState {
|
||||
tenants: BTreeMap<TenantShardId, TenantShard>,
|
||||
scheduler: Scheduler,
|
||||
delayed_reconcile_rx: tokio::sync::mpsc::Receiver<TenantShardId>,
|
||||
initial_leadership_status: LeadershipStatus,
|
||||
) -> Self {
|
||||
let status = &crate::metrics::METRICS_REGISTRY
|
||||
.metrics_group
|
||||
@@ -230,15 +233,13 @@ impl ServiceState {
|
||||
|
||||
status.set(
|
||||
LeadershipStatusGroup {
|
||||
status: LeadershipStatus::Leader,
|
||||
status: initial_leadership_status,
|
||||
},
|
||||
1,
|
||||
);
|
||||
|
||||
Self {
|
||||
// TODO: Starting up as Leader is a transient state. Once we enable rolling
|
||||
// upgrades on the k8s side, we should start up as Candidate.
|
||||
leadership_status: LeadershipStatus::Leader,
|
||||
leadership_status: initial_leadership_status,
|
||||
tenants,
|
||||
nodes: Arc::new(nodes),
|
||||
scheduler,
|
||||
@@ -287,6 +288,33 @@ impl ServiceState {
|
||||
0,
|
||||
);
|
||||
}
|
||||
|
||||
fn become_leader(&mut self) {
|
||||
self.leadership_status = LeadershipStatus::Leader;
|
||||
|
||||
let status = &crate::metrics::METRICS_REGISTRY
|
||||
.metrics_group
|
||||
.storage_controller_leadership_status;
|
||||
|
||||
status.set(
|
||||
LeadershipStatusGroup {
|
||||
status: LeadershipStatus::Leader,
|
||||
},
|
||||
1,
|
||||
);
|
||||
status.set(
|
||||
LeadershipStatusGroup {
|
||||
status: LeadershipStatus::SteppedDown,
|
||||
},
|
||||
0,
|
||||
);
|
||||
status.set(
|
||||
LeadershipStatusGroup {
|
||||
status: LeadershipStatus::Candidate,
|
||||
},
|
||||
0,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
@@ -323,6 +351,10 @@ pub struct Config {
|
||||
|
||||
// TODO: make this cfg(feature = "testing")
|
||||
pub neon_local_repo_dir: Option<PathBuf>,
|
||||
|
||||
pub start_as_candidate: bool,
|
||||
|
||||
pub http_service_port: i32,
|
||||
}
|
||||
|
||||
impl From<DatabaseError> for ApiError {
|
||||
@@ -490,9 +522,10 @@ pub(crate) enum ReconcileResultRequest {
|
||||
Stop,
|
||||
}
|
||||
|
||||
// TODO: move this into the storcon peer client when that gets added
|
||||
#[derive(Serialize, Deserialize, Debug, Default)]
|
||||
pub(crate) struct GlobalObservedState(HashMap<TenantShardId, ObservedState>);
|
||||
struct LeaderStepDownState {
|
||||
observed: GlobalObservedState,
|
||||
leader: LeaderPersistence,
|
||||
}
|
||||
|
||||
impl Service {
|
||||
pub fn get_config(&self) -> &Config {
|
||||
@@ -504,15 +537,11 @@ impl Service {
|
||||
#[instrument(skip_all)]
|
||||
async fn startup_reconcile(
|
||||
self: &Arc<Service>,
|
||||
leader_step_down_state: Option<LeaderStepDownState>,
|
||||
bg_compute_notify_result_tx: tokio::sync::mpsc::Sender<
|
||||
Result<(), (TenantShardId, NotifyError)>,
|
||||
>,
|
||||
) {
|
||||
// For all tenant shards, a vector of observed states on nodes (where None means
|
||||
// indeterminate, same as in [`ObservedStateLocation`])
|
||||
let mut observed: HashMap<TenantShardId, Vec<(NodeId, Option<LocationConfig>)>> =
|
||||
HashMap::new();
|
||||
|
||||
// Startup reconciliation does I/O to other services: whether they
|
||||
// are responsive or not, we should aim to finish within our deadline, because:
|
||||
// - If we don't, a k8s readiness hook watching /ready will kill us.
|
||||
@@ -526,26 +555,29 @@ impl Service {
|
||||
.checked_add(STARTUP_RECONCILE_TIMEOUT / 2)
|
||||
.expect("Reconcile timeout is a modest constant");
|
||||
|
||||
let (observed, current_leader) = if let Some(state) = leader_step_down_state {
|
||||
tracing::info!(
|
||||
"Using observed received from leader at {}:{}",
|
||||
state.leader.hostname,
|
||||
state.leader.port
|
||||
);
|
||||
(state.observed, Some(state.leader))
|
||||
} else {
|
||||
(
|
||||
self.build_global_observed_state(node_scan_deadline).await,
|
||||
None,
|
||||
)
|
||||
};
|
||||
|
||||
// Accumulate a list of any tenant locations that ought to be detached
|
||||
let mut cleanup = Vec::new();
|
||||
|
||||
let node_listings = self.scan_node_locations(node_scan_deadline).await;
|
||||
// Send initial heartbeat requests to nodes that replied to the location listing above.
|
||||
let nodes_online = self.initial_heartbeat_round(node_listings.keys()).await;
|
||||
|
||||
for (node_id, list_response) in node_listings {
|
||||
let tenant_shards = list_response.tenant_shards;
|
||||
tracing::info!(
|
||||
"Received {} shard statuses from pageserver {}, setting it to Active",
|
||||
tenant_shards.len(),
|
||||
node_id
|
||||
);
|
||||
|
||||
for (tenant_shard_id, conf_opt) in tenant_shards {
|
||||
let shard_observations = observed.entry(tenant_shard_id).or_default();
|
||||
shard_observations.push((node_id, conf_opt));
|
||||
}
|
||||
}
|
||||
// Send initial heartbeat requests to all nodes loaded from the database
|
||||
let all_nodes = {
|
||||
let locked = self.inner.read().unwrap();
|
||||
locked.nodes.clone()
|
||||
};
|
||||
let nodes_online = self.initial_heartbeat_round(all_nodes.keys()).await;
|
||||
|
||||
// List of tenants for which we will attempt to notify compute of their location at startup
|
||||
let mut compute_notifications = Vec::new();
|
||||
@@ -568,17 +600,16 @@ impl Service {
|
||||
}
|
||||
*nodes = Arc::new(new_nodes);
|
||||
|
||||
for (tenant_shard_id, shard_observations) in observed {
|
||||
for (node_id, observed_loc) in shard_observations {
|
||||
let Some(tenant_shard) = tenants.get_mut(&tenant_shard_id) else {
|
||||
cleanup.push((tenant_shard_id, node_id));
|
||||
continue;
|
||||
};
|
||||
tenant_shard
|
||||
.observed
|
||||
.locations
|
||||
.insert(node_id, ObservedStateLocation { conf: observed_loc });
|
||||
}
|
||||
for (tenant_shard_id, observed_state) in observed.0 {
|
||||
let Some(tenant_shard) = tenants.get_mut(&tenant_shard_id) else {
|
||||
for node_id in observed_state.locations.keys() {
|
||||
cleanup.push((tenant_shard_id, *node_id));
|
||||
}
|
||||
|
||||
continue;
|
||||
};
|
||||
|
||||
tenant_shard.observed = observed_state;
|
||||
}
|
||||
|
||||
// Populate each tenant's intent state
|
||||
@@ -612,6 +643,22 @@ impl Service {
|
||||
tenants.len()
|
||||
};
|
||||
|
||||
// Before making any obeservable changes to the cluster, persist self
|
||||
// as leader in database and memory.
|
||||
|
||||
let proposed_leader = self.get_proposed_leader_info();
|
||||
|
||||
if let Err(err) = self
|
||||
.persistence
|
||||
.update_leader(current_leader, proposed_leader)
|
||||
.await
|
||||
{
|
||||
tracing::error!("Failed to persist self as leader: {err}. Aborting start-up ...");
|
||||
std::process::exit(1);
|
||||
}
|
||||
|
||||
self.inner.write().unwrap().become_leader();
|
||||
|
||||
// TODO: if any tenant's intent now differs from its loaded generation_pageserver, we should clear that
|
||||
// generation_pageserver in the database.
|
||||
|
||||
@@ -777,6 +824,31 @@ impl Service {
|
||||
node_results
|
||||
}
|
||||
|
||||
async fn build_global_observed_state(&self, deadline: Instant) -> GlobalObservedState {
|
||||
let node_listings = self.scan_node_locations(deadline).await;
|
||||
let mut observed = GlobalObservedState::default();
|
||||
|
||||
for (node_id, location_confs) in node_listings {
|
||||
tracing::info!(
|
||||
"Received {} shard statuses from pageserver {}",
|
||||
location_confs.tenant_shards.len(),
|
||||
node_id
|
||||
);
|
||||
|
||||
for (tid, location_conf) in location_confs.tenant_shards {
|
||||
let entry = observed.0.entry(tid).or_default();
|
||||
entry.locations.insert(
|
||||
node_id,
|
||||
ObservedStateLocation {
|
||||
conf: location_conf,
|
||||
},
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
observed
|
||||
}
|
||||
|
||||
/// Used during [`Self::startup_reconcile`]: detach a list of unknown-to-us tenants from pageservers.
|
||||
///
|
||||
/// This is safe to run in the background, because if we don't have this TenantShardId in our map of
|
||||
@@ -1255,12 +1327,20 @@ impl Service {
|
||||
config.max_warming_up_interval,
|
||||
cancel.clone(),
|
||||
);
|
||||
|
||||
let initial_leadership_status = if config.start_as_candidate {
|
||||
LeadershipStatus::Candidate
|
||||
} else {
|
||||
LeadershipStatus::Leader
|
||||
};
|
||||
|
||||
let this = Arc::new(Self {
|
||||
inner: Arc::new(std::sync::RwLock::new(ServiceState::new(
|
||||
nodes,
|
||||
tenants,
|
||||
scheduler,
|
||||
delayed_reconcile_rx,
|
||||
initial_leadership_status,
|
||||
))),
|
||||
config: config.clone(),
|
||||
persistence,
|
||||
@@ -1329,7 +1409,16 @@ impl Service {
|
||||
return;
|
||||
};
|
||||
|
||||
this.startup_reconcile(bg_compute_notify_result_tx).await;
|
||||
let leadership_status = this.inner.read().unwrap().get_leadership_status();
|
||||
let peer_observed_state = match leadership_status {
|
||||
LeadershipStatus::Candidate => this.request_step_down().await,
|
||||
LeadershipStatus::Leader => None,
|
||||
LeadershipStatus::SteppedDown => unreachable!(),
|
||||
};
|
||||
|
||||
this.startup_reconcile(peer_observed_state, bg_compute_notify_result_tx)
|
||||
.await;
|
||||
|
||||
drop(startup_completion);
|
||||
}
|
||||
});
|
||||
@@ -6179,4 +6268,88 @@ impl Service {
|
||||
|
||||
global_observed
|
||||
}
|
||||
|
||||
/// Collect the details for the current proccess wishing to become the storage controller
|
||||
/// leader.
|
||||
///
|
||||
/// On failures to discover and resolve the hostname the process is killed and we rely on k8s to retry.
|
||||
fn get_proposed_leader_info(&self) -> LeaderPersistence {
|
||||
let hostname = match dns_lookup::get_hostname() {
|
||||
Ok(name) => name,
|
||||
Err(err) => {
|
||||
tracing::error!("Failed to discover hostname: {err}. Aborting start-up ...");
|
||||
std::process::exit(1);
|
||||
}
|
||||
};
|
||||
|
||||
let mut addrs = match dns_lookup::lookup_host(&hostname) {
|
||||
Ok(addrs) => addrs,
|
||||
Err(err) => {
|
||||
tracing::error!("Failed to resolve hostname: {err}. Aborting start-up ...");
|
||||
std::process::exit(1);
|
||||
}
|
||||
};
|
||||
|
||||
let addr = addrs
|
||||
.pop()
|
||||
.expect("k8s configured hostname always resolves");
|
||||
|
||||
let proposed = LeaderPersistence {
|
||||
hostname: addr.to_string(),
|
||||
port: self.get_config().http_service_port,
|
||||
started_at: chrono::Utc::now(),
|
||||
};
|
||||
|
||||
tracing::info!("Proposed leader details are: {proposed:?}");
|
||||
|
||||
proposed
|
||||
}
|
||||
|
||||
/// Request step down from the currently registered leader in the database
|
||||
///
|
||||
/// If such an entry is persisted, the success path returns the observed
|
||||
/// state and details of the leader. Otherwise, None is returned indicating
|
||||
/// there is no leader currently.
|
||||
///
|
||||
/// On failures to query the database or step down error responses the process is killed
|
||||
/// and we rely on k8s to retry.
|
||||
async fn request_step_down(&self) -> Option<LeaderStepDownState> {
|
||||
let leader = match self.persistence.get_leader().await {
|
||||
Ok(leader) => leader,
|
||||
Err(err) => {
|
||||
tracing::error!(
|
||||
"Failed to query database for current leader: {err}. Aborting start-up ..."
|
||||
);
|
||||
std::process::exit(1);
|
||||
}
|
||||
};
|
||||
|
||||
match leader {
|
||||
Some(leader) => {
|
||||
// TODO: jwt token
|
||||
let client = PeerClient::new(
|
||||
leader.hostname.to_owned(),
|
||||
leader.port,
|
||||
self.config.jwt_token.clone(),
|
||||
);
|
||||
let state = client.step_down(&self.cancel).await;
|
||||
match state {
|
||||
Ok(state) => Some(LeaderStepDownState {
|
||||
observed: state,
|
||||
leader: leader.clone(),
|
||||
}),
|
||||
Err(err) => {
|
||||
tracing::error!(
|
||||
"Leader ({}:{}) did not respond to step-down request: {}",
|
||||
leader.hostname,
|
||||
leader.port,
|
||||
err
|
||||
);
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
None => None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user