mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-09 06:22:57 +00:00
test_runner: add test_crafted_wal_end.py
For some reason both non-`simple` tests spend about 10 seconds in the post-restart `INSERT INTO` query on my machine, see #2023
This commit is contained in:
committed by
Egor Suvorov
parent
c08fa9d562
commit
bcdee3d3b5
61
test_runner/batch_others/test_crafted_wal_end.py
Normal file
61
test_runner/batch_others/test_crafted_wal_end.py
Normal file
@@ -0,0 +1,61 @@
|
||||
from fixtures.neon_fixtures import NeonEnvBuilder, WalGenerate
|
||||
from fixtures.log_helper import log
|
||||
import pytest
|
||||
|
||||
# Restart nodes with WAL end having specially crafted shape, like last record
|
||||
# crossing segment boundary, to test decoding issues.
|
||||
|
||||
|
||||
@pytest.mark.parametrize('wal_type',
|
||||
[
|
||||
'simple',
|
||||
'last_wal_record_crossing_segment',
|
||||
'wal_record_crossing_segment_followed_by_small_one',
|
||||
])
|
||||
def test_crafted_wal_end(neon_env_builder: NeonEnvBuilder, wal_type: str):
|
||||
neon_env_builder.num_safekeepers = 1
|
||||
env = neon_env_builder.init_start()
|
||||
env.neon_cli.create_branch('test_crafted_wal_end')
|
||||
|
||||
pg = env.postgres.create('test_crafted_wal_end')
|
||||
gen = WalGenerate(env)
|
||||
pg.config(gen.postgres_config())
|
||||
pg.start()
|
||||
res = pg.safe_psql_many(queries=[
|
||||
'CREATE TABLE keys(key int primary key)',
|
||||
'INSERT INTO keys SELECT generate_series(1, 100)',
|
||||
'SELECT SUM(key) FROM keys'
|
||||
])
|
||||
assert res[-1][0] == (5050, )
|
||||
|
||||
gen.in_existing(wal_type, pg.connstr())
|
||||
|
||||
log.info("Restarting all safekeepers and pageservers")
|
||||
env.pageserver.stop()
|
||||
env.safekeepers[0].stop()
|
||||
env.safekeepers[0].start()
|
||||
env.pageserver.start()
|
||||
|
||||
log.info("Trying more queries")
|
||||
res = pg.safe_psql_many(queries=[
|
||||
'SELECT SUM(key) FROM keys',
|
||||
'INSERT INTO keys SELECT generate_series(101, 200)',
|
||||
'SELECT SUM(key) FROM keys',
|
||||
])
|
||||
assert res[0][0] == (5050, )
|
||||
assert res[-1][0] == (20100, )
|
||||
|
||||
log.info("Restarting all safekeepers and pageservers (again)")
|
||||
env.pageserver.stop()
|
||||
env.safekeepers[0].stop()
|
||||
env.safekeepers[0].start()
|
||||
env.pageserver.start()
|
||||
|
||||
log.info("Trying more queries (again)")
|
||||
res = pg.safe_psql_many(queries=[
|
||||
'SELECT SUM(key) FROM keys',
|
||||
'INSERT INTO keys SELECT generate_series(201, 300)',
|
||||
'SELECT SUM(key) FROM keys',
|
||||
])
|
||||
assert res[0][0] == (20100, )
|
||||
assert res[-1][0] == (45150, )
|
||||
@@ -4,6 +4,7 @@ from dataclasses import field
|
||||
from enum import Flag, auto
|
||||
import textwrap
|
||||
from cached_property import cached_property
|
||||
import abc
|
||||
import asyncpg
|
||||
import os
|
||||
import boto3
|
||||
@@ -908,14 +909,89 @@ TIMELINE_DATA_EXTRACTOR = re.compile(r"\s(?P<branch_name>[^\s]+)\s\[(?P<timeline
|
||||
re.MULTILINE)
|
||||
|
||||
|
||||
class NeonCli:
|
||||
class AbstractNeonCli(abc.ABC):
|
||||
"""
|
||||
A typed wrapper around an arbitrary Neon CLI tool.
|
||||
Supports a way to run arbitrary command directly via CLI.
|
||||
Do not use directly, use specific subclasses instead.
|
||||
"""
|
||||
def __init__(self, env: NeonEnv):
|
||||
self.env = env
|
||||
|
||||
COMMAND: str = cast(str, None) # To be overwritten by the derived class.
|
||||
|
||||
def raw_cli(self,
|
||||
arguments: List[str],
|
||||
extra_env_vars: Optional[Dict[str, str]] = None,
|
||||
check_return_code=True) -> 'subprocess.CompletedProcess[str]':
|
||||
"""
|
||||
Run the command with the specified arguments.
|
||||
|
||||
Arguments must be in list form, e.g. ['pg', 'create']
|
||||
|
||||
Return both stdout and stderr, which can be accessed as
|
||||
|
||||
>>> result = env.neon_cli.raw_cli(...)
|
||||
>>> assert result.stderr == ""
|
||||
>>> log.info(result.stdout)
|
||||
|
||||
If `check_return_code`, on non-zero exit code logs failure and raises.
|
||||
"""
|
||||
|
||||
assert type(arguments) == list
|
||||
assert type(self.COMMAND) == str
|
||||
|
||||
bin_neon = os.path.join(str(neon_binpath), self.COMMAND)
|
||||
|
||||
args = [bin_neon] + arguments
|
||||
log.info('Running command "{}"'.format(' '.join(args)))
|
||||
log.info(f'Running in "{self.env.repo_dir}"')
|
||||
|
||||
env_vars = os.environ.copy()
|
||||
env_vars['NEON_REPO_DIR'] = str(self.env.repo_dir)
|
||||
env_vars['POSTGRES_DISTRIB_DIR'] = str(pg_distrib_dir)
|
||||
if self.env.rust_log_override is not None:
|
||||
env_vars['RUST_LOG'] = self.env.rust_log_override
|
||||
for (extra_env_key, extra_env_value) in (extra_env_vars or {}).items():
|
||||
env_vars[extra_env_key] = extra_env_value
|
||||
|
||||
# Pass coverage settings
|
||||
var = 'LLVM_PROFILE_FILE'
|
||||
val = os.environ.get(var)
|
||||
if val:
|
||||
env_vars[var] = val
|
||||
|
||||
# Intercept CalledProcessError and print more info
|
||||
res = subprocess.run(args,
|
||||
env=env_vars,
|
||||
check=False,
|
||||
universal_newlines=True,
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.PIPE)
|
||||
if not res.returncode:
|
||||
log.info(f"Run success: {res.stdout}")
|
||||
elif check_return_code:
|
||||
# this way command output will be in recorded and shown in CI in failure message
|
||||
msg = f"""\
|
||||
Run {res.args} failed:
|
||||
stdout: {res.stdout}
|
||||
stderr: {res.stderr}
|
||||
"""
|
||||
log.info(msg)
|
||||
raise Exception(msg) from subprocess.CalledProcessError(res.returncode,
|
||||
res.args,
|
||||
res.stdout,
|
||||
res.stderr)
|
||||
return res
|
||||
|
||||
|
||||
class NeonCli(AbstractNeonCli):
|
||||
"""
|
||||
A typed wrapper around the `neon` CLI tool.
|
||||
Supports main commands via typed methods and a way to run arbitrary command directly via CLI.
|
||||
"""
|
||||
def __init__(self, env: NeonEnv):
|
||||
self.env = env
|
||||
pass
|
||||
|
||||
COMMAND = 'neon_local'
|
||||
|
||||
def create_tenant(self,
|
||||
tenant_id: Optional[uuid.UUID] = None,
|
||||
@@ -1186,69 +1262,26 @@ class NeonCli:
|
||||
|
||||
return self.raw_cli(args, check_return_code=check_return_code)
|
||||
|
||||
def raw_cli(self,
|
||||
arguments: List[str],
|
||||
extra_env_vars: Optional[Dict[str, str]] = None,
|
||||
check_return_code=True) -> 'subprocess.CompletedProcess[str]':
|
||||
"""
|
||||
Run "neon" with the specified arguments.
|
||||
|
||||
Arguments must be in list form, e.g. ['pg', 'create']
|
||||
class WalGenerate(AbstractNeonCli):
|
||||
"""
|
||||
A typed wrapper around the `wal_generate` CLI tool.
|
||||
Supports main commands via typed methods and a way to run arbitrary command directly via CLI.
|
||||
"""
|
||||
|
||||
Return both stdout and stderr, which can be accessed as
|
||||
COMMAND = 'wal_generate'
|
||||
|
||||
>>> result = env.neon_cli.raw_cli(...)
|
||||
>>> assert result.stderr == ""
|
||||
>>> log.info(result.stdout)
|
||||
def postgres_config(self) -> List[str]:
|
||||
res = self.raw_cli(["print-postgres-config"])
|
||||
res.check_returncode()
|
||||
return res.stdout.split('\n')
|
||||
|
||||
If `check_return_code`, on non-zero exit code logs failure and raises.
|
||||
"""
|
||||
|
||||
assert type(arguments) == list
|
||||
|
||||
bin_neon = os.path.join(str(neon_binpath), 'neon_local')
|
||||
|
||||
args = [bin_neon] + arguments
|
||||
log.info('Running command "{}"'.format(' '.join(args)))
|
||||
log.info(f'Running in "{self.env.repo_dir}"')
|
||||
|
||||
env_vars = os.environ.copy()
|
||||
env_vars['NEON_REPO_DIR'] = str(self.env.repo_dir)
|
||||
env_vars['POSTGRES_DISTRIB_DIR'] = str(pg_distrib_dir)
|
||||
if self.env.rust_log_override is not None:
|
||||
env_vars['RUST_LOG'] = self.env.rust_log_override
|
||||
for (extra_env_key, extra_env_value) in (extra_env_vars or {}).items():
|
||||
env_vars[extra_env_key] = extra_env_value
|
||||
|
||||
# Pass coverage settings
|
||||
var = 'LLVM_PROFILE_FILE'
|
||||
val = os.environ.get(var)
|
||||
if val:
|
||||
env_vars[var] = val
|
||||
|
||||
# Intercept CalledProcessError and print more info
|
||||
res = subprocess.run(args,
|
||||
env=env_vars,
|
||||
check=False,
|
||||
universal_newlines=True,
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.PIPE)
|
||||
if not res.returncode:
|
||||
log.info(f"Run success: {res.stdout}")
|
||||
elif check_return_code:
|
||||
# this way command output will be in recorded and shown in CI in failure message
|
||||
msg = f"""\
|
||||
Run {res.args} failed:
|
||||
stdout: {res.stdout}
|
||||
stderr: {res.stderr}
|
||||
"""
|
||||
log.info(msg)
|
||||
raise Exception(msg) from subprocess.CalledProcessError(res.returncode,
|
||||
res.args,
|
||||
res.stdout,
|
||||
res.stderr)
|
||||
|
||||
return res
|
||||
def in_existing(self, type: str, connection: str) -> int:
|
||||
res = self.raw_cli(["in-existing", type, connection])
|
||||
res.check_returncode()
|
||||
m = re.fullmatch(r'end_of_wal = (.*)\n', res.stdout)
|
||||
assert m
|
||||
return lsn_from_hex(m.group(1))
|
||||
|
||||
|
||||
class NeonPageserver(PgProtocol):
|
||||
|
||||
Reference in New Issue
Block a user