diff --git a/pgxn/neon/walproposer_pg.c b/pgxn/neon/walproposer_pg.c index 61a2a54809..9a9f593bde 100644 --- a/pgxn/neon/walproposer_pg.c +++ b/pgxn/neon/walproposer_pg.c @@ -1923,12 +1923,14 @@ walprop_pg_process_safekeeper_feedback(WalProposer *wp, XLogRecPtr commitLsn) if (hsFeedback.ts != 0 && memcmp(&hsFeedback, &quorumFeedback.hs, sizeof hsFeedback) != 0) { quorumFeedback.hs = hsFeedback; + elog(LOG, "ProcessStandbyHSFeedback(xmin=%d, catalog_xmin=%d", XidFromFullTransactionId(hsFeedback.xmin), XidFromFullTransactionId(hsFeedback.catalog_xmin)); ProcessStandbyHSFeedback(hsFeedback.ts, XidFromFullTransactionId(hsFeedback.xmin), EpochFromFullTransactionId(hsFeedback.xmin), XidFromFullTransactionId(hsFeedback.catalog_xmin), EpochFromFullTransactionId(hsFeedback.catalog_xmin)); - } + } else + elog(LOG, "Skip HSFeedback ts=%ld, xmin=%d, catalog_xmin=%d", hsFeedback.ts, XidFromFullTransactionId(hsFeedback.xmin), XidFromFullTransactionId(hsFeedback.catalog_xmin)); } static XLogRecPtr diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index bf7c6ccc14..a107558abb 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -4087,6 +4087,19 @@ def tenant_get_shards( # Assume an unsharded tenant return [(TenantShardId(tenant_id, 0, 0), override_pageserver or env.pageserver)] +def wait_replica_caughtup(primary: Endpoint, secondary: Endpoint): + primary_lsn = primary.safe_psql_scalar( + "SELECT pg_current_wal_flush_lsn()::text", log_query=False + ) + while True: + secondary_lsn = secondary.safe_psql_scalar( + "SELECT pg_last_wal_replay_lsn()", log_query=False + ) + caught_up = secondary_lsn >= primary_lsn + log.info(f"caughtup={caught_up}, primary_lsn={primary_lsn}, secondary_lsn={secondary_lsn}") + if caught_up: + return + time.sleep(1) def wait_for_last_flush_lsn( env: NeonEnv, diff --git a/test_runner/regress/test_hot_standby.py b/test_runner/regress/test_hot_standby.py index 7822e29ed9..33859caab2 100644 --- a/test_runner/regress/test_hot_standby.py +++ b/test_runner/regress/test_hot_standby.py @@ -3,23 +3,7 @@ import re import time from fixtures.log_helper import log -from fixtures.neon_fixtures import Endpoint, NeonEnv - - -def wait_caughtup(primary: Endpoint, secondary: Endpoint): - primary_lsn = primary.safe_psql_scalar( - "SELECT pg_current_wal_insert_lsn()::text", log_query=False - ) - while True: - secondary_lsn = secondary.safe_psql_scalar( - "SELECT pg_last_wal_replay_lsn()", log_query=False - ) - caught_up = secondary_lsn >= primary_lsn - log.info(f"caughtup={caught_up}, primary_lsn={primary_lsn}, secondary_lsn={secondary_lsn}") - if caught_up: - return - time.sleep(1) - +from fixtures.neon_fixtures import Endpoint, NeonEnv, wait_replica_caughtup # Check for corrupted WAL messages which might otherwise go unnoticed if # reconnection fixes this. @@ -79,7 +63,7 @@ def test_hot_standby(neon_simple_env: NeonEnv): primary.safe_psql("create table t(key int, value text)") primary.safe_psql("insert into t select generate_series(1, 100000), 'payload'") - wait_caughtup(primary, secondary) + wait_replica_caughtup(primary, secondary) with secondary.connect() as s_con: with s_con.cursor() as s_cur: diff --git a/test_runner/regress/test_replication_lag.py b/test_runner/regress/test_replication_lag.py index f167078512..01185e403e 100644 --- a/test_runner/regress/test_replication_lag.py +++ b/test_runner/regress/test_replication_lag.py @@ -1,8 +1,7 @@ import threading -import time from fixtures.log_helper import log -from fixtures.neon_fixtures import NeonEnv, PgBin +from fixtures.neon_fixtures import NeonEnv, PgBin, wait_replica_caughtup def test_replication_lag(neon_simple_env: NeonEnv, pg_bin: PgBin): @@ -34,7 +33,7 @@ def test_replication_lag(neon_simple_env: NeonEnv, pg_bin: PgBin): t.start() with env.endpoints.new_replica_start(origin=primary, endpoint_id="secondary") as secondary: - time.sleep(2) + wait_replica_caughtup(primary, secondary) for _ in range(1, n_iterations): primary_lsn = primary.safe_psql_scalar( "SELECT pg_current_wal_flush_lsn()::text", log_query=False