mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-15 01:12:56 +00:00
storcon: add start-up sequence utilities
This commit is contained in:
13
Cargo.lock
generated
13
Cargo.lock
generated
@@ -1744,6 +1744,18 @@ dependencies = [
|
||||
"const-random",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "dns-lookup"
|
||||
version = "2.0.4"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "e5766087c2235fec47fafa4cfecc81e494ee679d0fd4a59887ea0919bfb0e4fc"
|
||||
dependencies = [
|
||||
"cfg-if",
|
||||
"libc",
|
||||
"socket2 0.5.5",
|
||||
"windows-sys 0.48.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "dsl_auto_type"
|
||||
version = "0.1.1"
|
||||
@@ -5724,6 +5736,7 @@ dependencies = [
|
||||
"control_plane",
|
||||
"diesel",
|
||||
"diesel_migrations",
|
||||
"dns-lookup",
|
||||
"fail",
|
||||
"futures",
|
||||
"git-version",
|
||||
|
||||
@@ -53,6 +53,7 @@ diesel = { version = "2.1.4", features = [
|
||||
] }
|
||||
diesel_migrations = { version = "2.1.0" }
|
||||
r2d2 = { version = "0.8.10" }
|
||||
dns-lookup = { version = "2.0.4" }
|
||||
|
||||
utils = { path = "../libs/utils/" }
|
||||
metrics = { path = "../libs/metrics/" }
|
||||
|
||||
@@ -277,6 +277,7 @@ async fn async_main() -> anyhow::Result<()> {
|
||||
split_threshold: args.split_threshold,
|
||||
neon_local_repo_dir: args.neon_local_repo_dir,
|
||||
start_as_candidate: args.start_as_candidate,
|
||||
http_service_port: args.listen.port() as i32,
|
||||
};
|
||||
|
||||
// After loading secrets & config, but before starting anything else, apply database migrations
|
||||
|
||||
@@ -16,8 +16,10 @@ use crate::{
|
||||
compute_hook::NotifyError,
|
||||
id_lock_map::{trace_exclusive_lock, trace_shared_lock, IdLockMap, TracingExclusiveGuard},
|
||||
metrics::LeadershipStatusGroup,
|
||||
peer_client::GlobalObservedState,
|
||||
persistence::{AbortShardSplitStatus, MetadataHealthPersistence, TenantFilter},
|
||||
peer_client::{GlobalObservedState, PeerClient},
|
||||
persistence::{
|
||||
AbortShardSplitStatus, LeaderPersistence, MetadataHealthPersistence, TenantFilter,
|
||||
},
|
||||
reconciler::{ReconcileError, ReconcileUnits},
|
||||
scheduler::{MaySchedule, ScheduleContext, ScheduleMode},
|
||||
tenant_shard::{
|
||||
@@ -324,6 +326,8 @@ pub struct Config {
|
||||
pub neon_local_repo_dir: Option<PathBuf>,
|
||||
|
||||
pub start_as_candidate: bool,
|
||||
|
||||
pub http_service_port: i32,
|
||||
}
|
||||
|
||||
impl From<DatabaseError> for ApiError {
|
||||
@@ -491,6 +495,11 @@ pub(crate) enum ReconcileResultRequest {
|
||||
Stop,
|
||||
}
|
||||
|
||||
struct LeaderStepDownState {
|
||||
observed: GlobalObservedState,
|
||||
leader: LeaderPersistence,
|
||||
}
|
||||
|
||||
impl Service {
|
||||
pub fn get_config(&self) -> &Config {
|
||||
&self.config
|
||||
@@ -6194,4 +6203,88 @@ impl Service {
|
||||
|
||||
global_observed
|
||||
}
|
||||
|
||||
/// Collect the details for the current proccess wishing to become the storage controller
|
||||
/// leader.
|
||||
///
|
||||
/// On failures to discover and resolve the hostname the process is killed and we rely on k8s to retry.
|
||||
fn get_proposed_leader_info(&self) -> LeaderPersistence {
|
||||
let hostname = match dns_lookup::get_hostname() {
|
||||
Ok(name) => name,
|
||||
Err(err) => {
|
||||
tracing::error!("Failed to discover hostname: {err}. Aborting start-up ...");
|
||||
std::process::exit(1);
|
||||
}
|
||||
};
|
||||
|
||||
let mut addrs = match dns_lookup::lookup_host(&hostname) {
|
||||
Ok(addrs) => addrs,
|
||||
Err(err) => {
|
||||
tracing::error!("Failed to resolve hostname: {err}. Aborting start-up ...");
|
||||
std::process::exit(1);
|
||||
}
|
||||
};
|
||||
|
||||
let addr = addrs
|
||||
.pop()
|
||||
.expect("k8s configured hostname always resolves");
|
||||
|
||||
let proposed = LeaderPersistence {
|
||||
hostname: addr.to_string(),
|
||||
port: self.get_config().http_service_port,
|
||||
started_at: chrono::Utc::now(),
|
||||
};
|
||||
|
||||
tracing::info!("Proposed leader details are: {proposed:?}");
|
||||
|
||||
proposed
|
||||
}
|
||||
|
||||
/// Request step down from the currently registered leader in the database
|
||||
///
|
||||
/// If such an entry is persisted, the success path returns the observed
|
||||
/// state and details of the leader. Otherwise, None is returned indicating
|
||||
/// there is no leader currently.
|
||||
///
|
||||
/// On failures to query the database or step down error responses the process is killed
|
||||
/// and we rely on k8s to retry.
|
||||
async fn request_step_down(&self) -> Option<LeaderStepDownState> {
|
||||
let leader = match self.persistence.get_leader().await {
|
||||
Ok(leader) => leader,
|
||||
Err(err) => {
|
||||
tracing::error!(
|
||||
"Failed to query database for current leader: {err}. Aborting start-up ..."
|
||||
);
|
||||
std::process::exit(1);
|
||||
}
|
||||
};
|
||||
|
||||
match leader {
|
||||
Some(leader) => {
|
||||
// TODO: jwt token
|
||||
let client = PeerClient::new(
|
||||
leader.hostname.to_owned(),
|
||||
leader.port,
|
||||
self.config.jwt_token.clone(),
|
||||
);
|
||||
let state = client.step_down(&self.cancel).await;
|
||||
match state {
|
||||
Ok(state) => Some(LeaderStepDownState {
|
||||
observed: state,
|
||||
leader: leader.clone(),
|
||||
}),
|
||||
Err(err) => {
|
||||
tracing::error!(
|
||||
"Leader ({}:{}) did not respond to step-down request: {}",
|
||||
leader.hostname,
|
||||
leader.port,
|
||||
err
|
||||
);
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
None => None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user