From 8e8df1b4539403b294a02332bbc14252b81b3cc9 Mon Sep 17 00:00:00 2001 From: Anastasia Lubennikova Date: Thu, 23 Jan 2025 11:02:15 +0000 Subject: [PATCH] Disable logical replication subscribers (#10249) Drop logical replication subscribers before compute starts on a non-main branch. Add new compute_ctl spec flag: drop_subscriptions_before_start If it is set, drop all the subscriptions from the compute node before it starts. To avoid race on compute start, use new GUC neon.disable_logical_replication_subscribers to temporarily disable logical replication workers until we drop the subscriptions. Ensure that we drop subscriptions exactly once when endpoint starts on a new branch. It is essential, because otherwise, we may drop not only inherited, but newly created subscriptions. We cannot rely only on spec.drop_subscriptions_before_start flag, because if for some reason compute restarts inside VM, it will start again with the same spec and flag value. To handle this, we save the fact of the operation in the database in the neon.drop_subscriptions_done table. If the table does not exist, we assume that the operation was never performed, so we must do it. If table exists, we check if the operation was performed on the current timeline. fixes: https://github.com/neondatabase/neon/issues/8790 --- compute_tools/src/compute.rs | 104 ++++++-- compute_tools/src/config.rs | 7 + compute_tools/src/spec_apply.rs | 24 +- ...or_drop_dbs.sql => drop_subscriptions.sql} | 0 .../src/sql/finalize_drop_subscriptions.sql | 21 ++ control_plane/src/bin/neon_local.rs | 1 + control_plane/src/endpoint.rs | 7 + libs/compute_api/src/spec.rs | 7 + pgxn/neon/neon.c | 10 + test_runner/fixtures/neon_cli.py | 3 + test_runner/fixtures/neon_fixtures.py | 2 + .../regress/test_subscriber_branching.py | 242 ++++++++++++++++++ vendor/postgres-v14 | 2 +- vendor/postgres-v15 | 2 +- vendor/postgres-v16 | 2 +- vendor/postgres-v17 | 2 +- vendor/revisions.json | 8 +- 17 files changed, 413 insertions(+), 31 deletions(-) rename compute_tools/src/sql/{drop_subscription_for_drop_dbs.sql => drop_subscriptions.sql} (100%) create mode 100644 compute_tools/src/sql/finalize_drop_subscriptions.sql create mode 100644 test_runner/regress/test_subscriber_branching.py diff --git a/compute_tools/src/compute.rs b/compute_tools/src/compute.rs index 1ac97a378b..fd76e404c6 100644 --- a/compute_tools/src/compute.rs +++ b/compute_tools/src/compute.rs @@ -41,14 +41,14 @@ use crate::local_proxy; use crate::pg_helpers::*; use crate::spec::*; use crate::spec_apply::ApplySpecPhase::{ - CreateAndAlterDatabases, CreateAndAlterRoles, CreateAvailabilityCheck, CreateSuperUser, - DropInvalidDatabases, DropRoles, HandleNeonExtension, HandleOtherExtensions, - RenameAndDeleteDatabases, RenameRoles, RunInEachDatabase, + CreateAndAlterDatabases, CreateAndAlterRoles, CreateAvailabilityCheck, CreateSchemaNeon, + CreateSuperUser, DropInvalidDatabases, DropRoles, FinalizeDropLogicalSubscriptions, + HandleNeonExtension, HandleOtherExtensions, RenameAndDeleteDatabases, RenameRoles, + RunInEachDatabase, }; use crate::spec_apply::PerDatabasePhase; use crate::spec_apply::PerDatabasePhase::{ - ChangeSchemaPerms, DeleteDBRoleReferences, DropSubscriptionsForDeletedDatabases, - HandleAnonExtension, + ChangeSchemaPerms, DeleteDBRoleReferences, DropLogicalSubscriptions, HandleAnonExtension, }; use crate::spec_apply::{apply_operations, MutableApplyContext, DB}; use crate::sync_sk::{check_if_synced, ping_safekeeper}; @@ -340,6 +340,15 @@ impl ComputeNode { self.state.lock().unwrap().status } + pub fn get_timeline_id(&self) -> Option { + self.state + .lock() + .unwrap() + .pspec + .as_ref() + .map(|s| s.timeline_id) + } + // Remove `pgdata` directory and create it again with right permissions. fn create_pgdata(&self) -> Result<()> { // Ignore removal error, likely it is a 'No such file or directory (os error 2)'. @@ -929,6 +938,48 @@ impl ComputeNode { .map(|role| (role.name.clone(), role)) .collect::>(); + // Check if we need to drop subscriptions before starting the endpoint. + // + // It is important to do this operation exactly once when endpoint starts on a new branch. + // Otherwise, we may drop not inherited, but newly created subscriptions. + // + // We cannot rely only on spec.drop_subscriptions_before_start flag, + // because if for some reason compute restarts inside VM, + // it will start again with the same spec and flag value. + // + // To handle this, we save the fact of the operation in the database + // in the neon.drop_subscriptions_done table. + // If the table does not exist, we assume that the operation was never performed, so we must do it. + // If table exists, we check if the operation was performed on the current timelilne. + // + let mut drop_subscriptions_done = false; + + if spec.drop_subscriptions_before_start { + let timeline_id = self.get_timeline_id().context("timeline_id must be set")?; + let query = format!("select 1 from neon.drop_subscriptions_done where timeline_id = '{}'", timeline_id); + + info!("Checking if drop subscription operation was already performed for timeline_id: {}", timeline_id); + + drop_subscriptions_done = match + client.simple_query(&query).await { + Ok(result) => { + matches!(&result[0], postgres::SimpleQueryMessage::Row(_)) + }, + Err(e) => + { + match e.code() { + Some(&SqlState::UNDEFINED_TABLE) => false, + _ => { + // We don't expect any other error here, except for the schema/table not existing + error!("Error checking if drop subscription operation was already performed: {}", e); + return Err(e.into()); + } + } + } + } + }; + + let jwks_roles = Arc::new( spec.as_ref() .local_proxy_config @@ -996,7 +1047,7 @@ impl ComputeNode { jwks_roles.clone(), concurrency_token.clone(), db, - [DropSubscriptionsForDeletedDatabases].to_vec(), + [DropLogicalSubscriptions].to_vec(), ); Ok(spawn(fut)) @@ -1024,6 +1075,7 @@ impl ComputeNode { CreateAndAlterRoles, RenameAndDeleteDatabases, CreateAndAlterDatabases, + CreateSchemaNeon, ] { info!("Applying phase {:?}", &phase); apply_operations( @@ -1064,6 +1116,17 @@ impl ComputeNode { } let conf = Arc::new(conf); + let mut phases = vec![ + DeleteDBRoleReferences, + ChangeSchemaPerms, + HandleAnonExtension, + ]; + + if spec.drop_subscriptions_before_start && !drop_subscriptions_done { + info!("Adding DropLogicalSubscriptions phase because drop_subscriptions_before_start is set"); + phases.push(DropLogicalSubscriptions); + } + let fut = Self::apply_spec_sql_db( spec.clone(), conf, @@ -1071,12 +1134,7 @@ impl ComputeNode { jwks_roles.clone(), concurrency_token.clone(), db, - [ - DeleteDBRoleReferences, - ChangeSchemaPerms, - HandleAnonExtension, - ] - .to_vec(), + phases, ); Ok(spawn(fut)) @@ -1088,12 +1146,20 @@ impl ComputeNode { handle.await??; } - for phase in vec![ + let mut phases = vec![ HandleOtherExtensions, - HandleNeonExtension, + HandleNeonExtension, // This step depends on CreateSchemaNeon CreateAvailabilityCheck, DropRoles, - ] { + ]; + + // This step depends on CreateSchemaNeon + if spec.drop_subscriptions_before_start && !drop_subscriptions_done { + info!("Adding FinalizeDropLogicalSubscriptions phase because drop_subscriptions_before_start is set"); + phases.push(FinalizeDropLogicalSubscriptions); + } + + for phase in phases { debug!("Applying phase {:?}", &phase); apply_operations( spec.clone(), @@ -1463,6 +1529,14 @@ impl ComputeNode { Ok(()) }, )?; + + let postgresql_conf_path = pgdata_path.join("postgresql.conf"); + if config::line_in_file( + &postgresql_conf_path, + "neon.disable_logical_replication_subscribers=false", + )? { + info!("updated postgresql.conf to set neon.disable_logical_replication_subscribers=false"); + } self.pg_reload_conf()?; } self.post_apply_config()?; diff --git a/compute_tools/src/config.rs b/compute_tools/src/config.rs index b257c8a68f..e1bdfffa54 100644 --- a/compute_tools/src/config.rs +++ b/compute_tools/src/config.rs @@ -129,6 +129,13 @@ pub fn write_postgres_conf( writeln!(file, "neon.extension_server_port={}", extension_server_port)?; + if spec.drop_subscriptions_before_start { + writeln!(file, "neon.disable_logical_replication_subscribers=true")?; + } else { + // be explicit about the default value + writeln!(file, "neon.disable_logical_replication_subscribers=false")?; + } + // This is essential to keep this line at the end of the file, // because it is intended to override any settings above. writeln!(file, "include_if_exists = 'compute_ctl_temp_override.conf'")?; diff --git a/compute_tools/src/spec_apply.rs b/compute_tools/src/spec_apply.rs index 7401de2e60..5ee9c5fbd8 100644 --- a/compute_tools/src/spec_apply.rs +++ b/compute_tools/src/spec_apply.rs @@ -47,7 +47,7 @@ pub enum PerDatabasePhase { DeleteDBRoleReferences, ChangeSchemaPerms, HandleAnonExtension, - DropSubscriptionsForDeletedDatabases, + DropLogicalSubscriptions, } #[derive(Clone, Debug)] @@ -58,11 +58,13 @@ pub enum ApplySpecPhase { CreateAndAlterRoles, RenameAndDeleteDatabases, CreateAndAlterDatabases, + CreateSchemaNeon, RunInEachDatabase { db: DB, subphase: PerDatabasePhase }, HandleOtherExtensions, HandleNeonExtension, CreateAvailabilityCheck, DropRoles, + FinalizeDropLogicalSubscriptions, } pub struct Operation { @@ -331,7 +333,7 @@ async fn get_operations<'a>( // NB: there could be other db states, which prevent us from dropping // the database. For example, if db is used by any active subscription // or replication slot. - // Such cases are handled in the DropSubscriptionsForDeletedDatabases + // Such cases are handled in the DropLogicalSubscriptions // phase. We do all the cleanup before actually dropping the database. let drop_db_query: String = format!( "DROP DATABASE IF EXISTS {} WITH (FORCE)", @@ -442,13 +444,19 @@ async fn get_operations<'a>( Ok(Box::new(operations)) } + ApplySpecPhase::CreateSchemaNeon => Ok(Box::new(once(Operation { + query: String::from("CREATE SCHEMA IF NOT EXISTS neon"), + comment: Some(String::from( + "create schema for neon extension and utils tables", + )), + }))), ApplySpecPhase::RunInEachDatabase { db, subphase } => { match subphase { - PerDatabasePhase::DropSubscriptionsForDeletedDatabases => { + PerDatabasePhase::DropLogicalSubscriptions => { match &db { DB::UserDB(db) => { let drop_subscription_query: String = format!( - include_str!("sql/drop_subscription_for_drop_dbs.sql"), + include_str!("sql/drop_subscriptions.sql"), datname_str = escape_literal(&db.name), ); @@ -666,10 +674,6 @@ async fn get_operations<'a>( } ApplySpecPhase::HandleNeonExtension => { let operations = vec![ - Operation { - query: String::from("CREATE SCHEMA IF NOT EXISTS neon"), - comment: Some(String::from("init: add schema for extension")), - }, Operation { query: String::from("CREATE EXTENSION IF NOT EXISTS neon WITH SCHEMA neon"), comment: Some(String::from( @@ -712,5 +716,9 @@ async fn get_operations<'a>( Ok(Box::new(operations)) } + ApplySpecPhase::FinalizeDropLogicalSubscriptions => Ok(Box::new(once(Operation { + query: String::from(include_str!("sql/finalize_drop_subscriptions.sql")), + comment: None, + }))), } } diff --git a/compute_tools/src/sql/drop_subscription_for_drop_dbs.sql b/compute_tools/src/sql/drop_subscriptions.sql similarity index 100% rename from compute_tools/src/sql/drop_subscription_for_drop_dbs.sql rename to compute_tools/src/sql/drop_subscriptions.sql diff --git a/compute_tools/src/sql/finalize_drop_subscriptions.sql b/compute_tools/src/sql/finalize_drop_subscriptions.sql new file mode 100644 index 0000000000..4bb291876f --- /dev/null +++ b/compute_tools/src/sql/finalize_drop_subscriptions.sql @@ -0,0 +1,21 @@ +DO $$ +BEGIN + IF NOT EXISTS( + SELECT 1 + FROM pg_catalog.pg_tables + WHERE tablename = 'drop_subscriptions_done' + AND schemaname = 'neon' + ) + THEN + CREATE TABLE neon.drop_subscriptions_done + (id serial primary key, timeline_id text); + END IF; + + -- preserve the timeline_id of the last drop_subscriptions run + -- to ensure that the cleanup of a timeline is executed only once. + -- use upsert to avoid the table bloat in case of cascade branching (branch of a branch) + INSERT INTO neon.drop_subscriptions_done VALUES (1, current_setting('neon.timeline_id')) + ON CONFLICT (id) DO UPDATE + SET timeline_id = current_setting('neon.timeline_id'); +END +$$ diff --git a/control_plane/src/bin/neon_local.rs b/control_plane/src/bin/neon_local.rs index c73debae4c..ba67ffa2dd 100644 --- a/control_plane/src/bin/neon_local.rs +++ b/control_plane/src/bin/neon_local.rs @@ -1357,6 +1357,7 @@ async fn handle_endpoint(subcmd: &EndpointCmd, env: &local_env::LocalEnv) -> Res args.pg_version, mode, !args.update_catalog, + false, )?; } EndpointCmd::Start(args) => { diff --git a/control_plane/src/endpoint.rs b/control_plane/src/endpoint.rs index b8027abf7c..bc86d09103 100644 --- a/control_plane/src/endpoint.rs +++ b/control_plane/src/endpoint.rs @@ -76,6 +76,7 @@ pub struct EndpointConf { http_port: u16, pg_version: u32, skip_pg_catalog_updates: bool, + drop_subscriptions_before_start: bool, features: Vec, } @@ -143,6 +144,7 @@ impl ComputeControlPlane { pg_version: u32, mode: ComputeMode, skip_pg_catalog_updates: bool, + drop_subscriptions_before_start: bool, ) -> Result> { let pg_port = pg_port.unwrap_or_else(|| self.get_port()); let http_port = http_port.unwrap_or_else(|| self.get_port() + 1); @@ -162,6 +164,7 @@ impl ComputeControlPlane { // with this we basically test a case of waking up an idle compute, where // we also skip catalog updates in the cloud. skip_pg_catalog_updates, + drop_subscriptions_before_start, features: vec![], }); @@ -177,6 +180,7 @@ impl ComputeControlPlane { pg_port, pg_version, skip_pg_catalog_updates, + drop_subscriptions_before_start, features: vec![], })?, )?; @@ -240,6 +244,7 @@ pub struct Endpoint { // Optimizations skip_pg_catalog_updates: bool, + drop_subscriptions_before_start: bool, // Feature flags features: Vec, } @@ -291,6 +296,7 @@ impl Endpoint { tenant_id: conf.tenant_id, pg_version: conf.pg_version, skip_pg_catalog_updates: conf.skip_pg_catalog_updates, + drop_subscriptions_before_start: conf.drop_subscriptions_before_start, features: conf.features, }) } @@ -625,6 +631,7 @@ impl Endpoint { shard_stripe_size: Some(shard_stripe_size), local_proxy_config: None, reconfigure_concurrency: 1, + drop_subscriptions_before_start: self.drop_subscriptions_before_start, }; let spec_path = self.endpoint_path().join("spec.json"); std::fs::write(spec_path, serde_json::to_string_pretty(&spec)?)?; diff --git a/libs/compute_api/src/spec.rs b/libs/compute_api/src/spec.rs index 54d6a1d38f..b3f18dc6da 100644 --- a/libs/compute_api/src/spec.rs +++ b/libs/compute_api/src/spec.rs @@ -138,6 +138,13 @@ pub struct ComputeSpec { /// enough spare connections for reconfiguration process to succeed. #[serde(default = "default_reconfigure_concurrency")] pub reconfigure_concurrency: usize, + + /// If set to true, the compute_ctl will drop all subscriptions before starting the + /// compute. This is needed when we start an endpoint on a branch, so that child + /// would not compete with parent branch subscriptions + /// over the same replication content from publisher. + #[serde(default)] // Default false + pub drop_subscriptions_before_start: bool, } /// Feature flag to signal `compute_ctl` to enable certain experimental functionality. diff --git a/pgxn/neon/neon.c b/pgxn/neon/neon.c index ff08f9164d..ce2938cfd5 100644 --- a/pgxn/neon/neon.c +++ b/pgxn/neon/neon.c @@ -19,6 +19,7 @@ #include "access/xlogrecovery.h" #endif #include "replication/logical.h" +#include "replication/logicallauncher.h" #include "replication/slot.h" #include "replication/walsender.h" #include "storage/proc.h" @@ -434,6 +435,15 @@ _PG_init(void) restore_running_xacts_callback = RestoreRunningXactsFromClog; + DefineCustomBoolVariable( + "neon.disable_logical_replication_subscribers", + "Disables incomming logical replication", + NULL, + &disable_logical_replication_subscribers, + false, + PGC_SIGHUP, + 0, + NULL, NULL, NULL); DefineCustomBoolVariable( "neon.allow_replica_misconfig", diff --git a/test_runner/fixtures/neon_cli.py b/test_runner/fixtures/neon_cli.py index adbd6414a7..33d422c590 100644 --- a/test_runner/fixtures/neon_cli.py +++ b/test_runner/fixtures/neon_cli.py @@ -523,6 +523,7 @@ class NeonLocalCli(AbstractNeonCli): remote_ext_config: str | None = None, pageserver_id: int | None = None, allow_multiple: bool = False, + create_test_user: bool = False, basebackup_request_tries: int | None = None, env: dict[str, str] | None = None, ) -> subprocess.CompletedProcess[str]: @@ -544,6 +545,8 @@ class NeonLocalCli(AbstractNeonCli): args.extend(["--pageserver-id", str(pageserver_id)]) if allow_multiple: args.extend(["--allow-multiple"]) + if create_test_user: + args.extend(["--create-test-user"]) res = self.raw_cli(args, extra_env_vars) res.check_returncode() diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index af427b92d2..388c1eb046 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -3918,6 +3918,7 @@ class Endpoint(PgProtocol, LogUtils): pageserver_id: int | None = None, safekeepers: list[int] | None = None, allow_multiple: bool = False, + create_test_user: bool = False, basebackup_request_tries: int | None = None, env: dict[str, str] | None = None, ) -> Self: @@ -3939,6 +3940,7 @@ class Endpoint(PgProtocol, LogUtils): remote_ext_config=remote_ext_config, pageserver_id=pageserver_id, allow_multiple=allow_multiple, + create_test_user=create_test_user, basebackup_request_tries=basebackup_request_tries, env=env, ) diff --git a/test_runner/regress/test_subscriber_branching.py b/test_runner/regress/test_subscriber_branching.py new file mode 100644 index 0000000000..645572da8e --- /dev/null +++ b/test_runner/regress/test_subscriber_branching.py @@ -0,0 +1,242 @@ +from __future__ import annotations + +import time + +from fixtures.log_helper import log +from fixtures.neon_fixtures import NeonEnv, logical_replication_sync +from fixtures.utils import query_scalar, wait_until + + +# This test checks that branching of timeline with logical subscriptions +# does not affect logical replication for parent. +# Endpoint on a new branch will drop all existing subscriptions at the start, +# so it will not receive any changes. +# If needed, user can create new subscriptions on the child branch. +def test_subscriber_branching(neon_simple_env: NeonEnv): + env = neon_simple_env + env.create_branch("publisher") + pub = env.endpoints.create("publisher") + pub.respec( + skip_pg_catalog_updates=False, + create_test_user=True, + ) + pub.start(create_test_user=True) + + env.create_branch("subscriber") + sub = env.endpoints.create("subscriber") + # Pass create_test_user flag to get properly filled spec.users and spec.databases fields. + # + # This test checks the per-database operations that happen at compute start + # and these operations are applied to the databases that are present in the spec. + sub.respec( + skip_pg_catalog_updates=False, + create_test_user=True, + ) + sub.start(create_test_user=True) + + pub.wait_for_migrations() + sub.wait_for_migrations() + + n_records = 1000 + + def check_that_changes_propagated(): + scur.execute("SELECT count(*) FROM t") + res = scur.fetchall() + assert res[0][0] == n_records + + def insert_data(pub, start): + with pub.cursor(dbname="neondb", user="test", password="pubtestpwd") as pcur: + for i in range(start, start + n_records): + pcur.execute("INSERT into t values (%s,random()*100000)", (i,)) + + # create_test_user creates a user without password + # but psycopg2 execute() requires a password + with sub.cursor() as scur: + scur.execute("ALTER USER test WITH PASSWORD 'testpwd'") + with pub.cursor() as pcur: + # Create a test user to avoid using superuser + pcur.execute("ALTER USER test WITH PASSWORD 'pubtestpwd'") + # If we don't do this, creating the subscription will fail + pub.edit_hba(["host all test 0.0.0.0/0 md5"]) + + with pub.cursor(dbname="neondb", user="test", password="pubtestpwd") as pcur: + pcur.execute("CREATE TABLE t (pk integer primary key, sk integer)") + pcur.execute("CREATE PUBLICATION pub FOR TABLE t") + + with sub.cursor(dbname="neondb", user="test", password="testpwd") as scur: + scur.execute("CREATE TABLE t (pk integer primary key, sk integer)") + pub_conn = ( + f"host=localhost port={pub.pg_port} dbname=neondb user=test password=pubtestpwd" + ) + query = f"CREATE SUBSCRIPTION sub CONNECTION '{pub_conn}' PUBLICATION pub" + scur.execute(query) + time.sleep(2) # let initial table sync complete + + insert_data(pub, 0) + + with sub.cursor(dbname="neondb", user="test", password="testpwd") as scur: + wait_until(check_that_changes_propagated) + latest_end_lsn = query_scalar( + scur, "select latest_end_lsn from pg_catalog.pg_stat_subscription; " + ) + last_insert_lsn = query_scalar(scur, "select pg_current_wal_insert_lsn();") + + log.info(f"latest_end_lsn = {latest_end_lsn}") + log.info(f"last_insert_lsn = {last_insert_lsn}") + + # stop the parent subscriber so that it doesn't interfere with the test + sub.stop() + + # 1. good scenario: + # create subscriber_child_1 + # it will not get changes from publisher, because drop_subscriptions_before_start is set to True + sub_child_1_timeline_id = env.create_branch( + "subscriber_child_1", + ancestor_branch_name="subscriber", + ancestor_start_lsn=last_insert_lsn, + ) + sub_child_1 = env.endpoints.create("subscriber_child_1") + # Pass drop_subscriptions_before_start flag + sub_child_1.respec( + skip_pg_catalog_updates=False, + create_test_user=True, + drop_subscriptions_before_start=True, + ) + sub_child_1.start(create_test_user=True) + + # ensure that subscriber_child_1 sees all the data + with sub_child_1.cursor(dbname="neondb", user="test", password="testpwd") as scur: + scur.execute("SELECT count(*) FROM t") + res = scur.fetchall() + assert res[0][0] == n_records + + # ensure that there are no subscriptions in this database + scur.execute("SELECT 1 FROM pg_catalog.pg_subscription WHERE subname = 'sub'") + assert len(scur.fetchall()) == 0 + + # ensure that drop_subscriptions_done happened on this timeline + with sub_child_1.cursor() as scur_postgres: + scur_postgres.execute("SELECT timeline_id from neon.drop_subscriptions_done") + res = scur_postgres.fetchall() + assert len(res) == 1 + assert str(sub_child_1_timeline_id) == res[0][0] + + old_n_records = n_records + # insert more data on publisher + insert_data(pub, n_records) + n_records += n_records + + pcur.execute("SELECT count(*) FROM t") + res = pcur.fetchall() + assert res[0][0] == n_records + + # ensure that subscriber_child_1 doesn't see the new data + with sub_child_1.cursor(dbname="neondb", user="test", password="testpwd") as scur: + scur.execute("SELECT count(*) FROM t") + res = scur.fetchall() + assert res[0][0] == old_n_records + + # reenable logical replication on subscriber_child_1 + # using new publication + # ensure that new publication works as expected + with sub_child_1.cursor(dbname="neondb", user="test", password="testpwd") as scur: + scur.execute("TRUNCATE t") + + # create new subscription + # with new pub name + pcur.execute("CREATE PUBLICATION pub_new FOR TABLE t") + query = f"CREATE SUBSCRIPTION sub_new CONNECTION '{pub_conn}' PUBLICATION pub_new" + scur.execute(query) + + wait_until(check_that_changes_propagated) + + scur.execute("SELECT count(*) FROM t") + res = scur.fetchall() + assert res[0][0] == n_records + + # ensure that new publication works as expected after compute restart + # first restart with drop_subscriptions_before_start=True + # to emulate the case when compute restarts within the VM with stale spec + sub_child_1.stop() + sub_child_1.respec( + skip_pg_catalog_updates=False, + create_test_user=True, + drop_subscriptions_before_start=True, + ) + sub_child_1.start(create_test_user=True) + + with sub_child_1.cursor(dbname="neondb", user="test", password="testpwd") as scur: + # ensure that even though the flag is set, we didn't drop new subscription + scur.execute("SELECT 1 FROM pg_catalog.pg_subscription WHERE subname = 'sub_new'") + assert len(scur.fetchall()) == 1 + + # ensure that drop_subscriptions_done happened on this timeline + with sub_child_1.cursor() as scur_postgres: + scur_postgres.execute("SELECT timeline_id from neon.drop_subscriptions_done") + res = scur_postgres.fetchall() + assert len(res) == 1 + assert str(sub_child_1_timeline_id) == res[0][0] + + sub_child_1.stop() + sub_child_1.respec( + skip_pg_catalog_updates=False, + create_test_user=True, + drop_subscriptions_before_start=False, + ) + sub_child_1.start(create_test_user=True) + + # insert more data on publisher + insert_data(pub, n_records) + n_records += n_records + with sub_child_1.cursor(dbname="neondb", user="test", password="testpwd") as scur: + # ensure that there is a subscriptions in this database + scur.execute("SELECT 1 FROM pg_catalog.pg_subscription WHERE subname = 'sub_new'") + assert len(scur.fetchall()) == 1 + + wait_until(check_that_changes_propagated) + scur.execute("SELECT count(*) FROM t") + res = scur.fetchall() + assert res[0][0] == n_records + + # ensure that drop_subscriptions_done happened on this timeline + with sub_child_1.cursor() as scur_postgres: + scur_postgres.execute("SELECT timeline_id from neon.drop_subscriptions_done") + res = scur_postgres.fetchall() + assert len(res) == 1 + assert str(sub_child_1_timeline_id) == res[0][0] + + # wake the sub and ensure that it catches up with the new data + sub.start(create_test_user=True) + with sub.cursor(dbname="neondb", user="test", password="testpwd") as scur: + logical_replication_sync(sub, pub) + wait_until(check_that_changes_propagated) + scur.execute("SELECT count(*) FROM t") + res = scur.fetchall() + assert res[0][0] == n_records + + # test that we can create a branch of a branch + sub_child_2_timeline_id = env.create_branch( + "subscriber_child_2", + ancestor_branch_name="subscriber_child_1", + ) + sub_child_2 = env.endpoints.create("subscriber_child_2") + # Pass drop_subscriptions_before_start flag + sub_child_2.respec( + skip_pg_catalog_updates=False, + drop_subscriptions_before_start=True, + ) + sub_child_2.start(create_test_user=True) + + # ensure that subscriber_child_2 does not inherit subscription from child_1 + with sub_child_2.cursor(dbname="neondb", user="test", password="testpwd") as scur: + # ensure that there are no subscriptions in this database + scur.execute("SELECT count(*) FROM pg_catalog.pg_subscription") + res = scur.fetchall() + assert res[0][0] == 0 + + # ensure that drop_subscriptions_done happened on this timeline + with sub_child_2.cursor() as scur_postgres: + scur_postgres.execute("SELECT timeline_id from neon.drop_subscriptions_done") + res = scur_postgres.fetchall() + assert len(res) == 1 + assert str(sub_child_2_timeline_id) == res[0][0] diff --git a/vendor/postgres-v14 b/vendor/postgres-v14 index 46082f2088..5f3b3afdd7 160000 --- a/vendor/postgres-v14 +++ b/vendor/postgres-v14 @@ -1 +1 @@ -Subproject commit 46082f20884f087a2d974b33ac65d63af26142bd +Subproject commit 5f3b3afdd7c24b4a0fd63ecb3288fab472fcc633 diff --git a/vendor/postgres-v15 b/vendor/postgres-v15 index dd0b28d6fb..935292e883 160000 --- a/vendor/postgres-v15 +++ b/vendor/postgres-v15 @@ -1 +1 @@ -Subproject commit dd0b28d6fbad39e227f3b77296fcca879af8b3a9 +Subproject commit 935292e883298187f112db6e9c7f765037ddcf64 diff --git a/vendor/postgres-v16 b/vendor/postgres-v16 index d674efd776..061d563779 160000 --- a/vendor/postgres-v16 +++ b/vendor/postgres-v16 @@ -1 +1 @@ -Subproject commit d674efd776f59d78e8fa1535bd2f95c3e6984fca +Subproject commit 061d56377961ba56998e41b7d5d5e975919ad301 diff --git a/vendor/postgres-v17 b/vendor/postgres-v17 index a8dd6e779d..4276717f6e 160000 --- a/vendor/postgres-v17 +++ b/vendor/postgres-v17 @@ -1 +1 @@ -Subproject commit a8dd6e779dde907778006adb436b557ad652fb97 +Subproject commit 4276717f6e91023e504de355f4f21d4824074de8 diff --git a/vendor/revisions.json b/vendor/revisions.json index c899dbaa5a..a104be8ae0 100644 --- a/vendor/revisions.json +++ b/vendor/revisions.json @@ -1,18 +1,18 @@ { "v17": [ "17.2", - "a8dd6e779dde907778006adb436b557ad652fb97" + "4276717f6e91023e504de355f4f21d4824074de8" ], "v16": [ "16.6", - "d674efd776f59d78e8fa1535bd2f95c3e6984fca" + "061d56377961ba56998e41b7d5d5e975919ad301" ], "v15": [ "15.10", - "dd0b28d6fbad39e227f3b77296fcca879af8b3a9" + "935292e883298187f112db6e9c7f765037ddcf64" ], "v14": [ "14.15", - "46082f20884f087a2d974b33ac65d63af26142bd" + "5f3b3afdd7c24b4a0fd63ecb3288fab472fcc633" ] }