diff --git a/compute_tools/src/lib.rs b/compute_tools/src/lib.rs index 18c228ba54..543d4462ed 100644 --- a/compute_tools/src/lib.rs +++ b/compute_tools/src/lib.rs @@ -11,6 +11,7 @@ pub mod logger; pub mod catalog; pub mod compute; pub mod extension_server; +mod migration; pub mod monitor; pub mod params; pub mod pg_helpers; diff --git a/compute_tools/src/migration.rs b/compute_tools/src/migration.rs new file mode 100644 index 0000000000..61dcf01c84 --- /dev/null +++ b/compute_tools/src/migration.rs @@ -0,0 +1,100 @@ +use anyhow::{Context, Result}; +use postgres::Client; +use tracing::info; + +pub(crate) struct MigrationRunner<'m> { + client: &'m mut Client, + migrations: &'m [&'m str], +} + +impl<'m> MigrationRunner<'m> { + pub fn new(client: &'m mut Client, migrations: &'m [&'m str]) -> Self { + Self { client, migrations } + } + + fn get_migration_id(&mut self) -> Result { + let query = "SELECT id FROM neon_migration.migration_id"; + let row = self + .client + .query_one(query, &[]) + .context("run_migrations get migration_id")?; + + Ok(row.get::<&str, i64>("id")) + } + + fn update_migration_id(&mut self) -> Result<()> { + let setval = format!( + "UPDATE neon_migration.migration_id SET id={}", + self.migrations.len() + ); + + self.client + .simple_query(&setval) + .context("run_migrations update id")?; + + Ok(()) + } + + fn prepare_migrations(&mut self) -> Result<()> { + let query = "CREATE SCHEMA IF NOT EXISTS neon_migration"; + self.client.simple_query(query)?; + + let query = "CREATE TABLE IF NOT EXISTS neon_migration.migration_id (key INT NOT NULL PRIMARY KEY, id bigint NOT NULL DEFAULT 0)"; + self.client.simple_query(query)?; + + let query = "INSERT INTO neon_migration.migration_id VALUES (0, 0) ON CONFLICT DO NOTHING"; + self.client.simple_query(query)?; + + let query = "ALTER SCHEMA neon_migration OWNER TO cloud_admin"; + self.client.simple_query(query)?; + + let query = "REVOKE ALL ON SCHEMA neon_migration FROM PUBLIC"; + self.client.simple_query(query)?; + + Ok(()) + } + + pub fn run_migrations(mut self) -> Result<()> { + self.prepare_migrations()?; + + let mut current_migration: usize = self.get_migration_id()? as usize; + let starting_migration_id = current_migration; + + let query = "BEGIN"; + self.client + .simple_query(query) + .context("run_migrations begin")?; + + while current_migration < self.migrations.len() { + let migration = self.migrations[current_migration]; + + if migration.starts_with("-- SKIP") { + info!("Skipping migration id={}", current_migration); + } else { + info!( + "Running migration id={}:\n{}\n", + current_migration, migration + ); + self.client.simple_query(migration).with_context(|| { + format!("run_migration current_migration={}", current_migration) + })?; + } + + current_migration += 1; + } + + self.update_migration_id()?; + + let query = "COMMIT"; + self.client + .simple_query(query) + .context("run_migrations commit")?; + + info!( + "Ran {} migrations", + (self.migrations.len() - starting_migration_id) + ); + + Ok(()) + } +} diff --git a/compute_tools/src/spec.rs b/compute_tools/src/spec.rs index 143f6c1e5f..37090b08fd 100644 --- a/compute_tools/src/spec.rs +++ b/compute_tools/src/spec.rs @@ -10,6 +10,7 @@ use tracing::{error, info, info_span, instrument, span_enabled, warn, Level}; use crate::config; use crate::logger::inlinify; +use crate::migration::MigrationRunner; use crate::params::PG_HBA_ALL_MD5; use crate::pg_helpers::*; @@ -791,69 +792,7 @@ pub fn handle_migrations(client: &mut Client) -> Result<()> { include_str!("./migrations/0008-revoke_replication_for_previously_allowed_roles.sql"), ]; - let mut func = || { - let query = "CREATE SCHEMA IF NOT EXISTS neon_migration"; - client.simple_query(query)?; - - let query = "CREATE TABLE IF NOT EXISTS neon_migration.migration_id (key INT NOT NULL PRIMARY KEY, id bigint NOT NULL DEFAULT 0)"; - client.simple_query(query)?; - - let query = "INSERT INTO neon_migration.migration_id VALUES (0, 0) ON CONFLICT DO NOTHING"; - client.simple_query(query)?; - - let query = "ALTER SCHEMA neon_migration OWNER TO cloud_admin"; - client.simple_query(query)?; - - let query = "REVOKE ALL ON SCHEMA neon_migration FROM PUBLIC"; - client.simple_query(query)?; - Ok::<_, anyhow::Error>(()) - }; - func().context("handle_migrations prepare")?; - - let query = "SELECT id FROM neon_migration.migration_id"; - let row = client - .query_one(query, &[]) - .context("handle_migrations get migration_id")?; - let mut current_migration: usize = row.get::<&str, i64>("id") as usize; - let starting_migration_id = current_migration; - - let query = "BEGIN"; - client - .simple_query(query) - .context("handle_migrations begin")?; - - while current_migration < migrations.len() { - let migration = &migrations[current_migration]; - if migration.starts_with("-- SKIP") { - info!("Skipping migration id={}", current_migration); - } else { - info!( - "Running migration id={}:\n{}\n", - current_migration, migration - ); - client.simple_query(migration).with_context(|| { - format!("handle_migrations current_migration={}", current_migration) - })?; - } - current_migration += 1; - } - let setval = format!( - "UPDATE neon_migration.migration_id SET id={}", - migrations.len() - ); - client - .simple_query(&setval) - .context("handle_migrations update id")?; - - let query = "COMMIT"; - client - .simple_query(query) - .context("handle_migrations commit")?; - - info!( - "Ran {} migrations", - (migrations.len() - starting_migration_id) - ); + MigrationRunner::new(client, &migrations).run_migrations()?; Ok(()) }