Add basic performance test framework.

This provides a pytest fixture to record metrics from pytest tests. The
The recorded metrics are printed out at the end of the tests.

As a starter, this includes on small test, using pgbench. It prints out
three metrics: the initialization time, runtime of 5000 xacts, and the
repository size after the tests.
This commit is contained in:
Heikki Linnakangas
2021-08-27 21:00:45 +03:00
parent e1d8f97b9e
commit 074bd3bb12
4 changed files with 242 additions and 0 deletions

View File

@@ -293,6 +293,12 @@ workflows:
test_selection: batch_others
requires:
- build-zenith-<< matrix.build_type >>
- run-pytest:
name: benchmarks
build_type: release
test_selection: performance
requires:
- build-zenith-release
- docker-image:
# Context gives an ability to login
context: Docker Hub

View File

@@ -0,0 +1,162 @@
from pprint import pprint
import os
import timeit
import pathlib
import uuid
import psycopg2
import pytest
from _pytest.config import Config
from _pytest.runner import CallInfo
from _pytest.terminal import TerminalReporter
import shutil
import signal
import subprocess
import time
from contextlib import contextmanager
from contextlib import closing
from pathlib import Path
from dataclasses import dataclass
# Type-related stuff
from psycopg2.extensions import connection as PgConnection
from typing import Any, Callable, Dict, Iterator, List, Optional, TypeVar, cast
from typing_extensions import Literal
from .utils import (get_self_dir, mkdir_if_needed, subprocess_capture)
"""
This file contains fixtures for micro-benchmarks.
To use, declare the 'zenbenchmark' fixture in the test function. Run the
bencmark, and then record the result by calling zenbenchmark.record. For example:
import timeit
from fixtures.zenith_fixtures import PostgresFactory, ZenithPageserver
pytest_plugins = ("fixtures.zenith_fixtures", "fixtures.benchmark_fixture")
def test_mybench(postgres: PostgresFactory, pageserver: ZenithPageserver, zenbenchmark):
# Initialize the test
...
# Run the test, timing how long it takes
with zenbenchmark.record_duration('test_query'):
cur.execute('SELECT test_query(...)')
# Record another measurement
zenbenchmark.record('speed_of_light', 300000, 'km/s')
You can measure multiple things in one test, and record each one with a separate
call to zenbenchmark. For example, you could time the bulk loading that happens
in the test initialization, or measure disk usage after the test query.
"""
# All the results are collected in this list, as a tuple:
# (test_name: str, metric_name: str, metric_value: float, unit: str)
#
# TODO: It would perhaps be better to store the results as additional
# properties in the pytest TestReport objects, to make them visible to
# other pytest tools.
global zenbenchmark_results
zenbenchmark_results = []
class ZenithBenchmarkResults:
""" An object for recording benchmark results. """
def __init__(self):
self.results = []
def record(self, test_name: str, metric_name: str, metric_value: float, unit: str):
"""
Record a benchmark result.
"""
self.results.append((test_name, metric_name, metric_value, unit))
# Sesssion scope fixture that initializes the results object
@pytest.fixture(autouse=True, scope='session')
def zenbenchmark_global(request) -> Iterator[ZenithBenchmarkResults]:
"""
This is a python decorator for benchmark fixtures
"""
global zenbenchmark_results
zenbenchmark_results = ZenithBenchmarkResults()
yield zenbenchmark_results
class ZenithBenchmarker:
"""
An object for recording benchmark results. This is created for each test
function by the zenbenchmark fixture
"""
def __init__(self, results, request):
self.results = results
self.request = request
def record(self, metric_name: str, metric_value: float, unit: str):
"""
Record a benchmark result.
"""
self.results.record(self.request.node.name, metric_name, metric_value, unit)
@contextmanager
def record_duration(self, metric_name):
"""
Record a duration. Usage:
with zenbenchmark.record_duration('foobar_runtime'):
foobar() # measure this
"""
start = timeit.default_timer()
yield
end = timeit.default_timer()
self.results.record(self.request.node.name, metric_name, end - start, 's')
@pytest.fixture(scope='function')
def zenbenchmark(zenbenchmark_global, request) -> Iterator[ZenithBenchmarker]:
"""
This is a python decorator for benchmark fixtures. It contains functions for
recording measurements, and prints them out at the end.
"""
benchmarker = ZenithBenchmarker(zenbenchmark_global, request)
yield benchmarker
# Hook to print the results at the end
@pytest.hookimpl(hookwrapper=True)
def pytest_terminal_summary(
terminalreporter: TerminalReporter, exitstatus: int, config: Config
):
yield
global zenbenchmark_results
if not zenbenchmark_results:
return
terminalreporter.section('Benchmark results', "-")
for result in zenbenchmark_results.results:
func = result[0]
metric_name = result[1]
metric_value = result[2]
unit = result[3]
terminalreporter.write("{}.{}: ".format(func, metric_name))
if unit == 'MB':
terminalreporter.write("{0:,.0f}".format(metric_value), green=True)
elif unit == 's':
terminalreporter.write("{0:,.3f}".format(metric_value), green=True)
else:
terminalreporter.write("{0:,.4f}".format(metric_value), green=True)
terminalreporter.line(" {}".format(unit))

