Refactor pgbench tests.

- Remove batch_others/test_pgbench.py. It was a quick check that pgbench
  works, without actually recording any performance numbers, but that
  doesn't seem very interesting anymore. Remove it to avoid confusing it
  with the actual pgbench benchmarks

- Run pgbench with "-n" and "-S" options, for two different workloads:
  simple-updates, and SELECT-only. Previously, we would only run it with
  the "default" TPCB-like workload. That's more or less the same as the
  simple-update (-n) workload, but I think the simple-upload workload
  is more relevant for testing storage performance. The SELECT-only
  workload is a new thing to measure.

- Merge test_perf_pgbench.py and test_perf_pgbench_remote.py. I added
  a new "remote" implementation of the PgCompare class, which allows
  running the same tests against an already-running Postgres instance.

- Make the PgBenchRunResult.parse_from_output function more
  flexible. pgbench can print different lines depending on the
  command-line options, but the parsing function expected a particular
  set of lines.
This commit is contained in:
Heikki Linnakangas
2022-04-14 13:31:42 +03:00
parent a009fe912a
commit 4a8c663452
7 changed files with 279 additions and 250 deletions

View File

@@ -26,7 +26,7 @@ jobs:
runs-on: [self-hosted, zenith-benchmarker]
env:
PG_BIN: "/usr/pgsql-13/bin"
POSTGRES_DISTRIB_DIR: "/usr/pgsql-13"
steps:
- name: Checkout zenith repo
@@ -51,7 +51,7 @@ jobs:
echo Poetry
poetry --version
echo Pgbench
$PG_BIN/pgbench --version
$POSTGRES_DISTRIB_DIR/bin/pgbench --version
# FIXME cluster setup is skipped due to various changes in console API
# for now pre created cluster is used. When API gain some stability
@@ -66,7 +66,7 @@ jobs:
echo "Starting cluster"
# wake up the cluster
$PG_BIN/psql $BENCHMARK_CONNSTR -c "SELECT 1"
$POSTGRES_DISTRIB_DIR/bin/psql $BENCHMARK_CONNSTR -c "SELECT 1"
- name: Run benchmark
# pgbench is installed system wide from official repo
@@ -83,8 +83,11 @@ jobs:
# sudo yum install postgresql13-contrib
# actual binaries are located in /usr/pgsql-13/bin/
env:
TEST_PG_BENCH_TRANSACTIONS_MATRIX: "5000,10000,20000"
TEST_PG_BENCH_SCALES_MATRIX: "10,15"
# The pgbench test runs two tests of given duration against each scale.
# So the total runtime with these parameters is 2 * 2 * 300 = 1200, or 20 minutes.
# Plus time needed to initialize the test databases.
TEST_PG_BENCH_DURATIONS_MATRIX: "300"
TEST_PG_BENCH_SCALES_MATRIX: "10,100"
PLATFORM: "zenith-staging"
BENCHMARK_CONNSTR: "${{ secrets.BENCHMARK_STAGING_CONNSTR }}"
REMOTE_ENV: "1" # indicate to test harness that we do not have zenith binaries locally

View File

@@ -1,14 +0,0 @@
from fixtures.zenith_fixtures import ZenithEnv
from fixtures.log_helper import log
def test_pgbench(zenith_simple_env: ZenithEnv, pg_bin):
env = zenith_simple_env
env.zenith_cli.create_branch("test_pgbench", "empty")
pg = env.postgres.create_start('test_pgbench')
log.info("postgres is running on 'test_pgbench' branch")
connstr = pg.connstr()
pg_bin.run_capture(['pgbench', '-i', connstr])
pg_bin.run_capture(['pgbench'] + '-c 10 -T 5 -P 1 -M prepared'.split() + [connstr])

View File

