From 0ad0db6ff8b5b491244d251fa09c8093f725a1d3 Mon Sep 17 00:00:00 2001 From: Anastasia Lubennikova Date: Wed, 8 Jan 2025 18:55:04 +0000 Subject: [PATCH] compute: dropdb DROP SUBSCRIPTION fix (#10066) ## Problem Project gets stuck if database with subscriptions was deleted via API / UI. https://github.com/neondatabase/cloud/issues/18646 ## Summary of changes Before dropping the database, drop all the subscriptions in it. Do not drop slot on publisher, because we have no guarantee that the slot still exists or that the publisher is reachable. Add `DropSubscriptionsForDeletedDatabases` phase to run these operations in all databases, we're about to delete. Ignore the error if the database does not exist. --- compute_tools/src/compute.rs | 95 ++++++++++++-- compute_tools/src/spec_apply.rs | 32 ++++- .../sql/drop_subscription_for_drop_dbs.sql | 11 ++ test_runner/fixtures/neon_fixtures.py | 27 +++- test_runner/regress/test_compute_catalog.py | 116 +++++++++++++++++- 5 files changed, 262 insertions(+), 19 deletions(-) create mode 100644 compute_tools/src/sql/drop_subscription_for_drop_dbs.sql diff --git a/compute_tools/src/compute.rs b/compute_tools/src/compute.rs index 78f6033429..1ac97a378b 100644 --- a/compute_tools/src/compute.rs +++ b/compute_tools/src/compute.rs @@ -15,7 +15,7 @@ use std::time::Instant; use anyhow::{Context, Result}; use chrono::{DateTime, Utc}; -use compute_api::spec::{PgIdent, Role}; +use compute_api::spec::{Database, PgIdent, Role}; use futures::future::join_all; use futures::stream::FuturesUnordered; use futures::StreamExt; @@ -45,8 +45,10 @@ use crate::spec_apply::ApplySpecPhase::{ DropInvalidDatabases, DropRoles, HandleNeonExtension, HandleOtherExtensions, RenameAndDeleteDatabases, RenameRoles, RunInEachDatabase, }; +use crate::spec_apply::PerDatabasePhase; use crate::spec_apply::PerDatabasePhase::{ - ChangeSchemaPerms, DeleteDBRoleReferences, HandleAnonExtension, + ChangeSchemaPerms, DeleteDBRoleReferences, DropSubscriptionsForDeletedDatabases, + HandleAnonExtension, }; use crate::spec_apply::{apply_operations, MutableApplyContext, DB}; use crate::sync_sk::{check_if_synced, ping_safekeeper}; @@ -834,7 +836,7 @@ impl ComputeNode { conf } - async fn get_maintenance_client( + pub async fn get_maintenance_client( conf: &tokio_postgres::Config, ) -> Result { let mut conf = conf.clone(); @@ -943,6 +945,78 @@ impl ComputeNode { dbs: databases, })); + // Apply special pre drop database phase. + // NOTE: we use the code of RunInEachDatabase phase for parallelism + // and connection management, but we don't really run it in *each* database, + // only in databases, we're about to drop. + info!("Applying PerDatabase (pre-dropdb) phase"); + let concurrency_token = Arc::new(tokio::sync::Semaphore::new(concurrency)); + + // Run the phase for each database that we're about to drop. + let db_processes = spec + .delta_operations + .iter() + .flatten() + .filter_map(move |op| { + if op.action.as_str() == "delete_db" { + Some(op.name.clone()) + } else { + None + } + }) + .map(|dbname| { + let spec = spec.clone(); + let ctx = ctx.clone(); + let jwks_roles = jwks_roles.clone(); + let mut conf = conf.as_ref().clone(); + let concurrency_token = concurrency_token.clone(); + // We only need dbname field for this phase, so set other fields to dummy values + let db = DB::UserDB(Database { + name: dbname.clone(), + owner: "cloud_admin".to_string(), + options: None, + restrict_conn: false, + invalid: false, + }); + + debug!("Applying per-database phases for Database {:?}", &db); + + match &db { + DB::SystemDB => {} + DB::UserDB(db) => { + conf.dbname(db.name.as_str()); + } + } + + let conf = Arc::new(conf); + let fut = Self::apply_spec_sql_db( + spec.clone(), + conf, + ctx.clone(), + jwks_roles.clone(), + concurrency_token.clone(), + db, + [DropSubscriptionsForDeletedDatabases].to_vec(), + ); + + Ok(spawn(fut)) + }) + .collect::>>(); + + for process in db_processes.into_iter() { + let handle = process?; + if let Err(e) = handle.await? { + // Handle the error case where the database does not exist + // We do not check whether the DB exists or not in the deletion phase, + // so we shouldn't be strict about it in pre-deletion cleanup as well. + if e.to_string().contains("does not exist") { + warn!("Error dropping subscription: {}", e); + } else { + return Err(e); + } + }; + } + for phase in [ CreateSuperUser, DropInvalidDatabases, @@ -962,7 +1036,7 @@ impl ComputeNode { .await?; } - info!("Applying RunInEachDatabase phase"); + info!("Applying RunInEachDatabase2 phase"); let concurrency_token = Arc::new(tokio::sync::Semaphore::new(concurrency)); let db_processes = spec @@ -997,6 +1071,12 @@ impl ComputeNode { jwks_roles.clone(), concurrency_token.clone(), db, + [ + DeleteDBRoleReferences, + ChangeSchemaPerms, + HandleAnonExtension, + ] + .to_vec(), ); Ok(spawn(fut)) @@ -1043,16 +1123,13 @@ impl ComputeNode { jwks_roles: Arc>, concurrency_token: Arc, db: DB, + subphases: Vec, ) -> Result<()> { let _permit = concurrency_token.acquire().await?; let mut client_conn = None; - for subphase in [ - DeleteDBRoleReferences, - ChangeSchemaPerms, - HandleAnonExtension, - ] { + for subphase in subphases { apply_operations( spec.clone(), ctx.clone(), diff --git a/compute_tools/src/spec_apply.rs b/compute_tools/src/spec_apply.rs index 7308d5d36e..695a722d6d 100644 --- a/compute_tools/src/spec_apply.rs +++ b/compute_tools/src/spec_apply.rs @@ -47,6 +47,7 @@ pub enum PerDatabasePhase { DeleteDBRoleReferences, ChangeSchemaPerms, HandleAnonExtension, + DropSubscriptionsForDeletedDatabases, } #[derive(Clone, Debug)] @@ -326,13 +327,12 @@ async fn get_operations<'a>( // Use FORCE to drop database even if there are active connections. // We run this from `cloud_admin`, so it should have enough privileges. + // // NB: there could be other db states, which prevent us from dropping // the database. For example, if db is used by any active subscription // or replication slot. - // TODO: deal with it once we allow logical replication. Proper fix should - // involve returning an error code to the control plane, so it could - // figure out that this is a non-retryable error, return it to the user - // and fail operation permanently. + // Such cases are handled in the DropSubscriptionsForDeletedDatabases + // phase. We do all the cleanup before actually dropping the database. let drop_db_query: String = format!( "DROP DATABASE IF EXISTS {} WITH (FORCE)", &op.name.pg_quote() @@ -444,6 +444,30 @@ async fn get_operations<'a>( } ApplySpecPhase::RunInEachDatabase { db, subphase } => { match subphase { + PerDatabasePhase::DropSubscriptionsForDeletedDatabases => { + match &db { + DB::UserDB(db) => { + let drop_subscription_query: String = format!( + include_str!("sql/drop_subscription_for_drop_dbs.sql"), + datname_str = escape_literal(&db.name), + ); + + let operations = vec![Operation { + query: drop_subscription_query, + comment: Some(format!( + "optionally dropping subscriptions for DB {}", + db.name, + )), + }] + .into_iter(); + + Ok(Box::new(operations)) + } + // skip this cleanup for the system databases + // because users can't drop them + DB::SystemDB => Ok(Box::new(empty())), + } + } PerDatabasePhase::DeleteDBRoleReferences => { let ctx = ctx.read().await; diff --git a/compute_tools/src/sql/drop_subscription_for_drop_dbs.sql b/compute_tools/src/sql/drop_subscription_for_drop_dbs.sql new file mode 100644 index 0000000000..dfb925e48e --- /dev/null +++ b/compute_tools/src/sql/drop_subscription_for_drop_dbs.sql @@ -0,0 +1,11 @@ +DO $$ +DECLARE + subname TEXT; +BEGIN + FOR subname IN SELECT pg_subscription.subname FROM pg_subscription WHERE subdbid = (SELECT oid FROM pg_database WHERE datname = {datname_str}) LOOP + EXECUTE format('ALTER SUBSCRIPTION %I DISABLE;', subname); + EXECUTE format('ALTER SUBSCRIPTION %I SET (slot_name = NONE);', subname); + EXECUTE format('DROP SUBSCRIPTION %I;', subname); + END LOOP; +END; +$$; diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index 00fdda2998..e22e452a52 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -4933,13 +4933,30 @@ def check_restored_datadir_content( assert (mismatch, error) == ([], []) -def logical_replication_sync(subscriber: PgProtocol, publisher: PgProtocol) -> Lsn: +def logical_replication_sync( + subscriber: PgProtocol, + publisher: PgProtocol, + sub_dbname: str | None = None, + pub_dbname: str | None = None, +) -> Lsn: """Wait logical replication subscriber to sync with publisher.""" - publisher_lsn = Lsn(publisher.safe_psql("SELECT pg_current_wal_flush_lsn()")[0][0]) + if pub_dbname is not None: + publisher_lsn = Lsn( + publisher.safe_psql("SELECT pg_current_wal_flush_lsn()", dbname=pub_dbname)[0][0] + ) + else: + publisher_lsn = Lsn(publisher.safe_psql("SELECT pg_current_wal_flush_lsn()")[0][0]) + while True: - res = subscriber.safe_psql("select latest_end_lsn from pg_catalog.pg_stat_subscription")[0][ - 0 - ] + if sub_dbname is not None: + res = subscriber.safe_psql( + "select latest_end_lsn from pg_catalog.pg_stat_subscription", dbname=sub_dbname + )[0][0] + else: + res = subscriber.safe_psql( + "select latest_end_lsn from pg_catalog.pg_stat_subscription" + )[0][0] + if res: log.info(f"subscriber_lsn={res}") subscriber_lsn = Lsn(res) diff --git a/test_runner/regress/test_compute_catalog.py b/test_runner/regress/test_compute_catalog.py index b3719a45ed..e411aad97d 100644 --- a/test_runner/regress/test_compute_catalog.py +++ b/test_runner/regress/test_compute_catalog.py @@ -1,7 +1,9 @@ from __future__ import annotations +import logging + import requests -from fixtures.neon_fixtures import NeonEnv +from fixtures.neon_fixtures import NeonEnv, logical_replication_sync TEST_DB_NAMES = [ { @@ -136,3 +138,115 @@ def test_compute_create_databases(neon_simple_env: NeonEnv): assert curr_db is not None assert len(curr_db) == 1 assert curr_db[0] == db["name"] + + +def test_dropdb_with_subscription(neon_simple_env: NeonEnv): + """ + Test that compute_ctl can drop a database that has a logical replication subscription. + """ + env = neon_simple_env + + # Create and start endpoint so that neon_local put all the generated + # stuff into the spec.json file. + endpoint = env.endpoints.create_start("main") + + TEST_DB_NAMES = [ + { + "name": "neondb", + "owner": "cloud_admin", + }, + { + "name": "subscriber_db", + "owner": "cloud_admin", + }, + { + "name": "publisher_db", + "owner": "cloud_admin", + }, + ] + + # Update the spec.json file to create the databases + # and reconfigure the endpoint to apply the changes. + endpoint.respec_deep( + **{ + "skip_pg_catalog_updates": False, + "cluster": { + "databases": TEST_DB_NAMES, + }, + } + ) + endpoint.reconfigure() + + # connect to the publisher_db and create a publication + with endpoint.cursor(dbname="publisher_db") as cursor: + cursor.execute("CREATE PUBLICATION mypub FOR ALL TABLES") + cursor.execute("select pg_catalog.pg_create_logical_replication_slot('mysub', 'pgoutput');") + cursor.execute("CREATE TABLE t(a int)") + cursor.execute("INSERT INTO t VALUES (1)") + + # connect to the subscriber_db and create a subscription + # Note that we need to create subscription with + connstr = endpoint.connstr(dbname="publisher_db").replace("'", "''") + with endpoint.cursor(dbname="subscriber_db") as cursor: + cursor.execute("CREATE TABLE t(a int)") + cursor.execute( + f"CREATE SUBSCRIPTION mysub CONNECTION '{connstr}' PUBLICATION mypub WITH (create_slot = false) " + ) + + # wait for the subscription to be active + logical_replication_sync( + endpoint, endpoint, sub_dbname="subscriber_db", pub_dbname="publisher_db" + ) + + # Check that replication is working + with endpoint.cursor(dbname="subscriber_db") as cursor: + cursor.execute("SELECT * FROM t") + rows = cursor.fetchall() + assert len(rows) == 1 + assert rows[0][0] == 1 + + # drop the subscriber_db from the list + TEST_DB_NAMES_NEW = [ + { + "name": "neondb", + "owner": "cloud_admin", + }, + { + "name": "publisher_db", + "owner": "cloud_admin", + }, + ] + # Update the spec.json file to drop the database + # and reconfigure the endpoint to apply the changes. + endpoint.respec_deep( + **{ + "skip_pg_catalog_updates": False, + "cluster": { + "databases": TEST_DB_NAMES_NEW, + }, + "delta_operations": [ + {"action": "delete_db", "name": "subscriber_db"}, + # also test the case when we try to delete a non-existent database + # shouldn't happen in normal operation, + # but can occur when failed operations are retried + {"action": "delete_db", "name": "nonexistent_db"}, + ], + } + ) + + logging.info("Reconfiguring the endpoint to drop the subscriber_db") + endpoint.reconfigure() + + # Check that the subscriber_db is dropped + with endpoint.cursor() as cursor: + cursor.execute("SELECT datname FROM pg_database WHERE datname = %s", ("subscriber_db",)) + catalog_db = cursor.fetchone() + assert catalog_db is None + + # Check that we can still connect to the publisher_db + with endpoint.cursor(dbname="publisher_db") as cursor: + cursor.execute("SELECT * FROM current_database()") + curr_db = cursor.fetchone() + assert curr_db is not None + assert len(curr_db) == 1 + assert curr_db[0] == "publisher_db"