diff --git a/test_runner/regress/test_pageserver_catchup.py b/test_runner/regress/test_pageserver_catchup.py index c16cbcb4ba..00f4634c16 100644 --- a/test_runner/regress/test_pageserver_catchup.py +++ b/test_runner/regress/test_pageserver_catchup.py @@ -1,4 +1,9 @@ +import time +from functools import partial + from fixtures.neon_fixtures import NeonEnvBuilder +from fixtures.utils import wait_until +from regress.test_wal_acceptor import assert_commit_lsn_equals_flush_lsn # Test safekeeper sync and pageserver catch up @@ -9,7 +14,9 @@ def test_pageserver_catchup_while_compute_down(neon_env_builder: NeonEnvBuilder) neon_env_builder.num_safekeepers = 3 env = neon_env_builder.init_start() - env.neon_cli.create_branch("test_pageserver_catchup_while_compute_down") + tenant_id = env.initial_tenant + + timeline_id = env.neon_cli.create_branch("test_pageserver_catchup_while_compute_down") # Make shared_buffers large to ensure we won't query pageserver while it is down. endpoint = env.endpoints.create_start( "test_pageserver_catchup_while_compute_down", config_lines=["shared_buffers=512MB"] @@ -45,12 +52,23 @@ def test_pageserver_catchup_while_compute_down(neon_env_builder: NeonEnvBuilder) FROM generate_series(1, 10000) g """ ) + endpoint.stop() + + # wait until safekeepers catch up. This forces/tests fast path which avoids + # sync-safekeepers on next compute start. + for sk in env.safekeepers: + wait_until(10, 0.5, partial(assert_commit_lsn_equals_flush_lsn, sk, tenant_id, timeline_id)) # stop safekeepers gracefully env.safekeepers[0].stop() env.safekeepers[1].stop() env.safekeepers[2].stop() + # Wait until in flight messages to broker arrive so pageserver won't know + # where to connect if timeline is not activated on safekeeper after restart + # -- we had such a bug once. + time.sleep(1) + # start everything again # safekeepers must synchronize and pageserver must catch up env.pageserver.start() @@ -59,7 +77,7 @@ def test_pageserver_catchup_while_compute_down(neon_env_builder: NeonEnvBuilder) env.safekeepers[2].start() # restart compute node - endpoint.stop_and_destroy().create_start("test_pageserver_catchup_while_compute_down") + endpoint = env.endpoints.create_start("test_pageserver_catchup_while_compute_down") # Ensure that basebackup went correct and pageserver returned all data pg_conn = endpoint.connect() diff --git a/test_runner/regress/test_wal_acceptor.py b/test_runner/regress/test_wal_acceptor.py index 8ca93845b2..4648fd6783 100644 --- a/test_runner/regress/test_wal_acceptor.py +++ b/test_runner/regress/test_wal_acceptor.py @@ -424,6 +424,15 @@ def is_flush_lsn_caught_up(sk: Safekeeper, tenant_id: TenantId, timeline_id: Tim return tli_status.flush_lsn >= lsn +def assert_commit_lsn_equals_flush_lsn( + sk: Safekeeper, tenant_id: TenantId, timeline_id: TimelineId +): + http_cli = sk.http_client() + tli_status = http_cli.timeline_status(tenant_id, timeline_id) + log.info(f"sk status is {tli_status}") + assert tli_status.flush_lsn == tli_status.commit_lsn + + def is_wal_trimmed(sk: Safekeeper, tenant_id: TenantId, timeline_id: TimelineId, target_size_mb): http_cli = sk.http_client() tli_status = http_cli.timeline_status(tenant_id, timeline_id)