Refactor test_race_conditions (#2162)

Do not use python multiprocessing, make the test async
This commit is contained in:
Arthur Petukhovsky
2022-07-28 14:38:37 +03:00
committed by GitHub
parent 58b04438f0
commit aeb3f0ea07
2 changed files with 65 additions and 52 deletions

View File

@@ -203,58 +203,6 @@ def test_restarts(neon_env_builder: NeonEnvBuilder):
assert cur.fetchone() == (500500, )
# shut down random subset of acceptors, sleep, wake them up, rinse, repeat
def xmas_garland(acceptors, stop):
while not bool(stop.value):
victims = []
for wa in acceptors:
if random.random() >= 0.5:
victims.append(wa)
for v in victims:
v.stop()
time.sleep(1)
for v in victims:
v.start()
time.sleep(1)
# value which gets unset on exit
@pytest.fixture
def stop_value():
stop = Value('i', 0)
yield stop
stop.value = 1
# do inserts while concurrently getting up/down subsets of acceptors
def test_race_conditions(neon_env_builder: NeonEnvBuilder, stop_value):
neon_env_builder.num_safekeepers = 3
env = neon_env_builder.init_start()
env.neon_cli.create_branch('test_safekeepers_race_conditions')
pg = env.postgres.create_start('test_safekeepers_race_conditions')
# we rely upon autocommit after each statement
# as waiting for acceptors happens there
pg_conn = pg.connect()
cur = pg_conn.cursor()
cur.execute('CREATE TABLE t(key int primary key, value text)')
proc = Process(target=xmas_garland, args=(env.safekeepers, stop_value))
proc.start()
for i in range(1000):
cur.execute("INSERT INTO t values (%s, 'payload');", (i + 1, ))
cur.execute('SELECT sum(key) FROM t')
assert cur.fetchone() == (500500, )
stop_value.value = 1
proc.join()
# Test that safekeepers push their info to the broker and learn peer status from it
def test_broker(neon_env_builder: NeonEnvBuilder):
neon_env_builder.num_safekeepers = 3

View File

@@ -9,6 +9,7 @@ from fixtures.neon_fixtures import NeonEnv, NeonEnvBuilder, Postgres, Safekeeper
from fixtures.log_helper import getLogger
from fixtures.utils import lsn_from_hex, lsn_to_hex
from typing import List, Optional
from dataclasses import dataclass
log = getLogger('root.safekeeper_async')
@@ -455,3 +456,67 @@ def test_unavailability(neon_env_builder: NeonEnvBuilder):
pg = env.postgres.create_start('test_safekeepers_unavailability')
asyncio.run(run_unavailability(env, pg))
@dataclass
class RaceConditionTest:
iteration: int
is_stopped: bool
# shut down random subset of safekeeper, sleep, wake them up, rinse, repeat
async def xmas_garland(safekeepers: List[Safekeeper], data: RaceConditionTest):
while not data.is_stopped:
data.iteration += 1
victims = []
for sk in safekeepers:
if random.random() >= 0.5:
victims.append(sk)
log.info(
f'Iteration {data.iteration}: stopping {list(map(lambda sk: sk.id, victims))} safekeepers'
)
for v in victims:
v.stop()
await asyncio.sleep(1)
for v in victims:
v.start()
log.info(f'Iteration {data.iteration} finished')
await asyncio.sleep(1)
async def run_race_conditions(env: NeonEnv, pg: Postgres):
conn = await pg.connect_async()
await conn.execute('CREATE TABLE t(key int primary key, value text)')
data = RaceConditionTest(0, False)
bg_xmas = asyncio.create_task(xmas_garland(env.safekeepers, data))
n_iterations = 5
expected_sum = 0
i = 1
while data.iteration <= n_iterations:
await asyncio.sleep(0.005)
await conn.execute(f"INSERT INTO t values ({i}, 'payload')")
expected_sum += i
i += 1
log.info(f'Executed {i-1} queries')
res = await conn.fetchval('SELECT sum(key) FROM t')
assert res == expected_sum
data.is_stopped = True
await bg_xmas
# do inserts while concurrently getting up/down subsets of acceptors
def test_race_conditions(neon_env_builder: NeonEnvBuilder):
neon_env_builder.num_safekeepers = 3
env = neon_env_builder.init_start()
env.neon_cli.create_branch('test_safekeepers_race_conditions')
pg = env.postgres.create_start('test_safekeepers_race_conditions')
asyncio.run(run_race_conditions(env, pg))