diff --git a/test_runner/batch_others/test_wal_acceptor.py b/test_runner/batch_others/test_wal_acceptor.py index 74b2f2657f..844ef3ebe1 100644 --- a/test_runner/batch_others/test_wal_acceptor.py +++ b/test_runner/batch_others/test_wal_acceptor.py @@ -14,13 +14,43 @@ from contextlib import closing from dataclasses import dataclass, field from multiprocessing import Process, Value from pathlib import Path -from fixtures.neon_fixtures import PgBin, Etcd, Postgres, RemoteStorageUsers, Safekeeper, NeonEnv, NeonEnvBuilder, PortDistributor, SafekeeperPort, neon_binpath, PgProtocol +from fixtures.neon_fixtures import NeonPageserver, PgBin, Etcd, Postgres, RemoteStorageUsers, Safekeeper, NeonEnv, NeonEnvBuilder, PortDistributor, SafekeeperPort, neon_binpath, PgProtocol, wait_for_last_record_lsn, wait_for_upload from fixtures.utils import get_dir_size, lsn_to_hex, lsn_from_hex from fixtures.log_helper import log from typing import List, Optional, Any from uuid import uuid4 +def wait_lsn_force_checkpoint(tenant_id: str, + timeline_id: str, + pg: Postgres, + ps: NeonPageserver, + pageserver_conn_options={}): + lsn = lsn_from_hex(pg.safe_psql('SELECT pg_current_wal_flush_lsn()')[0][0]) + log.info(f"pg_current_wal_flush_lsn is {lsn_to_hex(lsn)}, waiting for it on pageserver") + + auth_token = None + if 'password' in pageserver_conn_options: + auth_token = pageserver_conn_options['password'] + + # wait for the pageserver to catch up + wait_for_last_record_lsn(ps.http_client(auth_token=auth_token), + uuid.UUID(hex=tenant_id), + uuid.UUID(hex=timeline_id), + lsn) + + # force checkpoint to advance remote_consistent_lsn + with closing(ps.connect(**pageserver_conn_options)) as psconn: + with psconn.cursor() as pscur: + pscur.execute(f"checkpoint {tenant_id} {timeline_id}") + + # ensure that remote_consistent_lsn is advanced + wait_for_upload(ps.http_client(auth_token=auth_token), + uuid.UUID(hex=tenant_id), + uuid.UUID(hex=timeline_id), + lsn) + + @dataclass class TimelineMetrics: timeline_id: str @@ -223,10 +253,10 @@ def test_broker(neon_env_builder: NeonEnvBuilder): log.info(f"statuses is {stat_before}") pg.safe_psql("INSERT INTO t SELECT generate_series(1,100), 'payload'") - # force checkpoint to advance remote_consistent_lsn - with closing(env.pageserver.connect()) as psconn: - with psconn.cursor() as pscur: - pscur.execute(f"checkpoint {tenant_id} {timeline_id}") + + # force checkpoint in pageserver to advance remote_consistent_lsn + wait_lsn_force_checkpoint(tenant_id, timeline_id, pg, env.pageserver) + # and wait till remote_consistent_lsn propagates to all safekeepers started_at = time.time() while True: @@ -270,9 +300,7 @@ def test_wal_removal(neon_env_builder: NeonEnvBuilder, auth_enabled: bool): pageserver_conn_options = {} if auth_enabled: pageserver_conn_options['password'] = env.auth_keys.generate_tenant_token(tenant_id) - with closing(env.pageserver.connect(**pageserver_conn_options)) as psconn: - with psconn.cursor() as pscur: - pscur.execute(f"checkpoint {tenant_id} {timeline_id}") + wait_lsn_force_checkpoint(tenant_id, timeline_id, pg, env.pageserver, pageserver_conn_options) # We will wait for first segment removal. Make sure they exist for starter. first_segments = [