From 9fd2f87d7364dfc1e989717e0b66b207adf6b285 Mon Sep 17 00:00:00 2001 From: Konstantin Knizhnik Date: Wed, 19 Mar 2025 08:06:11 +0200 Subject: [PATCH] Add neon.ancestor_lsn GUC --- pageserver/src/basebackup.rs | 10 +++- pgxn/neon/neon.c | 9 ++++ .../regress/test_logical_replication.py | 52 ++++++++++++++++++- 3 files changed, 69 insertions(+), 2 deletions(-) diff --git a/pageserver/src/basebackup.rs b/pageserver/src/basebackup.rs index de527e307b..d3ca4ec479 100644 --- a/pageserver/src/basebackup.rs +++ b/pageserver/src/basebackup.rs @@ -331,7 +331,15 @@ where .append(&header, data) .await .map_err(|e| BasebackupError::Client(e, "send_tarball,pg_hba.conf"))?; - } else { + } else if *filepath == "postgresql.auto.conf" { + let ancestor_lsn = format!("neon.ancestor_lsn='{}'", self.timeline.get_ancestor_lsn()); + let data = ancestor_lsn.as_bytes(); + let header = new_tar_header(filepath, data.len() as u64)?; + self.ar + .append(&header, data) + .await + .map_err(|e| BasebackupError::Client(e, "send_tarball,pg_hba.conf"))?; + } else { let header = new_tar_header(filepath, 0)?; self.ar .append(&header, io::empty()) diff --git a/pgxn/neon/neon.c b/pgxn/neon/neon.c index 0f226cc9e2..b14f6c034b 100644 --- a/pgxn/neon/neon.c +++ b/pgxn/neon/neon.c @@ -497,6 +497,15 @@ _PG_init(void) GUC_UNIT_KB, NULL, NULL, NULL); + DefineCustomStringVariable("neon.ancestor_lsn", + "LSN of the ancestor timeline", + "Zero if there is no arent branch", + &neon_ancestor_lsn, + "0/0", + PGC_SIGHUP, + 0, + NULL, NULL, NULL); + /* * Important: This must happen after other parts of the extension are * loaded, otherwise any settings to GUCs that were set before the diff --git a/test_runner/regress/test_logical_replication.py b/test_runner/regress/test_logical_replication.py index 3a92f0d1d1..bcb74962ee 100644 --- a/test_runner/regress/test_logical_replication.py +++ b/test_runner/regress/test_logical_replication.py @@ -433,7 +433,7 @@ def test_large_records(neon_simple_env: NeonEnv, vanilla_pg: VanillaPostgres): # -# Check that slots are not inherited in brnach +# Check that slots are not inherited in branch # def test_slots_and_branching(neon_simple_env: NeonEnv): env = neon_simple_env @@ -624,3 +624,53 @@ def test_subscriber_synchronous_commit(neon_simple_env: NeonEnv, vanilla_pg: Van log.info("waiting for sync after restart") logical_replication_wait_flush_lsn_sync(vanilla_pg) assert sub.safe_psql_scalar("SELECT count(*) FROM t") == 1000 + + + +def test_logical_replication_ancestor_lsn(neon_simple_env: NeonEnv, vanilla_pg: VanillaPostgres): + env = neon_simple_env + tenant, timeline = env.create_tenant() + + main = env.endpoints.create_start("main", tenant_id=tenant) + client = env.pageserver.http_client() + + main_conn = main.connect() + main_cur = main_conn.cursor() + + main_cur.execute("create table t(pk integer primary key, payload integer)") + main_cur.execute("create publication pub1 for table t") + + # insert some data... + main_cur.execute("insert into t values (generate_series(1,1000), 0)") + + main_cur.execute("select pg_create_logical_replication_slot('myslot', 'pgoutput')") + wait_for_last_flush_lsn(env, main, tenant, timeline) + + #remember WAL position + main_cur.execute("SELECT pg_current_wal_flush_lsn()") + main_lsn = Lsn(main_cur.fetchall()[0][0]) + + # insert more data... + main_cur.execute("insert into t values (generate_series(1001,2000), 0)") + + # create branch + branch_timeline = env.create_branch("branch", ancestor_branch_name="main", tenant_id=tenant, ancestor_start_lsn=main_lsn) + #all_reparented = client.detach_ancestor(tenant, branch_timeline, detach_behavior="v1") + all_reparented = client.detach_ancestor(tenant, branch_timeline) + assert all_reparented == set() + + branch = env.endpoints.create_start("branch", tenant_id=tenant) + branch_conn = branch.connect() + branch_cur = branch_conn.cursor() + connstr = branch.connstr().replace("'", "''") + log.info(f"ep connstr is {branch.connstr()}, subscriber connstr {vanilla_pg.connstr()}") + + # now start subscriber + vanilla_pg.start() + vanilla_pg.safe_psql("create table t(pk integer primary key, payload integer)") + vanilla_pg.safe_psql(f"create subscription sub1 connection '{connstr}' publication pub1 with (create_slot=false, slot_name='myslot')") + #vanilla_pg.safe_psql(f"create subscription sub1 connection '{connstr}' publication pub1") + + logical_replication_sync(vanilla_pg, branch, "sub1") + + assert vanilla_pg.safe_psql("select count(*) from t")[0][0] == 1000