View File

@@ -147,6 +147,7 @@ class ZenithCli:
assert os.path.isdir(binpath)
self.binpath = binpath
self.bin_zenith = os.path.join(binpath, 'zenith')
self.repo_dir = repo_dir
self.env = os.environ.copy()
self.env['ZENITH_REPO_DIR'] = repo_dir
self.env['POSTGRES_DISTRIB_DIR'] = pg_distrib_dir
@@ -274,6 +275,12 @@ class ZenithPageserver(PgProtocol):
self.zenith_cli.run(cmd)
return self
def repo_dir(self) -> str:
"""
Return path to repository dir
"""
return self.zenith_cli.repo_dir
def start(self) -> 'ZenithPageserver':
"""
Start the page server.

View File

@@ -0,0 +1,67 @@
import os
from contextlib import closing
from fixtures.zenith_fixtures import PostgresFactory, ZenithPageserver
pytest_plugins = ("fixtures.zenith_fixtures", "fixtures.benchmark_fixture")
def get_timeline_size(repo_dir: str, tenantid: str, timelineid: str):
path = "{}/tenants/{}/timelines/{}".format(repo_dir, tenantid, timelineid)
totalbytes = 0
for root, dirs, files in os.walk(path):
for name in files:
totalbytes += os.path.getsize(os.path.join(root, name))
if 'wal' in dirs:
dirs.remove('wal') # don't visit 'wal' subdirectory
return totalbytes
#
# Run a very short pgbench test.
#
# Collects three metrics:
#
# 1. Time to initialize the pgbench database (pgbench -s5 -i)
# 2. Time to run 5000 pgbench transactions
# 3. Disk space used
#
def test_pgbench(postgres: PostgresFactory, pageserver: ZenithPageserver, pg_bin, zenith_cli, zenbenchmark, repo_dir: str):
# Create a branch for us
zenith_cli.run(["branch", "test_pgbench_perf", "empty"])
pg = postgres.create_start('test_pgbench_perf')
print("postgres is running on 'test_pgbench_perf' branch")
# Open a connection directly to the page server that we'll use to force
# flushing the layers to disk
psconn = pageserver.connect();
pscur = psconn.cursor()
# Get the timeline ID of our branch. We need it for the 'do_gc' command
with closing(pg.connect()) as conn:
with conn.cursor() as cur:
cur.execute("SHOW zenith.zenith_timeline")
timeline = cur.fetchone()[0]
connstr = pg.connstr()
# Initialize pgbench database
with zenbenchmark.record_duration('init'):
pg_bin.run_capture(['pgbench', '-s5', '-i', connstr])
# Flush the layers from memory to disk. The time to do that is included in the
# reported init time.
pscur.execute(f"do_gc {pageserver.initial_tenant} {timeline} 0")
# Run pgbench for 5000 transactions
with zenbenchmark.record_duration('5000_xacts'):
pg_bin.run_capture(['pgbench', '-c1', '-t5000', connstr])
# Flush the layers to disk again. This is *not' included in the reported time,
# though.
pscur.execute(f"do_gc {pageserver.initial_tenant} {timeline} 0")
# Report disk space used by the repository
timeline_size = get_timeline_size(repo_dir, pageserver.initial_tenant, timeline)
zenbenchmark.record('size', timeline_size / (1024*1024), 'MB')