mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-07 13:32:57 +00:00
try to be more intelligent in WalAcceptor.start, added a bunch of typing sugar to wal acceptor fixtures
This commit is contained in:
@@ -4,7 +4,7 @@ import time
|
||||
|
||||
from contextlib import closing
|
||||
from multiprocessing import Process, Value
|
||||
from fixtures.zenith_fixtures import ZenithPageserver, PostgresFactory
|
||||
from fixtures.zenith_fixtures import WalAcceptorFactory, ZenithPageserver, PostgresFactory
|
||||
|
||||
pytest_plugins = ("fixtures.zenith_fixtures")
|
||||
|
||||
@@ -61,7 +61,7 @@ def test_many_timelines(zenith_cli, pageserver: ZenithPageserver, postgres: Post
|
||||
# Check that dead minority doesn't prevent the commits: execute insert n_inserts
|
||||
# times, with fault_probability chance of getting a wal acceptor down or up
|
||||
# along the way. 2 of 3 are always alive, so the work keeps going.
|
||||
def test_restarts(zenith_cli, pageserver: ZenithPageserver, postgres: PostgresFactory, wa_factory):
|
||||
def test_restarts(zenith_cli, pageserver: ZenithPageserver, postgres: PostgresFactory, wa_factory: WalAcceptorFactory):
|
||||
fault_probability = 0.01
|
||||
n_inserts = 1000
|
||||
n_acceptors = 3
|
||||
|
||||
@@ -1,4 +1,3 @@
|
||||
import getpass
|
||||
import os
|
||||
import pathlib
|
||||
import uuid
|
||||
@@ -7,9 +6,11 @@ import pytest
|
||||
import shutil
|
||||
import signal
|
||||
import subprocess
|
||||
import time
|
||||
|
||||
from contextlib import closing
|
||||
from pathlib import Path
|
||||
from dataclasses import dataclass
|
||||
|
||||
# Type-related stuff
|
||||
from psycopg2.extensions import connection as PgConnection
|
||||
@@ -511,26 +512,27 @@ def pg_bin(test_output_dir: str, pg_distrib_dir: str) -> PgBin:
|
||||
return PgBin(test_output_dir, pg_distrib_dir)
|
||||
|
||||
|
||||
def read_pid(path):
|
||||
def read_pid(path: Path):
|
||||
""" Read content of file into number """
|
||||
return int(Path(path).read_text())
|
||||
return int(path.read_text())
|
||||
|
||||
|
||||
@dataclass
|
||||
class WalAcceptor:
|
||||
""" An object representing a running wal acceptor daemon. """
|
||||
def __init__(self, wa_binpath, data_dir, port, num, auth_token: Optional[str] = None):
|
||||
self.wa_binpath = wa_binpath
|
||||
self.data_dir = data_dir
|
||||
self.port = port
|
||||
self.num = num # identifier for logging
|
||||
self.auth_token = auth_token
|
||||
bin_path: Path
|
||||
data_dir: Path
|
||||
port: int
|
||||
num: int # identifier for logging
|
||||
auth_token: Optional[str] = None
|
||||
|
||||
def start(self) -> 'WalAcceptor':
|
||||
# create data directory if not exists
|
||||
Path(self.data_dir).mkdir(parents=True, exist_ok=True)
|
||||
self.data_dir.mkdir(parents=True, exist_ok=True)
|
||||
self.pidfile.unlink(missing_ok=True)
|
||||
|
||||
cmd = [self.wa_binpath]
|
||||
cmd.extend(["-D", self.data_dir])
|
||||
cmd = [str(self.bin_path)]
|
||||
cmd.extend(["-D", str(self.data_dir)])
|
||||
cmd.extend(["-l", "localhost:{}".format(self.port)])
|
||||
cmd.append("--daemonize")
|
||||
cmd.append("--no-sync")
|
||||
@@ -541,38 +543,51 @@ class WalAcceptor:
|
||||
env = {'PAGESERVER_AUTH_TOKEN': self.auth_token} if self.auth_token else None
|
||||
subprocess.run(cmd, check=True, env=env)
|
||||
|
||||
return self
|
||||
# wait for wal acceptor start by checkking that pid is readable
|
||||
for _ in range(3):
|
||||
pid = self.get_pid()
|
||||
if pid is not None:
|
||||
return self
|
||||
time.sleep(0.5)
|
||||
|
||||
raise RuntimeError("cannot get wal acceptor pid")
|
||||
|
||||
@property
|
||||
def pidfile(self) -> Path:
|
||||
return self.data_dir / "wal_acceptor.pid"
|
||||
|
||||
def get_pid(self) -> Optional[int]:
|
||||
if not self.pidfile.exists():
|
||||
return None
|
||||
|
||||
try:
|
||||
pid = read_pid(self.pidfile)
|
||||
except ValueError:
|
||||
return None
|
||||
|
||||
return pid
|
||||
|
||||
def stop(self) -> 'WalAcceptor':
|
||||
print('Stopping wal acceptor {}'.format(self.num))
|
||||
pidfile_path = os.path.join(self.data_dir, "wal_acceptor.pid")
|
||||
try:
|
||||
pid = read_pid(pidfile_path)
|
||||
try:
|
||||
os.kill(pid, signal.SIGTERM)
|
||||
except Exception:
|
||||
pass # pidfile might be obsolete
|
||||
# TODO: cleanup pid file on exit in wal acceptor
|
||||
return self
|
||||
# for _ in range(5):
|
||||
# print('waiting wal acceptor {} (pid {}) to stop...', self.num, pid)
|
||||
# try:
|
||||
# read_pid(pidfile_path)
|
||||
# except FileNotFoundError:
|
||||
# return # done
|
||||
# time.sleep(1)
|
||||
# raise Exception('Failed to wait for wal acceptor {} shutdown'.format(self.num))
|
||||
except FileNotFoundError:
|
||||
pid = self.get_pid()
|
||||
if pid is None:
|
||||
print("Wal acceptor {} is not running".format(self.num))
|
||||
return self
|
||||
|
||||
try:
|
||||
os.kill(pid, signal.SIGTERM)
|
||||
except Exception:
|
||||
# TODO: cleanup pid file on exit in wal acceptor
|
||||
pass # pidfile might be obsolete
|
||||
return self
|
||||
|
||||
|
||||
class WalAcceptorFactory:
|
||||
""" An object representing multiple running wal acceptors. """
|
||||
def __init__(self, zenith_binpath, data_dir):
|
||||
self.wa_binpath = os.path.join(zenith_binpath, 'wal_acceptor')
|
||||
def __init__(self, zenith_binpath: Path, data_dir: Path):
|
||||
self.wa_binpath = zenith_binpath / 'wal_acceptor'
|
||||
self.data_dir = data_dir
|
||||
self.instances = []
|
||||
self.instances: List[WalAcceptor] = []
|
||||
self.initial_port = 54321
|
||||
|
||||
def start_new(self, auth_token: Optional[str] = None) -> WalAcceptor:
|
||||
@@ -583,7 +598,7 @@ class WalAcceptorFactory:
|
||||
wa_num = len(self.instances)
|
||||
wa = WalAcceptor(
|
||||
self.wa_binpath,
|
||||
os.path.join(self.data_dir, "wal_acceptor_{}".format(wa_num)),
|
||||
self.data_dir / "wal_acceptor_{}".format(wa_num),
|
||||
self.initial_port + wa_num,
|
||||
wa_num,
|
||||
auth_token,
|
||||
@@ -613,7 +628,7 @@ class WalAcceptorFactory:
|
||||
@zenfixture
|
||||
def wa_factory(zenith_binpath: str, repo_dir: str) -> Iterator[WalAcceptorFactory]:
|
||||
""" Gives WalAcceptorFactory providing wal acceptors. """
|
||||
wafactory = WalAcceptorFactory(zenith_binpath, os.path.join(repo_dir, "wal_acceptors"))
|
||||
wafactory = WalAcceptorFactory(Path(zenith_binpath), Path(repo_dir) / "wal_acceptors")
|
||||
yield wafactory
|
||||
# After the yield comes any cleanup code we need.
|
||||
print('Starting wal acceptors cleanup')
|
||||
|
||||
Reference in New Issue
Block a user