From afb3342e4672c360744e328ddbbcd10bc0369bc0 Mon Sep 17 00:00:00 2001 From: bojanserafimov Date: Tue, 15 Feb 2022 13:44:22 -0500 Subject: [PATCH] Add vanilla pg baseline tests (#1275) --- test_runner/fixtures/compare_fixtures.py | 188 ++++++++++++++++++ test_runner/performance/test_bulk_insert.py | 51 ++--- test_runner/performance/test_copy.py | 77 +++---- test_runner/performance/test_gist_build.py | 51 ++--- .../performance/test_parallel_copy_to.py | 109 +++------- test_runner/performance/test_perf_pgbench.py | 101 ++-------- .../performance/test_small_seqscans.py | 25 +-- .../performance/test_write_amplification.py | 42 ++-- 8 files changed, 306 insertions(+), 338 deletions(-) create mode 100644 test_runner/fixtures/compare_fixtures.py diff --git a/test_runner/fixtures/compare_fixtures.py b/test_runner/fixtures/compare_fixtures.py new file mode 100644 index 0000000000..7b833c9614 --- /dev/null +++ b/test_runner/fixtures/compare_fixtures.py @@ -0,0 +1,188 @@ +import pytest +from contextlib import contextmanager +from abc import ABC, abstractmethod + +from fixtures.zenith_fixtures import PgBin, PgProtocol, VanillaPostgres, ZenithEnv +from fixtures.benchmark_fixture import MetricReport, ZenithBenchmarker + +# Type-related stuff +from typing import Iterator + + +class PgCompare(ABC): + """Common interface of all postgres implementations, useful for benchmarks. + + This class is a helper class for the zenith_with_baseline fixture. See its documentation + for more details. + """ + @property + @abstractmethod + def pg(self) -> PgProtocol: + pass + + @property + @abstractmethod + def pg_bin(self) -> PgBin: + pass + + @abstractmethod + def flush(self) -> None: + pass + + @abstractmethod + def report_peak_memory_use(self) -> None: + pass + + @abstractmethod + def report_size(self) -> None: + pass + + @contextmanager + @abstractmethod + def record_pageserver_writes(self, out_name): + pass + + @contextmanager + @abstractmethod + def record_duration(self, out_name): + pass + + +class ZenithCompare(PgCompare): + """PgCompare interface for the zenith stack.""" + def __init__(self, + zenbenchmark: ZenithBenchmarker, + zenith_simple_env: ZenithEnv, + pg_bin: PgBin, + branch_name): + self.env = zenith_simple_env + self.zenbenchmark = zenbenchmark + self._pg_bin = pg_bin + + # We only use one branch and one timeline + self.branch = branch_name + self.env.zenith_cli(["branch", self.branch, "empty"]) + self._pg = self.env.postgres.create_start(self.branch) + self.timeline = self.pg.safe_psql("SHOW zenith.zenith_timeline")[0][0] + + # Long-lived cursor, useful for flushing + self.psconn = self.env.pageserver.connect() + self.pscur = self.psconn.cursor() + + @property + def pg(self): + return self._pg + + @property + def pg_bin(self): + return self._pg_bin + + def flush(self): + self.pscur.execute(f"do_gc {self.env.initial_tenant} {self.timeline} 0") + + def report_peak_memory_use(self) -> None: + self.zenbenchmark.record("peak_mem", + self.zenbenchmark.get_peak_mem(self.env.pageserver) / 1024, + 'MB', + report=MetricReport.LOWER_IS_BETTER) + + def report_size(self) -> None: + timeline_size = self.zenbenchmark.get_timeline_size(self.env.repo_dir, + self.env.initial_tenant, + self.timeline) + self.zenbenchmark.record('size', + timeline_size / (1024 * 1024), + 'MB', + report=MetricReport.LOWER_IS_BETTER) + + def record_pageserver_writes(self, out_name): + return self.zenbenchmark.record_pageserver_writes(self.env.pageserver, out_name) + + def record_duration(self, out_name): + return self.zenbenchmark.record_duration(out_name) + + +class VanillaCompare(PgCompare): + """PgCompare interface for vanilla postgres.""" + def __init__(self, zenbenchmark, vanilla_pg: VanillaPostgres): + self._pg = vanilla_pg + self.zenbenchmark = zenbenchmark + vanilla_pg.configure(['shared_buffers=1MB']) + vanilla_pg.start() + + # Long-lived cursor, useful for flushing + self.conn = self.pg.connect() + self.cur = self.conn.cursor() + + @property + def pg(self): + return self._pg + + @property + def pg_bin(self): + return self._pg.pg_bin + + def flush(self): + self.cur.execute("checkpoint") + + def report_peak_memory_use(self) -> None: + pass # TODO find something + + def report_size(self) -> None: + data_size = self.pg.get_subdir_size('base') + self.zenbenchmark.record('data_size', + data_size / (1024 * 1024), + 'MB', + report=MetricReport.LOWER_IS_BETTER) + wal_size = self.pg.get_subdir_size('pg_wal') + self.zenbenchmark.record('wal_size', + wal_size / (1024 * 1024), + 'MB', + report=MetricReport.LOWER_IS_BETTER) + + @contextmanager + def record_pageserver_writes(self, out_name): + yield # Do nothing + + def record_duration(self, out_name): + return self.zenbenchmark.record_duration(out_name) + + +@pytest.fixture(scope='function') +def zenith_compare(request, zenbenchmark, pg_bin, zenith_simple_env) -> ZenithCompare: + branch_name = request.node.name + return ZenithCompare(zenbenchmark, zenith_simple_env, pg_bin, branch_name) + + +@pytest.fixture(scope='function') +def vanilla_compare(zenbenchmark, vanilla_pg) -> VanillaCompare: + return VanillaCompare(zenbenchmark, vanilla_pg) + + +@pytest.fixture(params=["vanilla_compare", "zenith_compare"], ids=["vanilla", "zenith"]) +def zenith_with_baseline(request) -> PgCompare: + """Parameterized fixture that helps compare zenith against vanilla postgres. + + A test that uses this fixture turns into a parameterized test that runs against: + 1. A vanilla postgres instance + 2. A simple zenith env (see zenith_simple_env) + 3. Possibly other postgres protocol implementations. + + The main goal of this fixture is to make it easier for people to read and write + performance tests. Easy test writing leads to more tests. + + Perfect encapsulation of the postgres implementations is **not** a goal because + it's impossible. Operational and configuration differences in the different + implementations sometimes matter, and the writer of the test should be mindful + of that. + + If a test requires some one-off special implementation-specific logic, use of + isinstance(zenith_with_baseline, ZenithCompare) is encouraged. Though if that + implementation-specific logic is widely useful across multiple tests, it might + make sense to add methods to the PgCompare class. + """ + fixture = request.getfixturevalue(request.param) + if isinstance(fixture, PgCompare): + return fixture + else: + raise AssertionError(f"test error: fixture {request.param} is not PgCompare") diff --git a/test_runner/performance/test_bulk_insert.py b/test_runner/performance/test_bulk_insert.py index 9892a70516..ed9113cb02 100644 --- a/test_runner/performance/test_bulk_insert.py +++ b/test_runner/performance/test_bulk_insert.py @@ -2,8 +2,13 @@ from contextlib import closing from fixtures.zenith_fixtures import ZenithEnv from fixtures.log_helper import log from fixtures.benchmark_fixture import MetricReport, ZenithBenchmarker +from fixtures.compare_fixtures import PgCompare, VanillaCompare, ZenithCompare -pytest_plugins = ("fixtures.zenith_fixtures", "fixtures.benchmark_fixture") +pytest_plugins = ( + "fixtures.zenith_fixtures", + "fixtures.benchmark_fixture", + "fixtures.compare_fixtures", +) # @@ -16,47 +21,19 @@ pytest_plugins = ("fixtures.zenith_fixtures", "fixtures.benchmark_fixture") # 3. Disk space used # 4. Peak memory usage # -def test_bulk_insert(zenith_simple_env: ZenithEnv, zenbenchmark: ZenithBenchmarker): - env = zenith_simple_env - # Create a branch for us - env.zenith_cli(["branch", "test_bulk_insert", "empty"]) - - pg = env.postgres.create_start('test_bulk_insert') - log.info("postgres is running on 'test_bulk_insert' branch") - - # Open a connection directly to the page server that we'll use to force - # flushing the layers to disk - psconn = env.pageserver.connect() - pscur = psconn.cursor() +def test_bulk_insert(zenith_with_baseline: PgCompare): + env = zenith_with_baseline # Get the timeline ID of our branch. We need it for the 'do_gc' command - with closing(pg.connect()) as conn: + with closing(env.pg.connect()) as conn: with conn.cursor() as cur: - cur.execute("SHOW zenith.zenith_timeline") - timeline = cur.fetchone()[0] - cur.execute("create table huge (i int, j int);") # Run INSERT, recording the time and I/O it takes - with zenbenchmark.record_pageserver_writes(env.pageserver, 'pageserver_writes'): - with zenbenchmark.record_duration('insert'): + with env.record_pageserver_writes('pageserver_writes'): + with env.record_duration('insert'): cur.execute("insert into huge values (generate_series(1, 5000000), 0);") + env.flush() - # Flush the layers from memory to disk. This is included in the reported - # time and I/O - pscur.execute(f"do_gc {env.initial_tenant} {timeline} 0") - - # Record peak memory usage - zenbenchmark.record("peak_mem", - zenbenchmark.get_peak_mem(env.pageserver) / 1024, - 'MB', - report=MetricReport.LOWER_IS_BETTER) - - # Report disk space used by the repository - timeline_size = zenbenchmark.get_timeline_size(env.repo_dir, - env.initial_tenant, - timeline) - zenbenchmark.record('size', - timeline_size / (1024 * 1024), - 'MB', - report=MetricReport.LOWER_IS_BETTER) + env.report_peak_memory_use() + env.report_size() diff --git a/test_runner/performance/test_copy.py b/test_runner/performance/test_copy.py index 50039eb33c..6b66d26433 100644 --- a/test_runner/performance/test_copy.py +++ b/test_runner/performance/test_copy.py @@ -2,10 +2,15 @@ from contextlib import closing from fixtures.zenith_fixtures import ZenithEnv from fixtures.log_helper import log from fixtures.benchmark_fixture import MetricReport, ZenithBenchmarker +from fixtures.compare_fixtures import PgCompare, VanillaCompare, ZenithCompare from io import BufferedReader, RawIOBase from itertools import repeat -pytest_plugins = ("fixtures.zenith_fixtures", "fixtures.benchmark_fixture") +pytest_plugins = ( + "fixtures.zenith_fixtures", + "fixtures.benchmark_fixture", + "fixtures.compare_fixtures", +) class CopyTestData(RawIOBase): @@ -42,77 +47,41 @@ def copy_test_data(rows: int): # # COPY performance tests. # -def test_copy(zenith_simple_env: ZenithEnv, zenbenchmark: ZenithBenchmarker): - env = zenith_simple_env - # Create a branch for us - env.zenith_cli(["branch", "test_copy", "empty"]) - - pg = env.postgres.create_start('test_copy') - log.info("postgres is running on 'test_copy' branch") - - # Open a connection directly to the page server that we'll use to force - # flushing the layers to disk - psconn = env.pageserver.connect() - pscur = psconn.cursor() +def test_copy(zenith_with_baseline: PgCompare): + env = zenith_with_baseline # Get the timeline ID of our branch. We need it for the pageserver 'checkpoint' command - with closing(pg.connect()) as conn: + with closing(env.pg.connect()) as conn: with conn.cursor() as cur: - cur.execute("SHOW zenith.zenith_timeline") - timeline = cur.fetchone()[0] - cur.execute("create table copytest (i int, t text);") # Load data with COPY, recording the time and I/O it takes. # # Since there's no data in the table previously, this extends it. - with zenbenchmark.record_pageserver_writes(env.pageserver, - 'copy_extend_pageserver_writes'): - with zenbenchmark.record_duration('copy_extend'): + with env.record_pageserver_writes('copy_extend_pageserver_writes'): + with env.record_duration('copy_extend'): cur.copy_from(copy_test_data(1000000), 'copytest') - # Flush the layers from memory to disk. This is included in the reported - # time and I/O - pscur.execute(f"checkpoint {env.initial_tenant} {timeline}") + env.flush() # Delete most rows, and VACUUM to make the space available for reuse. - with zenbenchmark.record_pageserver_writes(env.pageserver, 'delete_pageserver_writes'): - with zenbenchmark.record_duration('delete'): + with env.record_pageserver_writes('delete_pageserver_writes'): + with env.record_duration('delete'): cur.execute("delete from copytest where i % 100 <> 0;") - # Flush the layers from memory to disk. This is included in the reported - # time and I/O - pscur.execute(f"checkpoint {env.initial_tenant} {timeline}") + env.flush() - with zenbenchmark.record_pageserver_writes(env.pageserver, 'vacuum_pageserver_writes'): - with zenbenchmark.record_duration('vacuum'): + with env.record_pageserver_writes('vacuum_pageserver_writes'): + with env.record_duration('vacuum'): cur.execute("vacuum copytest") - # Flush the layers from memory to disk. This is included in the reported - # time and I/O - pscur.execute(f"checkpoint {env.initial_tenant} {timeline}") + env.flush() # Load data into the table again. This time, this will use the space free'd # by the VACUUM. # # This will also clear all the VM bits. - with zenbenchmark.record_pageserver_writes(env.pageserver, - 'copy_reuse_pageserver_writes'): - with zenbenchmark.record_duration('copy_reuse'): + with env.record_pageserver_writes('copy_reuse_pageserver_writes'): + with env.record_duration('copy_reuse'): cur.copy_from(copy_test_data(1000000), 'copytest') + env.flush() - # Flush the layers from memory to disk. This is included in the reported - # time and I/O - pscur.execute(f"checkpoint {env.initial_tenant} {timeline}") - - # Record peak memory usage - zenbenchmark.record("peak_mem", - zenbenchmark.get_peak_mem(env.pageserver) / 1024, - 'MB', - report=MetricReport.LOWER_IS_BETTER) - - # Report disk space used by the repository - timeline_size = zenbenchmark.get_timeline_size(env.repo_dir, - env.initial_tenant, - timeline) - zenbenchmark.record('size', - timeline_size / (1024 * 1024), - 'MB', - report=MetricReport.LOWER_IS_BETTER) + env.report_peak_memory_use() + env.report_size() diff --git a/test_runner/performance/test_gist_build.py b/test_runner/performance/test_gist_build.py index daa8c71df1..70600a4fff 100644 --- a/test_runner/performance/test_gist_build.py +++ b/test_runner/performance/test_gist_build.py @@ -2,9 +2,14 @@ import os from contextlib import closing from fixtures.benchmark_fixture import MetricReport from fixtures.zenith_fixtures import ZenithEnv +from fixtures.compare_fixtures import PgCompare, VanillaCompare, ZenithCompare from fixtures.log_helper import log -pytest_plugins = ("fixtures.zenith_fixtures", "fixtures.benchmark_fixture") +pytest_plugins = ( + "fixtures.zenith_fixtures", + "fixtures.benchmark_fixture", + "fixtures.compare_fixtures", +) # @@ -12,24 +17,11 @@ pytest_plugins = ("fixtures.zenith_fixtures", "fixtures.benchmark_fixture") # As of this writing, we're duplicate those giant WAL records for each page, # which makes the delta layer about 32x larger than it needs to be. # -def test_gist_buffering_build(zenith_simple_env: ZenithEnv, zenbenchmark): - env = zenith_simple_env - # Create a branch for us - env.zenith_cli(["branch", "test_gist_buffering_build", "empty"]) +def test_gist_buffering_build(zenith_with_baseline: PgCompare): + env = zenith_with_baseline - pg = env.postgres.create_start('test_gist_buffering_build') - log.info("postgres is running on 'test_gist_buffering_build' branch") - - # Open a connection directly to the page server that we'll use to force - # flushing the layers to disk - psconn = env.pageserver.connect() - pscur = psconn.cursor() - - # Get the timeline ID of our branch. We need it for the 'do_gc' command - with closing(pg.connect()) as conn: + with closing(env.pg.connect()) as conn: with conn.cursor() as cur: - cur.execute("SHOW zenith.zenith_timeline") - timeline = cur.fetchone()[0] # Create test table. cur.execute("create table gist_point_tbl(id int4, p point)") @@ -38,27 +30,12 @@ def test_gist_buffering_build(zenith_simple_env: ZenithEnv, zenbenchmark): ) # Build the index. - with zenbenchmark.record_pageserver_writes(env.pageserver, 'pageserver_writes'): - with zenbenchmark.record_duration('build'): + with env.record_pageserver_writes('pageserver_writes'): + with env.record_duration('build'): cur.execute( "create index gist_pointidx2 on gist_point_tbl using gist(p) with (buffering = on)" ) + env.flush() - # Flush the layers from memory to disk. This is included in the reported - # time and I/O - pscur.execute(f"do_gc {env.initial_tenant} {timeline} 1000000") - - # Record peak memory usage - zenbenchmark.record("peak_mem", - zenbenchmark.get_peak_mem(env.pageserver) / 1024, - 'MB', - report=MetricReport.LOWER_IS_BETTER) - - # Report disk space used by the repository - timeline_size = zenbenchmark.get_timeline_size(env.repo_dir, - env.initial_tenant, - timeline) - zenbenchmark.record('size', - timeline_size / (1024 * 1024), - 'MB', - report=MetricReport.LOWER_IS_BETTER) + env.report_peak_memory_use() + env.report_size() diff --git a/test_runner/performance/test_parallel_copy_to.py b/test_runner/performance/test_parallel_copy_to.py index 0e0457ccab..cdd40368e1 100644 --- a/test_runner/performance/test_parallel_copy_to.py +++ b/test_runner/performance/test_parallel_copy_to.py @@ -1,11 +1,16 @@ from io import BytesIO import asyncio import asyncpg -from fixtures.zenith_fixtures import ZenithEnv, Postgres +from fixtures.zenith_fixtures import ZenithEnv, Postgres, PgProtocol from fixtures.log_helper import log from fixtures.benchmark_fixture import MetricReport, ZenithBenchmarker +from fixtures.compare_fixtures import PgCompare, VanillaCompare, ZenithCompare -pytest_plugins = ("fixtures.zenith_fixtures", "fixtures.benchmark_fixture") +pytest_plugins = ( + "fixtures.zenith_fixtures", + "fixtures.benchmark_fixture", + "fixtures.compare_fixtures", +) async def repeat_bytes(buf, repetitions: int): @@ -13,7 +18,7 @@ async def repeat_bytes(buf, repetitions: int): yield buf -async def copy_test_data_to_table(pg: Postgres, worker_id: int, table_name: str): +async def copy_test_data_to_table(pg: PgProtocol, worker_id: int, table_name: str): buf = BytesIO() for i in range(1000): buf.write( @@ -26,7 +31,7 @@ async def copy_test_data_to_table(pg: Postgres, worker_id: int, table_name: str) await pg_conn.copy_to_table(table_name, source=copy_input) -async def parallel_load_different_tables(pg: Postgres, n_parallel: int): +async def parallel_load_different_tables(pg: PgProtocol, n_parallel: int): workers = [] for worker_id in range(n_parallel): worker = copy_test_data_to_table(pg, worker_id, f'copytest_{worker_id}') @@ -37,54 +42,25 @@ async def parallel_load_different_tables(pg: Postgres, n_parallel: int): # Load 5 different tables in parallel with COPY TO -def test_parallel_copy_different_tables(zenith_simple_env: ZenithEnv, - zenbenchmark: ZenithBenchmarker, - n_parallel=5): +def test_parallel_copy_different_tables(zenith_with_baseline: PgCompare, n_parallel=5): - env = zenith_simple_env - # Create a branch for us - env.zenith_cli(["branch", "test_parallel_copy_different_tables", "empty"]) - - pg = env.postgres.create_start('test_parallel_copy_different_tables') - log.info("postgres is running on 'test_parallel_copy_different_tables' branch") - - # Open a connection directly to the page server that we'll use to force - # flushing the layers to disk - psconn = env.pageserver.connect() - pscur = psconn.cursor() - - # Get the timeline ID of our branch. We need it for the 'do_gc' command - conn = pg.connect() + env = zenith_with_baseline + conn = env.pg.connect() cur = conn.cursor() - cur.execute("SHOW zenith.zenith_timeline") - timeline = cur.fetchone()[0] for worker_id in range(n_parallel): cur.execute(f'CREATE TABLE copytest_{worker_id} (i int, t text)') - with zenbenchmark.record_pageserver_writes(env.pageserver, 'pageserver_writes'): - with zenbenchmark.record_duration('load'): - asyncio.run(parallel_load_different_tables(pg, n_parallel)) + with env.record_pageserver_writes('pageserver_writes'): + with env.record_duration('load'): + asyncio.run(parallel_load_different_tables(env.pg, n_parallel)) + env.flush() - # Flush the layers from memory to disk. This is included in the reported - # time and I/O - pscur.execute(f"do_gc {env.initial_tenant} {timeline} 0") - - # Record peak memory usage - zenbenchmark.record("peak_mem", - zenbenchmark.get_peak_mem(env.pageserver) / 1024, - 'MB', - report=MetricReport.LOWER_IS_BETTER) - - # Report disk space used by the repository - timeline_size = zenbenchmark.get_timeline_size(env.repo_dir, env.initial_tenant, timeline) - zenbenchmark.record('size', - timeline_size / (1024 * 1024), - 'MB', - report=MetricReport.LOWER_IS_BETTER) + env.report_peak_memory_use() + env.report_size() -async def parallel_load_same_table(pg: Postgres, n_parallel: int): +async def parallel_load_same_table(pg: PgProtocol, n_parallel: int): workers = [] for worker_id in range(n_parallel): worker = copy_test_data_to_table(pg, worker_id, f'copytest') @@ -95,46 +71,17 @@ async def parallel_load_same_table(pg: Postgres, n_parallel: int): # Load data into one table with COPY TO from 5 parallel connections -def test_parallel_copy_same_table(zenith_simple_env: ZenithEnv, - zenbenchmark: ZenithBenchmarker, - n_parallel=5): - env = zenith_simple_env - # Create a branch for us - env.zenith_cli(["branch", "test_parallel_copy_same_table", "empty"]) - - pg = env.postgres.create_start('test_parallel_copy_same_table') - log.info("postgres is running on 'test_parallel_copy_same_table' branch") - - # Open a connection directly to the page server that we'll use to force - # flushing the layers to disk - psconn = env.pageserver.connect() - pscur = psconn.cursor() - - # Get the timeline ID of our branch. We need it for the 'do_gc' command - conn = pg.connect() +def test_parallel_copy_same_table(zenith_with_baseline: PgCompare, n_parallel=5): + env = zenith_with_baseline + conn = env.pg.connect() cur = conn.cursor() - cur.execute("SHOW zenith.zenith_timeline") - timeline = cur.fetchone()[0] cur.execute(f'CREATE TABLE copytest (i int, t text)') - with zenbenchmark.record_pageserver_writes(env.pageserver, 'pageserver_writes'): - with zenbenchmark.record_duration('load'): - asyncio.run(parallel_load_same_table(pg, n_parallel)) + with env.record_pageserver_writes('pageserver_writes'): + with env.record_duration('load'): + asyncio.run(parallel_load_same_table(env.pg, n_parallel)) + env.flush() - # Flush the layers from memory to disk. This is included in the reported - # time and I/O - pscur.execute(f"do_gc {env.initial_tenant} {timeline} 0") - - # Record peak memory usage - zenbenchmark.record("peak_mem", - zenbenchmark.get_peak_mem(env.pageserver) / 1024, - 'MB', - report=MetricReport.LOWER_IS_BETTER) - - # Report disk space used by the repository - timeline_size = zenbenchmark.get_timeline_size(env.repo_dir, env.initial_tenant, timeline) - zenbenchmark.record('size', - timeline_size / (1024 * 1024), - 'MB', - report=MetricReport.LOWER_IS_BETTER) + env.report_peak_memory_use() + env.report_size() diff --git a/test_runner/performance/test_perf_pgbench.py b/test_runner/performance/test_perf_pgbench.py index 724a822055..605e32cc4e 100644 --- a/test_runner/performance/test_perf_pgbench.py +++ b/test_runner/performance/test_perf_pgbench.py @@ -1,18 +1,15 @@ from contextlib import closing from fixtures.zenith_fixtures import PgBin, VanillaPostgres, ZenithEnv +from fixtures.compare_fixtures import PgCompare, VanillaCompare, ZenithCompare from fixtures.benchmark_fixture import MetricReport, ZenithBenchmarker from fixtures.log_helper import log -pytest_plugins = ("fixtures.zenith_fixtures", "fixtures.benchmark_fixture") - - -def pgbench_init(pg_bin: PgBin, connstr: str): - pg_bin.run_capture(['pgbench', '-s5', '-i', connstr]) - - -def pgbench_run_5000_transactions(pg_bin: PgBin, connstr: str): - pg_bin.run_capture(['pgbench', '-c1', '-t5000', connstr]) +pytest_plugins = ( + "fixtures.zenith_fixtures", + "fixtures.benchmark_fixture", + "fixtures.compare_fixtures", +) # @@ -24,82 +21,16 @@ def pgbench_run_5000_transactions(pg_bin: PgBin, connstr: str): # 2. Time to run 5000 pgbench transactions # 3. Disk space used # -def test_pgbench(zenith_simple_env: ZenithEnv, pg_bin: PgBin, zenbenchmark: ZenithBenchmarker): - env = zenith_simple_env - # Create a branch for us - env.zenith_cli(["branch", "test_pgbench_perf", "empty"]) +def test_pgbench(zenith_with_baseline: PgCompare): + env = zenith_with_baseline - pg = env.postgres.create_start('test_pgbench_perf') - log.info("postgres is running on 'test_pgbench_perf' branch") + with env.record_pageserver_writes('pageserver_writes'): + with env.record_duration('init'): + env.pg_bin.run_capture(['pgbench', '-s5', '-i', env.pg.connstr()]) + env.flush() - # Open a connection directly to the page server that we'll use to force - # flushing the layers to disk - psconn = env.pageserver.connect() - pscur = psconn.cursor() + with env.record_duration('5000_xacts'): + env.pg_bin.run_capture(['pgbench', '-c1', '-t5000', env.pg.connstr()]) + env.flush() - # Get the timeline ID of our branch. We need it for the 'do_gc' command - with closing(pg.connect()) as conn: - with conn.cursor() as cur: - cur.execute("SHOW zenith.zenith_timeline") - timeline = cur.fetchone()[0] - - connstr = pg.connstr() - - # Initialize pgbench database, recording the time and I/O it takes - with zenbenchmark.record_pageserver_writes(env.pageserver, 'pageserver_writes'): - with zenbenchmark.record_duration('init'): - pgbench_init(pg_bin, connstr) - - # Flush the layers from memory to disk. This is included in the reported - # time and I/O - pscur.execute(f"do_gc {env.initial_tenant} {timeline} 0") - - # Run pgbench for 5000 transactions - with zenbenchmark.record_duration('5000_xacts'): - pgbench_run_5000_transactions(pg_bin, connstr) - - # Flush the layers to disk again. This is *not' included in the reported time, - # though. - pscur.execute(f"do_gc {env.initial_tenant} {timeline} 0") - - # Report disk space used by the repository - timeline_size = zenbenchmark.get_timeline_size(env.repo_dir, env.initial_tenant, timeline) - zenbenchmark.record('size', - timeline_size / (1024 * 1024), - 'MB', - report=MetricReport.LOWER_IS_BETTER) - - -def test_pgbench_baseline(vanilla_pg: VanillaPostgres, zenbenchmark: ZenithBenchmarker): - vanilla_pg.configure(['shared_buffers=1MB']) - vanilla_pg.start() - - pg_bin = vanilla_pg.pg_bin - connstr = vanilla_pg.connstr() - conn = vanilla_pg.connect() - cur = conn.cursor() - - with zenbenchmark.record_duration('init'): - pgbench_init(pg_bin, connstr) - - # This is roughly equivalent to flushing the layers from memory to disk with Zenith. - cur.execute(f"checkpoint") - - # Run pgbench for 5000 transactions - with zenbenchmark.record_duration('5000_xacts'): - pgbench_run_5000_transactions(pg_bin, connstr) - - # This is roughly equivalent to flush the layers from memory to disk with Zenith. - cur.execute(f"checkpoint") - - # Report disk space used by the repository - data_size = vanilla_pg.get_subdir_size('base') - zenbenchmark.record('data_size', - data_size / (1024 * 1024), - 'MB', - report=MetricReport.LOWER_IS_BETTER) - wal_size = vanilla_pg.get_subdir_size('pg_wal') - zenbenchmark.record('wal_size', - wal_size / (1024 * 1024), - 'MB', - report=MetricReport.LOWER_IS_BETTER) + env.report_size() diff --git a/test_runner/performance/test_small_seqscans.py b/test_runner/performance/test_small_seqscans.py index 5631e95d09..c9539ee048 100644 --- a/test_runner/performance/test_small_seqscans.py +++ b/test_runner/performance/test_small_seqscans.py @@ -7,24 +7,19 @@ from contextlib import closing from fixtures.zenith_fixtures import ZenithEnv from fixtures.log_helper import log from fixtures.benchmark_fixture import MetricReport, ZenithBenchmarker +from fixtures.compare_fixtures import PgCompare -pytest_plugins = ("fixtures.zenith_fixtures", "fixtures.benchmark_fixture") +pytest_plugins = ( + "fixtures.zenith_fixtures", + "fixtures.benchmark_fixture", + "fixtures.compare_fixtures", +) -def test_small_seqscans(zenith_simple_env: ZenithEnv, zenbenchmark: ZenithBenchmarker): - env = zenith_simple_env - # Create a branch for us - env.zenith_cli(["branch", "test_small_seqscans", "empty"]) +def test_small_seqscans(zenith_with_baseline: PgCompare): + env = zenith_with_baseline - pg = env.postgres.create_start('test_small_seqscans') - log.info("postgres is running on 'test_small_seqscans' branch") - - # Open a connection directly to the page server that we'll use to force - # flushing the layers to disk - psconn = env.pageserver.connect() - pscur = psconn.cursor() - - with closing(pg.connect()) as conn: + with closing(env.pg.connect()) as conn: with conn.cursor() as cur: cur.execute('create table t (i integer);') cur.execute('insert into t values (generate_series(1,100000));') @@ -38,6 +33,6 @@ def test_small_seqscans(zenith_simple_env: ZenithEnv, zenbenchmark: ZenithBenchm log.info(f"shared_buffers is {row[0]}, table size {row[1]}") assert int(row[0]) < int(row[1]) - with zenbenchmark.record_duration('run'): + with env.record_duration('run'): for i in range(1000): cur.execute('select count(*) from t;') diff --git a/test_runner/performance/test_write_amplification.py b/test_runner/performance/test_write_amplification.py index 8d4ac882a1..bf58cf8274 100644 --- a/test_runner/performance/test_write_amplification.py +++ b/test_runner/performance/test_write_amplification.py @@ -14,32 +14,23 @@ import os from contextlib import closing from fixtures.benchmark_fixture import MetricReport from fixtures.zenith_fixtures import ZenithEnv +from fixtures.compare_fixtures import PgCompare, VanillaCompare, ZenithCompare from fixtures.log_helper import log -pytest_plugins = ("fixtures.zenith_fixtures", "fixtures.benchmark_fixture") +pytest_plugins = ( + "fixtures.zenith_fixtures", + "fixtures.benchmark_fixture", + "fixtures.compare_fixtures", +) -def test_write_amplification(zenith_simple_env: ZenithEnv, zenbenchmark): - env = zenith_simple_env - # Create a branch for us - env.zenith_cli(["branch", "test_write_amplification", "empty"]) +def test_write_amplification(zenith_with_baseline: PgCompare): + env = zenith_with_baseline - pg = env.postgres.create_start('test_write_amplification') - log.info("postgres is running on 'test_write_amplification' branch") - - # Open a connection directly to the page server that we'll use to force - # flushing the layers to disk - psconn = env.pageserver.connect() - pscur = psconn.cursor() - - with closing(pg.connect()) as conn: + with closing(env.pg.connect()) as conn: with conn.cursor() as cur: - # Get the timeline ID of our branch. We need it for the 'do_gc' command - cur.execute("SHOW zenith.zenith_timeline") - timeline = cur.fetchone()[0] - - with zenbenchmark.record_pageserver_writes(env.pageserver, 'pageserver_writes'): - with zenbenchmark.record_duration('run'): + with env.record_pageserver_writes('pageserver_writes'): + with env.record_duration('run'): # NOTE: Because each iteration updates every table already created, # the runtime and write amplification is O(n^2), where n is the @@ -71,13 +62,6 @@ def test_write_amplification(zenith_simple_env: ZenithEnv, zenbenchmark): # slower, adding some delays in this loop. But forcing # the checkpointing and GC makes the test go faster, # with the same total I/O effect. - pscur.execute(f"do_gc {env.initial_tenant} {timeline} 0") + env.flush() - # Report disk space used by the repository - timeline_size = zenbenchmark.get_timeline_size(env.repo_dir, - env.initial_tenant, - timeline) - zenbenchmark.record('size', - timeline_size / (1024 * 1024), - 'MB', - report=MetricReport.LOWER_IS_BETTER) + env.report_size()