diff --git a/pgxn/neon/walproposer_pg.c b/pgxn/neon/walproposer_pg.c index 7c23ff569f..c6188d7f30 100644 --- a/pgxn/neon/walproposer_pg.c +++ b/pgxn/neon/walproposer_pg.c @@ -1843,7 +1843,6 @@ CombineHotStanbyFeedbacks(HotStandbyFeedback *hs, WalProposer *wp) for (int i = 0; i < wp->n_safekeepers; i++) { - elog(LOG, "hs.ts=%ld hs.xmin=%d", wp->safekeeper[i].appendResponse.hs.ts, XidFromFullTransactionId(wp->safekeeper[i].appendResponse.hs.xmin)); if (wp->safekeeper[i].appendResponse.hs.ts != 0) { HotStandbyFeedback *skhs = &wp->safekeeper[i].appendResponse.hs; @@ -1919,7 +1918,7 @@ walprop_pg_process_safekeeper_feedback(WalProposer *wp, XLogRecPtr commitLsn) if (hsFeedback.ts != 0 && memcmp(&hsFeedback, &quorumFeedback.hs, sizeof hsFeedback) != 0) { quorumFeedback.hs = hsFeedback; - elog(LOG, "ProcessStandbyHSFeedback(xmin=%d, catalog_xmin=%d", XidFromFullTransactionId(hsFeedback.xmin), XidFromFullTransactionId(hsFeedback.catalog_xmin)); + elog(DEBUG2, "ProcessStandbyHSFeedback(xmin=%d, catalog_xmin=%d", XidFromFullTransactionId(hsFeedback.xmin), XidFromFullTransactionId(hsFeedback.catalog_xmin)); ProcessStandbyHSFeedback(hsFeedback.ts, XidFromFullTransactionId(hsFeedback.xmin), EpochFromFullTransactionId(hsFeedback.xmin), diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index a107558abb..4361bb0eb7 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -4087,6 +4087,7 @@ def tenant_get_shards( # Assume an unsharded tenant return [(TenantShardId(tenant_id, 0, 0), override_pageserver or env.pageserver)] + def wait_replica_caughtup(primary: Endpoint, secondary: Endpoint): primary_lsn = primary.safe_psql_scalar( "SELECT pg_current_wal_flush_lsn()::text", log_query=False @@ -4101,6 +4102,7 @@ def wait_replica_caughtup(primary: Endpoint, secondary: Endpoint): return time.sleep(1) + def wait_for_last_flush_lsn( env: NeonEnv, endpoint: Endpoint, diff --git a/test_runner/regress/test_replication_lag.py b/test_runner/regress/test_replication_lag.py index 01185e403e..c386d6e50f 100644 --- a/test_runner/regress/test_replication_lag.py +++ b/test_runner/regress/test_replication_lag.py @@ -1,3 +1,4 @@ +import time import threading from fixtures.log_helper import log @@ -7,6 +8,7 @@ from fixtures.neon_fixtures import NeonEnv, PgBin, wait_replica_caughtup def test_replication_lag(neon_simple_env: NeonEnv, pg_bin: PgBin): env = neon_simple_env n_iterations = 10 + max_retries = 10 # Use aggressive GC and checkpoint settings tenant, _ = env.neon_cli.create_tenant( @@ -34,6 +36,7 @@ def test_replication_lag(neon_simple_env: NeonEnv, pg_bin: PgBin): with env.endpoints.new_replica_start(origin=primary, endpoint_id="secondary") as secondary: wait_replica_caughtup(primary, secondary) + time.sleep(1) # Without this sleep replica sometime failed to find relation: could not open relation with OID 16404 for _ in range(1, n_iterations): primary_lsn = primary.safe_psql_scalar( "SELECT pg_current_wal_flush_lsn()::text", log_query=False @@ -41,7 +44,19 @@ def test_replication_lag(neon_simple_env: NeonEnv, pg_bin: PgBin): secondary_lsn = secondary.safe_psql_scalar( "SELECT pg_last_wal_replay_lsn()", log_query=False ) - balance = secondary.safe_psql_scalar("select sum(abalance) from pgbench_accounts") + retries = 0 + while True: + try: + balance = secondary.safe_psql_scalar( + "select sum(abalance) from pgbench_accounts" + ) + break + except Exception as error: + print(f"Query failed: {error}") + if retries < max_retries: + retries += 1 + else: + raise log.info( f"primary_lsn={primary_lsn}, secondary_lsn={secondary_lsn}, balance={balance}" )