mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-09 06:22:57 +00:00
Set up a workflow to run pgbench against captest (#2077)
This commit is contained in:
committed by
GitHub
parent
7b12deead7
commit
4cddb0f1a4
@@ -1,23 +1,21 @@
|
||||
import calendar
|
||||
import dataclasses
|
||||
import enum
|
||||
import json
|
||||
import os
|
||||
from pathlib import Path
|
||||
import re
|
||||
import subprocess
|
||||
import timeit
|
||||
import calendar
|
||||
import enum
|
||||
from datetime import datetime
|
||||
import uuid
|
||||
import warnings
|
||||
from contextlib import contextmanager
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
# Type-related stuff
|
||||
from typing import Iterator, Optional
|
||||
|
||||
import pytest
|
||||
from _pytest.config import Config
|
||||
from _pytest.terminal import TerminalReporter
|
||||
import warnings
|
||||
|
||||
from contextlib import contextmanager
|
||||
|
||||
# Type-related stuff
|
||||
from typing import Iterator, Optional
|
||||
"""
|
||||
This file contains fixtures for micro-benchmarks.
|
||||
|
||||
@@ -77,7 +75,7 @@ class PgBenchRunResult:
|
||||
|
||||
# we know significant parts of these values from test input
|
||||
# but to be precise take them from output
|
||||
for line in stdout.splitlines():
|
||||
for line in stdout_lines:
|
||||
# scaling factor: 5
|
||||
if line.startswith("scaling factor:"):
|
||||
scale = int(line.split()[-1])
|
||||
@@ -131,6 +129,58 @@ class PgBenchRunResult:
|
||||
)
|
||||
|
||||
|
||||
@dataclasses.dataclass
|
||||
class PgBenchInitResult:
|
||||
total: float
|
||||
drop_tables: Optional[float]
|
||||
create_tables: Optional[float]
|
||||
client_side_generate: Optional[float]
|
||||
vacuum: Optional[float]
|
||||
primary_keys: Optional[float]
|
||||
duration: float
|
||||
start_timestamp: int
|
||||
end_timestamp: int
|
||||
|
||||
@classmethod
|
||||
def parse_from_stderr(
|
||||
cls,
|
||||
stderr: str,
|
||||
duration: float,
|
||||
start_timestamp: int,
|
||||
end_timestamp: int,
|
||||
):
|
||||
# Parses pgbench initialize output for default initialization steps (dtgvp)
|
||||
# Example: done in 5.66 s (drop tables 0.05 s, create tables 0.31 s, client-side generate 2.01 s, vacuum 0.53 s, primary keys 0.38 s).
|
||||
|
||||
last_line = stderr.splitlines()[-1]
|
||||
|
||||
regex = re.compile(r"done in (\d+\.\d+) s "
|
||||
r"\("
|
||||
r"(?:drop tables (\d+\.\d+) s)?(?:, )?"
|
||||
r"(?:create tables (\d+\.\d+) s)?(?:, )?"
|
||||
r"(?:client-side generate (\d+\.\d+) s)?(?:, )?"
|
||||
r"(?:vacuum (\d+\.\d+) s)?(?:, )?"
|
||||
r"(?:primary keys (\d+\.\d+) s)?(?:, )?"
|
||||
r"\)\.")
|
||||
|
||||
if (m := regex.match(last_line)) is not None:
|
||||
total, drop_tables, create_tables, client_side_generate, vacuum, primary_keys = [float(v) for v in m.groups() if v is not None]
|
||||
else:
|
||||
raise RuntimeError(f"can't parse pgbench initialize results from `{last_line}`")
|
||||
|
||||
return cls(
|
||||
total=total,
|
||||
drop_tables=drop_tables,
|
||||
create_tables=create_tables,
|
||||
client_side_generate=client_side_generate,
|
||||
vacuum=vacuum,
|
||||
primary_keys=primary_keys,
|
||||
duration=duration,
|
||||
start_timestamp=start_timestamp,
|
||||
end_timestamp=end_timestamp,
|
||||
)
|
||||
|
||||
|
||||
@enum.unique
|
||||
class MetricReport(str, enum.Enum): # str is a hack to make it json serializable
|
||||
# this means that this is a constant test parameter
|
||||
@@ -232,6 +282,32 @@ class NeonBenchmarker:
|
||||
'',
|
||||
MetricReport.TEST_PARAM)
|
||||
|
||||
def record_pg_bench_init_result(self, prefix: str, result: PgBenchInitResult):
|
||||
test_params = [
|
||||
"start_timestamp",
|
||||
"end_timestamp",
|
||||
]
|
||||
for test_param in test_params:
|
||||
self.record(f"{prefix}.{test_param}",
|
||||
getattr(result, test_param),
|
||||
'',
|
||||
MetricReport.TEST_PARAM)
|
||||
|
||||
metrics = [
|
||||
"duration",
|
||||
"drop_tables",
|
||||
"create_tables",
|
||||
"client_side_generate",
|
||||
"vacuum",
|
||||
"primary_keys",
|
||||
]
|
||||
for metric in metrics:
|
||||
if (value := getattr(result, metric)) is not None:
|
||||
self.record(f"{prefix}.{metric}",
|
||||
value,
|
||||
unit="s",
|
||||
report=MetricReport.LOWER_IS_BETTER)
|
||||
|
||||
def get_io_writes(self, pageserver) -> int:
|
||||
"""
|
||||
Fetch the "cumulative # of bytes written" metric from the pageserver
|
||||
|
||||
@@ -146,3 +146,12 @@ def parse_delta_layer(f_name: str) -> Tuple[int, int, int, int]:
|
||||
key_parts = parts[0].split("-")
|
||||
lsn_parts = parts[1].split("-")
|
||||
return int(key_parts[0], 16), int(key_parts[1], 16), int(lsn_parts[0], 16), int(lsn_parts[1], 16)
|
||||
|
||||
|
||||
def get_scale_for_db(size_mb: int) -> int:
|
||||
"""Returns pgbench scale factor for given target db size in MB.
|
||||
|
||||
Ref https://www.cybertec-postgresql.com/en/a-formula-to-calculate-pgbench-scaling-factor-for-target-db-size/
|
||||
"""
|
||||
|
||||
return round(0.06689 * size_mb - 0.5)
|
||||
|
||||
@@ -10,7 +10,7 @@ In the CI, the performance tests are run in the same environment as the other in
|
||||
|
||||
## Remote tests
|
||||
|
||||
There are a few tests that marked with `pytest.mark.remote_cluster`. These tests do not set up a local environment, and instead require a libpq connection string to connect to. So they can be run on any Postgres compatible database. Currently, the CI runs these tests our staging environment daily. Staging is not an isolated environment, so there can be noise in the results due to activity of other clusters.
|
||||
There are a few tests that marked with `pytest.mark.remote_cluster`. These tests do not set up a local environment, and instead require a libpq connection string to connect to. So they can be run on any Postgres compatible database. Currently, the CI runs these tests on our staging and captest environments daily. Those are not an isolated environments, so there can be noise in the results due to activity of other clusters.
|
||||
|
||||
## Noise
|
||||
|
||||
|
||||
@@ -1,17 +1,23 @@
|
||||
from contextlib import closing
|
||||
from fixtures.neon_fixtures import PgBin, VanillaPostgres, NeonEnv, profiling_supported
|
||||
from fixtures.compare_fixtures import PgCompare, VanillaCompare, NeonCompare
|
||||
|
||||
from fixtures.benchmark_fixture import PgBenchRunResult, MetricReport, NeonBenchmarker
|
||||
from fixtures.log_helper import log
|
||||
|
||||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
from datetime import datetime
|
||||
import calendar
|
||||
import enum
|
||||
import os
|
||||
import timeit
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
from typing import List
|
||||
|
||||
import pytest
|
||||
from fixtures.benchmark_fixture import MetricReport, PgBenchInitResult, PgBenchRunResult
|
||||
from fixtures.compare_fixtures import NeonCompare, PgCompare
|
||||
from fixtures.neon_fixtures import profiling_supported
|
||||
from fixtures.utils import get_scale_for_db
|
||||
|
||||
|
||||
@enum.unique
|
||||
class PgBenchLoadType(enum.Enum):
|
||||
INIT = "init"
|
||||
SIMPLE_UPDATE = "simple_update"
|
||||
SELECT_ONLY = "select-only"
|
||||
|
||||
|
||||
def utc_now_timestamp() -> int:
|
||||
@@ -22,23 +28,24 @@ def init_pgbench(env: PgCompare, cmdline):
|
||||
# calculate timestamps and durations separately
|
||||
# timestamp is intended to be used for linking to grafana and logs
|
||||
# duration is actually a metric and uses float instead of int for timestamp
|
||||
init_start_timestamp = utc_now_timestamp()
|
||||
start_timestamp = utc_now_timestamp()
|
||||
t0 = timeit.default_timer()
|
||||
with env.record_pageserver_writes('init.pageserver_writes'):
|
||||
env.pg_bin.run_capture(cmdline)
|
||||
out = env.pg_bin.run_capture(cmdline)
|
||||
env.flush()
|
||||
init_duration = timeit.default_timer() - t0
|
||||
init_end_timestamp = utc_now_timestamp()
|
||||
|
||||
env.zenbenchmark.record("init.duration",
|
||||
init_duration,
|
||||
unit="s",
|
||||
report=MetricReport.LOWER_IS_BETTER)
|
||||
env.zenbenchmark.record("init.start_timestamp",
|
||||
init_start_timestamp,
|
||||
'',
|
||||
MetricReport.TEST_PARAM)
|
||||
env.zenbenchmark.record("init.end_timestamp", init_end_timestamp, '', MetricReport.TEST_PARAM)
|
||||
duration = timeit.default_timer() - t0
|
||||
end_timestamp = utc_now_timestamp()
|
||||
|
||||
stderr = Path(f"{out}.stderr").read_text()
|
||||
|
||||
res = PgBenchInitResult.parse_from_stderr(
|
||||
stderr=stderr,
|
||||
duration=duration,
|
||||
start_timestamp=start_timestamp,
|
||||
end_timestamp=end_timestamp,
|
||||
)
|
||||
env.zenbenchmark.record_pg_bench_init_result("init", res)
|
||||
|
||||
|
||||
def run_pgbench(env: PgCompare, prefix: str, cmdline):
|
||||
@@ -70,38 +77,84 @@ def run_pgbench(env: PgCompare, prefix: str, cmdline):
|
||||
# the test database.
|
||||
#
|
||||
# Currently, the # of connections is hardcoded at 4
|
||||
def run_test_pgbench(env: PgCompare, scale: int, duration: int):
|
||||
|
||||
# Record the scale and initialize
|
||||
def run_test_pgbench(env: PgCompare, scale: int, duration: int, workload_type: PgBenchLoadType):
|
||||
env.zenbenchmark.record("scale", scale, '', MetricReport.TEST_PARAM)
|
||||
init_pgbench(env, ['pgbench', f'-s{scale}', '-i', env.pg.connstr()])
|
||||
|
||||
# Run simple-update workload
|
||||
run_pgbench(env,
|
||||
"simple-update", ['pgbench', '-N', '-c4', f'-T{duration}', '-P2', env.pg.connstr()])
|
||||
if workload_type == PgBenchLoadType.INIT:
|
||||
# Run initialize
|
||||
init_pgbench(
|
||||
env, ['pgbench', f'-s{scale}', '-i', env.pg.connstr(options='-cstatement_timeout=1h')])
|
||||
|
||||
# Run SELECT workload
|
||||
run_pgbench(env,
|
||||
"select-only", ['pgbench', '-S', '-c4', f'-T{duration}', '-P2', env.pg.connstr()])
|
||||
if workload_type == PgBenchLoadType.SIMPLE_UPDATE:
|
||||
# Run simple-update workload
|
||||
run_pgbench(env,
|
||||
"simple-update",
|
||||
[
|
||||
'pgbench',
|
||||
'-N',
|
||||
'-c4',
|
||||
f'-T{duration}',
|
||||
'-P2',
|
||||
'--progress-timestamp',
|
||||
env.pg.connstr(),
|
||||
])
|
||||
|
||||
if workload_type == PgBenchLoadType.SELECT_ONLY:
|
||||
# Run SELECT workload
|
||||
run_pgbench(env,
|
||||
"select-only",
|
||||
[
|
||||
'pgbench',
|
||||
'-S',
|
||||
'-c4',
|
||||
f'-T{duration}',
|
||||
'-P2',
|
||||
'--progress-timestamp',
|
||||
env.pg.connstr(),
|
||||
])
|
||||
|
||||
env.report_size()
|
||||
|
||||
|
||||
def get_durations_matrix(default: int = 45):
|
||||
def get_durations_matrix(default: int = 45) -> List[int]:
|
||||
durations = os.getenv("TEST_PG_BENCH_DURATIONS_MATRIX", default=str(default))
|
||||
return list(map(int, durations.split(",")))
|
||||
rv = []
|
||||
for d in durations.split(","):
|
||||
d = d.strip().lower()
|
||||
if d.endswith('h'):
|
||||
duration = int(d.removesuffix('h')) * 60 * 60
|
||||
elif d.endswith('m'):
|
||||
duration = int(d.removesuffix('m')) * 60
|
||||
else:
|
||||
duration = int(d.removesuffix('s'))
|
||||
rv.append(duration)
|
||||
|
||||
return rv
|
||||
|
||||
|
||||
def get_scales_matrix(default: int = 10):
|
||||
def get_scales_matrix(default: int = 10) -> List[int]:
|
||||
scales = os.getenv("TEST_PG_BENCH_SCALES_MATRIX", default=str(default))
|
||||
return list(map(int, scales.split(",")))
|
||||
rv = []
|
||||
for s in scales.split(","):
|
||||
s = s.strip().lower()
|
||||
if s.endswith('mb'):
|
||||
scale = get_scale_for_db(int(s.removesuffix('mb')))
|
||||
elif s.endswith('gb'):
|
||||
scale = get_scale_for_db(int(s.removesuffix('gb')) * 1024)
|
||||
else:
|
||||
scale = int(s)
|
||||
rv.append(scale)
|
||||
|
||||
return rv
|
||||
|
||||
|
||||
# Run the pgbench tests against vanilla Postgres and neon
|
||||
@pytest.mark.parametrize("scale", get_scales_matrix())
|
||||
@pytest.mark.parametrize("duration", get_durations_matrix())
|
||||
def test_pgbench(neon_with_baseline: PgCompare, scale: int, duration: int):
|
||||
run_test_pgbench(neon_with_baseline, scale, duration)
|
||||
run_test_pgbench(neon_with_baseline, scale, duration, PgBenchLoadType.INIT)
|
||||
run_test_pgbench(neon_with_baseline, scale, duration, PgBenchLoadType.SIMPLE_UPDATE)
|
||||
run_test_pgbench(neon_with_baseline, scale, duration, PgBenchLoadType.SELECT_ONLY)
|
||||
|
||||
|
||||
# Run the pgbench tests, and generate a flamegraph from it
|
||||
@@ -123,12 +176,34 @@ profiling="page_requests"
|
||||
env = neon_env_builder.init_start()
|
||||
env.neon_cli.create_branch("empty", "main")
|
||||
|
||||
run_test_pgbench(NeonCompare(zenbenchmark, env, pg_bin, "pgbench"), scale, duration)
|
||||
neon_compare = NeonCompare(zenbenchmark, env, pg_bin, "pgbench")
|
||||
run_test_pgbench(neon_compare, scale, duration, PgBenchLoadType.INIT)
|
||||
run_test_pgbench(neon_compare, scale, duration, PgBenchLoadType.SIMPLE_UPDATE)
|
||||
run_test_pgbench(neon_compare, scale, duration, PgBenchLoadType.SELECT_ONLY)
|
||||
|
||||
|
||||
# The following 3 tests run on an existing database as it was set up by previous tests,
|
||||
# and leaves the database in a state that would be used in the next tests.
|
||||
# Modifying the definition order of these functions or adding other remote tests in between will alter results.
|
||||
# See usage of --sparse-ordering flag in the pytest invocation in the CI workflow
|
||||
#
|
||||
# Run the pgbench tests against an existing Postgres cluster
|
||||
@pytest.mark.parametrize("scale", get_scales_matrix())
|
||||
@pytest.mark.parametrize("duration", get_durations_matrix())
|
||||
@pytest.mark.remote_cluster
|
||||
def test_pgbench_remote(remote_compare: PgCompare, scale: int, duration: int):
|
||||
run_test_pgbench(remote_compare, scale, duration)
|
||||
def test_pgbench_remote_init(remote_compare: PgCompare, scale: int, duration: int):
|
||||
run_test_pgbench(remote_compare, scale, duration, PgBenchLoadType.INIT)
|
||||
|
||||
|
||||
@pytest.mark.parametrize("scale", get_scales_matrix())
|
||||
@pytest.mark.parametrize("duration", get_durations_matrix())
|
||||
@pytest.mark.remote_cluster
|
||||
def test_pgbench_remote_simple_update(remote_compare: PgCompare, scale: int, duration: int):
|
||||
run_test_pgbench(remote_compare, scale, duration, PgBenchLoadType.SIMPLE_UPDATE)
|
||||
|
||||
|
||||
@pytest.mark.parametrize("scale", get_scales_matrix())
|
||||
@pytest.mark.parametrize("duration", get_durations_matrix())
|
||||
@pytest.mark.remote_cluster
|
||||
def test_pgbench_remote_select_only(remote_compare: PgCompare, scale: int, duration: int):
|
||||
run_test_pgbench(remote_compare, scale, duration, PgBenchLoadType.SELECT_ONLY)
|
||||
|
||||
Reference in New Issue
Block a user