mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-05 20:42:54 +00:00
* change zenith-perf-data checkout ref to be main * set cluster id through secrets so there is no code changes required when we wipe out clusters on staging * display full pgbench output on error
127 lines
4.1 KiB
Python
127 lines
4.1 KiB
Python
import dataclasses
|
|
import os
|
|
import subprocess
|
|
from typing import List
|
|
from fixtures.benchmark_fixture import PgBenchRunResult, ZenithBenchmarker
|
|
import pytest
|
|
from datetime import datetime
|
|
import calendar
|
|
import timeit
|
|
import os
|
|
|
|
pytest_plugins = ("fixtures.benchmark_fixture", )
|
|
|
|
|
|
def utc_now_timestamp() -> int:
|
|
return calendar.timegm(datetime.utcnow().utctimetuple())
|
|
|
|
|
|
@dataclasses.dataclass
|
|
class PgBenchRunner:
|
|
connstr: str
|
|
scale: int
|
|
transactions: int
|
|
pgbench_bin_path: str = "pgbench"
|
|
|
|
def invoke(self, args: List[str]) -> 'subprocess.CompletedProcess[str]':
|
|
res = subprocess.run([self.pgbench_bin_path, *args], text=True, capture_output=True)
|
|
|
|
if res.returncode != 0:
|
|
raise RuntimeError(f"pgbench failed. stdout: {res.stdout} stderr: {res.stderr}")
|
|
return res
|
|
|
|
def init(self, vacuum: bool = True) -> 'subprocess.CompletedProcess[str]':
|
|
args = []
|
|
if not vacuum:
|
|
args.append("--no-vacuum")
|
|
args.extend([f"--scale={self.scale}", "--initialize", self.connstr])
|
|
return self.invoke(args)
|
|
|
|
def run(self, jobs: int = 1, clients: int = 1):
|
|
return self.invoke([
|
|
f"--transactions={self.transactions}",
|
|
f"--jobs={jobs}",
|
|
f"--client={clients}",
|
|
"--progress=2", # print progress every two seconds
|
|
self.connstr,
|
|
])
|
|
|
|
|
|
@pytest.fixture
|
|
def connstr():
|
|
res = os.getenv("BENCHMARK_CONNSTR")
|
|
if res is None:
|
|
raise ValueError("no connstr provided, use BENCHMARK_CONNSTR environment variable")
|
|
return res
|
|
|
|
|
|
def get_transactions_matrix():
|
|
transactions = os.getenv("TEST_PG_BENCH_TRANSACTIONS_MATRIX")
|
|
if transactions is None:
|
|
return [10**4, 10**5]
|
|
return list(map(int, transactions.split(",")))
|
|
|
|
|
|
def get_scales_matrix():
|
|
scales = os.getenv("TEST_PG_BENCH_SCALES_MATRIX")
|
|
if scales is None:
|
|
return [10, 20]
|
|
return list(map(int, scales.split(",")))
|
|
|
|
|
|
@pytest.mark.parametrize("scale", get_scales_matrix())
|
|
@pytest.mark.parametrize("transactions", get_transactions_matrix())
|
|
@pytest.mark.remote_cluster
|
|
def test_pg_bench_remote_cluster(zenbenchmark: ZenithBenchmarker,
|
|
connstr: str,
|
|
scale: int,
|
|
transactions: int):
|
|
"""
|
|
The best way is to run same pack of tests both, for local zenith
|
|
and against staging, but currently local tests heavily depend on
|
|
things available only locally e.g. zenith binaries, pageserver api, etc.
|
|
Also separate test allows to run pgbench workload against vanilla postgres
|
|
or other systems that support postgres protocol.
|
|
|
|
Also now this is more of a liveness test because it stresses pageserver internals,
|
|
so we clearly see what goes wrong in more "real" environment.
|
|
"""
|
|
pg_bin = os.getenv("PG_BIN")
|
|
if pg_bin is not None:
|
|
pgbench_bin_path = os.path.join(pg_bin, "pgbench")
|
|
else:
|
|
pgbench_bin_path = "pgbench"
|
|
|
|
runner = PgBenchRunner(
|
|
connstr=connstr,
|
|
scale=scale,
|
|
transactions=transactions,
|
|
pgbench_bin_path=pgbench_bin_path,
|
|
)
|
|
# calculate timestamps and durations separately
|
|
# timestamp is intended to be used for linking to grafana and logs
|
|
# duration is actually a metric and uses float instead of int for timestamp
|
|
init_start_timestamp = utc_now_timestamp()
|
|
t0 = timeit.default_timer()
|
|
runner.init()
|
|
init_duration = timeit.default_timer() - t0
|
|
init_end_timestamp = utc_now_timestamp()
|
|
|
|
run_start_timestamp = utc_now_timestamp()
|
|
t0 = timeit.default_timer()
|
|
out = runner.run() # TODO handle failures
|
|
run_duration = timeit.default_timer() - t0
|
|
run_end_timestamp = utc_now_timestamp()
|
|
|
|
res = PgBenchRunResult.parse_from_output(
|
|
out=out,
|
|
init_duration=init_duration,
|
|
init_start_timestamp=init_start_timestamp,
|
|
init_end_timestamp=init_end_timestamp,
|
|
run_duration=run_duration,
|
|
run_start_timestamp=run_start_timestamp,
|
|
run_end_timestamp=run_end_timestamp,
|
|
)
|
|
|
|
zenbenchmark.record_pg_bench_result(res)
|