From 70778058d95ab345b034c479f7616b0c19e3b909 Mon Sep 17 00:00:00 2001 From: Arthur Petukhovsky Date: Wed, 29 Dec 2021 12:58:27 +0300 Subject: [PATCH] Add test for safekeeper setup without pageserver (#1000) --- test_runner/batch_others/test_wal_acceptor.py | 185 +++++++++++++++++- 1 file changed, 175 insertions(+), 10 deletions(-) diff --git a/test_runner/batch_others/test_wal_acceptor.py b/test_runner/batch_others/test_wal_acceptor.py index e5351912fa..3822036521 100644 --- a/test_runner/batch_others/test_wal_acceptor.py +++ b/test_runner/batch_others/test_wal_acceptor.py @@ -2,6 +2,7 @@ import pytest import random import time import os +import signal import subprocess import sys import threading @@ -10,10 +11,11 @@ import uuid from contextlib import closing from dataclasses import dataclass, field from multiprocessing import Process, Value -from fixtures.zenith_fixtures import PgBin, ZenithEnv, ZenithEnvBuilder +from pathlib import Path +from fixtures.zenith_fixtures import PgBin, ZenithEnvBuilder, PortDistributor, SafekeeperPort, zenith_binpath, PgProtocol from fixtures.utils import lsn_to_hex, mkdir_if_needed from fixtures.log_helper import log -from typing import List, Optional +from typing import List, Optional, Any pytest_plugins = ("fixtures.zenith_fixtures") @@ -314,14 +316,23 @@ def test_race_conditions(zenith_env_builder: ZenithEnvBuilder, stop_value): proc.join() -class ProposerPostgres: - """Object for running safekeepers sync with walproposer""" - def __init__(self, env: ZenithEnv, pgdata_dir: str, pg_bin, timeline_id: str, tenant_id: str): - self.env = env +class ProposerPostgres(PgProtocol): + """Object for running postgres without ZenithEnv""" + def __init__(self, + pgdata_dir: str, + pg_bin, + timeline_id: str, + tenant_id: str, + listen_addr: str, + port: int): + super().__init__(host=listen_addr, port=port) + self.pgdata_dir: str = pgdata_dir self.pg_bin: PgBin = pg_bin self.timeline_id: str = timeline_id self.tenant_id: str = tenant_id + self.listen_addr: str = listen_addr + self.port: int = port def pg_data_dir_path(self) -> str: """ Path to data directory """ @@ -336,13 +347,18 @@ class ProposerPostgres: mkdir_if_needed(self.pg_data_dir_path()) with open(self.config_file_path(), "w") as f: - f.writelines([ + cfg = [ "synchronous_standby_names = 'walproposer'\n", + "shared_preload_libraries = 'zenith'\n", f"zenith.zenith_timeline = '{self.timeline_id}'\n", f"zenith.zenith_tenant = '{self.tenant_id}'\n", f"zenith.page_server_connstring = ''\n", f"wal_acceptors = '{wal_acceptors}'\n", - ]) + f"listen_addresses = '{self.listen_addr}'\n", + f"port = '{self.port}'\n", + ] + + f.writelines(cfg) def sync_safekeepers(self) -> str: """ @@ -362,9 +378,30 @@ class ProposerPostgres: stdout = stdout_f.read() return stdout.strip("\n ") + def initdb(self): + """ Run initdb """ + + args = ["initdb", "-U", "zenith_admin", "-D", self.pg_data_dir_path()] + self.pg_bin.run(args) + + def start(self): + """ Start postgres with pg_ctl """ + + log_path = os.path.join(self.pg_data_dir_path(), "pg.log") + args = ["pg_ctl", "-D", self.pg_data_dir_path(), "-l", log_path, "-w", "start"] + self.pg_bin.run(args) + + def stop(self): + """ Stop postgres with pg_ctl """ + + args = ["pg_ctl", "-D", self.pg_data_dir_path(), "-m", "immediate", "-w", "stop"] + self.pg_bin.run(args) + # insert wal in all safekeepers and run sync on proposer -def test_sync_safekeepers(zenith_env_builder: ZenithEnvBuilder, pg_bin: PgBin): +def test_sync_safekeepers(zenith_env_builder: ZenithEnvBuilder, + pg_bin: PgBin, + port_distributor: PortDistributor): # We don't really need the full environment for this test, just the # safekeepers would be enough. @@ -376,7 +413,12 @@ def test_sync_safekeepers(zenith_env_builder: ZenithEnvBuilder, pg_bin: PgBin): # write config for proposer pgdata_dir = os.path.join(env.repo_dir, "proposer_pgdata") - pg = ProposerPostgres(env, pgdata_dir, pg_bin, timeline_id, tenant_id) + pg = ProposerPostgres(pgdata_dir, + pg_bin, + timeline_id, + tenant_id, + '127.0.0.1', + port_distributor.get_port()) pg.create_dir_config(env.get_safekeeper_connstrs()) # valid lsn, which is not in the segment start, nor in zero segment @@ -438,3 +480,126 @@ def test_timeline_status(zenith_env_builder: ZenithEnvBuilder): epoch_after_reboot = wa_http_cli.timeline_status(tenant_id, timeline_id).acceptor_epoch assert epoch_after_reboot > epoch + + +class SafekeeperEnv: + def __init__(self, + repo_dir: Path, + port_distributor: PortDistributor, + pg_bin: PgBin, + num_safekeepers: int = 1): + self.repo_dir = repo_dir + self.port_distributor = port_distributor + self.pg_bin = pg_bin + self.num_safekeepers = num_safekeepers + self.bin_safekeeper = os.path.join(str(zenith_binpath), 'safekeeper') + self.safekeepers: Optional[List[subprocess.CompletedProcess[Any]]] = None + self.postgres: Optional[ProposerPostgres] = None + self.tenant_id: Optional[str] = None + self.timeline_id: Optional[str] = None + + def init(self) -> "SafekeeperEnv": + assert self.postgres is None, "postgres is already initialized" + assert self.safekeepers is None, "safekeepers are already initialized" + + self.timeline_id = uuid.uuid4().hex + self.tenant_id = uuid.uuid4().hex + mkdir_if_needed(str(self.repo_dir)) + + # Create config and a Safekeeper object for each safekeeper + self.safekeepers = [] + for i in range(1, self.num_safekeepers + 1): + self.safekeepers.append(self.start_safekeeper(i)) + + # Create and start postgres + self.postgres = self.create_postgres() + self.postgres.start() + + return self + + def start_safekeeper(self, i): + port = SafekeeperPort( + pg=self.port_distributor.get_port(), + http=self.port_distributor.get_port(), + ) + + if self.num_safekeepers == 1: + name = "single" + else: + name = f"sk{i}" + + safekeeper_dir = os.path.join(self.repo_dir, name) + mkdir_if_needed(safekeeper_dir) + + args = [ + self.bin_safekeeper, + "-l", + f"127.0.0.1:{port.pg}", + "--listen-http", + f"127.0.0.1:{port.http}", + "-D", + safekeeper_dir, + "--daemonize" + ] + + log.info(f'Running command "{" ".join(args)}"') + return subprocess.run(args, check=True) + + def get_safekeeper_connstrs(self): + return ','.join([sk_proc.args[2] for sk_proc in self.safekeepers]) + + def create_postgres(self): + pgdata_dir = os.path.join(self.repo_dir, "proposer_pgdata") + pg = ProposerPostgres(pgdata_dir, + self.pg_bin, + self.timeline_id, + self.tenant_id, + "127.0.0.1", + self.port_distributor.get_port()) + pg.initdb() + pg.create_dir_config(self.get_safekeeper_connstrs()) + return pg + + def kill_safekeeper(self, sk_dir): + """Read pid file and kill process""" + pid_file = os.path.join(sk_dir, "safekeeper.pid") + with open(pid_file, "r") as f: + pid = int(f.read()) + log.info(f"Killing safekeeper with pid {pid}") + os.kill(pid, signal.SIGKILL) + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_value, traceback): + log.info('Cleaning up all safekeeper and compute nodes') + + # Stop all the nodes + if self.postgres is not None: + self.postgres.stop() + if self.safekeepers is not None: + for sk_proc in self.safekeepers: + self.kill_safekeeper(sk_proc.args[6]) + + +def test_safekeeper_without_pageserver(test_output_dir: str, + port_distributor: PortDistributor, + pg_bin: PgBin): + # Create the environment in the test-specific output dir + repo_dir = Path(os.path.join(test_output_dir, "repo")) + + env = SafekeeperEnv( + repo_dir, + port_distributor, + pg_bin, + num_safekeepers=1, + ) + + with env: + env.init() + assert env.postgres is not None + + env.postgres.safe_psql("create table t(i int)") + env.postgres.safe_psql("insert into t select generate_series(1, 100)") + res = env.postgres.safe_psql("select sum(i) from t")[0][0] + assert res == 5050