mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-13 16:32:56 +00:00
Add safekeeper test with large wal records
This commit is contained in:
2
.gitmodules
vendored
2
.gitmodules
vendored
@@ -1,4 +1,4 @@
|
||||
[submodule "vendor/postgres"]
|
||||
path = vendor/postgres
|
||||
url = https://github.com/zenithdb/postgres
|
||||
branch = main
|
||||
branch = walproposer_more_logs
|
||||
|
||||
@@ -33,11 +33,16 @@ class BankClient(object):
|
||||
row = await self.conn.fetchrow('SELECT sum(amount) AS sum FROM bank_accs')
|
||||
assert row['sum'] == self.n_accounts * self.init_amount
|
||||
|
||||
async def bank_transfer(conn: asyncpg.Connection, from_uid, to_uid, amount):
|
||||
async def bank_transfer(conn: asyncpg.Connection, from_uid, to_uid, amount, large_wal=False):
|
||||
# avoid deadlocks by sorting uids
|
||||
if from_uid > to_uid:
|
||||
from_uid, to_uid, amount = to_uid, from_uid, -amount
|
||||
|
||||
if large_wal:
|
||||
# record with size about 128kb
|
||||
await conn.execute("SELECT pg_logical_emit_message(false, 'hello', REPEAT('abacaba', 19000))")
|
||||
return
|
||||
|
||||
async with conn.transaction():
|
||||
await conn.execute(
|
||||
'UPDATE bank_accs SET amount = amount + ($1) WHERE uid = $2',
|
||||
@@ -72,7 +77,7 @@ class WorkerStats(object):
|
||||
print('All workers made {} transactions'.format(progress))
|
||||
|
||||
|
||||
async def run_random_worker(stats: WorkerStats, pg: Postgres, worker_id, n_accounts, max_transfer):
|
||||
async def run_random_worker(stats: WorkerStats, pg: Postgres, worker_id, n_accounts, max_transfer, large_wal=False):
|
||||
pg_conn = await pg.connect_async()
|
||||
debug_print('Started worker {}'.format(worker_id))
|
||||
|
||||
@@ -81,7 +86,7 @@ async def run_random_worker(stats: WorkerStats, pg: Postgres, worker_id, n_accou
|
||||
to_uid = (from_uid + random.randint(1, n_accounts - 1)) % n_accounts
|
||||
amount = random.randint(1, max_transfer)
|
||||
|
||||
await bank_transfer(pg_conn, from_uid, to_uid, amount)
|
||||
await bank_transfer(pg_conn, from_uid, to_uid, amount, large_wal=large_wal)
|
||||
stats.inc_progress(worker_id)
|
||||
|
||||
debug_print('Executed transfer({}) {} => {}'.format(amount, from_uid, to_uid))
|
||||
@@ -95,12 +100,10 @@ async def run_random_worker(stats: WorkerStats, pg: Postgres, worker_id, n_accou
|
||||
# On each iteration 1 acceptor is stopped, and 2 others should allow
|
||||
# background workers execute transactions. In the end, state should remain
|
||||
# consistent.
|
||||
async def run_restarts_under_load(pg: Postgres, acceptors: List[WalAcceptor], n_workers=10):
|
||||
async def run_restarts_under_load(pg: Postgres, acceptors: List[WalAcceptor], n_workers=10, period_time=10, iterations=6, large_wal=False):
|
||||
n_accounts = 100
|
||||
init_amount = 100000
|
||||
max_transfer = 100
|
||||
period_time = 10
|
||||
iterations = 6
|
||||
|
||||
pg_conn = await pg.connect_async()
|
||||
bank = BankClient(pg_conn, n_accounts=n_accounts, init_amount=init_amount)
|
||||
@@ -110,7 +113,7 @@ async def run_restarts_under_load(pg: Postgres, acceptors: List[WalAcceptor], n_
|
||||
stats = WorkerStats(n_workers)
|
||||
workers = []
|
||||
for worker_id in range(n_workers):
|
||||
worker = run_random_worker(stats, pg, worker_id, bank.n_accounts, max_transfer)
|
||||
worker = run_random_worker(stats, pg, worker_id, bank.n_accounts, max_transfer, large_wal=large_wal)
|
||||
workers.append(asyncio.create_task(worker))
|
||||
|
||||
|
||||
@@ -157,3 +160,19 @@ def test_restarts_under_load(zenith_cli, pageserver: ZenithPageserver, postgres:
|
||||
|
||||
# TODO: Remove when https://github.com/zenithdb/zenith/issues/644 is fixed
|
||||
pg.stop()
|
||||
|
||||
# test conditition when wal records are too large to be contained in single wal data message
|
||||
# current MAX_SEND_SIZE is 8192 * 16 = 128kb
|
||||
def test_restarts_under_load_large(zenith_cli, pageserver: ZenithPageserver, postgres: PostgresFactory,
|
||||
wa_factory: WalAcceptorFactory):
|
||||
|
||||
wa_factory.start_n_new(3)
|
||||
|
||||
zenith_cli.run(["branch", "test_restarts_under_load_large", "empty"])
|
||||
pg = postgres.create_start('test_restarts_under_load_large',
|
||||
wal_acceptors=wa_factory.get_connstrs())
|
||||
|
||||
asyncio.run(run_restarts_under_load(pg, wa_factory.instances, n_workers=2, period_time=7, iterations=10, large_wal=True))
|
||||
|
||||
# TODO: Remove when https://github.com/zenithdb/zenith/issues/644 is fixed
|
||||
pg.stop()
|
||||
|
||||
2
vendor/postgres
vendored
2
vendor/postgres
vendored
Submodule vendor/postgres updated: 5387eb4a3b...d9f7f10320
Reference in New Issue
Block a user