mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-07 13:32:57 +00:00
Commit 43a4f7173e fixed the case that there are extra options in the
connection string, but broke it in the case when there are not. Fix
that. But on second thoughts, it's more straightforward set the
options with ALTER DATABASE, so change the workflow yaml file to do
that instead.
215 lines
7.6 KiB
Python
215 lines
7.6 KiB
Python
import calendar
|
|
import enum
|
|
import os
|
|
import timeit
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
from typing import List
|
|
|
|
import pytest
|
|
from fixtures.benchmark_fixture import MetricReport, PgBenchInitResult, PgBenchRunResult
|
|
from fixtures.compare_fixtures import NeonCompare, PgCompare
|
|
from fixtures.neon_fixtures import profiling_supported
|
|
from fixtures.utils import get_scale_for_db
|
|
|
|
|
|
@enum.unique
|
|
class PgBenchLoadType(enum.Enum):
|
|
INIT = "init"
|
|
SIMPLE_UPDATE = "simple_update"
|
|
SELECT_ONLY = "select-only"
|
|
|
|
|
|
def utc_now_timestamp() -> int:
|
|
return calendar.timegm(datetime.utcnow().utctimetuple())
|
|
|
|
|
|
def init_pgbench(env: PgCompare, cmdline):
|
|
# calculate timestamps and durations separately
|
|
# timestamp is intended to be used for linking to grafana and logs
|
|
# duration is actually a metric and uses float instead of int for timestamp
|
|
start_timestamp = utc_now_timestamp()
|
|
t0 = timeit.default_timer()
|
|
with env.record_pageserver_writes("init.pageserver_writes"):
|
|
out = env.pg_bin.run_capture(cmdline)
|
|
env.flush()
|
|
|
|
duration = timeit.default_timer() - t0
|
|
end_timestamp = utc_now_timestamp()
|
|
|
|
stderr = Path(f"{out}.stderr").read_text()
|
|
|
|
res = PgBenchInitResult.parse_from_stderr(
|
|
stderr=stderr,
|
|
duration=duration,
|
|
start_timestamp=start_timestamp,
|
|
end_timestamp=end_timestamp,
|
|
)
|
|
env.zenbenchmark.record_pg_bench_init_result("init", res)
|
|
|
|
|
|
def run_pgbench(env: PgCompare, prefix: str, cmdline):
|
|
with env.record_pageserver_writes(f"{prefix}.pageserver_writes"):
|
|
run_start_timestamp = utc_now_timestamp()
|
|
t0 = timeit.default_timer()
|
|
out = env.pg_bin.run_capture(
|
|
cmdline,
|
|
)
|
|
run_duration = timeit.default_timer() - t0
|
|
run_end_timestamp = utc_now_timestamp()
|
|
env.flush()
|
|
|
|
stdout = Path(f"{out}.stdout").read_text()
|
|
|
|
res = PgBenchRunResult.parse_from_stdout(
|
|
stdout=stdout,
|
|
run_duration=run_duration,
|
|
run_start_timestamp=run_start_timestamp,
|
|
run_end_timestamp=run_end_timestamp,
|
|
)
|
|
env.zenbenchmark.record_pg_bench_result(prefix, res)
|
|
|
|
|
|
#
|
|
# Initialize a pgbench database, and run pgbench against it.
|
|
#
|
|
# This makes runs two different pgbench workloads against the same
|
|
# initialized database, and 'duration' is the time of each run. So
|
|
# the total runtime is 2 * duration, plus time needed to initialize
|
|
# the test database.
|
|
#
|
|
# Currently, the # of connections is hardcoded at 4
|
|
def run_test_pgbench(env: PgCompare, scale: int, duration: int, workload_type: PgBenchLoadType):
|
|
env.zenbenchmark.record("scale", scale, "", MetricReport.TEST_PARAM)
|
|
|
|
if workload_type == PgBenchLoadType.INIT:
|
|
# Run initialize
|
|
options = "-cstatement_timeout=1h " + env.pg.default_options.get("options", "")
|
|
init_pgbench(env, ["pgbench", f"-s{scale}", "-i", env.pg.connstr(options=options)])
|
|
|
|
if workload_type == PgBenchLoadType.SIMPLE_UPDATE:
|
|
# Run simple-update workload
|
|
run_pgbench(
|
|
env,
|
|
"simple-update",
|
|
[
|
|
"pgbench",
|
|
"-N",
|
|
"-c4",
|
|
f"-T{duration}",
|
|
"-P2",
|
|
"--progress-timestamp",
|
|
env.pg.connstr(),
|
|
],
|
|
)
|
|
|
|
if workload_type == PgBenchLoadType.SELECT_ONLY:
|
|
# Run SELECT workload
|
|
run_pgbench(
|
|
env,
|
|
"select-only",
|
|
[
|
|
"pgbench",
|
|
"-S",
|
|
"-c4",
|
|
f"-T{duration}",
|
|
"-P2",
|
|
"--progress-timestamp",
|
|
env.pg.connstr(),
|
|
],
|
|
)
|
|
|
|
env.report_size()
|
|
|
|
|
|
def get_durations_matrix(default: int = 45) -> List[int]:
|
|
durations = os.getenv("TEST_PG_BENCH_DURATIONS_MATRIX", default=str(default))
|
|
rv = []
|
|
for d in durations.split(","):
|
|
d = d.strip().lower()
|
|
if d.endswith("h"):
|
|
duration = int(d.removesuffix("h")) * 60 * 60
|
|
elif d.endswith("m"):
|
|
duration = int(d.removesuffix("m")) * 60
|
|
else:
|
|
duration = int(d.removesuffix("s"))
|
|
rv.append(duration)
|
|
|
|
return rv
|
|
|
|
|
|
def get_scales_matrix(default: int = 10) -> List[int]:
|
|
scales = os.getenv("TEST_PG_BENCH_SCALES_MATRIX", default=str(default))
|
|
rv = []
|
|
for s in scales.split(","):
|
|
s = s.strip().lower()
|
|
if s.endswith("mb"):
|
|
scale = get_scale_for_db(int(s.removesuffix("mb")))
|
|
elif s.endswith("gb"):
|
|
scale = get_scale_for_db(int(s.removesuffix("gb")) * 1024)
|
|
else:
|
|
scale = int(s)
|
|
rv.append(scale)
|
|
|
|
return rv
|
|
|
|
|
|
# Run the pgbench tests against vanilla Postgres and neon
|
|
@pytest.mark.parametrize("scale", get_scales_matrix())
|
|
@pytest.mark.parametrize("duration", get_durations_matrix())
|
|
def test_pgbench(neon_with_baseline: PgCompare, scale: int, duration: int):
|
|
run_test_pgbench(neon_with_baseline, scale, duration, PgBenchLoadType.INIT)
|
|
run_test_pgbench(neon_with_baseline, scale, duration, PgBenchLoadType.SIMPLE_UPDATE)
|
|
run_test_pgbench(neon_with_baseline, scale, duration, PgBenchLoadType.SELECT_ONLY)
|
|
|
|
|
|
# Run the pgbench tests, and generate a flamegraph from it
|
|
# This requires that the pageserver was built with the 'profiling' feature.
|
|
#
|
|
# TODO: If the profiling is cheap enough, there's no need to run the same test
|
|
# twice, with and without profiling. But for now, run it separately, so that we
|
|
# can see how much overhead the profiling adds.
|
|
@pytest.mark.parametrize("scale", get_scales_matrix())
|
|
@pytest.mark.parametrize("duration", get_durations_matrix())
|
|
def test_pgbench_flamegraph(zenbenchmark, pg_bin, neon_env_builder, scale: int, duration: int):
|
|
neon_env_builder.pageserver_config_override = """
|
|
profiling="page_requests"
|
|
"""
|
|
if not profiling_supported():
|
|
pytest.skip("pageserver was built without 'profiling' feature")
|
|
|
|
env = neon_env_builder.init_start()
|
|
env.neon_cli.create_branch("empty", "main")
|
|
|
|
neon_compare = NeonCompare(zenbenchmark, env, pg_bin, "pgbench")
|
|
run_test_pgbench(neon_compare, scale, duration, PgBenchLoadType.INIT)
|
|
run_test_pgbench(neon_compare, scale, duration, PgBenchLoadType.SIMPLE_UPDATE)
|
|
run_test_pgbench(neon_compare, scale, duration, PgBenchLoadType.SELECT_ONLY)
|
|
|
|
|
|
# The following 3 tests run on an existing database as it was set up by previous tests,
|
|
# and leaves the database in a state that would be used in the next tests.
|
|
# Modifying the definition order of these functions or adding other remote tests in between will alter results.
|
|
# See usage of --sparse-ordering flag in the pytest invocation in the CI workflow
|
|
#
|
|
# Run the pgbench tests against an existing Postgres cluster
|
|
@pytest.mark.parametrize("scale", get_scales_matrix())
|
|
@pytest.mark.parametrize("duration", get_durations_matrix())
|
|
@pytest.mark.remote_cluster
|
|
def test_pgbench_remote_init(remote_compare: PgCompare, scale: int, duration: int):
|
|
run_test_pgbench(remote_compare, scale, duration, PgBenchLoadType.INIT)
|
|
|
|
|
|
@pytest.mark.parametrize("scale", get_scales_matrix())
|
|
@pytest.mark.parametrize("duration", get_durations_matrix())
|
|
@pytest.mark.remote_cluster
|
|
def test_pgbench_remote_simple_update(remote_compare: PgCompare, scale: int, duration: int):
|
|
run_test_pgbench(remote_compare, scale, duration, PgBenchLoadType.SIMPLE_UPDATE)
|
|
|
|
|
|
@pytest.mark.parametrize("scale", get_scales_matrix())
|
|
@pytest.mark.parametrize("duration", get_durations_matrix())
|
|
@pytest.mark.remote_cluster
|
|
def test_pgbench_remote_select_only(remote_compare: PgCompare, scale: int, duration: int):
|
|
run_test_pgbench(remote_compare, scale, duration, PgBenchLoadType.SELECT_ONLY)
|