mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-04 20:12:54 +00:00
Add test for acceptor restarts under load (#591)
In this test safekeepers are restarted one by one, while bank transactions are executed and validated in the background. Bank transactions consist of balance transfers and log writes. In the end balance sum should remain the same and there should be progress from every client, when 2 of 3 safekeeper nodes are up.
This commit is contained in:
committed by
GitHub
parent
16d3dc821a
commit
8ebf2fe550
@@ -10,6 +10,7 @@ typing-extensions = "*"
|
||||
pyjwt = {extras = ["crypto"], version = "*"}
|
||||
requests = "*"
|
||||
pytest-xdist = "*"
|
||||
asyncpg = "*"
|
||||
|
||||
[dev-packages]
|
||||
yapf = "*"
|
||||
|
||||
35
test_runner/Pipfile.lock
generated
35
test_runner/Pipfile.lock
generated
@@ -1,7 +1,7 @@
|
||||
{
|
||||
"_meta": {
|
||||
"hash": {
|
||||
"sha256": "480afaf71a214984dac55d128a4f67ec2d9749136e570c64df562c79900a9d83"
|
||||
"sha256": "3cdc048691824d0b93912b6b78a0aa01dc98f278212c1badb0cc2edbd2103c3a"
|
||||
},
|
||||
"pipfile-spec": 6,
|
||||
"requires": {
|
||||
@@ -16,6 +16,25 @@
|
||||
]
|
||||
},
|
||||
"default": {
|
||||
"asyncpg": {
|
||||
"hashes": [
|
||||
"sha256:129d501f3d30616afd51eb8d3142ef51ba05374256bd5834cec3ef4956a9b317",
|
||||
"sha256:29ef6ae0a617fc13cc2ac5dc8e9b367bb83cba220614b437af9b67766f4b6b20",
|
||||
"sha256:41704c561d354bef01353835a7846e5606faabbeb846214dfcf666cf53319f18",
|
||||
"sha256:556b0e92e2b75dc028b3c4bc9bd5162ddf0053b856437cf1f04c97f9c6837d03",
|
||||
"sha256:8ff5073d4b654e34bd5eaadc01dc4d68b8a9609084d835acd364cd934190a08d",
|
||||
"sha256:a458fc69051fbb67d995fdda46d75a012b5d6200f91e17d23d4751482640ed4c",
|
||||
"sha256:a7095890c96ba36f9f668eb552bb020dddb44f8e73e932f8573efc613ee83843",
|
||||
"sha256:a738f4807c853623d3f93f0fea11f61be6b0e5ca16ea8aeb42c2c7ee742aa853",
|
||||
"sha256:c4fc0205fe4ddd5aeb3dfdc0f7bafd43411181e1f5650189608e5971cceacff1",
|
||||
"sha256:dd2fa063c3344823487d9ddccb40802f02622ddf8bf8a6cc53885ee7a2c1c0c6",
|
||||
"sha256:ddffcb85227bf39cd1bedd4603e0082b243cf3b14ced64dce506a15b05232b83",
|
||||
"sha256:e36c6806883786b19551bb70a4882561f31135dc8105a59662e0376cf5b2cbc5",
|
||||
"sha256:eed43abc6ccf1dc02e0d0efc06ce46a411362f3358847c6b0ec9a43426f91ece"
|
||||
],
|
||||
"index": "pypi",
|
||||
"version": "==0.24.0"
|
||||
},
|
||||
"attrs": {
|
||||
"hashes": [
|
||||
"sha256:149e90d6d8ac20db7a955ad60cf0e6881a3f20d37096140088356da6c716b0b1",
|
||||
@@ -83,11 +102,11 @@
|
||||
},
|
||||
"charset-normalizer": {
|
||||
"hashes": [
|
||||
"sha256:0c8911edd15d19223366a194a513099a302055a962bca2cec0f54b8b63175d8b",
|
||||
"sha256:f23667ebe1084be45f6ae0538e4a5a865206544097e4e8bbcacf42cd02a348f3"
|
||||
"sha256:5d209c0a931f215cee683b6445e2d77677e7e75e159f78def0db09d68fafcaa6",
|
||||
"sha256:5ec46d183433dcbd0ab716f2d7f29d8dee50505b3fdb40c6b985c7c4f5a3591f"
|
||||
],
|
||||
"markers": "python_version >= '3'",
|
||||
"version": "==2.0.4"
|
||||
"version": "==2.0.6"
|
||||
},
|
||||
"cryptography": {
|
||||
"hashes": [
|
||||
@@ -96,8 +115,10 @@
|
||||
"sha256:21ca464b3a4b8d8e86ba0ee5045e103a1fcfac3b39319727bc0fc58c09c6aff7",
|
||||
"sha256:34dae04a0dce5730d8eb7894eab617d8a70d0c97da76b905de9efb7128ad7085",
|
||||
"sha256:3520667fda779eb788ea00080124875be18f2d8f0848ec00733c0ec3bb8219fc",
|
||||
"sha256:3c4129fc3fdc0fa8e40861b5ac0c673315b3c902bbdc05fc176764815b43dd1d",
|
||||
"sha256:3fa3a7ccf96e826affdf1a0a9432be74dc73423125c8f96a909e3835a5ef194a",
|
||||
"sha256:5b0fbfae7ff7febdb74b574055c7466da334a5371f253732d7e2e7525d570498",
|
||||
"sha256:695104a9223a7239d155d7627ad912953b540929ef97ae0c34c7b8bf30857e89",
|
||||
"sha256:8695456444f277af73a4877db9fc979849cd3ee74c198d04fc0776ebc3db52b9",
|
||||
"sha256:94cc5ed4ceaefcbe5bf38c8fba6a21fc1d365bb8fb826ea1688e3370b2e24a1c",
|
||||
"sha256:94fff993ee9bc1b2440d3b7243d488c6a3d9724cc2b09cdb297f6a886d040ef7",
|
||||
@@ -218,11 +239,11 @@
|
||||
},
|
||||
"pytest-xdist": {
|
||||
"hashes": [
|
||||
"sha256:e8ecde2f85d88fbcadb7d28cb33da0fa29bca5cf7d5967fa89fc0e97e5299ea5",
|
||||
"sha256:ed3d7da961070fce2a01818b51f6888327fb88df4379edeb6b9d990e789d9c8d"
|
||||
"sha256:7b61ebb46997a0820a263553179d6d1e25a8c50d8a8620cd1aa1e20e3be99168",
|
||||
"sha256:89b330316f7fc475f999c81b577c2b926c9569f3d397ae432c0c2e2496d61ff9"
|
||||
],
|
||||
"index": "pypi",
|
||||
"version": "==2.3.0"
|
||||
"version": "==2.4.0"
|
||||
},
|
||||
"requests": {
|
||||
"hashes": [
|
||||
|
||||
154
test_runner/batch_others/test_wal_acceptor_async.py
Normal file
154
test_runner/batch_others/test_wal_acceptor_async.py
Normal file
@@ -0,0 +1,154 @@
|
||||
import asyncio
|
||||
import asyncpg
|
||||
import random
|
||||
|
||||
from fixtures.zenith_fixtures import WalAcceptor, WalAcceptorFactory, ZenithPageserver, PostgresFactory, Postgres
|
||||
from typing import List
|
||||
from fixtures.utils import debug_print
|
||||
|
||||
pytest_plugins = ("fixtures.zenith_fixtures")
|
||||
|
||||
|
||||
class BankClient(object):
|
||||
def __init__(self, conn: asyncpg.Connection, n_accounts, init_amount):
|
||||
self.conn: asyncpg.Connection = conn
|
||||
self.n_accounts = n_accounts
|
||||
self.init_amount = init_amount
|
||||
|
||||
async def initdb(self):
|
||||
await self.conn.execute('DROP TABLE IF EXISTS bank_accs')
|
||||
await self.conn.execute('CREATE TABLE bank_accs(uid int primary key, amount int)')
|
||||
await self.conn.execute('''
|
||||
INSERT INTO bank_accs
|
||||
SELECT *, $1 FROM generate_series(0, $2)
|
||||
''', self.init_amount, self.n_accounts - 1)
|
||||
await self.conn.execute('DROP TABLE IF EXISTS bank_log')
|
||||
await self.conn.execute('CREATE TABLE bank_log(from_uid int, to_uid int, amount int)')
|
||||
|
||||
# TODO: Remove when https://github.com/zenithdb/zenith/issues/644 is fixed
|
||||
await self.conn.execute('ALTER TABLE bank_accs SET (autovacuum_enabled = false)')
|
||||
await self.conn.execute('ALTER TABLE bank_log SET (autovacuum_enabled = false)')
|
||||
|
||||
async def check_invariant(self):
|
||||
row = await self.conn.fetchrow('SELECT sum(amount) AS sum FROM bank_accs')
|
||||
assert row['sum'] == self.n_accounts * self.init_amount
|
||||
|
||||
async def bank_transfer(conn: asyncpg.Connection, from_uid, to_uid, amount):
|
||||
# avoid deadlocks by sorting uids
|
||||
if from_uid > to_uid:
|
||||
from_uid, to_uid, amount = to_uid, from_uid, -amount
|
||||
|
||||
async with conn.transaction():
|
||||
await conn.execute(
|
||||
'UPDATE bank_accs SET amount = amount + ($1) WHERE uid = $2',
|
||||
amount, to_uid,
|
||||
)
|
||||
await conn.execute(
|
||||
'UPDATE bank_accs SET amount = amount - ($1) WHERE uid = $2',
|
||||
amount, from_uid,
|
||||
)
|
||||
await conn.execute('INSERT INTO bank_log VALUES ($1, $2, $3)',
|
||||
from_uid, to_uid, amount,
|
||||
)
|
||||
|
||||
class WorkerStats(object):
|
||||
def __init__(self, n_workers):
|
||||
self.counters = [0] * n_workers
|
||||
self.running = True
|
||||
|
||||
def reset(self):
|
||||
self.counters = [0] * len(self.counters)
|
||||
|
||||
def inc_progress(self, worker_id):
|
||||
self.counters[worker_id] += 1
|
||||
|
||||
def check_progress(self):
|
||||
debug_print("Workers progress: {}".format(self.counters))
|
||||
|
||||
# every worker should finish at least one tx
|
||||
assert all(cnt > 0 for cnt in self.counters)
|
||||
|
||||
progress = sum(self.counters)
|
||||
print('All workers made {} transactions'.format(progress))
|
||||
|
||||
|
||||
async def run_random_worker(stats: WorkerStats, pg: Postgres, worker_id, n_accounts, max_transfer):
|
||||
pg_conn = await pg.connect_async()
|
||||
debug_print('Started worker {}'.format(worker_id))
|
||||
|
||||
while stats.running:
|
||||
from_uid = random.randint(0, n_accounts - 1)
|
||||
to_uid = (from_uid + random.randint(1, n_accounts - 1)) % n_accounts
|
||||
amount = random.randint(1, max_transfer)
|
||||
|
||||
await bank_transfer(pg_conn, from_uid, to_uid, amount)
|
||||
stats.inc_progress(worker_id)
|
||||
|
||||
debug_print('Executed transfer({}) {} => {}'.format(amount, from_uid, to_uid))
|
||||
|
||||
debug_print('Finished worker {}'.format(worker_id))
|
||||
|
||||
await pg_conn.close()
|
||||
|
||||
|
||||
# This test will run several iterations and check progress in each of them.
|
||||
# On each iteration 1 acceptor is stopped, and 2 others should allow
|
||||
# background workers execute transactions. In the end, state should remain
|
||||
# consistent.
|
||||
async def run_restarts_under_load(pg: Postgres, acceptors: List[WalAcceptor], n_workers=10):
|
||||
n_accounts = 100
|
||||
init_amount = 100000
|
||||
max_transfer = 100
|
||||
period_time = 10
|
||||
iterations = 6
|
||||
|
||||
pg_conn = await pg.connect_async()
|
||||
bank = BankClient(pg_conn, n_accounts=n_accounts, init_amount=init_amount)
|
||||
# create tables and initial balances
|
||||
await bank.initdb()
|
||||
|
||||
stats = WorkerStats(n_workers)
|
||||
workers = []
|
||||
for worker_id in range(n_workers):
|
||||
worker = run_random_worker(stats, pg, worker_id, bank.n_accounts, max_transfer)
|
||||
workers.append(asyncio.create_task(worker))
|
||||
|
||||
|
||||
for it in range(iterations):
|
||||
victim = acceptors[it % len(acceptors)]
|
||||
victim.stop()
|
||||
|
||||
# wait for transactions that could have started and finished before
|
||||
# victim acceptor was stopped
|
||||
await asyncio.sleep(1)
|
||||
|
||||
stats.reset()
|
||||
await asyncio.sleep(period_time)
|
||||
# assert that at least one transaction has completed in every worker
|
||||
stats.check_progress()
|
||||
|
||||
victim.start()
|
||||
|
||||
print('Iterations are finished, exiting coroutines...')
|
||||
stats.running = False
|
||||
# await all workers
|
||||
await asyncio.gather(*workers)
|
||||
# assert balances sum hasn't changed
|
||||
await bank.check_invariant()
|
||||
await pg_conn.close()
|
||||
|
||||
|
||||
# restart acceptors one by one, while executing and validating bank transactions
|
||||
def test_restarts_under_load(zenith_cli, pageserver: ZenithPageserver, postgres: PostgresFactory,
|
||||
wa_factory: WalAcceptorFactory):
|
||||
|
||||
wa_factory.start_n_new(3)
|
||||
|
||||
zenith_cli.run(["branch", "test_wal_acceptors_restarts_under_load", "empty"])
|
||||
pg = postgres.create_start('test_wal_acceptors_restarts_under_load',
|
||||
wal_acceptors=wa_factory.get_connstrs())
|
||||
|
||||
asyncio.run(run_restarts_under_load(pg, wa_factory.instances))
|
||||
|
||||
# TODO: Remove when https://github.com/zenithdb/zenith/issues/644 is fixed
|
||||
pg.stop()
|
||||
@@ -54,3 +54,11 @@ def global_counter() -> int:
|
||||
global _global_counter
|
||||
_global_counter += 1
|
||||
return _global_counter
|
||||
|
||||
def debug_print(*args, **kwargs) -> None:
|
||||
""" Print to the console if TEST_DEBUG_PRINT is set in env.
|
||||
|
||||
All parameters are passed to print().
|
||||
"""
|
||||
if os.environ.get('TEST_DEBUG_PRINT') is not None:
|
||||
print(*args, **kwargs)
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
from dataclasses import dataclass
|
||||
from functools import cached_property
|
||||
import asyncpg
|
||||
import os
|
||||
import pathlib
|
||||
import uuid
|
||||
@@ -129,6 +130,21 @@ class PgProtocol:
|
||||
conn.autocommit = autocommit
|
||||
return conn
|
||||
|
||||
async def connect_async(self, *, dbname: str = 'postgres', username: Optional[str] = None, password: Optional[str] = None) -> asyncpg.Connection:
|
||||
"""
|
||||
Connect to the node from async python.
|
||||
Returns asyncpg's connection object.
|
||||
"""
|
||||
|
||||
conn = await asyncpg.connect(
|
||||
host=self.host,
|
||||
port=self.port,
|
||||
database=dbname,
|
||||
user=username or self.username,
|
||||
password=password,
|
||||
)
|
||||
return conn
|
||||
|
||||
def safe_psql(self, query: str, **kwargs: Any) -> List[Any]:
|
||||
"""
|
||||
Execute query against the node and return all rows.
|
||||
|
||||
Reference in New Issue
Block a user