diff --git a/test_runner/batch_others/test_crafted_wal_end.py b/test_runner/batch_others/test_crafted_wal_end.py new file mode 100644 index 0000000000..1ddaadfae9 --- /dev/null +++ b/test_runner/batch_others/test_crafted_wal_end.py @@ -0,0 +1,61 @@ +from fixtures.neon_fixtures import NeonEnvBuilder, WalGenerate +from fixtures.log_helper import log +import pytest + +# Restart nodes with WAL end having specially crafted shape, like last record +# crossing segment boundary, to test decoding issues. + + +@pytest.mark.parametrize('wal_type', + [ + 'simple', + 'last_wal_record_crossing_segment', + 'wal_record_crossing_segment_followed_by_small_one', + ]) +def test_crafted_wal_end(neon_env_builder: NeonEnvBuilder, wal_type: str): + neon_env_builder.num_safekeepers = 1 + env = neon_env_builder.init_start() + env.neon_cli.create_branch('test_crafted_wal_end') + + pg = env.postgres.create('test_crafted_wal_end') + gen = WalGenerate(env) + pg.config(gen.postgres_config()) + pg.start() + res = pg.safe_psql_many(queries=[ + 'CREATE TABLE keys(key int primary key)', + 'INSERT INTO keys SELECT generate_series(1, 100)', + 'SELECT SUM(key) FROM keys' + ]) + assert res[-1][0] == (5050, ) + + gen.in_existing(wal_type, pg.connstr()) + + log.info("Restarting all safekeepers and pageservers") + env.pageserver.stop() + env.safekeepers[0].stop() + env.safekeepers[0].start() + env.pageserver.start() + + log.info("Trying more queries") + res = pg.safe_psql_many(queries=[ + 'SELECT SUM(key) FROM keys', + 'INSERT INTO keys SELECT generate_series(101, 200)', + 'SELECT SUM(key) FROM keys', + ]) + assert res[0][0] == (5050, ) + assert res[-1][0] == (20100, ) + + log.info("Restarting all safekeepers and pageservers (again)") + env.pageserver.stop() + env.safekeepers[0].stop() + env.safekeepers[0].start() + env.pageserver.start() + + log.info("Trying more queries (again)") + res = pg.safe_psql_many(queries=[ + 'SELECT SUM(key) FROM keys', + 'INSERT INTO keys SELECT generate_series(201, 300)', + 'SELECT SUM(key) FROM keys', + ]) + assert res[0][0] == (20100, ) + assert res[-1][0] == (45150, ) diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index be0322d418..d91ea398f9 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -4,6 +4,7 @@ from dataclasses import field from enum import Flag, auto import textwrap from cached_property import cached_property +import abc import asyncpg import os import boto3 @@ -908,14 +909,89 @@ TIMELINE_DATA_EXTRACTOR = re.compile(r"\s(?P[^\s]+)\s\[(?P 'subprocess.CompletedProcess[str]': + """ + Run the command with the specified arguments. + + Arguments must be in list form, e.g. ['pg', 'create'] + + Return both stdout and stderr, which can be accessed as + + >>> result = env.neon_cli.raw_cli(...) + >>> assert result.stderr == "" + >>> log.info(result.stdout) + + If `check_return_code`, on non-zero exit code logs failure and raises. + """ + + assert type(arguments) == list + assert type(self.COMMAND) == str + + bin_neon = os.path.join(str(neon_binpath), self.COMMAND) + + args = [bin_neon] + arguments + log.info('Running command "{}"'.format(' '.join(args))) + log.info(f'Running in "{self.env.repo_dir}"') + + env_vars = os.environ.copy() + env_vars['NEON_REPO_DIR'] = str(self.env.repo_dir) + env_vars['POSTGRES_DISTRIB_DIR'] = str(pg_distrib_dir) + if self.env.rust_log_override is not None: + env_vars['RUST_LOG'] = self.env.rust_log_override + for (extra_env_key, extra_env_value) in (extra_env_vars or {}).items(): + env_vars[extra_env_key] = extra_env_value + + # Pass coverage settings + var = 'LLVM_PROFILE_FILE' + val = os.environ.get(var) + if val: + env_vars[var] = val + + # Intercept CalledProcessError and print more info + res = subprocess.run(args, + env=env_vars, + check=False, + universal_newlines=True, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE) + if not res.returncode: + log.info(f"Run success: {res.stdout}") + elif check_return_code: + # this way command output will be in recorded and shown in CI in failure message + msg = f"""\ + Run {res.args} failed: + stdout: {res.stdout} + stderr: {res.stderr} + """ + log.info(msg) + raise Exception(msg) from subprocess.CalledProcessError(res.returncode, + res.args, + res.stdout, + res.stderr) + return res + + +class NeonCli(AbstractNeonCli): """ A typed wrapper around the `neon` CLI tool. Supports main commands via typed methods and a way to run arbitrary command directly via CLI. """ - def __init__(self, env: NeonEnv): - self.env = env - pass + + COMMAND = 'neon_local' def create_tenant(self, tenant_id: Optional[uuid.UUID] = None, @@ -1186,69 +1262,26 @@ class NeonCli: return self.raw_cli(args, check_return_code=check_return_code) - def raw_cli(self, - arguments: List[str], - extra_env_vars: Optional[Dict[str, str]] = None, - check_return_code=True) -> 'subprocess.CompletedProcess[str]': - """ - Run "neon" with the specified arguments. - Arguments must be in list form, e.g. ['pg', 'create'] +class WalGenerate(AbstractNeonCli): + """ + A typed wrapper around the `wal_generate` CLI tool. + Supports main commands via typed methods and a way to run arbitrary command directly via CLI. + """ - Return both stdout and stderr, which can be accessed as + COMMAND = 'wal_generate' - >>> result = env.neon_cli.raw_cli(...) - >>> assert result.stderr == "" - >>> log.info(result.stdout) + def postgres_config(self) -> List[str]: + res = self.raw_cli(["print-postgres-config"]) + res.check_returncode() + return res.stdout.split('\n') - If `check_return_code`, on non-zero exit code logs failure and raises. - """ - - assert type(arguments) == list - - bin_neon = os.path.join(str(neon_binpath), 'neon_local') - - args = [bin_neon] + arguments - log.info('Running command "{}"'.format(' '.join(args))) - log.info(f'Running in "{self.env.repo_dir}"') - - env_vars = os.environ.copy() - env_vars['NEON_REPO_DIR'] = str(self.env.repo_dir) - env_vars['POSTGRES_DISTRIB_DIR'] = str(pg_distrib_dir) - if self.env.rust_log_override is not None: - env_vars['RUST_LOG'] = self.env.rust_log_override - for (extra_env_key, extra_env_value) in (extra_env_vars or {}).items(): - env_vars[extra_env_key] = extra_env_value - - # Pass coverage settings - var = 'LLVM_PROFILE_FILE' - val = os.environ.get(var) - if val: - env_vars[var] = val - - # Intercept CalledProcessError and print more info - res = subprocess.run(args, - env=env_vars, - check=False, - universal_newlines=True, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE) - if not res.returncode: - log.info(f"Run success: {res.stdout}") - elif check_return_code: - # this way command output will be in recorded and shown in CI in failure message - msg = f"""\ - Run {res.args} failed: - stdout: {res.stdout} - stderr: {res.stderr} - """ - log.info(msg) - raise Exception(msg) from subprocess.CalledProcessError(res.returncode, - res.args, - res.stdout, - res.stderr) - - return res + def in_existing(self, type: str, connection: str) -> int: + res = self.raw_cli(["in-existing", type, connection]) + res.check_returncode() + m = re.fullmatch(r'end_of_wal = (.*)\n', res.stdout) + assert m + return lsn_from_hex(m.group(1)) class NeonPageserver(PgProtocol):