mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-21 23:20:40 +00:00
Add neon.ancestor_lsn GUC
This commit is contained in:
@@ -331,7 +331,15 @@ where
|
||||
.append(&header, data)
|
||||
.await
|
||||
.map_err(|e| BasebackupError::Client(e, "send_tarball,pg_hba.conf"))?;
|
||||
} else {
|
||||
} else if *filepath == "postgresql.auto.conf" {
|
||||
let ancestor_lsn = format!("neon.ancestor_lsn='{}'", self.timeline.get_ancestor_lsn());
|
||||
let data = ancestor_lsn.as_bytes();
|
||||
let header = new_tar_header(filepath, data.len() as u64)?;
|
||||
self.ar
|
||||
.append(&header, data)
|
||||
.await
|
||||
.map_err(|e| BasebackupError::Client(e, "send_tarball,pg_hba.conf"))?;
|
||||
} else {
|
||||
let header = new_tar_header(filepath, 0)?;
|
||||
self.ar
|
||||
.append(&header, io::empty())
|
||||
|
||||
@@ -497,6 +497,15 @@ _PG_init(void)
|
||||
GUC_UNIT_KB,
|
||||
NULL, NULL, NULL);
|
||||
|
||||
DefineCustomStringVariable("neon.ancestor_lsn",
|
||||
"LSN of the ancestor timeline",
|
||||
"Zero if there is no arent branch",
|
||||
&neon_ancestor_lsn,
|
||||
"0/0",
|
||||
PGC_SIGHUP,
|
||||
0,
|
||||
NULL, NULL, NULL);
|
||||
|
||||
/*
|
||||
* Important: This must happen after other parts of the extension are
|
||||
* loaded, otherwise any settings to GUCs that were set before the
|
||||
|
||||
@@ -433,7 +433,7 @@ def test_large_records(neon_simple_env: NeonEnv, vanilla_pg: VanillaPostgres):
|
||||
|
||||
|
||||
#
|
||||
# Check that slots are not inherited in brnach
|
||||
# Check that slots are not inherited in branch
|
||||
#
|
||||
def test_slots_and_branching(neon_simple_env: NeonEnv):
|
||||
env = neon_simple_env
|
||||
@@ -624,3 +624,53 @@ def test_subscriber_synchronous_commit(neon_simple_env: NeonEnv, vanilla_pg: Van
|
||||
log.info("waiting for sync after restart")
|
||||
logical_replication_wait_flush_lsn_sync(vanilla_pg)
|
||||
assert sub.safe_psql_scalar("SELECT count(*) FROM t") == 1000
|
||||
|
||||
|
||||
|
||||
def test_logical_replication_ancestor_lsn(neon_simple_env: NeonEnv, vanilla_pg: VanillaPostgres):
|
||||
env = neon_simple_env
|
||||
tenant, timeline = env.create_tenant()
|
||||
|
||||
main = env.endpoints.create_start("main", tenant_id=tenant)
|
||||
client = env.pageserver.http_client()
|
||||
|
||||
main_conn = main.connect()
|
||||
main_cur = main_conn.cursor()
|
||||
|
||||
main_cur.execute("create table t(pk integer primary key, payload integer)")
|
||||
main_cur.execute("create publication pub1 for table t")
|
||||
|
||||
# insert some data...
|
||||
main_cur.execute("insert into t values (generate_series(1,1000), 0)")
|
||||
|
||||
main_cur.execute("select pg_create_logical_replication_slot('myslot', 'pgoutput')")
|
||||
wait_for_last_flush_lsn(env, main, tenant, timeline)
|
||||
|
||||
#remember WAL position
|
||||
main_cur.execute("SELECT pg_current_wal_flush_lsn()")
|
||||
main_lsn = Lsn(main_cur.fetchall()[0][0])
|
||||
|
||||
# insert more data...
|
||||
main_cur.execute("insert into t values (generate_series(1001,2000), 0)")
|
||||
|
||||
# create branch
|
||||
branch_timeline = env.create_branch("branch", ancestor_branch_name="main", tenant_id=tenant, ancestor_start_lsn=main_lsn)
|
||||
#all_reparented = client.detach_ancestor(tenant, branch_timeline, detach_behavior="v1")
|
||||
all_reparented = client.detach_ancestor(tenant, branch_timeline)
|
||||
assert all_reparented == set()
|
||||
|
||||
branch = env.endpoints.create_start("branch", tenant_id=tenant)
|
||||
branch_conn = branch.connect()
|
||||
branch_cur = branch_conn.cursor()
|
||||
connstr = branch.connstr().replace("'", "''")
|
||||
log.info(f"ep connstr is {branch.connstr()}, subscriber connstr {vanilla_pg.connstr()}")
|
||||
|
||||
# now start subscriber
|
||||
vanilla_pg.start()
|
||||
vanilla_pg.safe_psql("create table t(pk integer primary key, payload integer)")
|
||||
vanilla_pg.safe_psql(f"create subscription sub1 connection '{connstr}' publication pub1 with (create_slot=false, slot_name='myslot')")
|
||||
#vanilla_pg.safe_psql(f"create subscription sub1 connection '{connstr}' publication pub1")
|
||||
|
||||
logical_replication_sync(vanilla_pg, branch, "sub1")
|
||||
|
||||
assert vanilla_pg.safe_psql("select count(*) from t")[0][0] == 1000
|
||||
|
||||
Reference in New Issue
Block a user