Change test_restart_compute to expose safekeeper problems

This commit is contained in:
Stas Kelvich
2021-08-25 00:42:08 +03:00
parent 4051c5d4ff
commit 72de70a8cc
8 changed files with 124 additions and 94 deletions

View File

@@ -362,7 +362,7 @@ impl PageServerHandler {
timeline.wait_lsn(lsn)?;
lsn
}
None => timeline.get_last_valid_lsn()
None => timeline.get_last_valid_lsn(),
};
{
let mut writer = CopyDataSink { pgb };

View File

@@ -427,7 +427,9 @@ pub fn save_decoded_record(
// Remove twophase file. see RemoveTwoPhaseFile() in postgres code
trace!(
"unlink twophaseFile for xid {} parsed_xact.xid {} here at {}",
decoded.xl_xid, parsed_xact.xid, lsn
decoded.xl_xid,
parsed_xact.xid,
lsn
);
timeline.put_unlink(
RelishTag::TwoPhase {

View File

@@ -9,7 +9,7 @@ pytest_plugins = ("fixtures.zenith_fixtures")
#
# Test restarting and recreating a postgres instance
#
@pytest.mark.parametrize('with_wal_acceptors', [False, True])
@pytest.mark.parametrize('with_wal_acceptors', [True, False])
def test_restart_compute(
zenith_cli,
pageserver: ZenithPageserver,
@@ -31,31 +31,56 @@ def test_restart_compute(
with closing(pg.connect()) as conn:
with conn.cursor() as cur:
# Create table, and insert a row
cur.execute('CREATE TABLE foo (t text)')
cur.execute("INSERT INTO foo VALUES ('bar')")
cur.execute('CREATE TABLE t(key int primary key, value text)')
cur.execute("INSERT INTO t SELECT generate_series(1,100000), 'payload'")
cur.execute('SELECT sum(key) FROM t')
r = cur.fetchone()
assert r == (5000050000, )
print("res = ", r)
# Stop and restart the Postgres instance
# Remove data directory and restart
pg.stop_and_destroy().create_start('test_restart_compute',
wal_acceptors=wal_acceptor_connstrs)
with closing(pg.connect()) as conn:
with conn.cursor() as cur:
# We can still see the row
cur.execute('SELECT count(*) FROM foo')
assert cur.fetchone() == (1, )
cur.execute('SELECT sum(key) FROM t')
r = cur.fetchone()
assert r == (5000050000, )
print("res = ", r)
# Insert another row
cur.execute("INSERT INTO foo VALUES ('bar2')")
cur.execute('SELECT count(*) FROM foo')
assert cur.fetchone() == (2, )
cur.execute("INSERT INTO t VALUES (100001, 'payload2')")
cur.execute('SELECT count(*) FROM t')
# Stop, and destroy the Postgres instance. Then recreate and restart it.
r = cur.fetchone()
assert r == (100001, )
print("res = ", r)
# Again remove data directory and restart
pg.stop_and_destroy().create_start('test_restart_compute',
wal_acceptors=wal_acceptor_connstrs)
with closing(pg.connect()) as conn:
with conn.cursor() as cur:
# We can still see the rows
cur.execute('SELECT count(*) FROM foo')
assert cur.fetchone() == (2, )
cur.execute('SELECT count(*) FROM t')
r = cur.fetchone()
assert r == (100001, )
print("res = ", r)
# And again remove data directory and restart
pg.stop_and_destroy().create_start('test_restart_compute',
wal_acceptors=wal_acceptor_connstrs)
with closing(pg.connect()) as conn:
with conn.cursor() as cur:
# We can still see the rows
cur.execute('SELECT count(*) FROM t')
r = cur.fetchone()
assert r == (100001, )
print("res = ", r)

View File

@@ -249,10 +249,68 @@ def pageserver(zenith_cli: ZenithCli) -> Iterator[ZenithPageserver]:
print('Starting pageserver cleanup')
ps.stop()
class PgBin:
""" A helper class for executing postgres binaries """
def __init__(self, log_dir: str, pg_distrib_dir: str):
self.log_dir = log_dir
self.pg_install_path = pg_distrib_dir
self.pg_bin_path = os.path.join(self.pg_install_path, 'bin')
self.env = os.environ.copy()
self.env['LD_LIBRARY_PATH'] = os.path.join(self.pg_install_path, 'lib')
def _fixpath(self, command: List[str]) -> None:
if '/' not in command[0]:
command[0] = os.path.join(self.pg_bin_path, command[0])
def _build_env(self, env_add: Optional[Env]) -> Env:
if env_add is None:
return self.env
env = self.env.copy()
env.update(env_add)
return env
def run(self, command: List[str], env: Optional[Env] = None, cwd: Optional[str] = None) -> None:
"""
Run one of the postgres binaries.
The command should be in list form, e.g. ['pgbench', '-p', '55432']
All the necessary environment variables will be set.
If the first argument (the command name) doesn't include a path (no '/'
characters present), then it will be edited to include the correct path.
If you want stdout/stderr captured to files, use `run_capture` instead.
"""
self._fixpath(command)
print('Running command "{}"'.format(' '.join(command)))
env = self._build_env(env)
subprocess.run(command, env=env, cwd=cwd, check=True)
def run_capture(self,
command: List[str],
env: Optional[Env] = None,
cwd: Optional[str] = None) -> None:
"""
Run one of the postgres binaries, with stderr and stdout redirected to a file.
This is just like `run`, but for chatty programs.
"""
self._fixpath(command)
print('Running command "{}"'.format(' '.join(command)))
env = self._build_env(env)
subprocess_capture(self.log_dir, command, env=env, cwd=cwd, check=True)
@zenfixture
def pg_bin(test_output_dir: str, pg_distrib_dir: str) -> PgBin:
return PgBin(test_output_dir, pg_distrib_dir)
class Postgres(PgProtocol):
""" An object representing a running postgres daemon. """
def __init__(self, zenith_cli: ZenithCli, repo_dir: str, tenant_id: str, port: int):
def __init__(self, zenith_cli: ZenithCli, repo_dir: str, pg_bin: PgBin, tenant_id: str, port: int):
super().__init__(host='localhost', port=port)
self.zenith_cli = zenith_cli
@@ -260,6 +318,7 @@ class Postgres(PgProtocol):
self.repo_dir = repo_dir
self.branch: Optional[str] = None # dubious, see asserts below
self.tenant_id = tenant_id
self.pg_bin = pg_bin
# path to conf is <repo_dir>/pgdatadirs/tenants/<tenant_id>/<branch_name>/postgresql.conf
def create(
@@ -299,27 +358,32 @@ class Postgres(PgProtocol):
"""
assert self.branch is not None
print(f"Starting postgres on brach {self.branch}")
self.zenith_cli.run(['pg', 'start', self.branch, f'--tenantid={self.tenant_id}'])
self.running = True
self.pg_bin.run(['pg_controldata', self.pg_data_dir_path()])
return self
def pg_data_dir_path(self) -> str:
""" Path to data directory """
path = pathlib.Path('pgdatadirs') / 'tenants' / self.tenant_id / self.branch
return os.path.join(self.repo_dir, path)
def pg_xact_dir_path(self) -> str:
""" Path to pg_xact dir """
path = pathlib.Path('pgdatadirs') / 'tenants' / self.tenant_id / self.branch / 'pg_xact'
return os.path.join(self.repo_dir, path)
return os.path.join(self.pg_data_dir_path(), 'pg_xact')
def pg_twophase_dir_path(self) -> str:
""" Path to pg_twophase dir """
print(self.tenant_id)
print(self.branch)
path = pathlib.Path('pgdatadirs') / 'tenants' / self.tenant_id / self.branch / 'pg_twophase'
return os.path.join(self.repo_dir, path)
return os.path.join(self.pg_data_dir_path(), 'pg_twophase')
def config_file_path(self) -> str:
""" Path to postgresql.conf """
filename = pathlib.Path('pgdatadirs') / 'tenants' / self.tenant_id / self.branch / 'postgresql.conf'
return os.path.join(self.repo_dir, filename)
return os.path.join(self.pg_data_dir_path(), 'postgresql.conf')
def adjust_for_wal_acceptors(self, wal_acceptors: str) -> 'Postgres':
"""
@@ -411,13 +475,14 @@ class Postgres(PgProtocol):
class PostgresFactory:
""" An object representing multiple running postgres daemons. """
def __init__(self, zenith_cli: ZenithCli, repo_dir: str, initial_tenant: str, base_port: int = 55431):
def __init__(self, zenith_cli: ZenithCli, repo_dir: str, pg_bin: PgBin, initial_tenant: str, base_port: int = 55431):
self.zenith_cli = zenith_cli
self.repo_dir = repo_dir
self.num_instances = 0
self.instances: List[Postgres] = []
self.initial_tenant: str = initial_tenant
self.base_port = base_port
self.pg_bin = pg_bin
def create_start(
self,
@@ -430,6 +495,7 @@ class PostgresFactory:
pg = Postgres(
zenith_cli=self.zenith_cli,
repo_dir=self.repo_dir,
pg_bin=self.pg_bin,
tenant_id=tenant_id or self.initial_tenant,
port=self.base_port + self.num_instances + 1,
)
@@ -503,8 +569,8 @@ def initial_tenant(pageserver: ZenithPageserver):
@zenfixture
def postgres(zenith_cli: ZenithCli, initial_tenant: str, repo_dir: str) -> Iterator[PostgresFactory]:
pgfactory = PostgresFactory(zenith_cli, repo_dir, initial_tenant=initial_tenant)
def postgres(zenith_cli: ZenithCli, initial_tenant: str, repo_dir: str, pg_bin: PgBin) -> Iterator[PostgresFactory]:
pgfactory = PostgresFactory(zenith_cli, repo_dir, pg_bin, initial_tenant=initial_tenant)
yield pgfactory
@@ -512,67 +578,6 @@ def postgres(zenith_cli: ZenithCli, initial_tenant: str, repo_dir: str) -> Itera
print('Starting postgres cleanup')
pgfactory.stop_all()
class PgBin:
""" A helper class for executing postgres binaries """
def __init__(self, log_dir: str, pg_distrib_dir: str):
self.log_dir = log_dir
self.pg_install_path = pg_distrib_dir
self.pg_bin_path = os.path.join(self.pg_install_path, 'bin')
self.env = os.environ.copy()
self.env['LD_LIBRARY_PATH'] = os.path.join(self.pg_install_path, 'lib')
def _fixpath(self, command: List[str]) -> None:
if '/' not in command[0]:
command[0] = os.path.join(self.pg_bin_path, command[0])
def _build_env(self, env_add: Optional[Env]) -> Env:
if env_add is None:
return self.env
env = self.env.copy()
env.update(env_add)
return env
def run(self, command: List[str], env: Optional[Env] = None, cwd: Optional[str] = None) -> None:
"""
Run one of the postgres binaries.
The command should be in list form, e.g. ['pgbench', '-p', '55432']
All the necessary environment variables will be set.
If the first argument (the command name) doesn't include a path (no '/'
characters present), then it will be edited to include the correct path.
If you want stdout/stderr captured to files, use `run_capture` instead.
"""
self._fixpath(command)
print('Running command "{}"'.format(' '.join(command)))
env = self._build_env(env)
subprocess.run(command, env=env, cwd=cwd, check=True)
def run_capture(self,
command: List[str],
env: Optional[Env] = None,
cwd: Optional[str] = None) -> None:
"""
Run one of the postgres binaries, with stderr and stdout redirected to a file.
This is just like `run`, but for chatty programs.
"""
self._fixpath(command)
print('Running command "{}"'.format(' '.join(command)))
env = self._build_env(env)
subprocess_capture(self.log_dir, command, env=env, cwd=cwd, check=True)
@zenfixture
def pg_bin(test_output_dir: str, pg_distrib_dir: str) -> PgBin:
return PgBin(test_output_dir, pg_distrib_dir)
def read_pid(path: Path):
""" Read content of file into number """
return int(path.read_text())

View File

@@ -322,7 +322,7 @@ impl<'pg> ReceiveWalConn<'pg> {
}
info!(
"Start streaming from timeline {} tenant {} address {:?} flush_lsn={}",
"Start accepting WAL for timeline {} tenant {} address {:?} flush_lsn={}",
server_info.timeline_id, server_info.tenant_id, self.peer_addr, my_info.flush_lsn
);

View File

@@ -77,9 +77,7 @@ impl ReplicationConn {
subscriber.add_hs_feedback(feedback);
}
FeMessage::Sync => {}
FeMessage::CopyFailed => {
return Err(anyhow!("Copy failed"))
}
FeMessage::CopyFailed => return Err(anyhow!("Copy failed")),
_ => {
// We only handle `CopyData`, 'Sync', 'CopyFailed' messages. Anything else is ignored.
info!("unexpected message {:?}", msg);

View File

@@ -341,7 +341,7 @@ impl PostgresBackend {
// We prefer explicit pattern matching to wildcards, because
// this helps us spot the places where new variants are missing
FeMessage::CopyData(_) | FeMessage::CopyDone | FeMessage::CopyFailed => {
FeMessage::CopyData(_) | FeMessage::CopyDone | FeMessage::CopyFailed => {
bail!("unexpected message type: {:?}", msg);
}
}