diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index 690e5cdcc4..3d3a445b97 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -5122,12 +5122,14 @@ def wait_for_last_flush_lsn( timeline: TimelineId, pageserver_id: int | None = None, auth_token: str | None = None, + last_flush_lsn: Lsn | None = None, ) -> Lsn: """Wait for pageserver to catch up the latest flush LSN, returns the last observed lsn.""" shards = tenant_get_shards(env, tenant, pageserver_id) - last_flush_lsn = Lsn(endpoint.safe_psql("SELECT pg_current_wal_flush_lsn()")[0][0]) + if last_flush_lsn is None: + last_flush_lsn = Lsn(endpoint.safe_psql("SELECT pg_current_wal_flush_lsn()")[0][0]) results = [] for tenant_shard_id, pageserver in shards: diff --git a/test_runner/performance/test_ingest_logical_message.py b/test_runner/performance/test_ingest_logical_message.py index d3118eb15a..b55cb68b64 100644 --- a/test_runner/performance/test_ingest_logical_message.py +++ b/test_runner/performance/test_ingest_logical_message.py @@ -76,6 +76,9 @@ def test_ingest_logical_message( log.info("Waiting for Pageserver to catch up") wait_for_last_record_lsn(client, env.initial_tenant, env.initial_timeline, end_lsn) + recover_to_lsn = Lsn(endpoint.safe_psql("select pg_current_wal_lsn()")[0][0]) + endpoint.stop() + # Now that all data is ingested, delete and recreate the tenant in the pageserver. This will # reingest all the WAL from the safekeeper without any other constraints. This gives us a # baseline of how fast the pageserver can ingest this WAL in isolation. @@ -88,7 +91,13 @@ def test_ingest_logical_message( with zenbenchmark.record_duration("pageserver_recover_ingest"): log.info("Recovering WAL into pageserver") client.timeline_create(env.pg_version, env.initial_tenant, env.initial_timeline) - wait_for_last_flush_lsn(env, endpoint, env.initial_tenant, env.initial_timeline) + wait_for_last_flush_lsn( + env, endpoint, env.initial_tenant, env.initial_timeline, last_flush_lsn=recover_to_lsn + ) + + # Check endpoint can start, i.e. we really recovered + endpoint.start() + wait_for_last_flush_lsn(env, endpoint, env.initial_tenant, env.initial_timeline) # Emit metrics. wal_written_mb = round((end_lsn - start_lsn) / (1024 * 1024))