Fix logical_replication_sync test fixture (#10531)

Fixes flaky test_lr_with_slow_safekeeper test #10242

Fix query to `pg_catalog.pg_stat_subscription` catalog to handle table
synchronization and parallel LR correctly.
This commit is contained in:
Anastasia Lubennikova
2025-02-03 12:44:47 +00:00
committed by GitHub
parent b1e451091a
commit b1bc33eb4d
7 changed files with 65 additions and 39 deletions

View File

@@ -4996,13 +4996,35 @@ def check_restored_datadir_content(
assert (mismatch, error) == ([], [])
# wait for subscriber to catch up with publisher
def logical_replication_sync(
subscriber: PgProtocol,
publisher: PgProtocol,
# pass subname explicitly to avoid confusion
# when multiple subscriptions are present
subname: str,
sub_dbname: str | None = None,
pub_dbname: str | None = None,
) -> Lsn:
):
"""Wait logical replication subscriber to sync with publisher."""
def initial_sync():
# first check if the subscription is active `s`=`synchronized`, `r` = `ready`
query = f"""SELECT 1 FROM pg_subscription_rel join pg_catalog.pg_subscription
on pg_subscription_rel.srsubid = pg_subscription.oid
WHERE srsubstate NOT IN ('r', 's') and subname='{subname}'"""
if sub_dbname is not None:
res = subscriber.safe_psql(query, dbname=sub_dbname)
else:
res = subscriber.safe_psql(query)
assert (res is None) or (len(res) == 0)
wait_until(initial_sync)
# wait for the subscription to catch up with current state of publisher
# caller is responsible to call checkpoint before calling this function
if pub_dbname is not None:
publisher_lsn = Lsn(
publisher.safe_psql("SELECT pg_current_wal_flush_lsn()", dbname=pub_dbname)[0][0]
@@ -5010,23 +5032,23 @@ def logical_replication_sync(
else:
publisher_lsn = Lsn(publisher.safe_psql("SELECT pg_current_wal_flush_lsn()")[0][0])
while True:
if sub_dbname is not None:
res = subscriber.safe_psql(
"select latest_end_lsn from pg_catalog.pg_stat_subscription", dbname=sub_dbname
)[0][0]
else:
res = subscriber.safe_psql(
"select latest_end_lsn from pg_catalog.pg_stat_subscription"
)[0][0]
def subscriber_catch_up():
query = f"select latest_end_lsn from pg_catalog.pg_stat_subscription where latest_end_lsn is NOT NULL and subname='{subname}'"
if res:
log.info(f"subscriber_lsn={res}")
subscriber_lsn = Lsn(res)
log.info(f"Subscriber LSN={subscriber_lsn}, publisher LSN={publisher_lsn}")
if subscriber_lsn >= publisher_lsn:
return subscriber_lsn
time.sleep(0.5)
if sub_dbname is not None:
res = subscriber.safe_psql(query, dbname=sub_dbname)
else:
res = subscriber.safe_psql(query)
assert res is not None
res_lsn = res[0][0]
log.info(f"subscriber_lsn={res_lsn}")
subscriber_lsn = Lsn(res_lsn)
log.info(f"Subscriber LSN={subscriber_lsn}, publisher LSN={publisher_lsn}")
assert subscriber_lsn >= publisher_lsn
wait_until(subscriber_catch_up)
def tenant_get_shards(

View File

@@ -44,13 +44,13 @@ def test_logical_replication(neon_simple_env: NeonEnv, pg_bin: PgBin, vanilla_pg
vanilla_pg.safe_psql(f"create subscription sub1 connection '{connstr}' publication pub1")
# Wait logical replication channel to be established
logical_replication_sync(vanilla_pg, endpoint)
logical_replication_sync(vanilla_pg, endpoint, "sub1")
pg_bin.run_capture(["pgbench", "-c10", "-T100", "-Mprepared", endpoint.connstr()])
# Wait logical replication to sync
start = time.time()
logical_replication_sync(vanilla_pg, endpoint)
logical_replication_sync(vanilla_pg, endpoint, "sub1")
log.info(f"Sync with master took {time.time() - start} seconds")
sum_master = cast("int", endpoint.safe_psql("select sum(abalance) from pgbench_accounts")[0][0])

View File

@@ -183,6 +183,7 @@ def test_dropdb_with_subscription(neon_simple_env: NeonEnv):
cursor.execute("select pg_catalog.pg_create_logical_replication_slot('mysub', 'pgoutput');")
cursor.execute("CREATE TABLE t(a int)")
cursor.execute("INSERT INTO t VALUES (1)")
cursor.execute("CHECKPOINT")
# connect to the subscriber_db and create a subscription
# Note that we need to create subscription with
@@ -195,7 +196,11 @@ def test_dropdb_with_subscription(neon_simple_env: NeonEnv):
# wait for the subscription to be active
logical_replication_sync(
endpoint, endpoint, sub_dbname="subscriber_db", pub_dbname="publisher_db"
endpoint,
endpoint,
"mysub",
sub_dbname="subscriber_db",
pub_dbname="publisher_db",
)
# Check that replication is working

View File

@@ -63,7 +63,7 @@ def test_layer_bloating(neon_env_builder: NeonEnvBuilder, vanilla_pg):
cur.execute("set statement_timeout=0")
cur.execute("select create_snapshots(10000)")
# Wait logical replication to sync
logical_replication_sync(vanilla_pg, endpoint)
logical_replication_sync(vanilla_pg, endpoint, "sub1")
wait_for_last_flush_lsn(env, endpoint, env.initial_tenant, timeline)
env.pageserver.http_client().timeline_checkpoint(env.initial_tenant, timeline, compact=False)

View File

@@ -55,13 +55,13 @@ def test_logical_replication(neon_simple_env: NeonEnv, vanilla_pg: VanillaPostgr
vanilla_pg.safe_psql(f"create subscription sub1 connection '{connstr}' publication pub1")
# Wait logical replication channel to be established
logical_replication_sync(vanilla_pg, endpoint)
logical_replication_sync(vanilla_pg, endpoint, "sub1")
# insert some data
cur.execute("insert into t values (generate_series(1,1000), 0)")
# Wait logical replication to sync
logical_replication_sync(vanilla_pg, endpoint)
logical_replication_sync(vanilla_pg, endpoint, "sub1")
assert vanilla_pg.safe_psql("select count(*) from t")[0][0] == 1000
# now stop subscriber...
@@ -78,7 +78,7 @@ def test_logical_replication(neon_simple_env: NeonEnv, vanilla_pg: VanillaPostgr
vanilla_pg.start()
# Wait logical replication to sync
logical_replication_sync(vanilla_pg, endpoint)
logical_replication_sync(vanilla_pg, endpoint, "sub1")
# Check that subscribers receives all data
assert vanilla_pg.safe_psql("select count(*) from t")[0][0] == 2000
@@ -148,7 +148,7 @@ COMMIT;
endpoint.start()
vanilla_pg.start()
logical_replication_sync(vanilla_pg, endpoint)
logical_replication_sync(vanilla_pg, endpoint, "sub1")
eq_q = "select testcolumn1, testcolumn2, testcolumn3 from replication_example order by 1, 2, 3"
assert vanilla_pg.safe_psql(eq_q) == endpoint.safe_psql(eq_q)
log.info("rewriteheap synced")
@@ -285,7 +285,7 @@ FROM generate_series(1, 16384) AS seq; -- Inserts enough rows to exceed 16MB of
vanilla_pg.safe_psql("create table t(a int)")
connstr = endpoint.connstr().replace("'", "''")
vanilla_pg.safe_psql(f"create subscription sub1 connection '{connstr}' publication pub")
logical_replication_sync(vanilla_pg, endpoint)
logical_replication_sync(vanilla_pg, endpoint, "sub1")
vanilla_pg.stop()
@@ -321,13 +321,13 @@ FROM generate_series(1, 16384) AS seq; -- Inserts enough rows to exceed 16MB of
sk_http = sk.http_client()
sk_http.configure_failpoints([("sk-pause-send", "off")])
logical_replication_sync(vanilla_pg, endpoint)
logical_replication_sync(vanilla_pg, endpoint, "sub1")
assert [r[0] for r in vanilla_pg.safe_psql("select * from t")] == [1, 2]
# Check that local reads also work
with endpoint.connect().cursor() as cur:
cur.execute("insert into t values (3)")
logical_replication_sync(vanilla_pg, endpoint)
logical_replication_sync(vanilla_pg, endpoint, "sub1")
assert [r[0] for r in vanilla_pg.safe_psql("select * from t")] == [1, 2, 3]
log_path = vanilla_pg.pgdatadir / "pg.log"
@@ -365,7 +365,7 @@ def test_restart_endpoint(neon_simple_env: NeonEnv, vanilla_pg: VanillaPostgres)
log.info(f"ep connstr is {endpoint.connstr()}, subscriber connstr {vanilla_pg.connstr()}")
connstr = endpoint.connstr().replace("'", "''")
vanilla_pg.safe_psql(f"create subscription sub1 connection '{connstr}' publication pub1")
logical_replication_sync(vanilla_pg, endpoint)
logical_replication_sync(vanilla_pg, endpoint, "sub1")
vanilla_pg.stop()
wait_for_last_flush_lsn(env, endpoint, tenant_id, timeline_id)
@@ -375,7 +375,7 @@ def test_restart_endpoint(neon_simple_env: NeonEnv, vanilla_pg: VanillaPostgres)
# this should flush current wal page
cur.execute("insert into replication_example values (3, 4)")
vanilla_pg.start()
logical_replication_sync(vanilla_pg, endpoint)
logical_replication_sync(vanilla_pg, endpoint, "sub1")
assert vanilla_pg.safe_psql(
"select sum(somedata) from replication_example"
) == endpoint.safe_psql("select sum(somedata) from replication_example")
@@ -409,18 +409,18 @@ def test_large_records(neon_simple_env: NeonEnv, vanilla_pg: VanillaPostgres):
# Test simple insert, update, delete. But with very large values
value = random_string(10_000_000)
cur.execute(f"INSERT INTO reptbl VALUES (1, '{value}')")
logical_replication_sync(vanilla_pg, endpoint)
logical_replication_sync(vanilla_pg, endpoint, "sub1")
assert vanilla_pg.safe_psql("select id, largeval from reptbl") == [(1, value)]
# Test delete, and reinsert another value
cur.execute("DELETE FROM reptbl WHERE id = 1")
cur.execute(f"INSERT INTO reptbl VALUES (2, '{value}')")
logical_replication_sync(vanilla_pg, endpoint)
logical_replication_sync(vanilla_pg, endpoint, "sub1")
assert vanilla_pg.safe_psql("select id, largeval from reptbl") == [(2, value)]
value = random_string(10_000_000)
cur.execute(f"UPDATE reptbl SET largeval='{value}'")
logical_replication_sync(vanilla_pg, endpoint)
logical_replication_sync(vanilla_pg, endpoint, "sub1")
assert vanilla_pg.safe_psql("select id, largeval from reptbl") == [(2, value)]
endpoint.stop()
@@ -428,7 +428,7 @@ def test_large_records(neon_simple_env: NeonEnv, vanilla_pg: VanillaPostgres):
cur = endpoint.connect().cursor()
value = random_string(10_000_000)
cur.execute(f"UPDATE reptbl SET largeval='{value}'")
logical_replication_sync(vanilla_pg, endpoint)
logical_replication_sync(vanilla_pg, endpoint, "sub1")
assert vanilla_pg.safe_psql("select id, largeval from reptbl") == [(2, value)]
@@ -608,7 +608,7 @@ def test_subscriber_synchronous_commit(neon_simple_env: NeonEnv, vanilla_pg: Van
for i in range(0, 1000):
pcur.execute("INSERT into t values (%s, random()*100000)", (i,))
# wait until sub receives all data
logical_replication_sync(sub, vanilla_pg)
logical_replication_sync(sub, vanilla_pg, "sub")
# Update confirmed_flush_lsn of the slot. If subscriber ack'ed recevied data
# as flushed we'll now lose it if subscriber restars. That's why
# logical_replication_wait_flush_lsn_sync is expected to hang while

View File

@@ -43,7 +43,7 @@ def test_physical_and_logical_replication_slot_not_copied(neon_simple_env: NeonE
s_cur.execute("select count(*) from t")
assert s_cur.fetchall()[0][0] == n_records
logical_replication_sync(vanilla_pg, primary)
logical_replication_sync(vanilla_pg, primary, "sub1")
assert vanilla_pg.safe_psql("select count(*) from t")[0][0] == n_records
# Check that LR slot is not copied to replica
@@ -87,7 +87,7 @@ def test_aux_not_logged_at_replica(neon_simple_env: NeonEnv, vanilla_pg):
s_con = secondary.connect()
s_cur = s_con.cursor()
logical_replication_sync(vanilla_pg, primary)
logical_replication_sync(vanilla_pg, primary, "sub1")
assert vanilla_pg.safe_psql("select count(*) from t")[0][0] == n_records
s_cur.execute("select count(*) from t")

View File

@@ -3,7 +3,7 @@ from __future__ import annotations
import time
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnv, logical_replication_sync
from fixtures.neon_fixtures import NeonEnv
from fixtures.utils import query_scalar, wait_until
@@ -208,7 +208,6 @@ def test_subscriber_branching(neon_simple_env: NeonEnv):
# wake the sub and ensure that it catches up with the new data
sub.start(create_test_user=True)
with sub.cursor(dbname="neondb", user="test", password="testpwd") as scur:
logical_replication_sync(sub, pub)
wait_until(check_that_changes_propagated)
scur.execute("SELECT count(*) FROM t")
res = scur.fetchall()