tests: fix flaky endpoint in test_ingest_logical_message (#10700)

## Problem

Endpoint kept running while timeline was deleted, causing forbidden
warnings on the pageserver when the tenant is not found.

## Summary of changes

- Explicitly stop the endpoint before the end of the test, so that it
isn't trying to talk to the pageserver in the background while things
are torn down
This commit is contained in:
John Spray
2025-02-07 14:51:36 +00:00
committed by GitHub
parent 08f92bb916
commit 95220ba43e
2 changed files with 13 additions and 2 deletions

View File

@@ -5122,12 +5122,14 @@ def wait_for_last_flush_lsn(
timeline: TimelineId,
pageserver_id: int | None = None,
auth_token: str | None = None,
last_flush_lsn: Lsn | None = None,
) -> Lsn:
"""Wait for pageserver to catch up the latest flush LSN, returns the last observed lsn."""
shards = tenant_get_shards(env, tenant, pageserver_id)
last_flush_lsn = Lsn(endpoint.safe_psql("SELECT pg_current_wal_flush_lsn()")[0][0])
if last_flush_lsn is None:
last_flush_lsn = Lsn(endpoint.safe_psql("SELECT pg_current_wal_flush_lsn()")[0][0])
results = []
for tenant_shard_id, pageserver in shards:

View File

@@ -76,6 +76,9 @@ def test_ingest_logical_message(
log.info("Waiting for Pageserver to catch up")
wait_for_last_record_lsn(client, env.initial_tenant, env.initial_timeline, end_lsn)
recover_to_lsn = Lsn(endpoint.safe_psql("select pg_current_wal_lsn()")[0][0])
endpoint.stop()
# Now that all data is ingested, delete and recreate the tenant in the pageserver. This will
# reingest all the WAL from the safekeeper without any other constraints. This gives us a
# baseline of how fast the pageserver can ingest this WAL in isolation.
@@ -88,7 +91,13 @@ def test_ingest_logical_message(
with zenbenchmark.record_duration("pageserver_recover_ingest"):
log.info("Recovering WAL into pageserver")
client.timeline_create(env.pg_version, env.initial_tenant, env.initial_timeline)
wait_for_last_flush_lsn(env, endpoint, env.initial_tenant, env.initial_timeline)
wait_for_last_flush_lsn(
env, endpoint, env.initial_tenant, env.initial_timeline, last_flush_lsn=recover_to_lsn
)
# Check endpoint can start, i.e. we really recovered
endpoint.start()
wait_for_last_flush_lsn(env, endpoint, env.initial_tenant, env.initial_timeline)
# Emit metrics.
wal_written_mb = round((end_lsn - start_lsn) / (1024 * 1024))