mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-04 20:12:54 +00:00
storcon: run db migrations after step down sequence (#8756)
## Problem Previously, we would run db migrations before doing the step-down sequence. This meant that the current leader would have to deal with the schema changes and that's generally not safe. ## Summary of changes Push the step-down procedure earlier in start-up and do db migrations right after it (but before we load-up the in-memory state from the db). Epic: https://github.com/neondatabase/cloud/issues/14701
This commit is contained in:
@@ -217,7 +217,7 @@ impl StorageController {
|
||||
Ok(exitcode.success())
|
||||
}
|
||||
|
||||
/// Create our database if it doesn't exist, and run migrations.
|
||||
/// Create our database if it doesn't exist
|
||||
///
|
||||
/// This function is equivalent to the `diesel setup` command in the diesel CLI. We implement
|
||||
/// the same steps by hand to avoid imposing a dependency on installing diesel-cli for developers
|
||||
@@ -382,7 +382,6 @@ impl StorageController {
|
||||
)
|
||||
.await?;
|
||||
|
||||
// Run migrations on every startup, in case something changed.
|
||||
self.setup_database(postgres_port).await?;
|
||||
}
|
||||
|
||||
|
||||
136
storage_controller/src/leadership.rs
Normal file
136
storage_controller/src/leadership.rs
Normal file
@@ -0,0 +1,136 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use hyper::Uri;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
|
||||
use crate::{
|
||||
peer_client::{GlobalObservedState, PeerClient},
|
||||
persistence::{ControllerPersistence, DatabaseError, DatabaseResult, Persistence},
|
||||
service::Config,
|
||||
};
|
||||
|
||||
/// Helper for storage controller leadership acquisition
|
||||
pub(crate) struct Leadership {
|
||||
persistence: Arc<Persistence>,
|
||||
config: Config,
|
||||
cancel: CancellationToken,
|
||||
}
|
||||
|
||||
#[derive(thiserror::Error, Debug)]
|
||||
pub(crate) enum Error {
|
||||
#[error(transparent)]
|
||||
Database(#[from] DatabaseError),
|
||||
}
|
||||
|
||||
pub(crate) type Result<T> = std::result::Result<T, Error>;
|
||||
|
||||
impl Leadership {
|
||||
pub(crate) fn new(
|
||||
persistence: Arc<Persistence>,
|
||||
config: Config,
|
||||
cancel: CancellationToken,
|
||||
) -> Self {
|
||||
Self {
|
||||
persistence,
|
||||
config,
|
||||
cancel,
|
||||
}
|
||||
}
|
||||
|
||||
/// Find the current leader in the database and request it to step down if required.
|
||||
/// Should be called early on in within the start-up sequence.
|
||||
///
|
||||
/// Returns a tuple of two optionals: the current leader and its observed state
|
||||
pub(crate) async fn step_down_current_leader(
|
||||
&self,
|
||||
) -> Result<(Option<ControllerPersistence>, Option<GlobalObservedState>)> {
|
||||
let leader = self.current_leader().await?;
|
||||
let leader_step_down_state = if let Some(ref leader) = leader {
|
||||
if self.config.start_as_candidate {
|
||||
self.request_step_down(leader).await
|
||||
} else {
|
||||
None
|
||||
}
|
||||
} else {
|
||||
tracing::info!("No leader found to request step down from. Will build observed state.");
|
||||
None
|
||||
};
|
||||
|
||||
Ok((leader, leader_step_down_state))
|
||||
}
|
||||
|
||||
/// Mark the current storage controller instance as the leader in the database
|
||||
pub(crate) async fn become_leader(
|
||||
&self,
|
||||
current_leader: Option<ControllerPersistence>,
|
||||
) -> Result<()> {
|
||||
if let Some(address_for_peers) = &self.config.address_for_peers {
|
||||
// TODO: `address-for-peers` can become a mandatory cli arg
|
||||
// after we update the k8s setup
|
||||
let proposed_leader = ControllerPersistence {
|
||||
address: address_for_peers.to_string(),
|
||||
started_at: chrono::Utc::now(),
|
||||
};
|
||||
|
||||
self.persistence
|
||||
.update_leader(current_leader, proposed_leader)
|
||||
.await
|
||||
.map_err(Error::Database)
|
||||
} else {
|
||||
tracing::info!("No address-for-peers provided. Skipping leader persistence.");
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
async fn current_leader(&self) -> DatabaseResult<Option<ControllerPersistence>> {
|
||||
let res = self.persistence.get_leader().await;
|
||||
if let Err(DatabaseError::Query(diesel::result::Error::DatabaseError(_kind, ref err))) = res
|
||||
{
|
||||
const REL_NOT_FOUND_MSG: &str = "relation \"controllers\" does not exist";
|
||||
if err.message().trim() == REL_NOT_FOUND_MSG {
|
||||
// Special case: if this is a brand new storage controller, migrations will not
|
||||
// have run at this point yet, and, hence, the controllers table does not exist.
|
||||
// Detect this case via the error string (diesel doesn't type it) and allow it.
|
||||
tracing::info!("Detected first storage controller start-up. Allowing missing controllers table ...");
|
||||
return Ok(None);
|
||||
}
|
||||
}
|
||||
|
||||
res
|
||||
}
|
||||
|
||||
/// Request step down from the currently registered leader in the database
|
||||
///
|
||||
/// If such an entry is persisted, the success path returns the observed
|
||||
/// state and details of the leader. Otherwise, None is returned indicating
|
||||
/// there is no leader currently.
|
||||
async fn request_step_down(
|
||||
&self,
|
||||
leader: &ControllerPersistence,
|
||||
) -> Option<GlobalObservedState> {
|
||||
tracing::info!("Sending step down request to {leader:?}");
|
||||
|
||||
// TODO: jwt token
|
||||
let client = PeerClient::new(
|
||||
Uri::try_from(leader.address.as_str()).expect("Failed to build leader URI"),
|
||||
self.config.jwt_token.clone(),
|
||||
);
|
||||
let state = client.step_down(&self.cancel).await;
|
||||
match state {
|
||||
Ok(state) => Some(state),
|
||||
Err(err) => {
|
||||
// TODO: Make leaders periodically update a timestamp field in the
|
||||
// database and, if the leader is not reachable from the current instance,
|
||||
// but inferred as alive from the timestamp, abort start-up. This avoids
|
||||
// a potential scenario in which we have two controllers acting as leaders.
|
||||
tracing::error!(
|
||||
"Leader ({}) did not respond to step-down request: {}",
|
||||
leader.address,
|
||||
err
|
||||
);
|
||||
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -8,6 +8,7 @@ mod drain_utils;
|
||||
mod heartbeater;
|
||||
pub mod http;
|
||||
mod id_lock_map;
|
||||
mod leadership;
|
||||
pub mod metrics;
|
||||
mod node;
|
||||
mod pageserver_client;
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
use anyhow::{anyhow, Context};
|
||||
use clap::Parser;
|
||||
use diesel::Connection;
|
||||
use hyper::Uri;
|
||||
use metrics::launch_timestamp::LaunchTimestamp;
|
||||
use metrics::BuildInfo;
|
||||
@@ -27,9 +26,6 @@ use utils::{project_build_tag, project_git_version, tcp_listener};
|
||||
project_git_version!(GIT_VERSION);
|
||||
project_build_tag!(BUILD_TAG);
|
||||
|
||||
use diesel_migrations::{embed_migrations, EmbeddedMigrations};
|
||||
pub const MIGRATIONS: EmbeddedMigrations = embed_migrations!("./migrations");
|
||||
|
||||
#[derive(Parser)]
|
||||
#[command(author, version, about, long_about = None)]
|
||||
#[command(arg_required_else_help(true))]
|
||||
@@ -181,20 +177,6 @@ impl Secrets {
|
||||
}
|
||||
}
|
||||
|
||||
/// Execute the diesel migrations that are built into this binary
|
||||
async fn migration_run(database_url: &str) -> anyhow::Result<()> {
|
||||
use diesel::PgConnection;
|
||||
use diesel_migrations::{HarnessWithOutput, MigrationHarness};
|
||||
let mut conn = PgConnection::establish(database_url)?;
|
||||
|
||||
HarnessWithOutput::write_to_stdout(&mut conn)
|
||||
.run_pending_migrations(MIGRATIONS)
|
||||
.map(|_| ())
|
||||
.map_err(|e| anyhow::anyhow!(e))?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn main() -> anyhow::Result<()> {
|
||||
logging::init(
|
||||
LogFormat::Plain,
|
||||
@@ -304,13 +286,9 @@ async fn async_main() -> anyhow::Result<()> {
|
||||
http_service_port: args.listen.port() as i32,
|
||||
};
|
||||
|
||||
// After loading secrets & config, but before starting anything else, apply database migrations
|
||||
// Validate that we can connect to the database
|
||||
Persistence::await_connection(&secrets.database_url, args.db_connect_timeout.into()).await?;
|
||||
|
||||
migration_run(&secrets.database_url)
|
||||
.await
|
||||
.context("Running database migrations")?;
|
||||
|
||||
let persistence = Arc::new(Persistence::new(secrets.database_url));
|
||||
|
||||
let service = Service::spawn(config, persistence.clone()).await?;
|
||||
|
||||
@@ -230,6 +230,7 @@ pub(crate) enum DatabaseErrorLabel {
|
||||
Connection,
|
||||
ConnectionPool,
|
||||
Logical,
|
||||
Migration,
|
||||
}
|
||||
|
||||
impl DatabaseError {
|
||||
@@ -239,6 +240,7 @@ impl DatabaseError {
|
||||
Self::Connection(_) => DatabaseErrorLabel::Connection,
|
||||
Self::ConnectionPool(_) => DatabaseErrorLabel::ConnectionPool,
|
||||
Self::Logical(_) => DatabaseErrorLabel::Logical,
|
||||
Self::Migration(_) => DatabaseErrorLabel::Migration,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -25,6 +25,9 @@ use crate::metrics::{
|
||||
};
|
||||
use crate::node::Node;
|
||||
|
||||
use diesel_migrations::{embed_migrations, EmbeddedMigrations};
|
||||
const MIGRATIONS: EmbeddedMigrations = embed_migrations!("./migrations");
|
||||
|
||||
/// ## What do we store?
|
||||
///
|
||||
/// The storage controller service does not store most of its state durably.
|
||||
@@ -72,6 +75,8 @@ pub(crate) enum DatabaseError {
|
||||
ConnectionPool(#[from] r2d2::Error),
|
||||
#[error("Logical error: {0}")]
|
||||
Logical(String),
|
||||
#[error("Migration error: {0}")]
|
||||
Migration(String),
|
||||
}
|
||||
|
||||
#[derive(measured::FixedCardinalityLabel, Copy, Clone)]
|
||||
@@ -167,6 +172,19 @@ impl Persistence {
|
||||
}
|
||||
}
|
||||
|
||||
/// Execute the diesel migrations that are built into this binary
|
||||
pub(crate) async fn migration_run(&self) -> DatabaseResult<()> {
|
||||
use diesel_migrations::{HarnessWithOutput, MigrationHarness};
|
||||
|
||||
self.with_conn(move |conn| -> DatabaseResult<()> {
|
||||
HarnessWithOutput::write_to_stdout(conn)
|
||||
.run_pending_migrations(MIGRATIONS)
|
||||
.map(|_| ())
|
||||
.map_err(|e| DatabaseError::Migration(e.to_string()))
|
||||
})
|
||||
.await
|
||||
}
|
||||
|
||||
/// Wraps `with_conn` in order to collect latency and error metrics
|
||||
async fn with_measured_conn<F, R>(&self, op: DatabaseOperation, func: F) -> DatabaseResult<R>
|
||||
where
|
||||
|
||||
@@ -17,8 +17,9 @@ use crate::{
|
||||
compute_hook::NotifyError,
|
||||
drain_utils::{self, TenantShardDrain, TenantShardIterator},
|
||||
id_lock_map::{trace_exclusive_lock, trace_shared_lock, IdLockMap, TracingExclusiveGuard},
|
||||
leadership::Leadership,
|
||||
metrics,
|
||||
peer_client::{GlobalObservedState, PeerClient},
|
||||
peer_client::GlobalObservedState,
|
||||
persistence::{
|
||||
AbortShardSplitStatus, ControllerPersistence, DatabaseResult, MetadataHealthPersistence,
|
||||
TenantFilter,
|
||||
@@ -333,7 +334,7 @@ impl From<DatabaseError> for ApiError {
|
||||
DatabaseError::Connection(_) | DatabaseError::ConnectionPool(_) => {
|
||||
ApiError::ShuttingDown
|
||||
}
|
||||
DatabaseError::Logical(reason) => {
|
||||
DatabaseError::Logical(reason) | DatabaseError::Migration(reason) => {
|
||||
ApiError::InternalServerError(anyhow::anyhow!(reason))
|
||||
}
|
||||
}
|
||||
@@ -606,22 +607,15 @@ impl Service {
|
||||
|
||||
// Before making any obeservable changes to the cluster, persist self
|
||||
// as leader in database and memory.
|
||||
if let Some(address_for_peers) = &self.config.address_for_peers {
|
||||
// TODO: `address-for-peers` can become a mandatory cli arg
|
||||
// after we update the k8s setup
|
||||
let proposed_leader = ControllerPersistence {
|
||||
address: address_for_peers.to_string(),
|
||||
started_at: chrono::Utc::now(),
|
||||
};
|
||||
let leadership = Leadership::new(
|
||||
self.persistence.clone(),
|
||||
self.config.clone(),
|
||||
self.cancel.child_token(),
|
||||
);
|
||||
|
||||
if let Err(err) = self
|
||||
.persistence
|
||||
.update_leader(current_leader, proposed_leader)
|
||||
.await
|
||||
{
|
||||
tracing::error!("Failed to persist self as leader: {err}. Aborting start-up ...");
|
||||
std::process::exit(1);
|
||||
}
|
||||
if let Err(e) = leadership.become_leader(current_leader).await {
|
||||
tracing::error!("Failed to persist self as leader: {e}. Aborting start-up ...");
|
||||
std::process::exit(1);
|
||||
}
|
||||
|
||||
self.inner.write().unwrap().become_leader();
|
||||
@@ -1159,6 +1153,16 @@ impl Service {
|
||||
let (result_tx, result_rx) = tokio::sync::mpsc::unbounded_channel();
|
||||
let (abort_tx, abort_rx) = tokio::sync::mpsc::unbounded_channel();
|
||||
|
||||
let leadership_cancel = CancellationToken::new();
|
||||
let leadership = Leadership::new(persistence.clone(), config.clone(), leadership_cancel);
|
||||
let (leader, leader_step_down_state) = leadership.step_down_current_leader().await?;
|
||||
|
||||
// Apply the migrations **after** the current leader has stepped down
|
||||
// (or we've given up waiting for it), but **before** reading from the
|
||||
// database. The only exception is reading the current leader before
|
||||
// migrating.
|
||||
persistence.migration_run().await?;
|
||||
|
||||
tracing::info!("Loading nodes from database...");
|
||||
let nodes = persistence
|
||||
.list_nodes()
|
||||
@@ -1376,32 +1380,6 @@ impl Service {
|
||||
return;
|
||||
};
|
||||
|
||||
let leadership_status = this.inner.read().unwrap().get_leadership_status();
|
||||
let leader = match this.get_leader().await {
|
||||
Ok(ok) => ok,
|
||||
Err(err) => {
|
||||
tracing::error!(
|
||||
"Failed to query database for current leader: {err}. Aborting start-up ..."
|
||||
);
|
||||
std::process::exit(1);
|
||||
}
|
||||
};
|
||||
|
||||
let leader_step_down_state = match leadership_status {
|
||||
LeadershipStatus::Candidate => {
|
||||
if let Some(ref leader) = leader {
|
||||
this.request_step_down(leader).await
|
||||
} else {
|
||||
tracing::info!(
|
||||
"No leader found to request step down from. Will build observed state."
|
||||
);
|
||||
None
|
||||
}
|
||||
}
|
||||
LeadershipStatus::Leader => None,
|
||||
LeadershipStatus::SteppedDown => unreachable!(),
|
||||
};
|
||||
|
||||
this.startup_reconcile(leader, leader_step_down_state, bg_compute_notify_result_tx)
|
||||
.await;
|
||||
|
||||
@@ -6377,42 +6355,4 @@ impl Service {
|
||||
|
||||
global_observed
|
||||
}
|
||||
|
||||
/// Request step down from the currently registered leader in the database
|
||||
///
|
||||
/// If such an entry is persisted, the success path returns the observed
|
||||
/// state and details of the leader. Otherwise, None is returned indicating
|
||||
/// there is no leader currently.
|
||||
///
|
||||
/// On failures to query the database or step down error responses the process is killed
|
||||
/// and we rely on k8s to retry.
|
||||
async fn request_step_down(
|
||||
&self,
|
||||
leader: &ControllerPersistence,
|
||||
) -> Option<GlobalObservedState> {
|
||||
tracing::info!("Sending step down request to {leader:?}");
|
||||
|
||||
// TODO: jwt token
|
||||
let client = PeerClient::new(
|
||||
Uri::try_from(leader.address.as_str()).expect("Failed to build leader URI"),
|
||||
self.config.jwt_token.clone(),
|
||||
);
|
||||
let state = client.step_down(&self.cancel).await;
|
||||
match state {
|
||||
Ok(state) => Some(state),
|
||||
Err(err) => {
|
||||
// TODO: Make leaders periodically update a timestamp field in the
|
||||
// database and, if the leader is not reachable from the current instance,
|
||||
// but inferred as alive from the timestamp, abort start-up. This avoids
|
||||
// a potential scenario in which we have two controllers acting as leaders.
|
||||
tracing::error!(
|
||||
"Leader ({}) did not respond to step-down request: {}",
|
||||
leader.address,
|
||||
err
|
||||
);
|
||||
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user