From f7f377cfc1126331bd93b8e2241ba552be16e86f Mon Sep 17 00:00:00 2001 From: Arthur Petukhovsky Date: Thu, 7 Oct 2021 16:15:44 +0300 Subject: [PATCH] Add safekeeper test with large wal records --- .gitmodules | 2 +- .../batch_others/test_wal_acceptor_async.py | 33 +++++++++++++++---- vendor/postgres | 2 +- 3 files changed, 28 insertions(+), 9 deletions(-) diff --git a/.gitmodules b/.gitmodules index 8975c6e2fa..d3806819da 100644 --- a/.gitmodules +++ b/.gitmodules @@ -1,4 +1,4 @@ [submodule "vendor/postgres"] path = vendor/postgres url = https://github.com/zenithdb/postgres - branch = main + branch = walproposer_more_logs diff --git a/test_runner/batch_others/test_wal_acceptor_async.py b/test_runner/batch_others/test_wal_acceptor_async.py index b1647a8544..984cf93b37 100644 --- a/test_runner/batch_others/test_wal_acceptor_async.py +++ b/test_runner/batch_others/test_wal_acceptor_async.py @@ -33,11 +33,16 @@ class BankClient(object): row = await self.conn.fetchrow('SELECT sum(amount) AS sum FROM bank_accs') assert row['sum'] == self.n_accounts * self.init_amount -async def bank_transfer(conn: asyncpg.Connection, from_uid, to_uid, amount): +async def bank_transfer(conn: asyncpg.Connection, from_uid, to_uid, amount, large_wal=False): # avoid deadlocks by sorting uids if from_uid > to_uid: from_uid, to_uid, amount = to_uid, from_uid, -amount + if large_wal: + # record with size about 128kb + await conn.execute("SELECT pg_logical_emit_message(false, 'hello', REPEAT('abacaba', 19000))") + return + async with conn.transaction(): await conn.execute( 'UPDATE bank_accs SET amount = amount + ($1) WHERE uid = $2', @@ -72,7 +77,7 @@ class WorkerStats(object): print('All workers made {} transactions'.format(progress)) -async def run_random_worker(stats: WorkerStats, pg: Postgres, worker_id, n_accounts, max_transfer): +async def run_random_worker(stats: WorkerStats, pg: Postgres, worker_id, n_accounts, max_transfer, large_wal=False): pg_conn = await pg.connect_async() debug_print('Started worker {}'.format(worker_id)) @@ -81,7 +86,7 @@ async def run_random_worker(stats: WorkerStats, pg: Postgres, worker_id, n_accou to_uid = (from_uid + random.randint(1, n_accounts - 1)) % n_accounts amount = random.randint(1, max_transfer) - await bank_transfer(pg_conn, from_uid, to_uid, amount) + await bank_transfer(pg_conn, from_uid, to_uid, amount, large_wal=large_wal) stats.inc_progress(worker_id) debug_print('Executed transfer({}) {} => {}'.format(amount, from_uid, to_uid)) @@ -95,12 +100,10 @@ async def run_random_worker(stats: WorkerStats, pg: Postgres, worker_id, n_accou # On each iteration 1 acceptor is stopped, and 2 others should allow # background workers execute transactions. In the end, state should remain # consistent. -async def run_restarts_under_load(pg: Postgres, acceptors: List[WalAcceptor], n_workers=10): +async def run_restarts_under_load(pg: Postgres, acceptors: List[WalAcceptor], n_workers=10, period_time=10, iterations=6, large_wal=False): n_accounts = 100 init_amount = 100000 max_transfer = 100 - period_time = 10 - iterations = 6 pg_conn = await pg.connect_async() bank = BankClient(pg_conn, n_accounts=n_accounts, init_amount=init_amount) @@ -110,7 +113,7 @@ async def run_restarts_under_load(pg: Postgres, acceptors: List[WalAcceptor], n_ stats = WorkerStats(n_workers) workers = [] for worker_id in range(n_workers): - worker = run_random_worker(stats, pg, worker_id, bank.n_accounts, max_transfer) + worker = run_random_worker(stats, pg, worker_id, bank.n_accounts, max_transfer, large_wal=large_wal) workers.append(asyncio.create_task(worker)) @@ -157,3 +160,19 @@ def test_restarts_under_load(zenith_cli, pageserver: ZenithPageserver, postgres: # TODO: Remove when https://github.com/zenithdb/zenith/issues/644 is fixed pg.stop() + +# test conditition when wal records are too large to be contained in single wal data message +# current MAX_SEND_SIZE is 8192 * 16 = 128kb +def test_restarts_under_load_large(zenith_cli, pageserver: ZenithPageserver, postgres: PostgresFactory, + wa_factory: WalAcceptorFactory): + + wa_factory.start_n_new(3) + + zenith_cli.run(["branch", "test_restarts_under_load_large", "empty"]) + pg = postgres.create_start('test_restarts_under_load_large', + wal_acceptors=wa_factory.get_connstrs()) + + asyncio.run(run_restarts_under_load(pg, wa_factory.instances, n_workers=2, period_time=7, iterations=10, large_wal=True)) + + # TODO: Remove when https://github.com/zenithdb/zenith/issues/644 is fixed + pg.stop() diff --git a/vendor/postgres b/vendor/postgres index 5387eb4a3b..d9f7f10320 160000 --- a/vendor/postgres +++ b/vendor/postgres @@ -1 +1 @@ -Subproject commit 5387eb4a3b892d3ff18a3a93f4bd996d43ea3b33 +Subproject commit d9f7f1032011defc9f816b4bf2247e4f234a2cec