@@ -17,7 +17,7 @@ import warnings
from contextlib import contextmanager
# Type-related stuff
from typing import Iterator
from typing import Iterator, Optional
"""
This file contains fixtures for micro-benchmarks.
@@ -51,17 +51,12 @@ in the test initialization, or measure disk usage after the test query.
@dataclasses.dataclass
class PgBenchRunResult:
scale: int
number_of_clients: int
number_of_threads: int
number_of_transactions_actually_processed: int
latency_average: float
latency_stddev: float
tps_including_connection_time: float
tps_excluding_connection_time: float
init_duration: float
init_start_timestamp: int
init_end_timestamp: int
latency_stddev: Optional[float]
tps: float
run_duration: float
run_start_timestamp: int
run_end_timestamp: int
@@ -69,56 +64,67 @@ class PgBenchRunResult:
# TODO progress
@classmethod
def parse_from_output(
def parse_from_stdout(
cls,
out: 'subprocess.CompletedProcess[str]',
init_duration: float,
init_start_timestamp: int,
init_end_timestamp: int,
stdout: str,
run_duration: float,
run_start_timestamp: int,
run_end_timestamp: int,
):
stdout_lines = out.stdout.splitlines()
stdout_lines = stdout.splitlines()
latency_stddev = None
# we know significant parts of these values from test input
# but to be precise take them from output
# scaling factor: 5
assert "scaling factor" in stdout_lines[1]
scale = int(stdout_lines[1].split()[-1])
# number of clients: 1
assert "number of clients" in stdout_lines[3]
number_of_clients = int(stdout_lines[3].split()[-1])
# number of threads: 1
assert "number of threads" in stdout_lines[4]
number_of_threads = int(stdout_lines[4].split()[-1])
# number of transactions actually processed: 1000/1000
assert "number of transactions actually processed" in stdout_lines[6]
number_of_transactions_actually_processed = int(stdout_lines[6].split("/")[1])
# latency average = 19.894 ms
assert "latency average" in stdout_lines[7]
latency_average = stdout_lines[7].split()[-2]
# latency stddev = 3.387 ms
assert "latency stddev" in stdout_lines[8]
latency_stddev = stdout_lines[8].split()[-2]
# tps = 50.219689 (including connections establishing)
assert "(including connections establishing)" in stdout_lines[9]
tps_including_connection_time = stdout_lines[9].split()[2]
# tps = 50.264435 (excluding connections establishing)
assert "(excluding connections establishing)" in stdout_lines[10]
tps_excluding_connection_time = stdout_lines[10].split()[2]
for line in stdout.splitlines():
# scaling factor: 5
if line.startswith("scaling factor:"):
scale = int(line.split()[-1])
# number of clients: 1
if line.startswith("number of clients: "):
number_of_clients = int(line.split()[-1])
# number of threads: 1
if line.startswith("number of threads: "):
number_of_threads = int(line.split()[-1])
# number of transactions actually processed: 1000/1000
# OR
# number of transactions actually processed: 1000
if line.startswith("number of transactions actually processed"):
if "/" in line:
number_of_transactions_actually_processed = int(line.split("/")[1])
else:
number_of_transactions_actually_processed = int(line.split()[-1])
# latency average = 19.894 ms
if line.startswith("latency average"):
latency_average = float(line.split()[-2])
# latency stddev = 3.387 ms
# (only printed with some options)
if line.startswith("latency stddev"):
latency_stddev = float(line.split()[-2])
# Get the TPS without initial connection time. The format
# of the tps lines changed in pgbench v14, but we accept
# either format:
#
# pgbench v13 and below:
# tps = 50.219689 (including connections establishing)
# tps = 50.264435 (excluding connections establishing)
#
# pgbench v14:
# initial connection time = 3.858 ms
# tps = 309.281539 (without initial connection time)
if (line.startswith("tps = ") and ("(excluding connections establishing)" in line
or "(without initial connection time)")):
tps = float(line.split()[2])
return cls(
scale=scale,
number_of_clients=number_of_clients,
number_of_threads=number_of_threads,
number_of_transactions_actually_processed=number_of_transactions_actually_processed,
latency_average=float(latency_average),
latency_stddev=float(latency_stddev),
tps_including_connection_time=float(tps_including_connection_time),
tps_excluding_connection_time=float(tps_excluding_connection_time),
init_duration=init_duration,
init_start_timestamp=init_start_timestamp,
init_end_timestamp=init_end_timestamp,
latency_average=latency_average,
latency_stddev=latency_stddev,
tps=tps,
run_duration=run_duration,
run_start_timestamp=run_start_timestamp,
run_end_timestamp=run_end_timestamp,
@@ -187,60 +193,41 @@ class ZenithBenchmarker:
report=MetricReport.LOWER_IS_BETTER,
)
def record_pg_bench_result(self, pg_bench_result: PgBenchRunResult):
self.record("scale", pg_bench_result.scale, '', MetricReport.TEST_PARAM)
self.record("number_of_clients",
def record_pg_bench_result(self, prefix: str, pg_bench_result: PgBenchRunResult):
self.record(f"{prefix}.number_of_clients",
pg_bench_result.number_of_clients,
'',
MetricReport.TEST_PARAM)
self.record("number_of_threads",
self.record(f"{prefix}.number_of_threads",
pg_bench_result.number_of_threads,
'',
MetricReport.TEST_PARAM)
self.record(
"number_of_transactions_actually_processed",
f"{prefix}.number_of_transactions_actually_processed",
pg_bench_result.number_of_transactions_actually_processed,
'',
# thats because this is predefined by test matrix and doesnt change across runs
report=MetricReport.TEST_PARAM,
)
self.record("latency_average",
self.record(f"{prefix}.latency_average",
pg_bench_result.latency_average,
unit="ms",
report=MetricReport.LOWER_IS_BETTER)
self.record("latency_stddev",
pg_bench_result.latency_stddev,
unit="ms",
report=MetricReport.LOWER_IS_BETTER)
self.record("tps_including_connection_time",
pg_bench_result.tps_including_connection_time,
'',
report=MetricReport.HIGHER_IS_BETTER)
self.record("tps_excluding_connection_time",
pg_bench_result.tps_excluding_connection_time,
'',
report=MetricReport.HIGHER_IS_BETTER)
self.record("init_duration",
pg_bench_result.init_duration,
unit="s",
report=MetricReport.LOWER_IS_BETTER)
self.record("init_start_timestamp",
pg_bench_result.init_start_timestamp,
'',
MetricReport.TEST_PARAM)
self.record("init_end_timestamp",
pg_bench_result.init_end_timestamp,
'',
MetricReport.TEST_PARAM)
self.record("run_duration",
if pg_bench_result.latency_stddev is not None:
self.record(f"{prefix}.latency_stddev",
pg_bench_result.latency_stddev,
unit="ms",
report=MetricReport.LOWER_IS_BETTER)
self.record(f"{prefix}.tps", pg_bench_result.tps, '', report=MetricReport.HIGHER_IS_BETTER)
self.record(f"{prefix}.run_duration",
pg_bench_result.run_duration,
unit="s",
report=MetricReport.LOWER_IS_BETTER)
self.record("run_start_timestamp",
self.record(f"{prefix}.run_start_timestamp",
pg_bench_result.run_start_timestamp,
'',
MetricReport.TEST_PARAM)
self.record("run_end_timestamp",
self.record(f"{prefix}.run_end_timestamp",
pg_bench_result.run_end_timestamp,
'',
MetricReport.TEST_PARAM)

View File

@@ -2,7 +2,7 @@ import pytest
from contextlib import contextmanager
from abc import ABC, abstractmethod
from fixtures.zenith_fixtures import PgBin, PgProtocol, VanillaPostgres, ZenithEnv
from fixtures.zenith_fixtures import PgBin, PgProtocol, VanillaPostgres, RemotePostgres, ZenithEnv
from fixtures.benchmark_fixture import MetricReport, ZenithBenchmarker
# Type-related stuff
@@ -162,6 +162,48 @@ class VanillaCompare(PgCompare):
return self.zenbenchmark.record_duration(out_name)
class RemoteCompare(PgCompare):
"""PgCompare interface for a remote postgres instance."""
def __init__(self, zenbenchmark, remote_pg: RemotePostgres):
self._pg = remote_pg
self._zenbenchmark = zenbenchmark
# Long-lived cursor, useful for flushing
self.conn = self.pg.connect()
self.cur = self.conn.cursor()
@property
def pg(self):
return self._pg
@property
def zenbenchmark(self):
return self._zenbenchmark
@property
def pg_bin(self):
return self._pg.pg_bin
def flush(self):
# TODO: flush the remote pageserver
pass
def report_peak_memory_use(self) -> None:
# TODO: get memory usage from remote pageserver
pass
def report_size(self) -> None:
# TODO: get storage size from remote pageserver
pass
@contextmanager
def record_pageserver_writes(self, out_name):
yield # Do nothing
def record_duration(self, out_name):
return self.zenbenchmark.record_duration(out_name)
@pytest.fixture(scope='function')
def zenith_compare(request, zenbenchmark, pg_bin, zenith_simple_env) -> ZenithCompare:
branch_name = request.node.name
@@ -173,6 +215,11 @@ def vanilla_compare(zenbenchmark, vanilla_pg) -> VanillaCompare:
return VanillaCompare(zenbenchmark, vanilla_pg)
@pytest.fixture(scope='function')
def remote_compare(zenbenchmark, remote_pg) -> RemoteCompare:
return RemoteCompare(zenbenchmark, remote_pg)
@pytest.fixture(params=["vanilla_compare", "zenith_compare"], ids=["vanilla", "zenith"])
def zenith_with_baseline(request) -> PgCompare:
"""Parameterized fixture that helps compare zenith against vanilla postgres.

