mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-25 09:00:37 +00:00
Add wait_replica_caughtup to neon_fixtures
This commit is contained in:
@@ -1923,12 +1923,14 @@ walprop_pg_process_safekeeper_feedback(WalProposer *wp, XLogRecPtr commitLsn)
|
||||
if (hsFeedback.ts != 0 && memcmp(&hsFeedback, &quorumFeedback.hs, sizeof hsFeedback) != 0)
|
||||
{
|
||||
quorumFeedback.hs = hsFeedback;
|
||||
elog(LOG, "ProcessStandbyHSFeedback(xmin=%d, catalog_xmin=%d", XidFromFullTransactionId(hsFeedback.xmin), XidFromFullTransactionId(hsFeedback.catalog_xmin));
|
||||
ProcessStandbyHSFeedback(hsFeedback.ts,
|
||||
XidFromFullTransactionId(hsFeedback.xmin),
|
||||
EpochFromFullTransactionId(hsFeedback.xmin),
|
||||
XidFromFullTransactionId(hsFeedback.catalog_xmin),
|
||||
EpochFromFullTransactionId(hsFeedback.catalog_xmin));
|
||||
}
|
||||
} else
|
||||
elog(LOG, "Skip HSFeedback ts=%ld, xmin=%d, catalog_xmin=%d", hsFeedback.ts, XidFromFullTransactionId(hsFeedback.xmin), XidFromFullTransactionId(hsFeedback.catalog_xmin));
|
||||
}
|
||||
|
||||
static XLogRecPtr
|
||||
|
||||
@@ -4087,6 +4087,19 @@ def tenant_get_shards(
|
||||
# Assume an unsharded tenant
|
||||
return [(TenantShardId(tenant_id, 0, 0), override_pageserver or env.pageserver)]
|
||||
|
||||
def wait_replica_caughtup(primary: Endpoint, secondary: Endpoint):
|
||||
primary_lsn = primary.safe_psql_scalar(
|
||||
"SELECT pg_current_wal_flush_lsn()::text", log_query=False
|
||||
)
|
||||
while True:
|
||||
secondary_lsn = secondary.safe_psql_scalar(
|
||||
"SELECT pg_last_wal_replay_lsn()", log_query=False
|
||||
)
|
||||
caught_up = secondary_lsn >= primary_lsn
|
||||
log.info(f"caughtup={caught_up}, primary_lsn={primary_lsn}, secondary_lsn={secondary_lsn}")
|
||||
if caught_up:
|
||||
return
|
||||
time.sleep(1)
|
||||
|
||||
def wait_for_last_flush_lsn(
|
||||
env: NeonEnv,
|
||||
|
||||
@@ -3,23 +3,7 @@ import re
|
||||
import time
|
||||
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import Endpoint, NeonEnv
|
||||
|
||||
|
||||
def wait_caughtup(primary: Endpoint, secondary: Endpoint):
|
||||
primary_lsn = primary.safe_psql_scalar(
|
||||
"SELECT pg_current_wal_insert_lsn()::text", log_query=False
|
||||
)
|
||||
while True:
|
||||
secondary_lsn = secondary.safe_psql_scalar(
|
||||
"SELECT pg_last_wal_replay_lsn()", log_query=False
|
||||
)
|
||||
caught_up = secondary_lsn >= primary_lsn
|
||||
log.info(f"caughtup={caught_up}, primary_lsn={primary_lsn}, secondary_lsn={secondary_lsn}")
|
||||
if caught_up:
|
||||
return
|
||||
time.sleep(1)
|
||||
|
||||
from fixtures.neon_fixtures import Endpoint, NeonEnv, wait_replica_caughtup
|
||||
|
||||
# Check for corrupted WAL messages which might otherwise go unnoticed if
|
||||
# reconnection fixes this.
|
||||
@@ -79,7 +63,7 @@ def test_hot_standby(neon_simple_env: NeonEnv):
|
||||
primary.safe_psql("create table t(key int, value text)")
|
||||
primary.safe_psql("insert into t select generate_series(1, 100000), 'payload'")
|
||||
|
||||
wait_caughtup(primary, secondary)
|
||||
wait_replica_caughtup(primary, secondary)
|
||||
|
||||
with secondary.connect() as s_con:
|
||||
with s_con.cursor() as s_cur:
|
||||
|
||||
@@ -1,8 +1,7 @@
|
||||
import threading
|
||||
import time
|
||||
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import NeonEnv, PgBin
|
||||
from fixtures.neon_fixtures import NeonEnv, PgBin, wait_replica_caughtup
|
||||
|
||||
|
||||
def test_replication_lag(neon_simple_env: NeonEnv, pg_bin: PgBin):
|
||||
@@ -34,7 +33,7 @@ def test_replication_lag(neon_simple_env: NeonEnv, pg_bin: PgBin):
|
||||
t.start()
|
||||
|
||||
with env.endpoints.new_replica_start(origin=primary, endpoint_id="secondary") as secondary:
|
||||
time.sleep(2)
|
||||
wait_replica_caughtup(primary, secondary)
|
||||
for _ in range(1, n_iterations):
|
||||
primary_lsn = primary.safe_psql_scalar(
|
||||
"SELECT pg_current_wal_flush_lsn()::text", log_query=False
|
||||
|
||||
Reference in New Issue
Block a user