diff --git a/compute_tools/src/compute.rs b/compute_tools/src/compute.rs index 78f6033429..1ac97a378b 100644 --- a/compute_tools/src/compute.rs +++ b/compute_tools/src/compute.rs @@ -15,7 +15,7 @@ use std::time::Instant; use anyhow::{Context, Result}; use chrono::{DateTime, Utc}; -use compute_api::spec::{PgIdent, Role}; +use compute_api::spec::{Database, PgIdent, Role}; use futures::future::join_all; use futures::stream::FuturesUnordered; use futures::StreamExt; @@ -45,8 +45,10 @@ use crate::spec_apply::ApplySpecPhase::{ DropInvalidDatabases, DropRoles, HandleNeonExtension, HandleOtherExtensions, RenameAndDeleteDatabases, RenameRoles, RunInEachDatabase, }; +use crate::spec_apply::PerDatabasePhase; use crate::spec_apply::PerDatabasePhase::{ - ChangeSchemaPerms, DeleteDBRoleReferences, HandleAnonExtension, + ChangeSchemaPerms, DeleteDBRoleReferences, DropSubscriptionsForDeletedDatabases, + HandleAnonExtension, }; use crate::spec_apply::{apply_operations, MutableApplyContext, DB}; use crate::sync_sk::{check_if_synced, ping_safekeeper}; @@ -834,7 +836,7 @@ impl ComputeNode { conf } - async fn get_maintenance_client( + pub async fn get_maintenance_client( conf: &tokio_postgres::Config, ) -> Result { let mut conf = conf.clone(); @@ -943,6 +945,78 @@ impl ComputeNode { dbs: databases, })); + // Apply special pre drop database phase. + // NOTE: we use the code of RunInEachDatabase phase for parallelism + // and connection management, but we don't really run it in *each* database, + // only in databases, we're about to drop. + info!("Applying PerDatabase (pre-dropdb) phase"); + let concurrency_token = Arc::new(tokio::sync::Semaphore::new(concurrency)); + + // Run the phase for each database that we're about to drop. + let db_processes = spec + .delta_operations + .iter() + .flatten() + .filter_map(move |op| { + if op.action.as_str() == "delete_db" { + Some(op.name.clone()) + } else { + None + } + }) + .map(|dbname| { + let spec = spec.clone(); + let ctx = ctx.clone(); + let jwks_roles = jwks_roles.clone(); + let mut conf = conf.as_ref().clone(); + let concurrency_token = concurrency_token.clone(); + // We only need dbname field for this phase, so set other fields to dummy values + let db = DB::UserDB(Database { + name: dbname.clone(), + owner: "cloud_admin".to_string(), + options: None, + restrict_conn: false, + invalid: false, + }); + + debug!("Applying per-database phases for Database {:?}", &db); + + match &db { + DB::SystemDB => {} + DB::UserDB(db) => { + conf.dbname(db.name.as_str()); + } + } + + let conf = Arc::new(conf); + let fut = Self::apply_spec_sql_db( + spec.clone(), + conf, + ctx.clone(), + jwks_roles.clone(), + concurrency_token.clone(), + db, + [DropSubscriptionsForDeletedDatabases].to_vec(), + ); + + Ok(spawn(fut)) + }) + .collect::>>(); + + for process in db_processes.into_iter() { + let handle = process?; + if let Err(e) = handle.await? { + // Handle the error case where the database does not exist + // We do not check whether the DB exists or not in the deletion phase, + // so we shouldn't be strict about it in pre-deletion cleanup as well. + if e.to_string().contains("does not exist") { + warn!("Error dropping subscription: {}", e); + } else { + return Err(e); + } + }; + } + for phase in [ CreateSuperUser, DropInvalidDatabases, @@ -962,7 +1036,7 @@ impl ComputeNode { .await?; } - info!("Applying RunInEachDatabase phase"); + info!("Applying RunInEachDatabase2 phase"); let concurrency_token = Arc::new(tokio::sync::Semaphore::new(concurrency)); let db_processes = spec @@ -997,6 +1071,12 @@ impl ComputeNode { jwks_roles.clone(), concurrency_token.clone(), db, + [ + DeleteDBRoleReferences, + ChangeSchemaPerms, + HandleAnonExtension, + ] + .to_vec(), ); Ok(spawn(fut)) @@ -1043,16 +1123,13 @@ impl ComputeNode { jwks_roles: Arc>, concurrency_token: Arc, db: DB, + subphases: Vec, ) -> Result<()> { let _permit = concurrency_token.acquire().await?; let mut client_conn = None; - for subphase in [ - DeleteDBRoleReferences, - ChangeSchemaPerms, - HandleAnonExtension, - ] { + for subphase in subphases { apply_operations( spec.clone(), ctx.clone(), diff --git a/compute_tools/src/spec_apply.rs b/compute_tools/src/spec_apply.rs index 7308d5d36e..695a722d6d 100644 --- a/compute_tools/src/spec_apply.rs +++ b/compute_tools/src/spec_apply.rs @@ -47,6 +47,7 @@ pub enum PerDatabasePhase { DeleteDBRoleReferences, ChangeSchemaPerms, HandleAnonExtension, + DropSubscriptionsForDeletedDatabases, } #[derive(Clone, Debug)] @@ -326,13 +327,12 @@ async fn get_operations<'a>( // Use FORCE to drop database even if there are active connections. // We run this from `cloud_admin`, so it should have enough privileges. + // // NB: there could be other db states, which prevent us from dropping // the database. For example, if db is used by any active subscription // or replication slot. - // TODO: deal with it once we allow logical replication. Proper fix should - // involve returning an error code to the control plane, so it could - // figure out that this is a non-retryable error, return it to the user - // and fail operation permanently. + // Such cases are handled in the DropSubscriptionsForDeletedDatabases + // phase. We do all the cleanup before actually dropping the database. let drop_db_query: String = format!( "DROP DATABASE IF EXISTS {} WITH (FORCE)", &op.name.pg_quote() @@ -444,6 +444,30 @@ async fn get_operations<'a>( } ApplySpecPhase::RunInEachDatabase { db, subphase } => { match subphase { + PerDatabasePhase::DropSubscriptionsForDeletedDatabases => { + match &db { + DB::UserDB(db) => { + let drop_subscription_query: String = format!( + include_str!("sql/drop_subscription_for_drop_dbs.sql"), + datname_str = escape_literal(&db.name), + ); + + let operations = vec![Operation { + query: drop_subscription_query, + comment: Some(format!( + "optionally dropping subscriptions for DB {}", + db.name, + )), + }] + .into_iter(); + + Ok(Box::new(operations)) + } + // skip this cleanup for the system databases + // because users can't drop them + DB::SystemDB => Ok(Box::new(empty())), + } + } PerDatabasePhase::DeleteDBRoleReferences => { let ctx = ctx.read().await; diff --git a/compute_tools/src/sql/drop_subscription_for_drop_dbs.sql b/compute_tools/src/sql/drop_subscription_for_drop_dbs.sql new file mode 100644 index 0000000000..dfb925e48e --- /dev/null +++ b/compute_tools/src/sql/drop_subscription_for_drop_dbs.sql @@ -0,0 +1,11 @@ +DO $$ +DECLARE + subname TEXT; +BEGIN + FOR subname IN SELECT pg_subscription.subname FROM pg_subscription WHERE subdbid = (SELECT oid FROM pg_database WHERE datname = {datname_str}) LOOP + EXECUTE format('ALTER SUBSCRIPTION %I DISABLE;', subname); + EXECUTE format('ALTER SUBSCRIPTION %I SET (slot_name = NONE);', subname); + EXECUTE format('DROP SUBSCRIPTION %I;', subname); + END LOOP; +END; +$$; diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index 00fdda2998..e22e452a52 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -4933,13 +4933,30 @@ def check_restored_datadir_content( assert (mismatch, error) == ([], []) -def logical_replication_sync(subscriber: PgProtocol, publisher: PgProtocol) -> Lsn: +def logical_replication_sync( + subscriber: PgProtocol, + publisher: PgProtocol, + sub_dbname: str | None = None, + pub_dbname: str | None = None, +) -> Lsn: """Wait logical replication subscriber to sync with publisher.""" - publisher_lsn = Lsn(publisher.safe_psql("SELECT pg_current_wal_flush_lsn()")[0][0]) + if pub_dbname is not None: + publisher_lsn = Lsn( + publisher.safe_psql("SELECT pg_current_wal_flush_lsn()", dbname=pub_dbname)[0][0] + ) + else: + publisher_lsn = Lsn(publisher.safe_psql("SELECT pg_current_wal_flush_lsn()")[0][0]) + while True: - res = subscriber.safe_psql("select latest_end_lsn from pg_catalog.pg_stat_subscription")[0][ - 0 - ] + if sub_dbname is not None: + res = subscriber.safe_psql( + "select latest_end_lsn from pg_catalog.pg_stat_subscription", dbname=sub_dbname + )[0][0] + else: + res = subscriber.safe_psql( + "select latest_end_lsn from pg_catalog.pg_stat_subscription" + )[0][0] + if res: log.info(f"subscriber_lsn={res}") subscriber_lsn = Lsn(res) diff --git a/test_runner/regress/test_compute_catalog.py b/test_runner/regress/test_compute_catalog.py index b3719a45ed..e411aad97d 100644 --- a/test_runner/regress/test_compute_catalog.py +++ b/test_runner/regress/test_compute_catalog.py @@ -1,7 +1,9 @@ from __future__ import annotations +import logging + import requests -from fixtures.neon_fixtures import NeonEnv +from fixtures.neon_fixtures import NeonEnv, logical_replication_sync TEST_DB_NAMES = [ { @@ -136,3 +138,115 @@ def test_compute_create_databases(neon_simple_env: NeonEnv): assert curr_db is not None assert len(curr_db) == 1 assert curr_db[0] == db["name"] + + +def test_dropdb_with_subscription(neon_simple_env: NeonEnv): + """ + Test that compute_ctl can drop a database that has a logical replication subscription. + """ + env = neon_simple_env + + # Create and start endpoint so that neon_local put all the generated + # stuff into the spec.json file. + endpoint = env.endpoints.create_start("main") + + TEST_DB_NAMES = [ + { + "name": "neondb", + "owner": "cloud_admin", + }, + { + "name": "subscriber_db", + "owner": "cloud_admin", + }, + { + "name": "publisher_db", + "owner": "cloud_admin", + }, + ] + + # Update the spec.json file to create the databases + # and reconfigure the endpoint to apply the changes. + endpoint.respec_deep( + **{ + "skip_pg_catalog_updates": False, + "cluster": { + "databases": TEST_DB_NAMES, + }, + } + ) + endpoint.reconfigure() + + # connect to the publisher_db and create a publication + with endpoint.cursor(dbname="publisher_db") as cursor: + cursor.execute("CREATE PUBLICATION mypub FOR ALL TABLES") + cursor.execute("select pg_catalog.pg_create_logical_replication_slot('mysub', 'pgoutput');") + cursor.execute("CREATE TABLE t(a int)") + cursor.execute("INSERT INTO t VALUES (1)") + + # connect to the subscriber_db and create a subscription + # Note that we need to create subscription with + connstr = endpoint.connstr(dbname="publisher_db").replace("'", "''") + with endpoint.cursor(dbname="subscriber_db") as cursor: + cursor.execute("CREATE TABLE t(a int)") + cursor.execute( + f"CREATE SUBSCRIPTION mysub CONNECTION '{connstr}' PUBLICATION mypub WITH (create_slot = false) " + ) + + # wait for the subscription to be active + logical_replication_sync( + endpoint, endpoint, sub_dbname="subscriber_db", pub_dbname="publisher_db" + ) + + # Check that replication is working + with endpoint.cursor(dbname="subscriber_db") as cursor: + cursor.execute("SELECT * FROM t") + rows = cursor.fetchall() + assert len(rows) == 1 + assert rows[0][0] == 1 + + # drop the subscriber_db from the list + TEST_DB_NAMES_NEW = [ + { + "name": "neondb", + "owner": "cloud_admin", + }, + { + "name": "publisher_db", + "owner": "cloud_admin", + }, + ] + # Update the spec.json file to drop the database + # and reconfigure the endpoint to apply the changes. + endpoint.respec_deep( + **{ + "skip_pg_catalog_updates": False, + "cluster": { + "databases": TEST_DB_NAMES_NEW, + }, + "delta_operations": [ + {"action": "delete_db", "name": "subscriber_db"}, + # also test the case when we try to delete a non-existent database + # shouldn't happen in normal operation, + # but can occur when failed operations are retried + {"action": "delete_db", "name": "nonexistent_db"}, + ], + } + ) + + logging.info("Reconfiguring the endpoint to drop the subscriber_db") + endpoint.reconfigure() + + # Check that the subscriber_db is dropped + with endpoint.cursor() as cursor: + cursor.execute("SELECT datname FROM pg_database WHERE datname = %s", ("subscriber_db",)) + catalog_db = cursor.fetchone() + assert catalog_db is None + + # Check that we can still connect to the publisher_db + with endpoint.cursor(dbname="publisher_db") as cursor: + cursor.execute("SELECT * FROM current_database()") + curr_db = cursor.fetchone() + assert curr_db is not None + assert len(curr_db) == 1 + assert curr_db[0] == "publisher_db"