diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index 7e3cc19829..8909f7f249 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -4996,13 +4996,35 @@ def check_restored_datadir_content( assert (mismatch, error) == ([], []) +# wait for subscriber to catch up with publisher def logical_replication_sync( subscriber: PgProtocol, publisher: PgProtocol, + # pass subname explicitly to avoid confusion + # when multiple subscriptions are present + subname: str, sub_dbname: str | None = None, pub_dbname: str | None = None, -) -> Lsn: +): """Wait logical replication subscriber to sync with publisher.""" + + def initial_sync(): + # first check if the subscription is active `s`=`synchronized`, `r` = `ready` + query = f"""SELECT 1 FROM pg_subscription_rel join pg_catalog.pg_subscription + on pg_subscription_rel.srsubid = pg_subscription.oid + WHERE srsubstate NOT IN ('r', 's') and subname='{subname}'""" + + if sub_dbname is not None: + res = subscriber.safe_psql(query, dbname=sub_dbname) + else: + res = subscriber.safe_psql(query) + + assert (res is None) or (len(res) == 0) + + wait_until(initial_sync) + + # wait for the subscription to catch up with current state of publisher + # caller is responsible to call checkpoint before calling this function if pub_dbname is not None: publisher_lsn = Lsn( publisher.safe_psql("SELECT pg_current_wal_flush_lsn()", dbname=pub_dbname)[0][0] @@ -5010,23 +5032,23 @@ def logical_replication_sync( else: publisher_lsn = Lsn(publisher.safe_psql("SELECT pg_current_wal_flush_lsn()")[0][0]) - while True: - if sub_dbname is not None: - res = subscriber.safe_psql( - "select latest_end_lsn from pg_catalog.pg_stat_subscription", dbname=sub_dbname - )[0][0] - else: - res = subscriber.safe_psql( - "select latest_end_lsn from pg_catalog.pg_stat_subscription" - )[0][0] + def subscriber_catch_up(): + query = f"select latest_end_lsn from pg_catalog.pg_stat_subscription where latest_end_lsn is NOT NULL and subname='{subname}'" - if res: - log.info(f"subscriber_lsn={res}") - subscriber_lsn = Lsn(res) - log.info(f"Subscriber LSN={subscriber_lsn}, publisher LSN={publisher_lsn}") - if subscriber_lsn >= publisher_lsn: - return subscriber_lsn - time.sleep(0.5) + if sub_dbname is not None: + res = subscriber.safe_psql(query, dbname=sub_dbname) + else: + res = subscriber.safe_psql(query) + + assert res is not None + + res_lsn = res[0][0] + log.info(f"subscriber_lsn={res_lsn}") + subscriber_lsn = Lsn(res_lsn) + log.info(f"Subscriber LSN={subscriber_lsn}, publisher LSN={publisher_lsn}") + assert subscriber_lsn >= publisher_lsn + + wait_until(subscriber_catch_up) def tenant_get_shards( diff --git a/test_runner/performance/test_logical_replication.py b/test_runner/performance/test_logical_replication.py index 9d653d1a1e..fdc56cc496 100644 --- a/test_runner/performance/test_logical_replication.py +++ b/test_runner/performance/test_logical_replication.py @@ -44,13 +44,13 @@ def test_logical_replication(neon_simple_env: NeonEnv, pg_bin: PgBin, vanilla_pg vanilla_pg.safe_psql(f"create subscription sub1 connection '{connstr}' publication pub1") # Wait logical replication channel to be established - logical_replication_sync(vanilla_pg, endpoint) + logical_replication_sync(vanilla_pg, endpoint, "sub1") pg_bin.run_capture(["pgbench", "-c10", "-T100", "-Mprepared", endpoint.connstr()]) # Wait logical replication to sync start = time.time() - logical_replication_sync(vanilla_pg, endpoint) + logical_replication_sync(vanilla_pg, endpoint, "sub1") log.info(f"Sync with master took {time.time() - start} seconds") sum_master = cast("int", endpoint.safe_psql("select sum(abalance) from pgbench_accounts")[0][0]) diff --git a/test_runner/regress/test_compute_catalog.py b/test_runner/regress/test_compute_catalog.py index f0878b2631..50a922a616 100644 --- a/test_runner/regress/test_compute_catalog.py +++ b/test_runner/regress/test_compute_catalog.py @@ -183,6 +183,7 @@ def test_dropdb_with_subscription(neon_simple_env: NeonEnv): cursor.execute("select pg_catalog.pg_create_logical_replication_slot('mysub', 'pgoutput');") cursor.execute("CREATE TABLE t(a int)") cursor.execute("INSERT INTO t VALUES (1)") + cursor.execute("CHECKPOINT") # connect to the subscriber_db and create a subscription # Note that we need to create subscription with @@ -195,7 +196,11 @@ def test_dropdb_with_subscription(neon_simple_env: NeonEnv): # wait for the subscription to be active logical_replication_sync( - endpoint, endpoint, sub_dbname="subscriber_db", pub_dbname="publisher_db" + endpoint, + endpoint, + "mysub", + sub_dbname="subscriber_db", + pub_dbname="publisher_db", ) # Check that replication is working diff --git a/test_runner/regress/test_layer_bloating.py b/test_runner/regress/test_layer_bloating.py index d9043fef7f..0260704ebf 100644 --- a/test_runner/regress/test_layer_bloating.py +++ b/test_runner/regress/test_layer_bloating.py @@ -63,7 +63,7 @@ def test_layer_bloating(neon_env_builder: NeonEnvBuilder, vanilla_pg): cur.execute("set statement_timeout=0") cur.execute("select create_snapshots(10000)") # Wait logical replication to sync - logical_replication_sync(vanilla_pg, endpoint) + logical_replication_sync(vanilla_pg, endpoint, "sub1") wait_for_last_flush_lsn(env, endpoint, env.initial_tenant, timeline) env.pageserver.http_client().timeline_checkpoint(env.initial_tenant, timeline, compact=False) diff --git a/test_runner/regress/test_logical_replication.py b/test_runner/regress/test_logical_replication.py index 8908763109..3a92f0d1d1 100644 --- a/test_runner/regress/test_logical_replication.py +++ b/test_runner/regress/test_logical_replication.py @@ -55,13 +55,13 @@ def test_logical_replication(neon_simple_env: NeonEnv, vanilla_pg: VanillaPostgr vanilla_pg.safe_psql(f"create subscription sub1 connection '{connstr}' publication pub1") # Wait logical replication channel to be established - logical_replication_sync(vanilla_pg, endpoint) + logical_replication_sync(vanilla_pg, endpoint, "sub1") # insert some data cur.execute("insert into t values (generate_series(1,1000), 0)") # Wait logical replication to sync - logical_replication_sync(vanilla_pg, endpoint) + logical_replication_sync(vanilla_pg, endpoint, "sub1") assert vanilla_pg.safe_psql("select count(*) from t")[0][0] == 1000 # now stop subscriber... @@ -78,7 +78,7 @@ def test_logical_replication(neon_simple_env: NeonEnv, vanilla_pg: VanillaPostgr vanilla_pg.start() # Wait logical replication to sync - logical_replication_sync(vanilla_pg, endpoint) + logical_replication_sync(vanilla_pg, endpoint, "sub1") # Check that subscribers receives all data assert vanilla_pg.safe_psql("select count(*) from t")[0][0] == 2000 @@ -148,7 +148,7 @@ COMMIT; endpoint.start() vanilla_pg.start() - logical_replication_sync(vanilla_pg, endpoint) + logical_replication_sync(vanilla_pg, endpoint, "sub1") eq_q = "select testcolumn1, testcolumn2, testcolumn3 from replication_example order by 1, 2, 3" assert vanilla_pg.safe_psql(eq_q) == endpoint.safe_psql(eq_q) log.info("rewriteheap synced") @@ -285,7 +285,7 @@ FROM generate_series(1, 16384) AS seq; -- Inserts enough rows to exceed 16MB of vanilla_pg.safe_psql("create table t(a int)") connstr = endpoint.connstr().replace("'", "''") vanilla_pg.safe_psql(f"create subscription sub1 connection '{connstr}' publication pub") - logical_replication_sync(vanilla_pg, endpoint) + logical_replication_sync(vanilla_pg, endpoint, "sub1") vanilla_pg.stop() @@ -321,13 +321,13 @@ FROM generate_series(1, 16384) AS seq; -- Inserts enough rows to exceed 16MB of sk_http = sk.http_client() sk_http.configure_failpoints([("sk-pause-send", "off")]) - logical_replication_sync(vanilla_pg, endpoint) + logical_replication_sync(vanilla_pg, endpoint, "sub1") assert [r[0] for r in vanilla_pg.safe_psql("select * from t")] == [1, 2] # Check that local reads also work with endpoint.connect().cursor() as cur: cur.execute("insert into t values (3)") - logical_replication_sync(vanilla_pg, endpoint) + logical_replication_sync(vanilla_pg, endpoint, "sub1") assert [r[0] for r in vanilla_pg.safe_psql("select * from t")] == [1, 2, 3] log_path = vanilla_pg.pgdatadir / "pg.log" @@ -365,7 +365,7 @@ def test_restart_endpoint(neon_simple_env: NeonEnv, vanilla_pg: VanillaPostgres) log.info(f"ep connstr is {endpoint.connstr()}, subscriber connstr {vanilla_pg.connstr()}") connstr = endpoint.connstr().replace("'", "''") vanilla_pg.safe_psql(f"create subscription sub1 connection '{connstr}' publication pub1") - logical_replication_sync(vanilla_pg, endpoint) + logical_replication_sync(vanilla_pg, endpoint, "sub1") vanilla_pg.stop() wait_for_last_flush_lsn(env, endpoint, tenant_id, timeline_id) @@ -375,7 +375,7 @@ def test_restart_endpoint(neon_simple_env: NeonEnv, vanilla_pg: VanillaPostgres) # this should flush current wal page cur.execute("insert into replication_example values (3, 4)") vanilla_pg.start() - logical_replication_sync(vanilla_pg, endpoint) + logical_replication_sync(vanilla_pg, endpoint, "sub1") assert vanilla_pg.safe_psql( "select sum(somedata) from replication_example" ) == endpoint.safe_psql("select sum(somedata) from replication_example") @@ -409,18 +409,18 @@ def test_large_records(neon_simple_env: NeonEnv, vanilla_pg: VanillaPostgres): # Test simple insert, update, delete. But with very large values value = random_string(10_000_000) cur.execute(f"INSERT INTO reptbl VALUES (1, '{value}')") - logical_replication_sync(vanilla_pg, endpoint) + logical_replication_sync(vanilla_pg, endpoint, "sub1") assert vanilla_pg.safe_psql("select id, largeval from reptbl") == [(1, value)] # Test delete, and reinsert another value cur.execute("DELETE FROM reptbl WHERE id = 1") cur.execute(f"INSERT INTO reptbl VALUES (2, '{value}')") - logical_replication_sync(vanilla_pg, endpoint) + logical_replication_sync(vanilla_pg, endpoint, "sub1") assert vanilla_pg.safe_psql("select id, largeval from reptbl") == [(2, value)] value = random_string(10_000_000) cur.execute(f"UPDATE reptbl SET largeval='{value}'") - logical_replication_sync(vanilla_pg, endpoint) + logical_replication_sync(vanilla_pg, endpoint, "sub1") assert vanilla_pg.safe_psql("select id, largeval from reptbl") == [(2, value)] endpoint.stop() @@ -428,7 +428,7 @@ def test_large_records(neon_simple_env: NeonEnv, vanilla_pg: VanillaPostgres): cur = endpoint.connect().cursor() value = random_string(10_000_000) cur.execute(f"UPDATE reptbl SET largeval='{value}'") - logical_replication_sync(vanilla_pg, endpoint) + logical_replication_sync(vanilla_pg, endpoint, "sub1") assert vanilla_pg.safe_psql("select id, largeval from reptbl") == [(2, value)] @@ -608,7 +608,7 @@ def test_subscriber_synchronous_commit(neon_simple_env: NeonEnv, vanilla_pg: Van for i in range(0, 1000): pcur.execute("INSERT into t values (%s, random()*100000)", (i,)) # wait until sub receives all data - logical_replication_sync(sub, vanilla_pg) + logical_replication_sync(sub, vanilla_pg, "sub") # Update confirmed_flush_lsn of the slot. If subscriber ack'ed recevied data # as flushed we'll now lose it if subscriber restars. That's why # logical_replication_wait_flush_lsn_sync is expected to hang while diff --git a/test_runner/regress/test_physical_and_logical_replicaiton.py b/test_runner/regress/test_physical_and_logical_replicaiton.py index 3f9824ee67..229439106b 100644 --- a/test_runner/regress/test_physical_and_logical_replicaiton.py +++ b/test_runner/regress/test_physical_and_logical_replicaiton.py @@ -43,7 +43,7 @@ def test_physical_and_logical_replication_slot_not_copied(neon_simple_env: NeonE s_cur.execute("select count(*) from t") assert s_cur.fetchall()[0][0] == n_records - logical_replication_sync(vanilla_pg, primary) + logical_replication_sync(vanilla_pg, primary, "sub1") assert vanilla_pg.safe_psql("select count(*) from t")[0][0] == n_records # Check that LR slot is not copied to replica @@ -87,7 +87,7 @@ def test_aux_not_logged_at_replica(neon_simple_env: NeonEnv, vanilla_pg): s_con = secondary.connect() s_cur = s_con.cursor() - logical_replication_sync(vanilla_pg, primary) + logical_replication_sync(vanilla_pg, primary, "sub1") assert vanilla_pg.safe_psql("select count(*) from t")[0][0] == n_records s_cur.execute("select count(*) from t") diff --git a/test_runner/regress/test_subscriber_branching.py b/test_runner/regress/test_subscriber_branching.py index 645572da8e..849d4f024d 100644 --- a/test_runner/regress/test_subscriber_branching.py +++ b/test_runner/regress/test_subscriber_branching.py @@ -3,7 +3,7 @@ from __future__ import annotations import time from fixtures.log_helper import log -from fixtures.neon_fixtures import NeonEnv, logical_replication_sync +from fixtures.neon_fixtures import NeonEnv from fixtures.utils import query_scalar, wait_until @@ -208,7 +208,6 @@ def test_subscriber_branching(neon_simple_env: NeonEnv): # wake the sub and ensure that it catches up with the new data sub.start(create_test_user=True) with sub.cursor(dbname="neondb", user="test", password="testpwd") as scur: - logical_replication_sync(sub, pub) wait_until(check_that_changes_propagated) scur.execute("SELECT count(*) FROM t") res = scur.fetchall()