From 4a8c66345267bfb11882a10d0260e2aacec6d112 Mon Sep 17 00:00:00 2001 From: Heikki Linnakangas Date: Thu, 14 Apr 2022 13:31:42 +0300 Subject: [PATCH] Refactor pgbench tests. - Remove batch_others/test_pgbench.py. It was a quick check that pgbench works, without actually recording any performance numbers, but that doesn't seem very interesting anymore. Remove it to avoid confusing it with the actual pgbench benchmarks - Run pgbench with "-n" and "-S" options, for two different workloads: simple-updates, and SELECT-only. Previously, we would only run it with the "default" TPCB-like workload. That's more or less the same as the simple-update (-n) workload, but I think the simple-upload workload is more relevant for testing storage performance. The SELECT-only workload is a new thing to measure. - Merge test_perf_pgbench.py and test_perf_pgbench_remote.py. I added a new "remote" implementation of the PgCompare class, which allows running the same tests against an already-running Postgres instance. - Make the PgBenchRunResult.parse_from_output function more flexible. pgbench can print different lines depending on the command-line options, but the parsing function expected a particular set of lines. --- .github/workflows/benchmarking.yml | 13 +- test_runner/batch_others/test_pgbench.py | 14 -- test_runner/fixtures/benchmark_fixture.py | 145 ++++++++---------- test_runner/fixtures/compare_fixtures.py | 49 +++++- test_runner/fixtures/zenith_fixtures.py | 68 ++++++-- test_runner/performance/test_perf_pgbench.py | 116 ++++++++++++-- .../performance/test_perf_pgbench_remote.py | 124 --------------- 7 files changed, 279 insertions(+), 250 deletions(-) delete mode 100644 test_runner/batch_others/test_pgbench.py delete mode 100644 test_runner/performance/test_perf_pgbench_remote.py diff --git a/.github/workflows/benchmarking.yml b/.github/workflows/benchmarking.yml index 36df35297d..72041c9d02 100644 --- a/.github/workflows/benchmarking.yml +++ b/.github/workflows/benchmarking.yml @@ -26,7 +26,7 @@ jobs: runs-on: [self-hosted, zenith-benchmarker] env: - PG_BIN: "/usr/pgsql-13/bin" + POSTGRES_DISTRIB_DIR: "/usr/pgsql-13" steps: - name: Checkout zenith repo @@ -51,7 +51,7 @@ jobs: echo Poetry poetry --version echo Pgbench - $PG_BIN/pgbench --version + $POSTGRES_DISTRIB_DIR/bin/pgbench --version # FIXME cluster setup is skipped due to various changes in console API # for now pre created cluster is used. When API gain some stability @@ -66,7 +66,7 @@ jobs: echo "Starting cluster" # wake up the cluster - $PG_BIN/psql $BENCHMARK_CONNSTR -c "SELECT 1" + $POSTGRES_DISTRIB_DIR/bin/psql $BENCHMARK_CONNSTR -c "SELECT 1" - name: Run benchmark # pgbench is installed system wide from official repo @@ -83,8 +83,11 @@ jobs: # sudo yum install postgresql13-contrib # actual binaries are located in /usr/pgsql-13/bin/ env: - TEST_PG_BENCH_TRANSACTIONS_MATRIX: "5000,10000,20000" - TEST_PG_BENCH_SCALES_MATRIX: "10,15" + # The pgbench test runs two tests of given duration against each scale. + # So the total runtime with these parameters is 2 * 2 * 300 = 1200, or 20 minutes. + # Plus time needed to initialize the test databases. + TEST_PG_BENCH_DURATIONS_MATRIX: "300" + TEST_PG_BENCH_SCALES_MATRIX: "10,100" PLATFORM: "zenith-staging" BENCHMARK_CONNSTR: "${{ secrets.BENCHMARK_STAGING_CONNSTR }}" REMOTE_ENV: "1" # indicate to test harness that we do not have zenith binaries locally diff --git a/test_runner/batch_others/test_pgbench.py b/test_runner/batch_others/test_pgbench.py deleted file mode 100644 index 09713023bc..0000000000 --- a/test_runner/batch_others/test_pgbench.py +++ /dev/null @@ -1,14 +0,0 @@ -from fixtures.zenith_fixtures import ZenithEnv -from fixtures.log_helper import log - - -def test_pgbench(zenith_simple_env: ZenithEnv, pg_bin): - env = zenith_simple_env - env.zenith_cli.create_branch("test_pgbench", "empty") - pg = env.postgres.create_start('test_pgbench') - log.info("postgres is running on 'test_pgbench' branch") - - connstr = pg.connstr() - - pg_bin.run_capture(['pgbench', '-i', connstr]) - pg_bin.run_capture(['pgbench'] + '-c 10 -T 5 -P 1 -M prepared'.split() + [connstr]) diff --git a/test_runner/fixtures/benchmark_fixture.py b/test_runner/fixtures/benchmark_fixture.py index 480eb3f891..a904233e98 100644 --- a/test_runner/fixtures/benchmark_fixture.py +++ b/test_runner/fixtures/benchmark_fixture.py @@ -17,7 +17,7 @@ import warnings from contextlib import contextmanager # Type-related stuff -from typing import Iterator +from typing import Iterator, Optional """ This file contains fixtures for micro-benchmarks. @@ -51,17 +51,12 @@ in the test initialization, or measure disk usage after the test query. @dataclasses.dataclass class PgBenchRunResult: - scale: int number_of_clients: int number_of_threads: int number_of_transactions_actually_processed: int latency_average: float - latency_stddev: float - tps_including_connection_time: float - tps_excluding_connection_time: float - init_duration: float - init_start_timestamp: int - init_end_timestamp: int + latency_stddev: Optional[float] + tps: float run_duration: float run_start_timestamp: int run_end_timestamp: int @@ -69,56 +64,67 @@ class PgBenchRunResult: # TODO progress @classmethod - def parse_from_output( + def parse_from_stdout( cls, - out: 'subprocess.CompletedProcess[str]', - init_duration: float, - init_start_timestamp: int, - init_end_timestamp: int, + stdout: str, run_duration: float, run_start_timestamp: int, run_end_timestamp: int, ): - stdout_lines = out.stdout.splitlines() + stdout_lines = stdout.splitlines() + + latency_stddev = None + # we know significant parts of these values from test input # but to be precise take them from output - # scaling factor: 5 - assert "scaling factor" in stdout_lines[1] - scale = int(stdout_lines[1].split()[-1]) - # number of clients: 1 - assert "number of clients" in stdout_lines[3] - number_of_clients = int(stdout_lines[3].split()[-1]) - # number of threads: 1 - assert "number of threads" in stdout_lines[4] - number_of_threads = int(stdout_lines[4].split()[-1]) - # number of transactions actually processed: 1000/1000 - assert "number of transactions actually processed" in stdout_lines[6] - number_of_transactions_actually_processed = int(stdout_lines[6].split("/")[1]) - # latency average = 19.894 ms - assert "latency average" in stdout_lines[7] - latency_average = stdout_lines[7].split()[-2] - # latency stddev = 3.387 ms - assert "latency stddev" in stdout_lines[8] - latency_stddev = stdout_lines[8].split()[-2] - # tps = 50.219689 (including connections establishing) - assert "(including connections establishing)" in stdout_lines[9] - tps_including_connection_time = stdout_lines[9].split()[2] - # tps = 50.264435 (excluding connections establishing) - assert "(excluding connections establishing)" in stdout_lines[10] - tps_excluding_connection_time = stdout_lines[10].split()[2] + for line in stdout.splitlines(): + # scaling factor: 5 + if line.startswith("scaling factor:"): + scale = int(line.split()[-1]) + # number of clients: 1 + if line.startswith("number of clients: "): + number_of_clients = int(line.split()[-1]) + # number of threads: 1 + if line.startswith("number of threads: "): + number_of_threads = int(line.split()[-1]) + # number of transactions actually processed: 1000/1000 + # OR + # number of transactions actually processed: 1000 + if line.startswith("number of transactions actually processed"): + if "/" in line: + number_of_transactions_actually_processed = int(line.split("/")[1]) + else: + number_of_transactions_actually_processed = int(line.split()[-1]) + # latency average = 19.894 ms + if line.startswith("latency average"): + latency_average = float(line.split()[-2]) + # latency stddev = 3.387 ms + # (only printed with some options) + if line.startswith("latency stddev"): + latency_stddev = float(line.split()[-2]) + + # Get the TPS without initial connection time. The format + # of the tps lines changed in pgbench v14, but we accept + # either format: + # + # pgbench v13 and below: + # tps = 50.219689 (including connections establishing) + # tps = 50.264435 (excluding connections establishing) + # + # pgbench v14: + # initial connection time = 3.858 ms + # tps = 309.281539 (without initial connection time) + if (line.startswith("tps = ") and ("(excluding connections establishing)" in line + or "(without initial connection time)")): + tps = float(line.split()[2]) return cls( - scale=scale, number_of_clients=number_of_clients, number_of_threads=number_of_threads, number_of_transactions_actually_processed=number_of_transactions_actually_processed, - latency_average=float(latency_average), - latency_stddev=float(latency_stddev), - tps_including_connection_time=float(tps_including_connection_time), - tps_excluding_connection_time=float(tps_excluding_connection_time), - init_duration=init_duration, - init_start_timestamp=init_start_timestamp, - init_end_timestamp=init_end_timestamp, + latency_average=latency_average, + latency_stddev=latency_stddev, + tps=tps, run_duration=run_duration, run_start_timestamp=run_start_timestamp, run_end_timestamp=run_end_timestamp, @@ -187,60 +193,41 @@ class ZenithBenchmarker: report=MetricReport.LOWER_IS_BETTER, ) - def record_pg_bench_result(self, pg_bench_result: PgBenchRunResult): - self.record("scale", pg_bench_result.scale, '', MetricReport.TEST_PARAM) - self.record("number_of_clients", + def record_pg_bench_result(self, prefix: str, pg_bench_result: PgBenchRunResult): + self.record(f"{prefix}.number_of_clients", pg_bench_result.number_of_clients, '', MetricReport.TEST_PARAM) - self.record("number_of_threads", + self.record(f"{prefix}.number_of_threads", pg_bench_result.number_of_threads, '', MetricReport.TEST_PARAM) self.record( - "number_of_transactions_actually_processed", + f"{prefix}.number_of_transactions_actually_processed", pg_bench_result.number_of_transactions_actually_processed, '', # thats because this is predefined by test matrix and doesnt change across runs report=MetricReport.TEST_PARAM, ) - self.record("latency_average", + self.record(f"{prefix}.latency_average", pg_bench_result.latency_average, unit="ms", report=MetricReport.LOWER_IS_BETTER) - self.record("latency_stddev", - pg_bench_result.latency_stddev, - unit="ms", - report=MetricReport.LOWER_IS_BETTER) - self.record("tps_including_connection_time", - pg_bench_result.tps_including_connection_time, - '', - report=MetricReport.HIGHER_IS_BETTER) - self.record("tps_excluding_connection_time", - pg_bench_result.tps_excluding_connection_time, - '', - report=MetricReport.HIGHER_IS_BETTER) - self.record("init_duration", - pg_bench_result.init_duration, - unit="s", - report=MetricReport.LOWER_IS_BETTER) - self.record("init_start_timestamp", - pg_bench_result.init_start_timestamp, - '', - MetricReport.TEST_PARAM) - self.record("init_end_timestamp", - pg_bench_result.init_end_timestamp, - '', - MetricReport.TEST_PARAM) - self.record("run_duration", + if pg_bench_result.latency_stddev is not None: + self.record(f"{prefix}.latency_stddev", + pg_bench_result.latency_stddev, + unit="ms", + report=MetricReport.LOWER_IS_BETTER) + self.record(f"{prefix}.tps", pg_bench_result.tps, '', report=MetricReport.HIGHER_IS_BETTER) + self.record(f"{prefix}.run_duration", pg_bench_result.run_duration, unit="s", report=MetricReport.LOWER_IS_BETTER) - self.record("run_start_timestamp", + self.record(f"{prefix}.run_start_timestamp", pg_bench_result.run_start_timestamp, '', MetricReport.TEST_PARAM) - self.record("run_end_timestamp", + self.record(f"{prefix}.run_end_timestamp", pg_bench_result.run_end_timestamp, '', MetricReport.TEST_PARAM) diff --git a/test_runner/fixtures/compare_fixtures.py b/test_runner/fixtures/compare_fixtures.py index 598ee10f8e..3c6a923587 100644 --- a/test_runner/fixtures/compare_fixtures.py +++ b/test_runner/fixtures/compare_fixtures.py @@ -2,7 +2,7 @@ import pytest from contextlib import contextmanager from abc import ABC, abstractmethod -from fixtures.zenith_fixtures import PgBin, PgProtocol, VanillaPostgres, ZenithEnv +from fixtures.zenith_fixtures import PgBin, PgProtocol, VanillaPostgres, RemotePostgres, ZenithEnv from fixtures.benchmark_fixture import MetricReport, ZenithBenchmarker # Type-related stuff @@ -162,6 +162,48 @@ class VanillaCompare(PgCompare): return self.zenbenchmark.record_duration(out_name) +class RemoteCompare(PgCompare): + """PgCompare interface for a remote postgres instance.""" + def __init__(self, zenbenchmark, remote_pg: RemotePostgres): + self._pg = remote_pg + self._zenbenchmark = zenbenchmark + + # Long-lived cursor, useful for flushing + self.conn = self.pg.connect() + self.cur = self.conn.cursor() + + @property + def pg(self): + return self._pg + + @property + def zenbenchmark(self): + return self._zenbenchmark + + @property + def pg_bin(self): + return self._pg.pg_bin + + def flush(self): + # TODO: flush the remote pageserver + pass + + def report_peak_memory_use(self) -> None: + # TODO: get memory usage from remote pageserver + pass + + def report_size(self) -> None: + # TODO: get storage size from remote pageserver + pass + + @contextmanager + def record_pageserver_writes(self, out_name): + yield # Do nothing + + def record_duration(self, out_name): + return self.zenbenchmark.record_duration(out_name) + + @pytest.fixture(scope='function') def zenith_compare(request, zenbenchmark, pg_bin, zenith_simple_env) -> ZenithCompare: branch_name = request.node.name @@ -173,6 +215,11 @@ def vanilla_compare(zenbenchmark, vanilla_pg) -> VanillaCompare: return VanillaCompare(zenbenchmark, vanilla_pg) +@pytest.fixture(scope='function') +def remote_compare(zenbenchmark, remote_pg) -> RemoteCompare: + return RemoteCompare(zenbenchmark, remote_pg) + + @pytest.fixture(params=["vanilla_compare", "zenith_compare"], ids=["vanilla", "zenith"]) def zenith_with_baseline(request) -> PgCompare: """Parameterized fixture that helps compare zenith against vanilla postgres. diff --git a/test_runner/fixtures/zenith_fixtures.py b/test_runner/fixtures/zenith_fixtures.py index 41d1443880..f8ee39a5a1 100644 --- a/test_runner/fixtures/zenith_fixtures.py +++ b/test_runner/fixtures/zenith_fixtures.py @@ -123,6 +123,22 @@ def pytest_configure(config): top_output_dir = os.path.join(base_dir, DEFAULT_OUTPUT_DIR) mkdir_if_needed(top_output_dir) + # Find the postgres installation. + global pg_distrib_dir + env_postgres_bin = os.environ.get('POSTGRES_DISTRIB_DIR') + if env_postgres_bin: + pg_distrib_dir = env_postgres_bin + else: + pg_distrib_dir = os.path.normpath(os.path.join(base_dir, DEFAULT_POSTGRES_DIR)) + log.info(f'pg_distrib_dir is {pg_distrib_dir}') + if os.getenv("REMOTE_ENV"): + # When testing against a remote server, we only need the client binary. + if not os.path.exists(os.path.join(pg_distrib_dir, 'bin/psql')): + raise Exception('psql not found at "{}"'.format(pg_distrib_dir)) + else: + if not os.path.exists(os.path.join(pg_distrib_dir, 'bin/postgres')): + raise Exception('postgres not found at "{}"'.format(pg_distrib_dir)) + if os.getenv("REMOTE_ENV"): # we are in remote env and do not have zenith binaries locally # this is the case for benchmarks run on self-hosted runner @@ -138,17 +154,6 @@ def pytest_configure(config): if not os.path.exists(os.path.join(zenith_binpath, 'pageserver')): raise Exception('zenith binaries not found at "{}"'.format(zenith_binpath)) - # Find the postgres installation. - global pg_distrib_dir - env_postgres_bin = os.environ.get('POSTGRES_DISTRIB_DIR') - if env_postgres_bin: - pg_distrib_dir = env_postgres_bin - else: - pg_distrib_dir = os.path.normpath(os.path.join(base_dir, DEFAULT_POSTGRES_DIR)) - log.info(f'pg_distrib_dir is {pg_distrib_dir}') - if not os.path.exists(os.path.join(pg_distrib_dir, 'bin/postgres')): - raise Exception('postgres not found at "{}"'.format(pg_distrib_dir)) - def zenfixture(func: Fn) -> Fn: """ @@ -1305,6 +1310,47 @@ def vanilla_pg(test_output_dir: str) -> Iterator[VanillaPostgres]: yield vanilla_pg +class RemotePostgres(PgProtocol): + def __init__(self, pg_bin: PgBin, remote_connstr: str): + super().__init__(**parse_dsn(remote_connstr)) + self.pg_bin = pg_bin + # The remote server is assumed to be running already + self.running = True + + def configure(self, options: List[str]): + raise Exception('cannot change configuration of remote Posgres instance') + + def start(self): + raise Exception('cannot start a remote Postgres instance') + + def stop(self): + raise Exception('cannot stop a remote Postgres instance') + + def get_subdir_size(self, subdir) -> int: + # TODO: Could use the server's Generic File Acccess functions if superuser. + # See https://www.postgresql.org/docs/14/functions-admin.html#FUNCTIONS-ADMIN-GENFILE + raise Exception('cannot get size of a Postgres instance') + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc, tb): + # do nothing + pass + + +@pytest.fixture(scope='function') +def remote_pg(test_output_dir: str) -> Iterator[RemotePostgres]: + pg_bin = PgBin(test_output_dir) + + connstr = os.getenv("BENCHMARK_CONNSTR") + if connstr is None: + raise ValueError("no connstr provided, use BENCHMARK_CONNSTR environment variable") + + with RemotePostgres(pg_bin, connstr) as remote_pg: + yield remote_pg + + class ZenithProxy(PgProtocol): def __init__(self, port: int): super().__init__(host="127.0.0.1", diff --git a/test_runner/performance/test_perf_pgbench.py b/test_runner/performance/test_perf_pgbench.py index 5ffce3c0be..d2de76913a 100644 --- a/test_runner/performance/test_perf_pgbench.py +++ b/test_runner/performance/test_perf_pgbench.py @@ -2,29 +2,113 @@ from contextlib import closing from fixtures.zenith_fixtures import PgBin, VanillaPostgres, ZenithEnv from fixtures.compare_fixtures import PgCompare, VanillaCompare, ZenithCompare -from fixtures.benchmark_fixture import MetricReport, ZenithBenchmarker +from fixtures.benchmark_fixture import PgBenchRunResult, MetricReport, ZenithBenchmarker from fixtures.log_helper import log +from pathlib import Path + +import pytest +from datetime import datetime +import calendar +import os +import timeit + + +def utc_now_timestamp() -> int: + return calendar.timegm(datetime.utcnow().utctimetuple()) + + +def init_pgbench(env: PgCompare, cmdline): + # calculate timestamps and durations separately + # timestamp is intended to be used for linking to grafana and logs + # duration is actually a metric and uses float instead of int for timestamp + init_start_timestamp = utc_now_timestamp() + t0 = timeit.default_timer() + with env.record_pageserver_writes('init.pageserver_writes'): + env.pg_bin.run_capture(cmdline) + env.flush() + init_duration = timeit.default_timer() - t0 + init_end_timestamp = utc_now_timestamp() + + env.zenbenchmark.record("init.duration", + init_duration, + unit="s", + report=MetricReport.LOWER_IS_BETTER) + env.zenbenchmark.record("init.start_timestamp", + init_start_timestamp, + '', + MetricReport.TEST_PARAM) + env.zenbenchmark.record("init.end_timestamp", init_end_timestamp, '', MetricReport.TEST_PARAM) + + +def run_pgbench(env: PgCompare, prefix: str, cmdline): + with env.record_pageserver_writes(f'{prefix}.pageserver_writes'): + run_start_timestamp = utc_now_timestamp() + t0 = timeit.default_timer() + out = env.pg_bin.run_capture(cmdline, ) + run_duration = timeit.default_timer() - t0 + run_end_timestamp = utc_now_timestamp() + env.flush() + + stdout = Path(f"{out}.stdout").read_text() + + res = PgBenchRunResult.parse_from_stdout( + stdout=stdout, + run_duration=run_duration, + run_start_timestamp=run_start_timestamp, + run_end_timestamp=run_end_timestamp, + ) + env.zenbenchmark.record_pg_bench_result(prefix, res) + # -# Run a very short pgbench test. +# Initialize a pgbench database, and run pgbench against it. # -# Collects three metrics: +# This makes runs two different pgbench workloads against the same +# initialized database, and 'duration' is the time of each run. So +# the total runtime is 2 * duration, plus time needed to initialize +# the test database. # -# 1. Time to initialize the pgbench database (pgbench -s5 -i) -# 2. Time to run 5000 pgbench transactions -# 3. Disk space used -# -def test_pgbench(zenith_with_baseline: PgCompare): - env = zenith_with_baseline +# Currently, the # of connections is hardcoded at 4 +def run_test_pgbench(env: PgCompare, scale: int, duration: int): - with env.record_pageserver_writes('pageserver_writes'): - with env.record_duration('init'): - env.pg_bin.run_capture(['pgbench', '-s5', '-i', env.pg.connstr()]) - env.flush() + # Record the scale and initialize + env.zenbenchmark.record("scale", scale, '', MetricReport.TEST_PARAM) + init_pgbench(env, ['pgbench', f'-s{scale}', '-i', env.pg.connstr()]) - with env.record_duration('5000_xacts'): - env.pg_bin.run_capture(['pgbench', '-c1', '-t5000', env.pg.connstr()]) - env.flush() + # Run simple-update workload + run_pgbench(env, + "simple-update", + ['pgbench', '-n', '-c4', f'-T{duration}', '-P2', '-Mprepared', env.pg.connstr()]) + + # Run SELECT workload + run_pgbench(env, + "select-only", + ['pgbench', '-S', '-c4', f'-T{duration}', '-P2', '-Mprepared', env.pg.connstr()]) env.report_size() + + +def get_durations_matrix(): + durations = os.getenv("TEST_PG_BENCH_DURATIONS_MATRIX", default="45") + return list(map(int, durations.split(","))) + + +def get_scales_matrix(): + scales = os.getenv("TEST_PG_BENCH_SCALES_MATRIX", default="10") + return list(map(int, scales.split(","))) + + +# Run the pgbench tests against vanilla Postgres and zenith +@pytest.mark.parametrize("scale", get_scales_matrix()) +@pytest.mark.parametrize("duration", get_durations_matrix()) +def test_pgbench(zenith_with_baseline: PgCompare, scale: int, duration: int): + run_test_pgbench(zenith_with_baseline, scale, duration) + + +# Run the pgbench tests against an existing Postgres cluster +@pytest.mark.parametrize("scale", get_scales_matrix()) +@pytest.mark.parametrize("duration", get_durations_matrix()) +@pytest.mark.remote_cluster +def test_pgbench_remote(remote_compare: PgCompare, scale: int, duration: int): + run_test_pgbench(remote_compare, scale, duration) diff --git a/test_runner/performance/test_perf_pgbench_remote.py b/test_runner/performance/test_perf_pgbench_remote.py deleted file mode 100644 index 28472a16c8..0000000000 --- a/test_runner/performance/test_perf_pgbench_remote.py +++ /dev/null @@ -1,124 +0,0 @@ -import dataclasses -import os -import subprocess -from typing import List -from fixtures.benchmark_fixture import PgBenchRunResult, ZenithBenchmarker -import pytest -from datetime import datetime -import calendar -import timeit -import os - - -def utc_now_timestamp() -> int: - return calendar.timegm(datetime.utcnow().utctimetuple()) - - -@dataclasses.dataclass -class PgBenchRunner: - connstr: str - scale: int - transactions: int - pgbench_bin_path: str = "pgbench" - - def invoke(self, args: List[str]) -> 'subprocess.CompletedProcess[str]': - res = subprocess.run([self.pgbench_bin_path, *args], text=True, capture_output=True) - - if res.returncode != 0: - raise RuntimeError(f"pgbench failed. stdout: {res.stdout} stderr: {res.stderr}") - return res - - def init(self, vacuum: bool = True) -> 'subprocess.CompletedProcess[str]': - args = [] - if not vacuum: - args.append("--no-vacuum") - args.extend([f"--scale={self.scale}", "--initialize", self.connstr]) - return self.invoke(args) - - def run(self, jobs: int = 1, clients: int = 1): - return self.invoke([ - f"--transactions={self.transactions}", - f"--jobs={jobs}", - f"--client={clients}", - "--progress=2", # print progress every two seconds - self.connstr, - ]) - - -@pytest.fixture -def connstr(): - res = os.getenv("BENCHMARK_CONNSTR") - if res is None: - raise ValueError("no connstr provided, use BENCHMARK_CONNSTR environment variable") - return res - - -def get_transactions_matrix(): - transactions = os.getenv("TEST_PG_BENCH_TRANSACTIONS_MATRIX") - if transactions is None: - return [10**4, 10**5] - return list(map(int, transactions.split(","))) - - -def get_scales_matrix(): - scales = os.getenv("TEST_PG_BENCH_SCALES_MATRIX") - if scales is None: - return [10, 20] - return list(map(int, scales.split(","))) - - -@pytest.mark.parametrize("scale", get_scales_matrix()) -@pytest.mark.parametrize("transactions", get_transactions_matrix()) -@pytest.mark.remote_cluster -def test_pg_bench_remote_cluster(zenbenchmark: ZenithBenchmarker, - connstr: str, - scale: int, - transactions: int): - """ - The best way is to run same pack of tests both, for local zenith - and against staging, but currently local tests heavily depend on - things available only locally e.g. zenith binaries, pageserver api, etc. - Also separate test allows to run pgbench workload against vanilla postgres - or other systems that support postgres protocol. - - Also now this is more of a liveness test because it stresses pageserver internals, - so we clearly see what goes wrong in more "real" environment. - """ - pg_bin = os.getenv("PG_BIN") - if pg_bin is not None: - pgbench_bin_path = os.path.join(pg_bin, "pgbench") - else: - pgbench_bin_path = "pgbench" - - runner = PgBenchRunner( - connstr=connstr, - scale=scale, - transactions=transactions, - pgbench_bin_path=pgbench_bin_path, - ) - # calculate timestamps and durations separately - # timestamp is intended to be used for linking to grafana and logs - # duration is actually a metric and uses float instead of int for timestamp - init_start_timestamp = utc_now_timestamp() - t0 = timeit.default_timer() - runner.init() - init_duration = timeit.default_timer() - t0 - init_end_timestamp = utc_now_timestamp() - - run_start_timestamp = utc_now_timestamp() - t0 = timeit.default_timer() - out = runner.run() # TODO handle failures - run_duration = timeit.default_timer() - t0 - run_end_timestamp = utc_now_timestamp() - - res = PgBenchRunResult.parse_from_output( - out=out, - init_duration=init_duration, - init_start_timestamp=init_start_timestamp, - init_end_timestamp=init_end_timestamp, - run_duration=run_duration, - run_start_timestamp=run_start_timestamp, - run_end_timestamp=run_end_timestamp, - ) - - zenbenchmark.record_pg_bench_result(res)