View File

@@ -123,6 +123,22 @@ def pytest_configure(config):
top_output_dir = os.path.join(base_dir, DEFAULT_OUTPUT_DIR)
mkdir_if_needed(top_output_dir)
# Find the postgres installation.
global pg_distrib_dir
env_postgres_bin = os.environ.get('POSTGRES_DISTRIB_DIR')
if env_postgres_bin:
pg_distrib_dir = env_postgres_bin
else:
pg_distrib_dir = os.path.normpath(os.path.join(base_dir, DEFAULT_POSTGRES_DIR))
log.info(f'pg_distrib_dir is {pg_distrib_dir}')
if os.getenv("REMOTE_ENV"):
# When testing against a remote server, we only need the client binary.
if not os.path.exists(os.path.join(pg_distrib_dir, 'bin/psql')):
raise Exception('psql not found at "{}"'.format(pg_distrib_dir))
else:
if not os.path.exists(os.path.join(pg_distrib_dir, 'bin/postgres')):
raise Exception('postgres not found at "{}"'.format(pg_distrib_dir))
if os.getenv("REMOTE_ENV"):
# we are in remote env and do not have zenith binaries locally
# this is the case for benchmarks run on self-hosted runner
@@ -138,17 +154,6 @@ def pytest_configure(config):
if not os.path.exists(os.path.join(zenith_binpath, 'pageserver')):
raise Exception('zenith binaries not found at "{}"'.format(zenith_binpath))
# Find the postgres installation.
global pg_distrib_dir
env_postgres_bin = os.environ.get('POSTGRES_DISTRIB_DIR')
if env_postgres_bin:
pg_distrib_dir = env_postgres_bin
else:
pg_distrib_dir = os.path.normpath(os.path.join(base_dir, DEFAULT_POSTGRES_DIR))
log.info(f'pg_distrib_dir is {pg_distrib_dir}')
if not os.path.exists(os.path.join(pg_distrib_dir, 'bin/postgres')):
raise Exception('postgres not found at "{}"'.format(pg_distrib_dir))
def zenfixture(func: Fn) -> Fn:
"""
@@ -1305,6 +1310,47 @@ def vanilla_pg(test_output_dir: str) -> Iterator[VanillaPostgres]:
yield vanilla_pg
class RemotePostgres(PgProtocol):
def __init__(self, pg_bin: PgBin, remote_connstr: str):
super().__init__(**parse_dsn(remote_connstr))
self.pg_bin = pg_bin
# The remote server is assumed to be running already
self.running = True
def configure(self, options: List[str]):
raise Exception('cannot change configuration of remote Posgres instance')
def start(self):
raise Exception('cannot start a remote Postgres instance')
def stop(self):
raise Exception('cannot stop a remote Postgres instance')
def get_subdir_size(self, subdir) -> int:
# TODO: Could use the server's Generic File Acccess functions if superuser.
# See https://www.postgresql.org/docs/14/functions-admin.html#FUNCTIONS-ADMIN-GENFILE
raise Exception('cannot get size of a Postgres instance')
def __enter__(self):
return self
def __exit__(self, exc_type, exc, tb):
# do nothing
pass
@pytest.fixture(scope='function')
def remote_pg(test_output_dir: str) -> Iterator[RemotePostgres]:
pg_bin = PgBin(test_output_dir)
connstr = os.getenv("BENCHMARK_CONNSTR")
if connstr is None:
raise ValueError("no connstr provided, use BENCHMARK_CONNSTR environment variable")
with RemotePostgres(pg_bin, connstr) as remote_pg:
yield remote_pg
class ZenithProxy(PgProtocol):
def __init__(self, port: int):
super().__init__(host="127.0.0.1",

View File

@@ -2,29 +2,113 @@ from contextlib import closing
from fixtures.zenith_fixtures import PgBin, VanillaPostgres, ZenithEnv
from fixtures.compare_fixtures import PgCompare, VanillaCompare, ZenithCompare
from fixtures.benchmark_fixture import MetricReport, ZenithBenchmarker
from fixtures.benchmark_fixture import PgBenchRunResult, MetricReport, ZenithBenchmarker
from fixtures.log_helper import log
from pathlib import Path
import pytest
from datetime import datetime
import calendar
import os
import timeit
def utc_now_timestamp() -> int:
return calendar.timegm(datetime.utcnow().utctimetuple())
def init_pgbench(env: PgCompare, cmdline):
# calculate timestamps and durations separately
# timestamp is intended to be used for linking to grafana and logs
# duration is actually a metric and uses float instead of int for timestamp
init_start_timestamp = utc_now_timestamp()
t0 = timeit.default_timer()
with env.record_pageserver_writes('init.pageserver_writes'):
env.pg_bin.run_capture(cmdline)
env.flush()
init_duration = timeit.default_timer() - t0
init_end_timestamp = utc_now_timestamp()
env.zenbenchmark.record("init.duration",
init_duration,
unit="s",
report=MetricReport.LOWER_IS_BETTER)
env.zenbenchmark.record("init.start_timestamp",
init_start_timestamp,
'',
MetricReport.TEST_PARAM)
env.zenbenchmark.record("init.end_timestamp", init_end_timestamp, '', MetricReport.TEST_PARAM)
def run_pgbench(env: PgCompare, prefix: str, cmdline):
with env.record_pageserver_writes(f'{prefix}.pageserver_writes'):
run_start_timestamp = utc_now_timestamp()
t0 = timeit.default_timer()
out = env.pg_bin.run_capture(cmdline, )
run_duration = timeit.default_timer() - t0
run_end_timestamp = utc_now_timestamp()
env.flush()
stdout = Path(f"{out}.stdout").read_text()
res = PgBenchRunResult.parse_from_stdout(
stdout=stdout,
run_duration=run_duration,
run_start_timestamp=run_start_timestamp,
run_end_timestamp=run_end_timestamp,
)
env.zenbenchmark.record_pg_bench_result(prefix, res)
#
# Run a very short pgbench test.
# Initialize a pgbench database, and run pgbench against it.
#
# Collects three metrics:
# This makes runs two different pgbench workloads against the same
# initialized database, and 'duration' is the time of each run. So
# the total runtime is 2 * duration, plus time needed to initialize
# the test database.
#
# 1. Time to initialize the pgbench database (pgbench -s5 -i)
# 2. Time to run 5000 pgbench transactions
# 3. Disk space used
#
def test_pgbench(zenith_with_baseline: PgCompare):
env = zenith_with_baseline
# Currently, the # of connections is hardcoded at 4
def run_test_pgbench(env: PgCompare, scale: int, duration: int):
with env.record_pageserver_writes('pageserver_writes'):
with env.record_duration('init'):
env.pg_bin.run_capture(['pgbench', '-s5', '-i', env.pg.connstr()])
env.flush()
# Record the scale and initialize
env.zenbenchmark.record("scale", scale, '', MetricReport.TEST_PARAM)
init_pgbench(env, ['pgbench', f'-s{scale}', '-i', env.pg.connstr()])
with env.record_duration('5000_xacts'):
env.pg_bin.run_capture(['pgbench', '-c1', '-t5000', env.pg.connstr()])
env.flush()
# Run simple-update workload
run_pgbench(env,
"simple-update",
['pgbench', '-n', '-c4', f'-T{duration}', '-P2', '-Mprepared', env.pg.connstr()])
# Run SELECT workload
run_pgbench(env,
"select-only",
['pgbench', '-S', '-c4', f'-T{duration}', '-P2', '-Mprepared', env.pg.connstr()])
env.report_size()
def get_durations_matrix():
durations = os.getenv("TEST_PG_BENCH_DURATIONS_MATRIX", default="45")
return list(map(int, durations.split(",")))
def get_scales_matrix():
scales = os.getenv("TEST_PG_BENCH_SCALES_MATRIX", default="10")
return list(map(int, scales.split(",")))
# Run the pgbench tests against vanilla Postgres and zenith
@pytest.mark.parametrize("scale", get_scales_matrix())
@pytest.mark.parametrize("duration", get_durations_matrix())
def test_pgbench(zenith_with_baseline: PgCompare, scale: int, duration: int):
run_test_pgbench(zenith_with_baseline, scale, duration)
# Run the pgbench tests against an existing Postgres cluster
@pytest.mark.parametrize("scale", get_scales_matrix())
@pytest.mark.parametrize("duration", get_durations_matrix())
@pytest.mark.remote_cluster
def test_pgbench_remote(remote_compare: PgCompare, scale: int, duration: int):
run_test_pgbench(remote_compare, scale, duration)

View File

@@ -1,124 +0,0 @@
import dataclasses
import os
import subprocess
from typing import List
from fixtures.benchmark_fixture import PgBenchRunResult, ZenithBenchmarker
import pytest
from datetime import datetime
import calendar
import timeit
import os
def utc_now_timestamp() -> int:
return calendar.timegm(datetime.utcnow().utctimetuple())
@dataclasses.dataclass
class PgBenchRunner:
connstr: str
scale: int
transactions: int
pgbench_bin_path: str = "pgbench"
def invoke(self, args: List[str]) -> 'subprocess.CompletedProcess[str]':
res = subprocess.run([self.pgbench_bin_path, *args], text=True, capture_output=True)
if res.returncode != 0:
raise RuntimeError(f"pgbench failed. stdout: {res.stdout} stderr: {res.stderr}")
return res
def init(self, vacuum: bool = True) -> 'subprocess.CompletedProcess[str]':
args = []
if not vacuum:
args.append("--no-vacuum")
args.extend([f"--scale={self.scale}", "--initialize", self.connstr])
return self.invoke(args)
def run(self, jobs: int = 1, clients: int = 1):
return self.invoke([
f"--transactions={self.transactions}",
f"--jobs={jobs}",
f"--client={clients}",
"--progress=2", # print progress every two seconds
self.connstr,
])
@pytest.fixture
def connstr():
res = os.getenv("BENCHMARK_CONNSTR")
if res is None:
raise ValueError("no connstr provided, use BENCHMARK_CONNSTR environment variable")
return res
def get_transactions_matrix():
transactions = os.getenv("TEST_PG_BENCH_TRANSACTIONS_MATRIX")
if transactions is None:
return [10**4, 10**5]
return list(map(int, transactions.split(",")))
def get_scales_matrix():
scales = os.getenv("TEST_PG_BENCH_SCALES_MATRIX")
if scales is None:
return [10, 20]
return list(map(int, scales.split(",")))
@pytest.mark.parametrize("scale", get_scales_matrix())
@pytest.mark.parametrize("transactions", get_transactions_matrix())
@pytest.mark.remote_cluster
def test_pg_bench_remote_cluster(zenbenchmark: ZenithBenchmarker,
connstr: str,
scale: int,
transactions: int):
"""
The best way is to run same pack of tests both, for local zenith
and against staging, but currently local tests heavily depend on
things available only locally e.g. zenith binaries, pageserver api, etc.
Also separate test allows to run pgbench workload against vanilla postgres
or other systems that support postgres protocol.
Also now this is more of a liveness test because it stresses pageserver internals,
so we clearly see what goes wrong in more "real" environment.
"""
pg_bin = os.getenv("PG_BIN")
if pg_bin is not None:
pgbench_bin_path = os.path.join(pg_bin, "pgbench")
else:
pgbench_bin_path = "pgbench"
runner = PgBenchRunner(
connstr=connstr,
scale=scale,
transactions=transactions,
pgbench_bin_path=pgbench_bin_path,
)
# calculate timestamps and durations separately
# timestamp is intended to be used for linking to grafana and logs
# duration is actually a metric and uses float instead of int for timestamp
init_start_timestamp = utc_now_timestamp()
t0 = timeit.default_timer()
runner.init()
init_duration = timeit.default_timer() - t0
init_end_timestamp = utc_now_timestamp()
run_start_timestamp = utc_now_timestamp()
t0 = timeit.default_timer()
out = runner.run() # TODO handle failures
run_duration = timeit.default_timer() - t0
run_end_timestamp = utc_now_timestamp()
res = PgBenchRunResult.parse_from_output(
out=out,
init_duration=init_duration,
init_start_timestamp=init_start_timestamp,
init_end_timestamp=init_end_timestamp,
run_duration=run_duration,
run_start_timestamp=run_start_timestamp,
run_end_timestamp=run_end_timestamp,
)
zenbenchmark.record_pg_bench_result(res)