From a9c38ad137c62dde189b0d6e148796d48b9db3f2 Mon Sep 17 00:00:00 2001 From: Tristan Partin Date: Mon, 15 Jul 2024 16:00:32 -0500 Subject: [PATCH] Teach MigrationRunner about per-database migrations Up to this point, all of our migrations have run on the catalog, which is shared across Postgres databases, so we have tracked migrations in the "postgres" database. With the release of Postgres versions 14.12, 15.7, and 16.3, a CVE was disclosed for all clusters created prior to these latest point releases. The fix for the CVE is a SQL script that must run in every database in a cluster, including template0 and template1. This presents a little bit of a problem with the way we run migrations. We have a neon_migration.migration_id table which has one row that marks the last migration that was ran. That table is stored in the postgres database. Running this migration isn't transactional. A typical migration is of the form: BEGIN -- Run migration COMMIT But transactions are not cluster-wide. _A_ solution to this is to run the fix on every database that isn't the "postgres" database, and then after all of those transactions are successful, "commit" that we've ran the migration into the neon_migration.migration_id table of the "postgres" database. In addition, we have to pay attention to the connectability and validity of the databases when running per-database migration. We can skip invalid databases (pg_database.datconnectivity = -2), but we need to adjust ALLOW_CONNECTIONS for a database, and then reset it back. This is preparatory work for the next commit. Link: https://www.postgresql.org/support/security/CVE-2024-4317/ Signed-off-by: Tristan Partin --- compute_tools/src/compute.rs | 21 +- compute_tools/src/migration.rs | 355 +++++++++++++++--- compute_tools/src/spec.rs | 63 ++-- .../regress/test_compute_migrations.py | 1 + 4 files changed, 346 insertions(+), 94 deletions(-) diff --git a/compute_tools/src/compute.rs b/compute_tools/src/compute.rs index 8834f0d63d..c6e4553a29 100644 --- a/compute_tools/src/compute.rs +++ b/compute_tools/src/compute.rs @@ -1500,24 +1500,9 @@ impl ComputeNode { let mut conf = conf.as_ref().clone(); conf.application_name("compute_ctl:migrations"); - match conf.connect(NoTls).await { - Ok((mut client, connection)) => { - tokio::spawn(async move { - if let Err(e) = connection.await { - eprintln!("connection error: {}", e); - } - }); - if let Err(e) = handle_migrations(&mut client).await { - error!("Failed to run migrations: {}", e); - } - } - Err(e) => { - error!( - "Failed to connect to the compute for running migrations: {}", - e - ); - } - }; + if let Err(e) = handle_migrations(conf).await { + error!("Failed to run migrations: {}", e); + } }); Ok::<(), anyhow::Error>(()) diff --git a/compute_tools/src/migration.rs b/compute_tools/src/migration.rs index c5e05822c0..28436eb8b9 100644 --- a/compute_tools/src/migration.rs +++ b/compute_tools/src/migration.rs @@ -1,29 +1,61 @@ use anyhow::{Context, Result}; use fail::fail_point; -use tokio_postgres::{Client, Transaction}; -use tracing::{error, info}; +use postgres::NoTls; +use tokio_postgres::{Client, Config, Transaction}; +use tracing::{error, info, warn}; use crate::metrics::DB_MIGRATION_FAILED; /// Runs a series of migrations on a target database +use compute_api::spec::{Database, PgIdent}; + +use crate::pg_helpers::{Escaping, get_existing_dbs_async}; + +pub(crate) enum Migration<'m> { + /// Cluster migrations are things like catalog updates, where they can be + /// run in the default Postgres database, but affect every database in the + /// cluster. + Cluster(&'m str), + + /// Per-database migrations will be run in every database of the cluster. + /// The migration will not be marked as completed until after it has been + /// run in every database. We will save the `postgres` database for last so + /// that we can commit the transaction as applied in the + /// neon_migration.migration_id table. + /// + /// Please be aware of the race condition that exists for this type of + /// migration. At the beginning of running the series of migrations, we get + /// the current list of databases. However, we run migrations in a separate + /// thread in order to not block connections to the compute. If after the + /// time we have gotten the list of databases in the cluster, a user creates + /// a new database, that database will not receive the migration, but we + /// will have marked the migration as completed successfully, assuming all + /// previous databases ran the migration to completion. + #[expect(dead_code)] + PerDatabase(&'m str), +} + pub(crate) struct MigrationRunner<'m> { - client: &'m mut Client, - migrations: &'m [&'m str], + /// Postgres client configuration. + config: Config, + + /// List of migrations to run. + migrations: &'m [Migration<'m>], } impl<'m> MigrationRunner<'m> { /// Create a new migration runner - pub fn new(client: &'m mut Client, migrations: &'m [&'m str]) -> Self { - // The neon_migration.migration_id::id column is a bigint, which is equivalent to an i64 - assert!(migrations.len() + 1 < i64::MAX as usize); + pub fn new(config: Config, migrations: &'m [Migration<'m>]) -> Result { + // The neon_migration.migration_id::id column is a bigint, which is + // equivalent to an i64 + debug_assert!(migrations.len() + 1 < i64::MAX as usize); - Self { client, migrations } + Ok(Self { config, migrations }) } /// Get the current value neon_migration.migration_id - async fn get_migration_id(&mut self) -> Result { - let row = self - .client + async fn get_migration_id(client: &mut Client) -> Result { + let row = client .query_one("SELECT id FROM neon_migration.migration_id", &[]) .await?; @@ -36,9 +68,8 @@ impl<'m> MigrationRunner<'m> { /// used if you would like to fail the application of a series of migrations /// at some point. async fn update_migration_id(txn: &mut Transaction<'_>, migration_id: i64) -> Result<()> { - // We use this fail point in order to check that failing in the - // middle of applying a series of migrations fails in an expected - // manner + // We use this fail point in order to check that failing in the middle + // of applying a series of migrations fails in an expected manner if cfg!(feature = "testing") { let fail = (|| { fail_point!("compute-migration", |fail_migration_id| { @@ -67,48 +98,95 @@ impl<'m> MigrationRunner<'m> { } /// Prepare the migrations the target database for handling migrations - async fn prepare_database(&mut self) -> Result<()> { - self.client + async fn prepare_database(client: &mut Client) -> Result<()> { + client .simple_query("CREATE SCHEMA IF NOT EXISTS neon_migration") .await?; - self.client.simple_query("CREATE TABLE IF NOT EXISTS neon_migration.migration_id (key INT NOT NULL PRIMARY KEY, id bigint NOT NULL DEFAULT 0)").await?; - self.client + client.simple_query("CREATE TABLE IF NOT EXISTS neon_migration.migration_id (key INT NOT NULL PRIMARY KEY, id bigint NOT NULL DEFAULT 0)").await?; + client .simple_query( "INSERT INTO neon_migration.migration_id VALUES (0, 0) ON CONFLICT DO NOTHING", ) .await?; - self.client + client .simple_query("ALTER SCHEMA neon_migration OWNER TO cloud_admin") .await?; - self.client + client .simple_query("REVOKE ALL ON SCHEMA neon_migration FROM PUBLIC") .await?; Ok(()) } - /// Run an individual migration in a separate transaction block. - async fn run_migration(client: &mut Client, migration_id: i64, migration: &str) -> Result<()> { + /// Helper function for allowing/disallowing connections to a Postgres + /// database. + async fn allow_connections_to_db( + client: &mut Client, + dbname: &PgIdent, + allow: bool, + ) -> Result<()> { + client + .simple_query( + format!( + "ALTER DATABASE {} WITH ALLOW_CONNECTIONS {}", + dbname.pg_quote(), + allow + ) + .as_str(), + ) + .await?; + + Ok(()) + } + + /// Connect to the configured Postgres database. Spawns a tokio task to + /// handle the connection. + async fn connect(config: &Config) -> Result { + let (client, connection) = config.connect(NoTls).await?; + + tokio::spawn(async move { + if let Err(e) = connection.await { + error!("connection error: {}", e); + } + }); + + Ok(client) + } + + async fn run_migration( + client: &mut Client, + db: &str, + migration_id: i64, + migration: &str, + update_migration_id: bool, + ) -> Result<()> { let mut txn = client .transaction() .await .with_context(|| format!("begin transaction for migration {migration_id}"))?; if migration.starts_with("-- SKIP") { - info!("Skipping migration id={}", migration_id); - - // Even though we are skipping the migration, updating the - // migration ID should help keep logic easy to understand when - // trying to understand the state of a cluster. - Self::update_migration_id(&mut txn, migration_id).await?; + info!("Skipping migration id={} db=\"{}\"", migration_id, db); } else { - info!("Running migration id={}:\n{}\n", migration_id, migration); + info!( + "Running migration id={} db=\"{}\":\n{}\n", + migration_id, db, migration + ); - txn.simple_query(migration) - .await - .with_context(|| format!("apply migration {migration_id}"))?; + if let Err(e) = txn.simple_query(migration).await { + error!("Failed to run the migration: {}", e); + return Err(anyhow::anyhow!(e)); + } + } - Self::update_migration_id(&mut txn, migration_id).await?; + if update_migration_id { + if let Err(e) = Self::update_migration_id(&mut txn, migration_id).await { + error!( + "Failed to update the migration id to {}: {}", + migration_id, e + ); + return Err(e); + } } txn.commit() @@ -118,31 +196,200 @@ impl<'m> MigrationRunner<'m> { Ok(()) } - /// Run the configured set of migrations - pub async fn run_migrations(mut self) -> Result<()> { - self.prepare_database() + /// Run the migration for the entire cluster. See [`Migration::Cluster`] for + /// more information. + async fn run_cluster_migration( + client: &mut Client, + db: &str, + migration_id: i64, + migration: &str, + ) -> Result<()> { + Self::run_migration(client, db, migration_id, migration, true).await + } + + /// Run the migration in the specified database. See + /// [`Migration::PerDatabase`] for more information. + async fn run_database_migration( + cluster_client: &mut Client, + config: Config, + db: &Database, + migration_id: i64, + migration: &str, + ) -> Result<()> { + // There are 2 race conditions here. Migrations get ran in a separate + // thread to not block the ability to connect to the compute. The race + // conditions are as follow: + // + // 1. If between the time we have retrieved the list of databases in + // the cluster and before we set ALLOW_CONNECTIONS back to false, + // the user has changed allowed connections to the database, we + // will have overwritten their change. + // + // This is not the end of the world, but an inconvenience, + // nonetheless. + // + // 2. If between the time we have allowed connections to the database + // and the time the migration is performed, the user disallows + // connections to the database, we will fail to connect to the + // database. + // + // This is not much of a problem since we will re-run the migration + // the next time we run migrations. + if db.restrict_conn { + info!("Allowing connections to \"{}\" for migrations", db.name); + + Self::allow_connections_to_db(cluster_client, &db.name, true) + .await + .context("Failed to allow connections to the database")?; + } + + let mut db_client = Self::connect(&config) .await - .context("prepare database to handle migrations")?; + .context("Failed to connect to the database")?; - let mut current_migration = self.get_migration_id().await? as usize; - while current_migration < self.migrations.len() { - // The index lags the migration ID by 1, so the current migration - // ID is also the next index - let migration_id = (current_migration + 1) as i64; - let migration = self.migrations[current_migration]; + let result = Self::run_migration(&mut db_client, &db.name, migration_id, migration, false) + .await + .context("Failed to run the migration"); - match Self::run_migration(self.client, migration_id, migration).await { - Ok(_) => { - info!("Finished migration id={}", migration_id); - } - Err(e) => { - error!("Failed to run migration id={}: {:?}", migration_id, e); - DB_MIGRATION_FAILED - .with_label_values(&[migration_id.to_string().as_str()]) - .inc(); - return Err(e); - } + // Reset the connection restriction + if db.restrict_conn { + info!( + "Disallowing connections to \"{}\" because migration {} is done", + db.name, migration_id + ); + + // Failing here is not the end of the world + if let Err(e) = Self::allow_connections_to_db(cluster_client, &db.name, false).await { + warn!( + "failed to reset ALLOW_CONNECTIONS on \"{}\": {}", + db.name, e + ) } + } + + result + } + + /// Run the configured set of migrations. + pub async fn run_migrations(self) -> Result<()> { + // Owns the connection to the database containing the + // neon_migration.migration_id table. In addition, all Cluster + // migrations will be run on this connection. + let mut cluster_client = Self::connect(&self.config) + .await + .context("failed to connect to cluster")?; + + Self::prepare_database(&mut cluster_client) + .await + .context("failed to prepare database to handle migrations")?; + + let mut current_migration = Self::get_migration_id(&mut cluster_client) + .await + .context("failed to get the current migration ID")? + as usize; + + // All databases within the cluster + let dbs: Option> = { + // Then check if we actually need to run any, and if so, check if + // any need to run in each individual database + if current_migration < self.migrations.len() + && self.migrations[current_migration..] + .iter() + .any(|m| matches!(m, Migration::PerDatabase(_))) + { + match get_existing_dbs_async(&cluster_client).await { + Ok(dbs) => Some( + // Filter out invalid database (datconnectivity = -2) + dbs.into_values().filter(|d| !d.invalid).collect::>(), + ), + Err(e) => { + error!("Failed to collect the existing databases: {}", e); + return Err(e); + } + } + } else { + None::> + } + }; + + let admin_db = self.config.get_dbname().unwrap(); + + while current_migration < self.migrations.len() { + let migration_id = (current_migration + 1) as i64; + + let result: Result<()> = match &self.migrations[current_migration] { + Migration::Cluster(migration) => { + Self::run_cluster_migration( + &mut cluster_client, + admin_db, + migration_id, + migration, + ) + .await + } + Migration::PerDatabase(migration) => { + let mut result: Result<()> = Ok(()); + for db in dbs.as_ref().unwrap() { + // Once all the databases have run the migration, then we can run it in the + // admin database to mark the migration as complete. See the run for the + // admin database outside this loop. + if db.name == admin_db { + continue; + } + + let mut config = self.config.clone(); + config.dbname(&db.name); + + // If we failed to run the migration in the current + // database, stop trying to run this migration + if let Err(e) = Self::run_database_migration( + &mut cluster_client, + config, + db, + migration_id, + migration, + ) + .await + { + result = Err(e); + break; + } + } + + match result { + Ok(_) => { + // Finally, run the migration for the admin database, + // and update the migration ID + Self::run_migration( + &mut cluster_client, + admin_db, + migration_id, + migration, + true, + ) + .await + .map_err(|e| { + error!("failed to commit the per-database migration: {}", e); + e + }) + } + Err(e) => Err(e), + } + } + }; + + // If failed, mark the metric and return + if let Err(e) = result { + DB_MIGRATION_FAILED + .with_label_values(&[migration_id.to_string().as_str()]) + .inc(); + + return Err(anyhow::anyhow!(format!( + "failed at migration {migration_id}: {e}" + ))); + } + + info!("Finished migration id={}", migration_id); current_migration += 1; } diff --git a/compute_tools/src/spec.rs b/compute_tools/src/spec.rs index 4b38e6e29c..e512288353 100644 --- a/compute_tools/src/spec.rs +++ b/compute_tools/src/spec.rs @@ -6,12 +6,12 @@ use compute_api::responses::{ ComputeConfig, ControlPlaneComputeStatus, ControlPlaneConfigResponse, }; use reqwest::StatusCode; -use tokio_postgres::Client; +use tokio_postgres::{Client, Config}; use tracing::{error, info, instrument}; use crate::config; use crate::metrics::{CPLANE_REQUESTS_TOTAL, CPlaneRequestRPC, UNKNOWN_HTTP_STATUS}; -use crate::migration::MigrationRunner; +use crate::migration::{Migration, MigrationRunner}; use crate::params::PG_HBA_ALL_MD5; // Do control plane request and return response if any. In case of error it @@ -169,7 +169,7 @@ pub async fn handle_neon_extension_upgrade(client: &mut Client) -> Result<()> { } #[instrument(skip_all)] -pub async fn handle_migrations(client: &mut Client) -> Result<()> { +pub async fn handle_migrations(config: Config) -> Result<()> { info!("handle migrations"); // !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! @@ -178,30 +178,49 @@ pub async fn handle_migrations(client: &mut Client) -> Result<()> { // Add new migrations in numerical order. let migrations = [ - include_str!("./migrations/0001-neon_superuser_bypass_rls.sql"), - include_str!("./migrations/0002-alter_roles.sql"), - include_str!("./migrations/0003-grant_pg_create_subscription_to_neon_superuser.sql"), - include_str!("./migrations/0004-grant_pg_monitor_to_neon_superuser.sql"), - include_str!("./migrations/0005-grant_all_on_tables_to_neon_superuser.sql"), - include_str!("./migrations/0006-grant_all_on_sequences_to_neon_superuser.sql"), - include_str!( + Migration::Cluster(include_str!( + "./migrations/0001-neon_superuser_bypass_rls.sql" + )), + Migration::Cluster(include_str!("./migrations/0002-alter_roles.sql")), + Migration::Cluster(include_str!( + "./migrations/0003-grant_pg_create_subscription_to_neon_superuser.sql" + )), + Migration::Cluster(include_str!( + "./migrations/0004-grant_pg_monitor_to_neon_superuser.sql" + )), + Migration::Cluster(include_str!( + "./migrations/0005-grant_all_on_tables_to_neon_superuser.sql" + )), + Migration::Cluster(include_str!( + "./migrations/0006-grant_all_on_sequences_to_neon_superuser.sql" + )), + Migration::Cluster(include_str!( "./migrations/0007-grant_all_on_tables_to_neon_superuser_with_grant_option.sql" - ), - include_str!( + )), + Migration::Cluster(include_str!( "./migrations/0008-grant_all_on_sequences_to_neon_superuser_with_grant_option.sql" - ), - include_str!("./migrations/0009-revoke_replication_for_previously_allowed_roles.sql"), - include_str!( + )), + Migration::Cluster(include_str!( + "./migrations/0009-revoke_replication_for_previously_allowed_roles.sql" + )), + Migration::Cluster(include_str!( "./migrations/0010-grant_snapshot_synchronization_funcs_to_neon_superuser.sql" - ), - include_str!( + )), + Migration::Cluster(include_str!( "./migrations/0011-grant_pg_show_replication_origin_status_to_neon_superuser.sql" - ), + )), ]; - MigrationRunner::new(client, &migrations) - .run_migrations() - .await?; + let runner = match MigrationRunner::new(config, &migrations) { + Ok(runner) => runner, + Err(e) => { + error!("Failed to construct a migration runner: {}", e); + return Err(e); + } + }; - Ok(()) + runner.run_migrations().await.map_err(|e| { + error!("Failed to run the migrations: {}", e); + e + }) } diff --git a/test_runner/regress/test_compute_migrations.py b/test_runner/regress/test_compute_migrations.py index dc555417b4..d66774f5fd 100644 --- a/test_runner/regress/test_compute_migrations.py +++ b/test_runner/regress/test_compute_migrations.py @@ -75,6 +75,7 @@ def test_compute_migrations_retry(neon_simple_env: NeonEnv, compute_migrations_d pattern = rf"Running migration id={i}" endpoint.log_contains(pattern) + endpoint.log_contains(rf"Finished migration id={i}") @pytest.mark.parametrize(