diff --git a/control_plane/src/storage_controller.rs b/control_plane/src/storage_controller.rs index 2c077595a1..f6539ad5b0 100644 --- a/control_plane/src/storage_controller.rs +++ b/control_plane/src/storage_controller.rs @@ -217,7 +217,7 @@ impl StorageController { Ok(exitcode.success()) } - /// Create our database if it doesn't exist, and run migrations. + /// Create our database if it doesn't exist /// /// This function is equivalent to the `diesel setup` command in the diesel CLI. We implement /// the same steps by hand to avoid imposing a dependency on installing diesel-cli for developers @@ -382,7 +382,6 @@ impl StorageController { ) .await?; - // Run migrations on every startup, in case something changed. self.setup_database(postgres_port).await?; } diff --git a/storage_controller/src/leadership.rs b/storage_controller/src/leadership.rs new file mode 100644 index 0000000000..a171bab451 --- /dev/null +++ b/storage_controller/src/leadership.rs @@ -0,0 +1,136 @@ +use std::sync::Arc; + +use hyper::Uri; +use tokio_util::sync::CancellationToken; + +use crate::{ + peer_client::{GlobalObservedState, PeerClient}, + persistence::{ControllerPersistence, DatabaseError, DatabaseResult, Persistence}, + service::Config, +}; + +/// Helper for storage controller leadership acquisition +pub(crate) struct Leadership { + persistence: Arc, + config: Config, + cancel: CancellationToken, +} + +#[derive(thiserror::Error, Debug)] +pub(crate) enum Error { + #[error(transparent)] + Database(#[from] DatabaseError), +} + +pub(crate) type Result = std::result::Result; + +impl Leadership { + pub(crate) fn new( + persistence: Arc, + config: Config, + cancel: CancellationToken, + ) -> Self { + Self { + persistence, + config, + cancel, + } + } + + /// Find the current leader in the database and request it to step down if required. + /// Should be called early on in within the start-up sequence. + /// + /// Returns a tuple of two optionals: the current leader and its observed state + pub(crate) async fn step_down_current_leader( + &self, + ) -> Result<(Option, Option)> { + let leader = self.current_leader().await?; + let leader_step_down_state = if let Some(ref leader) = leader { + if self.config.start_as_candidate { + self.request_step_down(leader).await + } else { + None + } + } else { + tracing::info!("No leader found to request step down from. Will build observed state."); + None + }; + + Ok((leader, leader_step_down_state)) + } + + /// Mark the current storage controller instance as the leader in the database + pub(crate) async fn become_leader( + &self, + current_leader: Option, + ) -> Result<()> { + if let Some(address_for_peers) = &self.config.address_for_peers { + // TODO: `address-for-peers` can become a mandatory cli arg + // after we update the k8s setup + let proposed_leader = ControllerPersistence { + address: address_for_peers.to_string(), + started_at: chrono::Utc::now(), + }; + + self.persistence + .update_leader(current_leader, proposed_leader) + .await + .map_err(Error::Database) + } else { + tracing::info!("No address-for-peers provided. Skipping leader persistence."); + Ok(()) + } + } + + async fn current_leader(&self) -> DatabaseResult> { + let res = self.persistence.get_leader().await; + if let Err(DatabaseError::Query(diesel::result::Error::DatabaseError(_kind, ref err))) = res + { + const REL_NOT_FOUND_MSG: &str = "relation \"controllers\" does not exist"; + if err.message().trim() == REL_NOT_FOUND_MSG { + // Special case: if this is a brand new storage controller, migrations will not + // have run at this point yet, and, hence, the controllers table does not exist. + // Detect this case via the error string (diesel doesn't type it) and allow it. + tracing::info!("Detected first storage controller start-up. Allowing missing controllers table ..."); + return Ok(None); + } + } + + res + } + + /// Request step down from the currently registered leader in the database + /// + /// If such an entry is persisted, the success path returns the observed + /// state and details of the leader. Otherwise, None is returned indicating + /// there is no leader currently. + async fn request_step_down( + &self, + leader: &ControllerPersistence, + ) -> Option { + tracing::info!("Sending step down request to {leader:?}"); + + // TODO: jwt token + let client = PeerClient::new( + Uri::try_from(leader.address.as_str()).expect("Failed to build leader URI"), + self.config.jwt_token.clone(), + ); + let state = client.step_down(&self.cancel).await; + match state { + Ok(state) => Some(state), + Err(err) => { + // TODO: Make leaders periodically update a timestamp field in the + // database and, if the leader is not reachable from the current instance, + // but inferred as alive from the timestamp, abort start-up. This avoids + // a potential scenario in which we have two controllers acting as leaders. + tracing::error!( + "Leader ({}) did not respond to step-down request: {}", + leader.address, + err + ); + + None + } + } + } +} diff --git a/storage_controller/src/lib.rs b/storage_controller/src/lib.rs index 2034addbe1..60e613bb5c 100644 --- a/storage_controller/src/lib.rs +++ b/storage_controller/src/lib.rs @@ -8,6 +8,7 @@ mod drain_utils; mod heartbeater; pub mod http; mod id_lock_map; +mod leadership; pub mod metrics; mod node; mod pageserver_client; diff --git a/storage_controller/src/main.rs b/storage_controller/src/main.rs index 7387d36690..17685b1140 100644 --- a/storage_controller/src/main.rs +++ b/storage_controller/src/main.rs @@ -1,6 +1,5 @@ use anyhow::{anyhow, Context}; use clap::Parser; -use diesel::Connection; use hyper::Uri; use metrics::launch_timestamp::LaunchTimestamp; use metrics::BuildInfo; @@ -27,9 +26,6 @@ use utils::{project_build_tag, project_git_version, tcp_listener}; project_git_version!(GIT_VERSION); project_build_tag!(BUILD_TAG); -use diesel_migrations::{embed_migrations, EmbeddedMigrations}; -pub const MIGRATIONS: EmbeddedMigrations = embed_migrations!("./migrations"); - #[derive(Parser)] #[command(author, version, about, long_about = None)] #[command(arg_required_else_help(true))] @@ -181,20 +177,6 @@ impl Secrets { } } -/// Execute the diesel migrations that are built into this binary -async fn migration_run(database_url: &str) -> anyhow::Result<()> { - use diesel::PgConnection; - use diesel_migrations::{HarnessWithOutput, MigrationHarness}; - let mut conn = PgConnection::establish(database_url)?; - - HarnessWithOutput::write_to_stdout(&mut conn) - .run_pending_migrations(MIGRATIONS) - .map(|_| ()) - .map_err(|e| anyhow::anyhow!(e))?; - - Ok(()) -} - fn main() -> anyhow::Result<()> { logging::init( LogFormat::Plain, @@ -304,13 +286,9 @@ async fn async_main() -> anyhow::Result<()> { http_service_port: args.listen.port() as i32, }; - // After loading secrets & config, but before starting anything else, apply database migrations + // Validate that we can connect to the database Persistence::await_connection(&secrets.database_url, args.db_connect_timeout.into()).await?; - migration_run(&secrets.database_url) - .await - .context("Running database migrations")?; - let persistence = Arc::new(Persistence::new(secrets.database_url)); let service = Service::spawn(config, persistence.clone()).await?; diff --git a/storage_controller/src/metrics.rs b/storage_controller/src/metrics.rs index c2303e7a7f..5cfcfb4b1f 100644 --- a/storage_controller/src/metrics.rs +++ b/storage_controller/src/metrics.rs @@ -230,6 +230,7 @@ pub(crate) enum DatabaseErrorLabel { Connection, ConnectionPool, Logical, + Migration, } impl DatabaseError { @@ -239,6 +240,7 @@ impl DatabaseError { Self::Connection(_) => DatabaseErrorLabel::Connection, Self::ConnectionPool(_) => DatabaseErrorLabel::ConnectionPool, Self::Logical(_) => DatabaseErrorLabel::Logical, + Self::Migration(_) => DatabaseErrorLabel::Migration, } } } diff --git a/storage_controller/src/persistence.rs b/storage_controller/src/persistence.rs index aebbdec0d1..16df19026c 100644 --- a/storage_controller/src/persistence.rs +++ b/storage_controller/src/persistence.rs @@ -25,6 +25,9 @@ use crate::metrics::{ }; use crate::node::Node; +use diesel_migrations::{embed_migrations, EmbeddedMigrations}; +const MIGRATIONS: EmbeddedMigrations = embed_migrations!("./migrations"); + /// ## What do we store? /// /// The storage controller service does not store most of its state durably. @@ -72,6 +75,8 @@ pub(crate) enum DatabaseError { ConnectionPool(#[from] r2d2::Error), #[error("Logical error: {0}")] Logical(String), + #[error("Migration error: {0}")] + Migration(String), } #[derive(measured::FixedCardinalityLabel, Copy, Clone)] @@ -167,6 +172,19 @@ impl Persistence { } } + /// Execute the diesel migrations that are built into this binary + pub(crate) async fn migration_run(&self) -> DatabaseResult<()> { + use diesel_migrations::{HarnessWithOutput, MigrationHarness}; + + self.with_conn(move |conn| -> DatabaseResult<()> { + HarnessWithOutput::write_to_stdout(conn) + .run_pending_migrations(MIGRATIONS) + .map(|_| ()) + .map_err(|e| DatabaseError::Migration(e.to_string())) + }) + .await + } + /// Wraps `with_conn` in order to collect latency and error metrics async fn with_measured_conn(&self, op: DatabaseOperation, func: F) -> DatabaseResult where diff --git a/storage_controller/src/service.rs b/storage_controller/src/service.rs index 3459b44774..780f4a7ee5 100644 --- a/storage_controller/src/service.rs +++ b/storage_controller/src/service.rs @@ -17,8 +17,9 @@ use crate::{ compute_hook::NotifyError, drain_utils::{self, TenantShardDrain, TenantShardIterator}, id_lock_map::{trace_exclusive_lock, trace_shared_lock, IdLockMap, TracingExclusiveGuard}, + leadership::Leadership, metrics, - peer_client::{GlobalObservedState, PeerClient}, + peer_client::GlobalObservedState, persistence::{ AbortShardSplitStatus, ControllerPersistence, DatabaseResult, MetadataHealthPersistence, TenantFilter, @@ -333,7 +334,7 @@ impl From for ApiError { DatabaseError::Connection(_) | DatabaseError::ConnectionPool(_) => { ApiError::ShuttingDown } - DatabaseError::Logical(reason) => { + DatabaseError::Logical(reason) | DatabaseError::Migration(reason) => { ApiError::InternalServerError(anyhow::anyhow!(reason)) } } @@ -606,22 +607,15 @@ impl Service { // Before making any obeservable changes to the cluster, persist self // as leader in database and memory. - if let Some(address_for_peers) = &self.config.address_for_peers { - // TODO: `address-for-peers` can become a mandatory cli arg - // after we update the k8s setup - let proposed_leader = ControllerPersistence { - address: address_for_peers.to_string(), - started_at: chrono::Utc::now(), - }; + let leadership = Leadership::new( + self.persistence.clone(), + self.config.clone(), + self.cancel.child_token(), + ); - if let Err(err) = self - .persistence - .update_leader(current_leader, proposed_leader) - .await - { - tracing::error!("Failed to persist self as leader: {err}. Aborting start-up ..."); - std::process::exit(1); - } + if let Err(e) = leadership.become_leader(current_leader).await { + tracing::error!("Failed to persist self as leader: {e}. Aborting start-up ..."); + std::process::exit(1); } self.inner.write().unwrap().become_leader(); @@ -1159,6 +1153,16 @@ impl Service { let (result_tx, result_rx) = tokio::sync::mpsc::unbounded_channel(); let (abort_tx, abort_rx) = tokio::sync::mpsc::unbounded_channel(); + let leadership_cancel = CancellationToken::new(); + let leadership = Leadership::new(persistence.clone(), config.clone(), leadership_cancel); + let (leader, leader_step_down_state) = leadership.step_down_current_leader().await?; + + // Apply the migrations **after** the current leader has stepped down + // (or we've given up waiting for it), but **before** reading from the + // database. The only exception is reading the current leader before + // migrating. + persistence.migration_run().await?; + tracing::info!("Loading nodes from database..."); let nodes = persistence .list_nodes() @@ -1376,32 +1380,6 @@ impl Service { return; }; - let leadership_status = this.inner.read().unwrap().get_leadership_status(); - let leader = match this.get_leader().await { - Ok(ok) => ok, - Err(err) => { - tracing::error!( - "Failed to query database for current leader: {err}. Aborting start-up ..." - ); - std::process::exit(1); - } - }; - - let leader_step_down_state = match leadership_status { - LeadershipStatus::Candidate => { - if let Some(ref leader) = leader { - this.request_step_down(leader).await - } else { - tracing::info!( - "No leader found to request step down from. Will build observed state." - ); - None - } - } - LeadershipStatus::Leader => None, - LeadershipStatus::SteppedDown => unreachable!(), - }; - this.startup_reconcile(leader, leader_step_down_state, bg_compute_notify_result_tx) .await; @@ -6377,42 +6355,4 @@ impl Service { global_observed } - - /// Request step down from the currently registered leader in the database - /// - /// If such an entry is persisted, the success path returns the observed - /// state and details of the leader. Otherwise, None is returned indicating - /// there is no leader currently. - /// - /// On failures to query the database or step down error responses the process is killed - /// and we rely on k8s to retry. - async fn request_step_down( - &self, - leader: &ControllerPersistence, - ) -> Option { - tracing::info!("Sending step down request to {leader:?}"); - - // TODO: jwt token - let client = PeerClient::new( - Uri::try_from(leader.address.as_str()).expect("Failed to build leader URI"), - self.config.jwt_token.clone(), - ); - let state = client.step_down(&self.cancel).await; - match state { - Ok(state) => Some(state), - Err(err) => { - // TODO: Make leaders periodically update a timestamp field in the - // database and, if the leader is not reachable from the current instance, - // but inferred as alive from the timestamp, abort start-up. This avoids - // a potential scenario in which we have two controllers acting as leaders. - tracing::error!( - "Leader ({}) did not respond to step-down request: {}", - leader.address, - err - ); - - None - } - } - } }