mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-14 00:42:54 +00:00
tests are based on self-hosted runner which is physically close to our staging deployment in aws, currently tests consist of various configurations of pgbenchi runs. Also these changes rework benchmark fixture by removing globals and allowing to collect reports with desired metrics and dump them to json for further analysis. This is also applicable to usual performance tests which use local zenith binaries.
126 lines
4.0 KiB
Python
126 lines
4.0 KiB
Python
import dataclasses
|
|
import os
|
|
import subprocess
|
|
from typing import List
|
|
from fixtures.benchmark_fixture import PgBenchRunResult, ZenithBenchmarker
|
|
import pytest
|
|
from datetime import datetime
|
|
import calendar
|
|
import timeit
|
|
import os
|
|
|
|
pytest_plugins = ("fixtures.benchmark_fixture", )
|
|
|
|
|
|
def utc_now_timestamp() -> int:
|
|
return calendar.timegm(datetime.utcnow().utctimetuple())
|
|
|
|
|
|
@dataclasses.dataclass
|
|
class PgBenchRunner:
|
|
connstr: str
|
|
scale: int
|
|
transactions: int
|
|
pgbench_bin_path: str = "pgbench"
|
|
|
|
def invoke(self, args: List[str]) -> 'subprocess.CompletedProcess[str]':
|
|
return subprocess.run([self.pgbench_bin_path, *args],
|
|
check=True,
|
|
text=True,
|
|
capture_output=True)
|
|
|
|
def init(self, vacuum: bool = True) -> 'subprocess.CompletedProcess[str]':
|
|
args = []
|
|
if not vacuum:
|
|
args.append("--no-vacuum")
|
|
args.extend([f"--scale={self.scale}", "--initialize", self.connstr])
|
|
return self.invoke(args)
|
|
|
|
def run(self, jobs: int = 1, clients: int = 1):
|
|
return self.invoke([
|
|
f"--transactions={self.transactions}",
|
|
f"--jobs={jobs}",
|
|
f"--client={clients}",
|
|
"--progress=2", # print progress every two seconds
|
|
self.connstr,
|
|
])
|
|
|
|
|
|
@pytest.fixture
|
|
def connstr():
|
|
res = os.getenv("BENCHMARK_CONNSTR")
|
|
if res is None:
|
|
raise ValueError("no connstr provided, use BENCHMARK_CONNSTR environment variable")
|
|
return res
|
|
|
|
|
|
def get_transactions_matrix():
|
|
transactions = os.getenv("TEST_PG_BENCH_TRANSACTIONS_MATRIX")
|
|
if transactions is None:
|
|
return [10**4, 10**5]
|
|
return list(map(int, transactions.split(",")))
|
|
|
|
|
|
def get_scales_matrix():
|
|
scales = os.getenv("TEST_PG_BENCH_SCALES_MATRIX")
|
|
if scales is None:
|
|
return [10, 20]
|
|
return list(map(int, scales.split(",")))
|
|
|
|
|
|
@pytest.mark.parametrize("scale", get_scales_matrix())
|
|
@pytest.mark.parametrize("transactions", get_transactions_matrix())
|
|
@pytest.mark.remote_cluster
|
|
def test_pg_bench_remote_cluster(zenbenchmark: ZenithBenchmarker,
|
|
connstr: str,
|
|
scale: int,
|
|
transactions: int):
|
|
"""
|
|
The best way is to run same pack of tests both, for local zenith
|
|
and against staging, but currently local tests heavily depend on
|
|
things available only locally e.g. zenith binaries, pageserver api, etc.
|
|
Also separate test allows to run pgbench workload against vanilla postgres
|
|
or other systems that support postgres protocol.
|
|
|
|
Also now this is more of a liveness test because it stresses pageserver internals,
|
|
so we clearly see what goes wrong in more "real" environment.
|
|
"""
|
|
pg_bin = os.getenv("PG_BIN")
|
|
if pg_bin is not None:
|
|
pgbench_bin_path = os.path.join(pg_bin, "pgbench")
|
|
else:
|
|
pgbench_bin_path = "pgbench"
|
|
|
|
runner = PgBenchRunner(
|
|
connstr=connstr,
|
|
scale=scale,
|
|
transactions=transactions,
|
|
pgbench_bin_path=pgbench_bin_path,
|
|
)
|
|
# calculate timestamps and durations separately
|
|
# timestamp is intended to be used for linking to grafana and logs
|
|
# duration is actually a metric and uses float instead of int for timestamp
|
|
init_start_timestamp = utc_now_timestamp()
|
|
t0 = timeit.default_timer()
|
|
runner.init()
|
|
init_duration = timeit.default_timer() - t0
|
|
init_end_timestamp = utc_now_timestamp()
|
|
|
|
run_start_timestamp = utc_now_timestamp()
|
|
t0 = timeit.default_timer()
|
|
out = runner.run() # TODO handle failures
|
|
run_duration = timeit.default_timer() - t0
|
|
run_end_timestamp = utc_now_timestamp()
|
|
|
|
res = PgBenchRunResult.parse_from_output(
|
|
out=out,
|
|
init_duration=init_duration,
|
|
init_start_timestamp=init_start_timestamp,
|
|
init_end_timestamp=init_end_timestamp,
|
|
run_duration=run_duration,
|
|
run_start_timestamp=run_start_timestamp,
|
|
run_end_timestamp=run_end_timestamp,
|
|
)
|
|
|
|
zenbenchmark.record_pg_bench_result(res)
|