Files
neon/test_runner/performance/test_perf_pgbench.py
Heikki Linnakangas e4f775436f Don't override other options than statement_timeout in test conn string.
In commit 6985f6cd6c, I tried passing extra GUCs in the 'options' part
of the connection string, but it didn't work because the pgbench test
overrode it with the statement_timeout. Change it so that it adds the
statement_timeout to any other options, instead of replacing them.
2022-09-20 09:46:15 +03:00

215 lines
7.6 KiB
Python

import calendar
import enum
import os
import timeit
from datetime import datetime
from pathlib import Path
from typing import List
import pytest
from fixtures.benchmark_fixture import MetricReport, PgBenchInitResult, PgBenchRunResult
from fixtures.compare_fixtures import NeonCompare, PgCompare
from fixtures.neon_fixtures import profiling_supported
from fixtures.utils import get_scale_for_db
@enum.unique
class PgBenchLoadType(enum.Enum):
INIT = "init"
SIMPLE_UPDATE = "simple_update"
SELECT_ONLY = "select-only"
def utc_now_timestamp() -> int:
return calendar.timegm(datetime.utcnow().utctimetuple())
def init_pgbench(env: PgCompare, cmdline):
# calculate timestamps and durations separately
# timestamp is intended to be used for linking to grafana and logs
# duration is actually a metric and uses float instead of int for timestamp
start_timestamp = utc_now_timestamp()
t0 = timeit.default_timer()
with env.record_pageserver_writes("init.pageserver_writes"):
out = env.pg_bin.run_capture(cmdline)
env.flush()
duration = timeit.default_timer() - t0
end_timestamp = utc_now_timestamp()
stderr = Path(f"{out}.stderr").read_text()
res = PgBenchInitResult.parse_from_stderr(
stderr=stderr,
duration=duration,
start_timestamp=start_timestamp,
end_timestamp=end_timestamp,
)
env.zenbenchmark.record_pg_bench_init_result("init", res)
def run_pgbench(env: PgCompare, prefix: str, cmdline):
with env.record_pageserver_writes(f"{prefix}.pageserver_writes"):
run_start_timestamp = utc_now_timestamp()
t0 = timeit.default_timer()
out = env.pg_bin.run_capture(
cmdline,
)
run_duration = timeit.default_timer() - t0
run_end_timestamp = utc_now_timestamp()
env.flush()
stdout = Path(f"{out}.stdout").read_text()
res = PgBenchRunResult.parse_from_stdout(
stdout=stdout,
run_duration=run_duration,
run_start_timestamp=run_start_timestamp,
run_end_timestamp=run_end_timestamp,
)
env.zenbenchmark.record_pg_bench_result(prefix, res)
#
# Initialize a pgbench database, and run pgbench against it.
#
# This makes runs two different pgbench workloads against the same
# initialized database, and 'duration' is the time of each run. So
# the total runtime is 2 * duration, plus time needed to initialize
# the test database.
#
# Currently, the # of connections is hardcoded at 4
def run_test_pgbench(env: PgCompare, scale: int, duration: int, workload_type: PgBenchLoadType):
env.zenbenchmark.record("scale", scale, "", MetricReport.TEST_PARAM)
if workload_type == PgBenchLoadType.INIT:
# Run initialize
options = "-cstatement_timeout=1h " + env.pg.default_options["options"]
init_pgbench(env, ["pgbench", f"-s{scale}", "-i", env.pg.connstr(options=options)])
if workload_type == PgBenchLoadType.SIMPLE_UPDATE:
# Run simple-update workload
run_pgbench(
env,
"simple-update",
[
"pgbench",
"-N",
"-c4",
f"-T{duration}",
"-P2",
"--progress-timestamp",
env.pg.connstr(),
],
)
if workload_type == PgBenchLoadType.SELECT_ONLY:
# Run SELECT workload
run_pgbench(
env,
"select-only",
[
"pgbench",
"-S",
"-c4",
f"-T{duration}",
"-P2",
"--progress-timestamp",
env.pg.connstr(),
],
)
env.report_size()
def get_durations_matrix(default: int = 45) -> List[int]:
durations = os.getenv("TEST_PG_BENCH_DURATIONS_MATRIX", default=str(default))
rv = []
for d in durations.split(","):
d = d.strip().lower()
if d.endswith("h"):
duration = int(d.removesuffix("h")) * 60 * 60
elif d.endswith("m"):
duration = int(d.removesuffix("m")) * 60
else:
duration = int(d.removesuffix("s"))
rv.append(duration)
return rv
def get_scales_matrix(default: int = 10) -> List[int]:
scales = os.getenv("TEST_PG_BENCH_SCALES_MATRIX", default=str(default))
rv = []
for s in scales.split(","):
s = s.strip().lower()
if s.endswith("mb"):
scale = get_scale_for_db(int(s.removesuffix("mb")))
elif s.endswith("gb"):
scale = get_scale_for_db(int(s.removesuffix("gb")) * 1024)
else:
scale = int(s)
rv.append(scale)
return rv
# Run the pgbench tests against vanilla Postgres and neon
@pytest.mark.parametrize("scale", get_scales_matrix())
@pytest.mark.parametrize("duration", get_durations_matrix())
def test_pgbench(neon_with_baseline: PgCompare, scale: int, duration: int):
run_test_pgbench(neon_with_baseline, scale, duration, PgBenchLoadType.INIT)
run_test_pgbench(neon_with_baseline, scale, duration, PgBenchLoadType.SIMPLE_UPDATE)
run_test_pgbench(neon_with_baseline, scale, duration, PgBenchLoadType.SELECT_ONLY)
# Run the pgbench tests, and generate a flamegraph from it
# This requires that the pageserver was built with the 'profiling' feature.
#
# TODO: If the profiling is cheap enough, there's no need to run the same test
# twice, with and without profiling. But for now, run it separately, so that we
# can see how much overhead the profiling adds.
@pytest.mark.parametrize("scale", get_scales_matrix())
@pytest.mark.parametrize("duration", get_durations_matrix())
def test_pgbench_flamegraph(zenbenchmark, pg_bin, neon_env_builder, scale: int, duration: int):
neon_env_builder.pageserver_config_override = """
profiling="page_requests"
"""
if not profiling_supported():
pytest.skip("pageserver was built without 'profiling' feature")
env = neon_env_builder.init_start()
env.neon_cli.create_branch("empty", "main")
neon_compare = NeonCompare(zenbenchmark, env, pg_bin, "pgbench")
run_test_pgbench(neon_compare, scale, duration, PgBenchLoadType.INIT)
run_test_pgbench(neon_compare, scale, duration, PgBenchLoadType.SIMPLE_UPDATE)
run_test_pgbench(neon_compare, scale, duration, PgBenchLoadType.SELECT_ONLY)
# The following 3 tests run on an existing database as it was set up by previous tests,
# and leaves the database in a state that would be used in the next tests.
# Modifying the definition order of these functions or adding other remote tests in between will alter results.
# See usage of --sparse-ordering flag in the pytest invocation in the CI workflow
#
# Run the pgbench tests against an existing Postgres cluster
@pytest.mark.parametrize("scale", get_scales_matrix())
@pytest.mark.parametrize("duration", get_durations_matrix())
@pytest.mark.remote_cluster
def test_pgbench_remote_init(remote_compare: PgCompare, scale: int, duration: int):
run_test_pgbench(remote_compare, scale, duration, PgBenchLoadType.INIT)
@pytest.mark.parametrize("scale", get_scales_matrix())
@pytest.mark.parametrize("duration", get_durations_matrix())
@pytest.mark.remote_cluster
def test_pgbench_remote_simple_update(remote_compare: PgCompare, scale: int, duration: int):
run_test_pgbench(remote_compare, scale, duration, PgBenchLoadType.SIMPLE_UPDATE)
@pytest.mark.parametrize("scale", get_scales_matrix())
@pytest.mark.parametrize("duration", get_durations_matrix())
@pytest.mark.remote_cluster
def test_pgbench_remote_select_only(remote_compare: PgCompare, scale: int, duration: int):
run_test_pgbench(remote_compare, scale, duration, PgBenchLoadType.SELECT_ONLY)