mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-17 02:12:56 +00:00
Fix LR tests waiting for synced data.
Even if pg_stat_subscription.latest_end_lsn is caughtup, some tables might not be synced because until sync worker finishes main apply worker continues to advance.
This commit is contained in:
@@ -3983,18 +3983,27 @@ def check_restored_datadir_content(test_output_dir: Path, env: NeonEnv, endpoint
|
||||
|
||||
def logical_replication_sync(subscriber: VanillaPostgres, publisher: Endpoint) -> Lsn:
|
||||
"""Wait logical replication subscriber to sync with publisher."""
|
||||
publisher_lsn = Lsn(publisher.safe_psql("SELECT pg_current_wal_flush_lsn()")[0][0])
|
||||
while True:
|
||||
|
||||
def is_synced(publisher_lsn):
|
||||
# Even if pg_stat_subscription.latest_end_lsn is caughtup, some tables
|
||||
# might not be synced because until sync worker finishes main apply
|
||||
# continues to advance.
|
||||
rels_synced = subscriber.safe_psql(
|
||||
"select count(*) = 0 from pg_subscription_rel where srsubstate != 'r'"
|
||||
)[0][0]
|
||||
log.info(f"number of not synced rels: {rels_synced}")
|
||||
assert rels_synced
|
||||
res = subscriber.safe_psql("select latest_end_lsn from pg_catalog.pg_stat_subscription")[0][
|
||||
0
|
||||
]
|
||||
if res:
|
||||
log.info(f"subscriber_lsn={res}")
|
||||
subscriber_lsn = Lsn(res)
|
||||
log.info(f"Subscriber LSN={subscriber_lsn}, publisher LSN={ publisher_lsn}")
|
||||
if subscriber_lsn >= publisher_lsn:
|
||||
return subscriber_lsn
|
||||
time.sleep(0.5)
|
||||
log.info(f"subscriber_lsn={res}")
|
||||
subscriber_lsn = Lsn(res)
|
||||
log.info(f"Subscriber LSN={subscriber_lsn}, publisher LSN={ publisher_lsn}")
|
||||
assert subscriber_lsn >= publisher_lsn
|
||||
|
||||
publisher_lsn = Lsn(publisher.safe_psql("SELECT pg_current_wal_flush_lsn()")[0][0])
|
||||
wait_until(30, 0.5, partial(is_synced, publisher_lsn))
|
||||
return publisher_lsn
|
||||
|
||||
|
||||
def tenant_get_shards(
|
||||
|
||||
Reference in New Issue
Block a user