diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index b3f460c7fe..5ab9100974 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -3983,18 +3983,27 @@ def check_restored_datadir_content(test_output_dir: Path, env: NeonEnv, endpoint def logical_replication_sync(subscriber: VanillaPostgres, publisher: Endpoint) -> Lsn: """Wait logical replication subscriber to sync with publisher.""" - publisher_lsn = Lsn(publisher.safe_psql("SELECT pg_current_wal_flush_lsn()")[0][0]) - while True: + + def is_synced(publisher_lsn): + # Even if pg_stat_subscription.latest_end_lsn is caughtup, some tables + # might not be synced because until sync worker finishes main apply + # continues to advance. + rels_synced = subscriber.safe_psql( + "select count(*) = 0 from pg_subscription_rel where srsubstate != 'r'" + )[0][0] + log.info(f"number of not synced rels: {rels_synced}") + assert rels_synced res = subscriber.safe_psql("select latest_end_lsn from pg_catalog.pg_stat_subscription")[0][ 0 ] - if res: - log.info(f"subscriber_lsn={res}") - subscriber_lsn = Lsn(res) - log.info(f"Subscriber LSN={subscriber_lsn}, publisher LSN={ publisher_lsn}") - if subscriber_lsn >= publisher_lsn: - return subscriber_lsn - time.sleep(0.5) + log.info(f"subscriber_lsn={res}") + subscriber_lsn = Lsn(res) + log.info(f"Subscriber LSN={subscriber_lsn}, publisher LSN={ publisher_lsn}") + assert subscriber_lsn >= publisher_lsn + + publisher_lsn = Lsn(publisher.safe_psql("SELECT pg_current_wal_flush_lsn()")[0][0]) + wait_until(30, 0.5, partial(is_synced, publisher_lsn)) + return publisher_lsn def tenant_get_shards(