Add test for safekeeper setup without pageserver (#1000)

This commit is contained in:
Arthur Petukhovsky
2021-12-29 12:58:27 +03:00
committed by GitHub
parent a379b45257
commit 70778058d9

View File

@@ -2,6 +2,7 @@ import pytest
import random
import time
import os
import signal
import subprocess
import sys
import threading
@@ -10,10 +11,11 @@ import uuid
from contextlib import closing
from dataclasses import dataclass, field
from multiprocessing import Process, Value
from fixtures.zenith_fixtures import PgBin, ZenithEnv, ZenithEnvBuilder
from pathlib import Path
from fixtures.zenith_fixtures import PgBin, ZenithEnvBuilder, PortDistributor, SafekeeperPort, zenith_binpath, PgProtocol
from fixtures.utils import lsn_to_hex, mkdir_if_needed
from fixtures.log_helper import log
from typing import List, Optional
from typing import List, Optional, Any
pytest_plugins = ("fixtures.zenith_fixtures")
@@ -314,14 +316,23 @@ def test_race_conditions(zenith_env_builder: ZenithEnvBuilder, stop_value):
proc.join()
class ProposerPostgres:
"""Object for running safekeepers sync with walproposer"""
def __init__(self, env: ZenithEnv, pgdata_dir: str, pg_bin, timeline_id: str, tenant_id: str):
self.env = env
class ProposerPostgres(PgProtocol):
"""Object for running postgres without ZenithEnv"""
def __init__(self,
pgdata_dir: str,
pg_bin,
timeline_id: str,
tenant_id: str,
listen_addr: str,
port: int):
super().__init__(host=listen_addr, port=port)
self.pgdata_dir: str = pgdata_dir
self.pg_bin: PgBin = pg_bin
self.timeline_id: str = timeline_id
self.tenant_id: str = tenant_id
self.listen_addr: str = listen_addr
self.port: int = port
def pg_data_dir_path(self) -> str:
""" Path to data directory """
@@ -336,13 +347,18 @@ class ProposerPostgres:
mkdir_if_needed(self.pg_data_dir_path())
with open(self.config_file_path(), "w") as f:
f.writelines([
cfg = [
"synchronous_standby_names = 'walproposer'\n",
"shared_preload_libraries = 'zenith'\n",
f"zenith.zenith_timeline = '{self.timeline_id}'\n",
f"zenith.zenith_tenant = '{self.tenant_id}'\n",
f"zenith.page_server_connstring = ''\n",
f"wal_acceptors = '{wal_acceptors}'\n",
])
f"listen_addresses = '{self.listen_addr}'\n",
f"port = '{self.port}'\n",
]
f.writelines(cfg)
def sync_safekeepers(self) -> str:
"""
@@ -362,9 +378,30 @@ class ProposerPostgres:
stdout = stdout_f.read()
return stdout.strip("\n ")
def initdb(self):
""" Run initdb """
args = ["initdb", "-U", "zenith_admin", "-D", self.pg_data_dir_path()]
self.pg_bin.run(args)
def start(self):
""" Start postgres with pg_ctl """
log_path = os.path.join(self.pg_data_dir_path(), "pg.log")
args = ["pg_ctl", "-D", self.pg_data_dir_path(), "-l", log_path, "-w", "start"]
self.pg_bin.run(args)
def stop(self):
""" Stop postgres with pg_ctl """
args = ["pg_ctl", "-D", self.pg_data_dir_path(), "-m", "immediate", "-w", "stop"]
self.pg_bin.run(args)
# insert wal in all safekeepers and run sync on proposer
def test_sync_safekeepers(zenith_env_builder: ZenithEnvBuilder, pg_bin: PgBin):
def test_sync_safekeepers(zenith_env_builder: ZenithEnvBuilder,
pg_bin: PgBin,
port_distributor: PortDistributor):
# We don't really need the full environment for this test, just the
# safekeepers would be enough.
@@ -376,7 +413,12 @@ def test_sync_safekeepers(zenith_env_builder: ZenithEnvBuilder, pg_bin: PgBin):
# write config for proposer
pgdata_dir = os.path.join(env.repo_dir, "proposer_pgdata")
pg = ProposerPostgres(env, pgdata_dir, pg_bin, timeline_id, tenant_id)
pg = ProposerPostgres(pgdata_dir,
pg_bin,
timeline_id,
tenant_id,
'127.0.0.1',
port_distributor.get_port())
pg.create_dir_config(env.get_safekeeper_connstrs())
# valid lsn, which is not in the segment start, nor in zero segment
@@ -438,3 +480,126 @@ def test_timeline_status(zenith_env_builder: ZenithEnvBuilder):
epoch_after_reboot = wa_http_cli.timeline_status(tenant_id, timeline_id).acceptor_epoch
assert epoch_after_reboot > epoch
class SafekeeperEnv:
def __init__(self,
repo_dir: Path,
port_distributor: PortDistributor,
pg_bin: PgBin,
num_safekeepers: int = 1):
self.repo_dir = repo_dir
self.port_distributor = port_distributor
self.pg_bin = pg_bin
self.num_safekeepers = num_safekeepers
self.bin_safekeeper = os.path.join(str(zenith_binpath), 'safekeeper')
self.safekeepers: Optional[List[subprocess.CompletedProcess[Any]]] = None
self.postgres: Optional[ProposerPostgres] = None
self.tenant_id: Optional[str] = None
self.timeline_id: Optional[str] = None
def init(self) -> "SafekeeperEnv":
assert self.postgres is None, "postgres is already initialized"
assert self.safekeepers is None, "safekeepers are already initialized"
self.timeline_id = uuid.uuid4().hex
self.tenant_id = uuid.uuid4().hex
mkdir_if_needed(str(self.repo_dir))
# Create config and a Safekeeper object for each safekeeper
self.safekeepers = []
for i in range(1, self.num_safekeepers + 1):
self.safekeepers.append(self.start_safekeeper(i))
# Create and start postgres
self.postgres = self.create_postgres()
self.postgres.start()
return self
def start_safekeeper(self, i):
port = SafekeeperPort(
pg=self.port_distributor.get_port(),
http=self.port_distributor.get_port(),
)
if self.num_safekeepers == 1:
name = "single"
else:
name = f"sk{i}"
safekeeper_dir = os.path.join(self.repo_dir, name)
mkdir_if_needed(safekeeper_dir)
args = [
self.bin_safekeeper,
"-l",
f"127.0.0.1:{port.pg}",
"--listen-http",
f"127.0.0.1:{port.http}",
"-D",
safekeeper_dir,
"--daemonize"
]
log.info(f'Running command "{" ".join(args)}"')
return subprocess.run(args, check=True)
def get_safekeeper_connstrs(self):
return ','.join([sk_proc.args[2] for sk_proc in self.safekeepers])
def create_postgres(self):
pgdata_dir = os.path.join(self.repo_dir, "proposer_pgdata")
pg = ProposerPostgres(pgdata_dir,
self.pg_bin,
self.timeline_id,
self.tenant_id,
"127.0.0.1",
self.port_distributor.get_port())
pg.initdb()
pg.create_dir_config(self.get_safekeeper_connstrs())
return pg
def kill_safekeeper(self, sk_dir):
"""Read pid file and kill process"""
pid_file = os.path.join(sk_dir, "safekeeper.pid")
with open(pid_file, "r") as f:
pid = int(f.read())
log.info(f"Killing safekeeper with pid {pid}")
os.kill(pid, signal.SIGKILL)
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, traceback):
log.info('Cleaning up all safekeeper and compute nodes')
# Stop all the nodes
if self.postgres is not None:
self.postgres.stop()
if self.safekeepers is not None:
for sk_proc in self.safekeepers:
self.kill_safekeeper(sk_proc.args[6])
def test_safekeeper_without_pageserver(test_output_dir: str,
port_distributor: PortDistributor,
pg_bin: PgBin):
# Create the environment in the test-specific output dir
repo_dir = Path(os.path.join(test_output_dir, "repo"))
env = SafekeeperEnv(
repo_dir,
port_distributor,
pg_bin,
num_safekeepers=1,
)
with env:
env.init()
assert env.postgres is not None
env.postgres.safe_psql("create table t(i int)")
env.postgres.safe_psql("insert into t select generate_series(1, 100)")
res = env.postgres.safe_psql("select sum(i) from t")[0][0]
assert res == 5050