Compare commits

...

1 Commits

Author SHA1 Message Date
Konstantin Knizhnik
9fd2f87d73 Add neon.ancestor_lsn GUC 2025-03-19 08:06:11 +02:00
3 changed files with 69 additions and 2 deletions

View File

@@ -331,7 +331,15 @@ where
.append(&header, data)
.await
.map_err(|e| BasebackupError::Client(e, "send_tarball,pg_hba.conf"))?;
} else {
} else if *filepath == "postgresql.auto.conf" {
let ancestor_lsn = format!("neon.ancestor_lsn='{}'", self.timeline.get_ancestor_lsn());
let data = ancestor_lsn.as_bytes();
let header = new_tar_header(filepath, data.len() as u64)?;
self.ar
.append(&header, data)
.await
.map_err(|e| BasebackupError::Client(e, "send_tarball,pg_hba.conf"))?;
} else {
let header = new_tar_header(filepath, 0)?;
self.ar
.append(&header, io::empty())

View File

@@ -497,6 +497,15 @@ _PG_init(void)
GUC_UNIT_KB,
NULL, NULL, NULL);
DefineCustomStringVariable("neon.ancestor_lsn",
"LSN of the ancestor timeline",
"Zero if there is no arent branch",
&neon_ancestor_lsn,
"0/0",
PGC_SIGHUP,
0,
NULL, NULL, NULL);
/*
* Important: This must happen after other parts of the extension are
* loaded, otherwise any settings to GUCs that were set before the

View File

@@ -433,7 +433,7 @@ def test_large_records(neon_simple_env: NeonEnv, vanilla_pg: VanillaPostgres):
#
# Check that slots are not inherited in brnach
# Check that slots are not inherited in branch
#
def test_slots_and_branching(neon_simple_env: NeonEnv):
env = neon_simple_env
@@ -624,3 +624,53 @@ def test_subscriber_synchronous_commit(neon_simple_env: NeonEnv, vanilla_pg: Van
log.info("waiting for sync after restart")
logical_replication_wait_flush_lsn_sync(vanilla_pg)
assert sub.safe_psql_scalar("SELECT count(*) FROM t") == 1000
def test_logical_replication_ancestor_lsn(neon_simple_env: NeonEnv, vanilla_pg: VanillaPostgres):
env = neon_simple_env
tenant, timeline = env.create_tenant()
main = env.endpoints.create_start("main", tenant_id=tenant)
client = env.pageserver.http_client()
main_conn = main.connect()
main_cur = main_conn.cursor()
main_cur.execute("create table t(pk integer primary key, payload integer)")
main_cur.execute("create publication pub1 for table t")
# insert some data...
main_cur.execute("insert into t values (generate_series(1,1000), 0)")
main_cur.execute("select pg_create_logical_replication_slot('myslot', 'pgoutput')")
wait_for_last_flush_lsn(env, main, tenant, timeline)
#remember WAL position
main_cur.execute("SELECT pg_current_wal_flush_lsn()")
main_lsn = Lsn(main_cur.fetchall()[0][0])
# insert more data...
main_cur.execute("insert into t values (generate_series(1001,2000), 0)")
# create branch
branch_timeline = env.create_branch("branch", ancestor_branch_name="main", tenant_id=tenant, ancestor_start_lsn=main_lsn)
#all_reparented = client.detach_ancestor(tenant, branch_timeline, detach_behavior="v1")
all_reparented = client.detach_ancestor(tenant, branch_timeline)
assert all_reparented == set()
branch = env.endpoints.create_start("branch", tenant_id=tenant)
branch_conn = branch.connect()
branch_cur = branch_conn.cursor()
connstr = branch.connstr().replace("'", "''")
log.info(f"ep connstr is {branch.connstr()}, subscriber connstr {vanilla_pg.connstr()}")
# now start subscriber
vanilla_pg.start()
vanilla_pg.safe_psql("create table t(pk integer primary key, payload integer)")
vanilla_pg.safe_psql(f"create subscription sub1 connection '{connstr}' publication pub1 with (create_slot=false, slot_name='myslot')")
#vanilla_pg.safe_psql(f"create subscription sub1 connection '{connstr}' publication pub1")
logical_replication_sync(vanilla_pg, branch, "sub1")
assert vanilla_pg.safe_psql("select count(*) from t")[0][0] == 1000