diff --git a/test_runner/batch_others/test_wal_acceptor.py b/test_runner/batch_others/test_wal_acceptor.py index 371a740f0f..fb45aa18b9 100644 --- a/test_runner/batch_others/test_wal_acceptor.py +++ b/test_runner/batch_others/test_wal_acceptor.py @@ -4,7 +4,7 @@ import time from contextlib import closing from multiprocessing import Process, Value -from fixtures.zenith_fixtures import ZenithPageserver, PostgresFactory +from fixtures.zenith_fixtures import WalAcceptorFactory, ZenithPageserver, PostgresFactory pytest_plugins = ("fixtures.zenith_fixtures") @@ -61,7 +61,7 @@ def test_many_timelines(zenith_cli, pageserver: ZenithPageserver, postgres: Post # Check that dead minority doesn't prevent the commits: execute insert n_inserts # times, with fault_probability chance of getting a wal acceptor down or up # along the way. 2 of 3 are always alive, so the work keeps going. -def test_restarts(zenith_cli, pageserver: ZenithPageserver, postgres: PostgresFactory, wa_factory): +def test_restarts(zenith_cli, pageserver: ZenithPageserver, postgres: PostgresFactory, wa_factory: WalAcceptorFactory): fault_probability = 0.01 n_inserts = 1000 n_acceptors = 3 diff --git a/test_runner/fixtures/zenith_fixtures.py b/test_runner/fixtures/zenith_fixtures.py index e76debcea1..f4813d2230 100644 --- a/test_runner/fixtures/zenith_fixtures.py +++ b/test_runner/fixtures/zenith_fixtures.py @@ -1,4 +1,3 @@ -import getpass import os import pathlib import uuid @@ -7,9 +6,11 @@ import pytest import shutil import signal import subprocess +import time from contextlib import closing from pathlib import Path +from dataclasses import dataclass # Type-related stuff from psycopg2.extensions import connection as PgConnection @@ -511,26 +512,27 @@ def pg_bin(test_output_dir: str, pg_distrib_dir: str) -> PgBin: return PgBin(test_output_dir, pg_distrib_dir) -def read_pid(path): +def read_pid(path: Path): """ Read content of file into number """ - return int(Path(path).read_text()) + return int(path.read_text()) +@dataclass class WalAcceptor: """ An object representing a running wal acceptor daemon. """ - def __init__(self, wa_binpath, data_dir, port, num, auth_token: Optional[str] = None): - self.wa_binpath = wa_binpath - self.data_dir = data_dir - self.port = port - self.num = num # identifier for logging - self.auth_token = auth_token + bin_path: Path + data_dir: Path + port: int + num: int # identifier for logging + auth_token: Optional[str] = None def start(self) -> 'WalAcceptor': # create data directory if not exists - Path(self.data_dir).mkdir(parents=True, exist_ok=True) + self.data_dir.mkdir(parents=True, exist_ok=True) + self.pidfile.unlink(missing_ok=True) - cmd = [self.wa_binpath] - cmd.extend(["-D", self.data_dir]) + cmd = [str(self.bin_path)] + cmd.extend(["-D", str(self.data_dir)]) cmd.extend(["-l", "localhost:{}".format(self.port)]) cmd.append("--daemonize") cmd.append("--no-sync") @@ -541,38 +543,51 @@ class WalAcceptor: env = {'PAGESERVER_AUTH_TOKEN': self.auth_token} if self.auth_token else None subprocess.run(cmd, check=True, env=env) - return self + # wait for wal acceptor start by checkking that pid is readable + for _ in range(3): + pid = self.get_pid() + if pid is not None: + return self + time.sleep(0.5) + + raise RuntimeError("cannot get wal acceptor pid") + + @property + def pidfile(self) -> Path: + return self.data_dir / "wal_acceptor.pid" + + def get_pid(self) -> Optional[int]: + if not self.pidfile.exists(): + return None + + try: + pid = read_pid(self.pidfile) + except ValueError: + return None + + return pid def stop(self) -> 'WalAcceptor': print('Stopping wal acceptor {}'.format(self.num)) - pidfile_path = os.path.join(self.data_dir, "wal_acceptor.pid") - try: - pid = read_pid(pidfile_path) - try: - os.kill(pid, signal.SIGTERM) - except Exception: - pass # pidfile might be obsolete - # TODO: cleanup pid file on exit in wal acceptor - return self - # for _ in range(5): - # print('waiting wal acceptor {} (pid {}) to stop...', self.num, pid) - # try: - # read_pid(pidfile_path) - # except FileNotFoundError: - # return # done - # time.sleep(1) - # raise Exception('Failed to wait for wal acceptor {} shutdown'.format(self.num)) - except FileNotFoundError: + pid = self.get_pid() + if pid is None: print("Wal acceptor {} is not running".format(self.num)) return self + try: + os.kill(pid, signal.SIGTERM) + except Exception: + # TODO: cleanup pid file on exit in wal acceptor + pass # pidfile might be obsolete + return self + class WalAcceptorFactory: """ An object representing multiple running wal acceptors. """ - def __init__(self, zenith_binpath, data_dir): - self.wa_binpath = os.path.join(zenith_binpath, 'wal_acceptor') + def __init__(self, zenith_binpath: Path, data_dir: Path): + self.wa_binpath = zenith_binpath / 'wal_acceptor' self.data_dir = data_dir - self.instances = [] + self.instances: List[WalAcceptor] = [] self.initial_port = 54321 def start_new(self, auth_token: Optional[str] = None) -> WalAcceptor: @@ -583,7 +598,7 @@ class WalAcceptorFactory: wa_num = len(self.instances) wa = WalAcceptor( self.wa_binpath, - os.path.join(self.data_dir, "wal_acceptor_{}".format(wa_num)), + self.data_dir / "wal_acceptor_{}".format(wa_num), self.initial_port + wa_num, wa_num, auth_token, @@ -613,7 +628,7 @@ class WalAcceptorFactory: @zenfixture def wa_factory(zenith_binpath: str, repo_dir: str) -> Iterator[WalAcceptorFactory]: """ Gives WalAcceptorFactory providing wal acceptors. """ - wafactory = WalAcceptorFactory(zenith_binpath, os.path.join(repo_dir, "wal_acceptors")) + wafactory = WalAcceptorFactory(Path(zenith_binpath), Path(repo_dir) / "wal_acceptors") yield wafactory # After the yield comes any cleanup code we need. print('Starting wal acceptors cleanup')