Add walproposer vs vanilla replication test

This commit is contained in:
Arthur Petukhovsky
2022-02-23 18:21:39 +00:00
committed by Arseny Sher
parent af712798e7
commit 7c7efe5537
4 changed files with 307 additions and 13 deletions

View File

View File

@@ -12,10 +12,10 @@ from contextlib import closing
from dataclasses import dataclass, field
from multiprocessing import Process, Value
from pathlib import Path
from fixtures.zenith_fixtures import PgBin, Postgres, Safekeeper, ZenithEnv, ZenithEnvBuilder, PortDistributor, SafekeeperPort, zenith_binpath, PgProtocol
from fixtures.zenith_fixtures import PgBin, Postgres, Safekeeper, SafekeeperHttpClient, ZenithEnv, ZenithEnvBuilder, PortDistributor, SafekeeperPort, zenith_binpath, PgProtocol
from fixtures.utils import etcd_path, lsn_to_hex, mkdir_if_needed, lsn_from_hex
from fixtures.log_helper import log
from typing import List, Optional, Any
from typing import Dict, List, Optional, Any
# basic test, write something in setup with wal acceptors, ensure that commits
@@ -396,20 +396,39 @@ class ProposerPostgres(PgProtocol):
""" Path to postgresql.conf """
return os.path.join(self.pgdata_dir, 'postgresql.conf')
def log_path(self) -> str:
""" Path to pg.log """
return os.path.join(self.pg_data_dir_path(), "pg.log")
def create_dir_config(self, wal_acceptors: str):
""" Create dir and config for running --sync-safekeepers """
mkdir_if_needed(self.pg_data_dir_path())
with open(self.config_file_path(), "w") as f:
cfg = [
"synchronous_standby_names = 'walproposer'\n",
"shared_preload_libraries = 'zenith'\n",
"wal_keep_size=10TB\n",
"shared_preload_libraries=zenith\n",
"zenith.page_server_connstring=''\n",
"synchronous_commit=on\n",
"max_wal_senders=10\n",
"wal_log_hints=on\n",
"max_replication_slots=10\n",
"hot_standby=on\n",
"min_wal_size=20GB\n",
"max_wal_size=40GB\n",
"checkpoint_timeout=60min\n",
"log_checkpoints=on\n",
"max_connections=100\n",
"wal_sender_timeout=0\n",
"wal_level=replica\n",
f"zenith.zenith_timeline = '{self.timeline_id.hex}'\n",
f"zenith.zenith_tenant = '{self.tenant_id.hex}'\n",
f"zenith.page_server_connstring = ''\n",
f"wal_acceptors = '{wal_acceptors}'\n",
f"listen_addresses = '{self.listen_addr}'\n",
f"port = '{self.port}'\n",
"synchronous_standby_names = 'walproposer'\n",
"fsync=off\n",
]
f.writelines(cfg)
@@ -441,8 +460,7 @@ class ProposerPostgres(PgProtocol):
def start(self):
""" Start postgres with pg_ctl """
log_path = os.path.join(self.pg_data_dir_path(), "pg.log")
args = ["pg_ctl", "-D", self.pg_data_dir_path(), "-l", log_path, "-w", "start"]
args = ["pg_ctl", "-D", self.pg_data_dir_path(), "-l", self.log_path(), "-w", "start"]
self.pg_bin.run(args)
def stop(self):
@@ -536,6 +554,16 @@ def test_timeline_status(zenith_env_builder: ZenithEnvBuilder):
assert epoch_after_reboot > epoch
@dataclass
class SafekeeperProc:
""" An object representing a running safekeeper daemon. """
proc: 'subprocess.CompletedProcess[bytes]'
port: SafekeeperPort
def http_client(self) -> SafekeeperHttpClient:
return SafekeeperHttpClient(port=self.port.http)
class SafekeeperEnv:
def __init__(self,
repo_dir: Path,
@@ -547,7 +575,7 @@ class SafekeeperEnv:
self.pg_bin = pg_bin
self.num_safekeepers = num_safekeepers
self.bin_safekeeper = os.path.join(str(zenith_binpath), 'safekeeper')
self.safekeepers: Optional[List[subprocess.CompletedProcess[Any]]] = None
self.safekeepers: Optional[List[SafekeeperProc]] = None
self.postgres: Optional[ProposerPostgres] = None
self.tenant_id: Optional[uuid.UUID] = None
self.timeline_id: Optional[uuid.UUID] = None
@@ -571,7 +599,7 @@ class SafekeeperEnv:
return self
def start_safekeeper(self, i):
def start_safekeeper(self, i) -> "SafekeeperProc":
port = SafekeeperPort(
pg=self.port_distributor.get_port(),
http=self.port_distributor.get_port(),
@@ -594,10 +622,12 @@ class SafekeeperEnv:
]
log.info(f'Running command "{" ".join(args)}"')
return subprocess.run(args, check=True)
def get_safekeeper_connstrs(self):
return ','.join([sk_proc.args[2] for sk_proc in self.safekeepers])
return SafekeeperProc(subprocess.run(args, check=True), port)
def get_safekeeper_connstrs(self) -> str:
assert self.safekeepers is not None
return ','.join([sk.proc.args[2] for sk in self.safekeepers])
def create_postgres(self):
pgdata_dir = os.path.join(self.repo_dir, "proposer_pgdata")
@@ -629,8 +659,8 @@ class SafekeeperEnv:
if self.postgres is not None:
self.postgres.stop()
if self.safekeepers is not None:
for sk_proc in self.safekeepers:
self.kill_safekeeper(sk_proc.args[6])
for sk in self.safekeepers:
self.kill_safekeeper(sk.proc.args[6])
def test_safekeeper_without_pageserver(test_output_dir: str,

View File

@@ -1700,6 +1700,7 @@ class SafekeeperMetrics:
# As a consequence, values may differ from real original int64s.
flush_lsn_inexact: Dict[Tuple[str, str], int] = field(default_factory=dict)
commit_lsn_inexact: Dict[Tuple[str, str], int] = field(default_factory=dict)
flush_wal_count: Dict[Tuple[str, str], int] = field(default_factory=dict)
class SafekeeperHttpClient(requests.Session):
@@ -1734,6 +1735,11 @@ class SafekeeperHttpClient(requests.Session):
all_metrics_text,
re.MULTILINE):
metrics.commit_lsn_inexact[(match.group(1), match.group(2))] = int(match.group(3))
for match in re.finditer(
r'^safekeeper_flush_wal_seconds_count{tenant_id="([0-9a-f]+)",timeline_id="([0-9a-f]+)"} (\S+)$',
all_metrics_text,
re.MULTILINE):
metrics.flush_wal_count[(match.group(1), match.group(2))] = int(match.group(3))
return metrics

View File

@@ -0,0 +1,258 @@
from pathlib import Path
import os
from time import sleep
from typing import Callable, Dict
import uuid
from fixtures.zenith_fixtures import PgBin, PgProtocol, PortDistributor
from fixtures.log_helper import log
from fixtures.benchmark_fixture import MetricReport, ZenithBenchmarker
from batch_others.test_wal_acceptor import ProposerPostgres, SafekeeperEnv
from fixtures.utils import lsn_from_hex, mkdir_if_needed
pytest_plugins = ("fixtures.zenith_fixtures", "fixtures.benchmark_fixture")
def test_walproposer_pgbench(test_output_dir: str,
port_distributor: PortDistributor,
pg_bin: PgBin,
zenbenchmark: ZenithBenchmarker):
# Create the environment in the test-specific output dir
repo_dir = Path(os.path.join(test_output_dir, "repo"))
env = SafekeeperEnv(
repo_dir,
port_distributor,
pg_bin,
num_safekeepers=1,
)
with env:
env.init()
assert env.postgres is not None
def calc_flushes() -> int:
assert env.safekeepers is not None
assert env.tenant_id is not None
assert env.timeline_id is not None
metrics = env.safekeepers[0].http_client().get_metrics()
return int(metrics.flush_wal_count[(env.tenant_id.hex, env.timeline_id.hex)])
pgbench_env = {
"PGHOST": env.postgres.listen_addr,
"PGPORT": str(env.postgres.port),
"PGDATA": env.postgres.pg_data_dir_path(),
"PGUSER": "zenith_admin",
"PGDATABASE": "postgres"
}
run_pgbench(pg_bin, zenbenchmark, pgbench_env, calc_flushes, env.postgres)
def test_vanilla_pgbench(test_output_dir: str,
port_distributor: PortDistributor,
pg_bin: PgBin,
zenbenchmark: ZenithBenchmarker):
# Create the environment in the test-specific output dir
repo_dir = os.path.join(test_output_dir, "repo")
mkdir_if_needed(repo_dir)
pgdata_master = os.path.join(repo_dir, "pgdata_master")
pgdata_replica = os.path.join(repo_dir, "pgdata_replica")
master = ProposerPostgres(pgdata_master,
pg_bin,
uuid.uuid4(),
uuid.uuid4(),
"127.0.0.1",
port_distributor.get_port())
common_config = [
"wal_keep_size=10TB\n",
"shared_preload_libraries=zenith\n",
"zenith.page_server_connstring=''\n",
"synchronous_commit=on\n",
"max_wal_senders=10\n",
"wal_log_hints=on\n",
"max_replication_slots=10\n",
"hot_standby=on\n",
"min_wal_size=20GB\n",
"max_wal_size=40GB\n",
"checkpoint_timeout=60min\n",
"log_checkpoints=on\n",
"max_connections=100\n",
"wal_sender_timeout=0\n",
"wal_level=replica\n",
]
master.initdb()
mkdir_if_needed(master.pg_data_dir_path())
with open(master.config_file_path(), "w") as f:
cfg = [
"fsync=off\n",
f"listen_addresses = '{master.listen_addr}'\n",
f"port = '{master.port}'\n",
"synchronous_standby_names = 'ANY 1 (s1)'\n",
] + common_config
f.writelines(cfg)
with open(os.path.join(pgdata_master, "pg_hba.conf"), "w") as f:
pg_hba = """
host all all 0.0.0.0/0 trust
host all all ::/0 trust
local all all trust
host all all 127.0.0.1/32 trust
host all all ::1/128 trust
host replication all 0.0.0.0/0 trust
host replication all ::/0 trust
# Allow replication connections from localhost, by a user with the
# replication privilege.
local replication all trust
host replication all 127.0.0.1/32 trust
host replication all ::1/128 trust
"""
f.writelines(pg_hba)
master_env = {
"PGHOST": master.listen_addr,
"PGPORT": str(master.port),
"PGDATA": master.pg_data_dir_path(),
"PGUSER": "zenith_admin",
"PGDATABASE": "postgres"
}
master.start()
master.safe_psql("SELECT pg_create_physical_replication_slot('s1');")
pg_bin.run(["pg_basebackup", "-D", pgdata_replica], env=master_env)
replica = ProposerPostgres(pgdata_replica,
pg_bin,
uuid.uuid4(),
uuid.uuid4(),
"127.0.0.1",
port_distributor.get_port())
with open(replica.config_file_path(), "w") as f:
cfg = [
"fsync=on\n",
f"listen_addresses = '{replica.listen_addr}'\n",
f"port = '{replica.port}'\n",
"primary_slot_name = 's1'\n",
f"primary_conninfo = 'application_name=s1 user=zenith_admin host={master.listen_addr} channel_binding=disable port={master.port} sslmode=disable sslcompression=0 sslsni=1 ssl_min_protocol_version=TLSv1.2 gssencmode=disable krbsrvname=postgres target_session_attrs=any'\n",
] + common_config
f.writelines(cfg)
with open(os.path.join(pgdata_replica, "standby.signal"), "w") as f:
pass
replica.start()
def calc_flushes() -> int:
return int(replica.safe_psql("SELECT wal_sync FROM pg_stat_wal")[0][0])
run_pgbench(pg_bin, zenbenchmark, master_env, calc_flushes, master)
replica.stop()
master.stop()
def run_pgbench(pg_bin: PgBin,
zenbenchmark: ZenithBenchmarker,
pgbench_env: Dict[str, str],
calc_flushes: Callable[[], int],
postgres: PgProtocol,
scale=200,
conns_count=32,
pgbench_time=60):
step0_lsn = lsn_from_hex(postgres.safe_psql('select pg_current_wal_insert_lsn()')[0][0])
step0_flush_cnt = calc_flushes()
log.info(f"step0_lsn: {step0_lsn}")
log.info(f"step0_flush_cnt: {step0_flush_cnt}")
cmd = ["pgbench", "-i", "-s", str(scale)]
basepath = pg_bin.run_capture(cmd, pgbench_env)
pgbench_init = basepath + '.stderr'
with open(pgbench_init, 'r') as stdout_f:
stdout = stdout_f.readlines()
stats_str = stdout[-1]
log.info(stats_str)
init_seconds = float(stats_str.split()[2])
zenbenchmark.record("pgbench_init", init_seconds, unit="s", report=MetricReport.LOWER_IS_BETTER)
wal_init_size = get_dir_size(os.path.join(pgbench_env["PGDATA"], 'pg_wal'))
zenbenchmark.record('wal_init_size',
wal_init_size / (1024 * 1024),
'MB',
report=MetricReport.LOWER_IS_BETTER)
step1_lsn = lsn_from_hex(postgres.safe_psql('select pg_current_wal_insert_lsn()')[0][0])
step1_flush_cnt = calc_flushes()
log.info(f"step1_lsn: {step1_lsn}")
log.info(f"step1_flush_cnt: {step1_flush_cnt}")
zenbenchmark.record("init_wal_bytes_per_fsync",
(step1_lsn - step0_lsn) / (step1_flush_cnt - step0_flush_cnt),
unit="b",
report=MetricReport.HIGHER_IS_BETTER)
cmd = [
"pgbench",
"-c",
str(conns_count),
"-N",
"-P",
"1",
"-T",
str(pgbench_time),
]
basepath = pg_bin.run_capture(cmd, pgbench_env)
pgbench_run = basepath + '.stdout'
with open(pgbench_run, 'r') as stdout_f:
stdout = stdout_f.readlines()
for line in stdout:
if "number of transactions actually processed:" in line:
transactions_processed = int(line.split()[-1])
stats_str = stdout[-1]
log.info(stats_str)
pgbench_tps = float(stats_str.split()[2])
step2_lsn = lsn_from_hex(postgres.safe_psql('select pg_current_wal_insert_lsn()')[0][0])
step2_flush_cnt = calc_flushes()
log.info(f"step2_lsn: {step2_lsn}")
log.info(f"step2_flush_cnt: {step2_flush_cnt}")
zenbenchmark.record("tps_pgbench", pgbench_tps, unit="", report=MetricReport.HIGHER_IS_BETTER)
zenbenchmark.record("tx_wal_bytes_per_fsync",
(step2_lsn - step1_lsn) / (step2_flush_cnt - step1_flush_cnt),
unit="b",
report=MetricReport.HIGHER_IS_BETTER)
zenbenchmark.record("txes_per_fsync",
transactions_processed / (step2_flush_cnt - step1_flush_cnt),
unit="",
report=MetricReport.HIGHER_IS_BETTER)
wal_size = get_dir_size(os.path.join(pgbench_env["PGDATA"], 'pg_wal'))
zenbenchmark.record('wal_size',
wal_size / (1024 * 1024),
'MB',
report=MetricReport.LOWER_IS_BETTER)
def get_dir_size(path: str) -> int:
"""Return size in bytes."""
totalbytes = 0
for root, dirs, files in os.walk(path):
for name in files:
totalbytes += os.path.getsize(os.path.join(root, name))
return totalbytes