diff --git a/test_runner/batch_others/test_restart_compute.py b/test_runner/batch_others/test_restart_compute.py deleted file mode 100644 index af1956e196..0000000000 --- a/test_runner/batch_others/test_restart_compute.py +++ /dev/null @@ -1,74 +0,0 @@ -import pytest - -from contextlib import closing -from fixtures.neon_fixtures import NeonEnvBuilder -from fixtures.log_helper import log - - -# -# Test restarting and recreating a postgres instance -# -@pytest.mark.parametrize('with_safekeepers', [False, True]) -def test_restart_compute(neon_env_builder: NeonEnvBuilder, with_safekeepers: bool): - neon_env_builder.auth_enabled = True - if with_safekeepers: - neon_env_builder.num_safekeepers = 3 - env = neon_env_builder.init_start() - - env.neon_cli.create_branch('test_restart_compute') - pg = env.postgres.create_start('test_restart_compute') - log.info("postgres is running on 'test_restart_compute' branch") - - with closing(pg.connect()) as conn: - with conn.cursor() as cur: - cur.execute('CREATE TABLE t(key int primary key, value text)') - cur.execute("INSERT INTO t SELECT generate_series(1,100000), 'payload'") - cur.execute('SELECT sum(key) FROM t') - r = cur.fetchone() - assert r == (5000050000, ) - log.info(f"res = {r}") - - # Remove data directory and restart - pg.stop_and_destroy().create_start('test_restart_compute') - - with closing(pg.connect()) as conn: - with conn.cursor() as cur: - # We can still see the row - cur.execute('SELECT sum(key) FROM t') - r = cur.fetchone() - assert r == (5000050000, ) - log.info(f"res = {r}") - - # Insert another row - cur.execute("INSERT INTO t VALUES (100001, 'payload2')") - cur.execute('SELECT count(*) FROM t') - - r = cur.fetchone() - assert r == (100001, ) - log.info(f"res = {r}") - - # Again remove data directory and restart - pg.stop_and_destroy().create_start('test_restart_compute') - - # That select causes lots of FPI's and increases probability of wakeepers - # lagging behind after query completion - with closing(pg.connect()) as conn: - with conn.cursor() as cur: - # We can still see the rows - cur.execute('SELECT count(*) FROM t') - - r = cur.fetchone() - assert r == (100001, ) - log.info(f"res = {r}") - - # And again remove data directory and restart - pg.stop_and_destroy().create_start('test_restart_compute') - - with closing(pg.connect()) as conn: - with conn.cursor() as cur: - # We can still see the rows - cur.execute('SELECT count(*) FROM t') - - r = cur.fetchone() - assert r == (100001, ) - log.info(f"res = {r}") diff --git a/test_runner/batch_others/test_wal_acceptor_async.py b/test_runner/batch_others/test_wal_acceptor_async.py index c0967ef6c0..4664c332fc 100644 --- a/test_runner/batch_others/test_wal_acceptor_async.py +++ b/test_runner/batch_others/test_wal_acceptor_async.py @@ -1,5 +1,6 @@ import asyncio import uuid + import asyncpg import random import time @@ -7,7 +8,7 @@ import time from fixtures.neon_fixtures import NeonEnv, NeonEnvBuilder, Postgres, Safekeeper from fixtures.log_helper import getLogger from fixtures.utils import lsn_from_hex, lsn_to_hex -from typing import List +from typing import List, Optional log = getLogger('root.safekeeper_async') @@ -234,3 +235,156 @@ def test_restarts_frequent_checkpoints(neon_env_builder: NeonEnvBuilder): # we try to simulate large (flush_lsn - truncate_lsn) lag, to test that WAL segments # are not removed before broadcasted to all safekeepers, with the help of replication slot asyncio.run(run_restarts_under_load(env, pg, env.safekeepers, period_time=15, iterations=5)) + + +def postgres_create_start(env: NeonEnv, branch: str, pgdir_name: Optional[str]): + pg = Postgres( + env, + tenant_id=env.initial_tenant, + port=env.port_distributor.get_port(), + # In these tests compute has high probability of terminating on its own + # before our stop() due to lost consensus leadership. + check_stop_result=False) + + # embed current time in node name + node_name = pgdir_name or f'pg_node_{time.time()}' + return pg.create_start(branch_name=branch, + node_name=node_name, + config_lines=['log_statement=all']) + + +async def exec_compute_query(env: NeonEnv, + branch: str, + query: str, + pgdir_name: Optional[str] = None): + with postgres_create_start(env, branch=branch, pgdir_name=pgdir_name) as pg: + before_conn = time.time() + conn = await pg.connect_async() + res = await conn.fetch(query) + await conn.close() + after_conn = time.time() + log.info(f'{query} took {after_conn - before_conn}s') + return res + + +async def run_compute_restarts(env: NeonEnv, + queries=16, + batch_insert=10000, + branch='test_compute_restarts'): + cnt = 0 + sum = 0 + + await exec_compute_query(env, branch, 'CREATE TABLE t (i int)') + + for i in range(queries): + if i % 4 == 0: + await exec_compute_query( + env, branch, f'INSERT INTO t SELECT 1 FROM generate_series(1, {batch_insert})') + sum += batch_insert + cnt += batch_insert + elif (i % 4 == 1) or (i % 4 == 3): + # Note that select causes lots of FPI's and increases probability of safekeepers + # standing at different LSNs after compute termination. + actual_sum = (await exec_compute_query(env, branch, 'SELECT SUM(i) FROM t'))[0][0] + assert actual_sum == sum, f'Expected sum={sum}, actual={actual_sum}' + elif i % 4 == 2: + await exec_compute_query(env, branch, 'UPDATE t SET i = i + 1') + sum += cnt + + +# Add a test which creates compute for every query, and then destroys it right after. +def test_compute_restarts(neon_env_builder: NeonEnvBuilder): + neon_env_builder.num_safekeepers = 3 + env = neon_env_builder.init_start() + + env.neon_cli.create_branch('test_compute_restarts') + asyncio.run(run_compute_restarts(env)) + + +class BackgroundCompute(object): + def __init__(self, index: int, env: NeonEnv, branch: str): + self.index = index + self.env = env + self.branch = branch + self.running = False + self.stopped = False + self.total_tries = 0 + self.successful_queries: List[int] = [] + + async def run(self): + if self.running: + raise Exception('BackgroundCompute is already running') + + self.running = True + i = 0 + while not self.stopped: + try: + verify_key = (self.index << 16) + i + i += 1 + self.total_tries += 1 + res = await exec_compute_query( + self.env, + self.branch, + f'INSERT INTO query_log(index, verify_key) VALUES ({self.index}, {verify_key}) RETURNING verify_key', + pgdir_name=f'bgcompute{self.index}_key{verify_key}', + ) + log.info(f'result: {res}') + if len(res) != 1: + raise Exception('No result returned') + if res[0][0] != verify_key: + raise Exception('Wrong result returned') + self.successful_queries.append(verify_key) + except Exception as e: + log.info(f'BackgroundCompute {self.index} query failed: {e}') + + # With less sleep, there is a very big chance of not committing + # anything or only 1 xact during test run. + await asyncio.sleep(2 * random.random()) + self.running = False + + +async def run_concurrent_computes(env: NeonEnv, + num_computes=10, + run_seconds=20, + branch='test_concurrent_computes'): + await exec_compute_query( + env, + branch, + 'CREATE TABLE query_log (t timestamp default now(), index int, verify_key int)') + + computes = [BackgroundCompute(i, env, branch) for i in range(num_computes)] + background_tasks = [asyncio.create_task(compute.run()) for compute in computes] + + await asyncio.sleep(run_seconds) + for compute in computes[1:]: + compute.stopped = True + log.info("stopped all tasks but one") + + # work for some time with only one compute -- it should be able to make some xacts + await asyncio.sleep(8) + computes[0].stopped = True + + await asyncio.gather(*background_tasks) + + result = await exec_compute_query(env, branch, 'SELECT * FROM query_log') + # we should have inserted something while single compute was running + assert len(result) >= 4 + log.info(f'Executed {len(result)} queries') + for row in result: + log.info(f'{row[0]} {row[1]} {row[2]}') + + # ensure everything reported as committed wasn't lost + for compute in computes: + for verify_key in compute.successful_queries: + assert verify_key in [row[2] for row in result] + + +# Run multiple computes concurrently, creating-destroying them after single +# query. Ensure we don't lose any xacts reported as committed and be able to +# progress once only one compute remains. +def test_concurrent_computes(neon_env_builder: NeonEnvBuilder): + neon_env_builder.num_safekeepers = 3 + env = neon_env_builder.init_start() + + env.neon_cli.create_branch('test_concurrent_computes') + asyncio.run(run_concurrent_computes(env)) diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index 9eb02b50d0..3d4daf5f29 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -1160,6 +1160,7 @@ class NeonCli: node_name: str, tenant_id: Optional[uuid.UUID] = None, destroy=False, + check_return_code=True, ) -> 'subprocess.CompletedProcess[str]': args = [ 'pg', @@ -1172,7 +1173,7 @@ class NeonCli: if node_name is not None: args.append(node_name) - return self.raw_cli(args) + return self.raw_cli(args, check_return_code=check_return_code) def raw_cli(self, arguments: List[str], @@ -1188,6 +1189,8 @@ class NeonCli: >>> result = env.neon_cli.raw_cli(...) >>> assert result.stderr == "" >>> log.info(result.stdout) + + If `check_return_code`, on non-zero exit code logs failure and raises. """ assert type(arguments) == list @@ -1213,27 +1216,27 @@ class NeonCli: env_vars[var] = val # Intercept CalledProcessError and print more info - try: - res = subprocess.run(args, - env=env_vars, - check=True, - universal_newlines=True, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE) + res = subprocess.run(args, + env=env_vars, + check=False, + universal_newlines=True, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE) + if not res.returncode: log.info(f"Run success: {res.stdout}") - except subprocess.CalledProcessError as exc: + elif check_return_code: # this way command output will be in recorded and shown in CI in failure message msg = f"""\ - Run failed: {exc} - stdout: {exc.stdout} - stderr: {exc.stderr} + Run {res.args} failed: + stdout: {res.stdout} + stderr: {res.stderr} """ log.info(msg) + raise Exception(msg) from subprocess.CalledProcessError(res.returncode, + res.args, + res.stdout, + res.stderr) - raise Exception(msg) from exc - - if check_return_code: - res.check_returncode() return res @@ -1526,7 +1529,11 @@ def static_proxy(vanilla_pg, port_distributor) -> Iterator[NeonProxy]: class Postgres(PgProtocol): """ An object representing a running postgres daemon. """ - def __init__(self, env: NeonEnv, tenant_id: uuid.UUID, port: int): + def __init__(self, + env: NeonEnv, + tenant_id: uuid.UUID, + port: int, + check_stop_result: bool = True): super().__init__(host='localhost', port=port, user='cloud_admin', dbname='postgres') self.env = env self.running = False @@ -1534,6 +1541,7 @@ class Postgres(PgProtocol): self.pgdata_dir: Optional[str] = None # Path to computenode PGDATA self.tenant_id = tenant_id self.port = port + self.check_stop_result = check_stop_result # path to conf is /pgdatadirs/tenants///postgresql.conf def create( @@ -1585,8 +1593,6 @@ class Postgres(PgProtocol): port=self.port) self.running = True - log.info(f"stdout: {run_result.stdout}") - return self def pg_data_dir_path(self) -> str: @@ -1650,7 +1656,9 @@ class Postgres(PgProtocol): if self.running: assert self.node_name is not None - self.env.neon_cli.pg_stop(self.node_name, self.tenant_id) + self.env.neon_cli.pg_stop(self.node_name, + self.tenant_id, + check_return_code=self.check_stop_result) self.running = False return self @@ -1662,7 +1670,10 @@ class Postgres(PgProtocol): """ assert self.node_name is not None - self.env.neon_cli.pg_stop(self.node_name, self.tenant_id, True) + self.env.neon_cli.pg_stop(self.node_name, + self.tenant_id, + True, + check_return_code=self.check_stop_result) self.node_name = None self.running = False @@ -1681,6 +1692,8 @@ class Postgres(PgProtocol): Returns self. """ + started_at = time.time() + self.create( branch_name=branch_name, node_name=node_name, @@ -1688,6 +1701,8 @@ class Postgres(PgProtocol): lsn=lsn, ).start() + log.info(f"Postgres startup took {time.time() - started_at} seconds") + return self def __enter__(self):