diff --git a/test_runner/batch_others/test_wal_acceptor.py b/test_runner/batch_others/test_wal_acceptor.py index 5014a7ad4e..74b2f2657f 100644 --- a/test_runner/batch_others/test_wal_acceptor.py +++ b/test_runner/batch_others/test_wal_acceptor.py @@ -203,58 +203,6 @@ def test_restarts(neon_env_builder: NeonEnvBuilder): assert cur.fetchone() == (500500, ) -# shut down random subset of acceptors, sleep, wake them up, rinse, repeat -def xmas_garland(acceptors, stop): - while not bool(stop.value): - victims = [] - for wa in acceptors: - if random.random() >= 0.5: - victims.append(wa) - for v in victims: - v.stop() - time.sleep(1) - for v in victims: - v.start() - time.sleep(1) - - -# value which gets unset on exit -@pytest.fixture -def stop_value(): - stop = Value('i', 0) - yield stop - stop.value = 1 - - -# do inserts while concurrently getting up/down subsets of acceptors -def test_race_conditions(neon_env_builder: NeonEnvBuilder, stop_value): - - neon_env_builder.num_safekeepers = 3 - env = neon_env_builder.init_start() - - env.neon_cli.create_branch('test_safekeepers_race_conditions') - pg = env.postgres.create_start('test_safekeepers_race_conditions') - - # we rely upon autocommit after each statement - # as waiting for acceptors happens there - pg_conn = pg.connect() - cur = pg_conn.cursor() - - cur.execute('CREATE TABLE t(key int primary key, value text)') - - proc = Process(target=xmas_garland, args=(env.safekeepers, stop_value)) - proc.start() - - for i in range(1000): - cur.execute("INSERT INTO t values (%s, 'payload');", (i + 1, )) - - cur.execute('SELECT sum(key) FROM t') - assert cur.fetchone() == (500500, ) - - stop_value.value = 1 - proc.join() - - # Test that safekeepers push their info to the broker and learn peer status from it def test_broker(neon_env_builder: NeonEnvBuilder): neon_env_builder.num_safekeepers = 3 diff --git a/test_runner/batch_others/test_wal_acceptor_async.py b/test_runner/batch_others/test_wal_acceptor_async.py index bf7d8e3645..5c0cb56880 100644 --- a/test_runner/batch_others/test_wal_acceptor_async.py +++ b/test_runner/batch_others/test_wal_acceptor_async.py @@ -9,6 +9,7 @@ from fixtures.neon_fixtures import NeonEnv, NeonEnvBuilder, Postgres, Safekeeper from fixtures.log_helper import getLogger from fixtures.utils import lsn_from_hex, lsn_to_hex from typing import List, Optional +from dataclasses import dataclass log = getLogger('root.safekeeper_async') @@ -455,3 +456,67 @@ def test_unavailability(neon_env_builder: NeonEnvBuilder): pg = env.postgres.create_start('test_safekeepers_unavailability') asyncio.run(run_unavailability(env, pg)) + + +@dataclass +class RaceConditionTest: + iteration: int + is_stopped: bool + + +# shut down random subset of safekeeper, sleep, wake them up, rinse, repeat +async def xmas_garland(safekeepers: List[Safekeeper], data: RaceConditionTest): + while not data.is_stopped: + data.iteration += 1 + victims = [] + for sk in safekeepers: + if random.random() >= 0.5: + victims.append(sk) + log.info( + f'Iteration {data.iteration}: stopping {list(map(lambda sk: sk.id, victims))} safekeepers' + ) + for v in victims: + v.stop() + await asyncio.sleep(1) + for v in victims: + v.start() + log.info(f'Iteration {data.iteration} finished') + await asyncio.sleep(1) + + +async def run_race_conditions(env: NeonEnv, pg: Postgres): + conn = await pg.connect_async() + await conn.execute('CREATE TABLE t(key int primary key, value text)') + + data = RaceConditionTest(0, False) + bg_xmas = asyncio.create_task(xmas_garland(env.safekeepers, data)) + + n_iterations = 5 + expected_sum = 0 + i = 1 + + while data.iteration <= n_iterations: + await asyncio.sleep(0.005) + await conn.execute(f"INSERT INTO t values ({i}, 'payload')") + expected_sum += i + i += 1 + + log.info(f'Executed {i-1} queries') + + res = await conn.fetchval('SELECT sum(key) FROM t') + assert res == expected_sum + + data.is_stopped = True + await bg_xmas + + +# do inserts while concurrently getting up/down subsets of acceptors +def test_race_conditions(neon_env_builder: NeonEnvBuilder): + + neon_env_builder.num_safekeepers = 3 + env = neon_env_builder.init_start() + + env.neon_cli.create_branch('test_safekeepers_race_conditions') + pg = env.postgres.create_start('test_safekeepers_race_conditions') + + asyncio.run(run_race_conditions(env, pg))