diff --git a/test_runner/batch_others/test_wal_acceptor_async.py b/test_runner/batch_others/test_wal_acceptor_async.py index a5d4191375..a86b17c946 100644 --- a/test_runner/batch_others/test_wal_acceptor_async.py +++ b/test_runner/batch_others/test_wal_acceptor_async.py @@ -1,9 +1,11 @@ import asyncio import asyncpg import random +import time from fixtures.zenith_fixtures import WalAcceptor, WalAcceptorFactory, ZenithPageserver, PostgresFactory, Postgres from fixtures.log_helper import getLogger +from fixtures.utils import lsn_from_hex, lsn_to_hex from typing import List log = getLogger('root.wal_acceptor_async') @@ -102,6 +104,38 @@ async def run_random_worker(stats: WorkerStats, pg: Postgres, worker_id, n_accou await pg_conn.close() +async def wait_for_lsn(safekeeper: WalAcceptor, + tenant_id: str, + timeline_id: str, + wait_lsn: str, + polling_interval=1, + timeout=600): + """ + Poll flush_lsn from safekeeper until it's greater or equal than + provided wait_lsn. To do that, timeline_status is fetched from + safekeeper every polling_interval seconds. + """ + + started_at = time.time() + client = safekeeper.http_client() + + flush_lsn = client.timeline_status(tenant_id, timeline_id).flush_lsn + log.info( + f'Safekeeper at port {safekeeper.port.pg} has flush_lsn {flush_lsn}, waiting for lsn {wait_lsn}' + ) + + while lsn_from_hex(wait_lsn) > lsn_from_hex(flush_lsn): + elapsed = time.time() - started_at + if elapsed > timeout: + raise RuntimeError( + f"timed out waiting for safekeeper at port {safekeeper.port.pg} to reach {wait_lsn}, current lsn is {flush_lsn}" + ) + + await asyncio.sleep(polling_interval) + flush_lsn = client.timeline_status(tenant_id, timeline_id).flush_lsn + log.debug(f'safekeeper port={safekeeper.port.pg} flush_lsn={flush_lsn} wait_lsn={wait_lsn}') + + # This test will run several iterations and check progress in each of them. # On each iteration 1 acceptor is stopped, and 2 others should allow # background workers execute transactions. In the end, state should remain @@ -114,6 +148,9 @@ async def run_restarts_under_load(pg: Postgres, acceptors: List[WalAcceptor], n_ iterations = 6 pg_conn = await pg.connect_async() + tenant_id = await pg_conn.fetchval("show zenith.zenith_tenant") + timeline_id = await pg_conn.fetchval("show zenith.zenith_timeline") + bank = BankClient(pg_conn, n_accounts=n_accounts, init_amount=init_amount) # create tables and initial balances await bank.initdb() @@ -125,14 +162,18 @@ async def run_restarts_under_load(pg: Postgres, acceptors: List[WalAcceptor], n_ workers.append(asyncio.create_task(worker)) for it in range(iterations): - victim = acceptors[it % len(acceptors)] + victim_idx = it % len(acceptors) + victim = acceptors[victim_idx] victim.stop() - # Wait till previous victim recovers so it is ready for the next - # iteration by making any writing xact. - conn = await pg.connect_async() - await conn.execute('UPDATE bank_accs SET amount = amount WHERE uid = 1', timeout=120) - await conn.close() + flush_lsn = await pg_conn.fetchval('SELECT pg_current_wal_flush_lsn()') + flush_lsn = lsn_to_hex(flush_lsn) + log.info(f'Postgres flush_lsn {flush_lsn}') + + # Wait until alive safekeepers catch up with postgres + for idx, safekeeper in enumerate(acceptors): + if idx != victim_idx: + await wait_for_lsn(safekeeper, tenant_id, timeline_id, flush_lsn) stats.reset() await asyncio.sleep(period_time) diff --git a/test_runner/fixtures/utils.py b/test_runner/fixtures/utils.py index dbb1809a2b..51005e3c48 100644 --- a/test_runner/fixtures/utils.py +++ b/test_runner/fixtures/utils.py @@ -63,3 +63,9 @@ def global_counter() -> int: def lsn_to_hex(num: int) -> str: """ Convert lsn from int to standard hex notation. """ return "{:X}/{:X}".format(num >> 32, num & 0xffffffff) + + +def lsn_from_hex(lsn_hex: str) -> int: + """ Convert lsn from hex notation to int. """ + l, r = lsn_hex.split('/') + return (int(l, 16) << 32) + int(r, 16) diff --git a/test_runner/fixtures/zenith_fixtures.py b/test_runner/fixtures/zenith_fixtures.py index 868f14ab29..b343828006 100644 --- a/test_runner/fixtures/zenith_fixtures.py +++ b/test_runner/fixtures/zenith_fixtures.py @@ -1031,8 +1031,9 @@ def wa_factory(zenith_binpath: str, @dataclass -class PageserverTimelineStatus: +class SafekeeperTimelineStatus: acceptor_epoch: int + flush_lsn: str class WalAcceptorHttpClient(requests.Session): @@ -1043,11 +1044,12 @@ class WalAcceptorHttpClient(requests.Session): def check_status(self): self.get(f"http://localhost:{self.port}/v1/status").raise_for_status() - def timeline_status(self, tenant_id: str, timeline_id: str) -> PageserverTimelineStatus: + def timeline_status(self, tenant_id: str, timeline_id: str) -> SafekeeperTimelineStatus: res = self.get(f"http://localhost:{self.port}/v1/timeline/{tenant_id}/{timeline_id}") res.raise_for_status() resj = res.json() - return PageserverTimelineStatus(acceptor_epoch=resj['acceptor_state']['epoch']) + return SafekeeperTimelineStatus(acceptor_epoch=resj['acceptor_state']['epoch'], + flush_lsn=resj['flush_lsn']) @zenfixture diff --git a/walkeeper/src/http/routes.rs b/walkeeper/src/http/routes.rs index 8ab405508e..f5a69d3522 100644 --- a/walkeeper/src/http/routes.rs +++ b/walkeeper/src/http/routes.rs @@ -49,6 +49,8 @@ struct TimelineStatus { commit_lsn: Lsn, #[serde(serialize_with = "display_serialize")] truncate_lsn: Lsn, + #[serde(serialize_with = "display_serialize")] + flush_lsn: Lsn, } /// Report info about timeline. @@ -64,6 +66,7 @@ async fn timeline_status_handler(request: Request
) -> Result