mirror of
https://github.com/neondatabase/neon.git
synced 2025-12-27 16:12:56 +00:00
Add vanilla pg baseline tests (#1275)
This commit is contained in:
188
test_runner/fixtures/compare_fixtures.py
Normal file
188
test_runner/fixtures/compare_fixtures.py
Normal file
@@ -0,0 +1,188 @@
|
||||
import pytest
|
||||
from contextlib import contextmanager
|
||||
from abc import ABC, abstractmethod
|
||||
|
||||
from fixtures.zenith_fixtures import PgBin, PgProtocol, VanillaPostgres, ZenithEnv
|
||||
from fixtures.benchmark_fixture import MetricReport, ZenithBenchmarker
|
||||
|
||||
# Type-related stuff
|
||||
from typing import Iterator
|
||||
|
||||
|
||||
class PgCompare(ABC):
|
||||
"""Common interface of all postgres implementations, useful for benchmarks.
|
||||
|
||||
This class is a helper class for the zenith_with_baseline fixture. See its documentation
|
||||
for more details.
|
||||
"""
|
||||
@property
|
||||
@abstractmethod
|
||||
def pg(self) -> PgProtocol:
|
||||
pass
|
||||
|
||||
@property
|
||||
@abstractmethod
|
||||
def pg_bin(self) -> PgBin:
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def flush(self) -> None:
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def report_peak_memory_use(self) -> None:
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def report_size(self) -> None:
|
||||
pass
|
||||
|
||||
@contextmanager
|
||||
@abstractmethod
|
||||
def record_pageserver_writes(self, out_name):
|
||||
pass
|
||||
|
||||
@contextmanager
|
||||
@abstractmethod
|
||||
def record_duration(self, out_name):
|
||||
pass
|
||||
|
||||
|
||||
class ZenithCompare(PgCompare):
|
||||
"""PgCompare interface for the zenith stack."""
|
||||
def __init__(self,
|
||||
zenbenchmark: ZenithBenchmarker,
|
||||
zenith_simple_env: ZenithEnv,
|
||||
pg_bin: PgBin,
|
||||
branch_name):
|
||||
self.env = zenith_simple_env
|
||||
self.zenbenchmark = zenbenchmark
|
||||
self._pg_bin = pg_bin
|
||||
|
||||
# We only use one branch and one timeline
|
||||
self.branch = branch_name
|
||||
self.env.zenith_cli(["branch", self.branch, "empty"])
|
||||
self._pg = self.env.postgres.create_start(self.branch)
|
||||
self.timeline = self.pg.safe_psql("SHOW zenith.zenith_timeline")[0][0]
|
||||
|
||||
# Long-lived cursor, useful for flushing
|
||||
self.psconn = self.env.pageserver.connect()
|
||||
self.pscur = self.psconn.cursor()
|
||||
|
||||
@property
|
||||
def pg(self):
|
||||
return self._pg
|
||||
|
||||
@property
|
||||
def pg_bin(self):
|
||||
return self._pg_bin
|
||||
|
||||
def flush(self):
|
||||
self.pscur.execute(f"do_gc {self.env.initial_tenant} {self.timeline} 0")
|
||||
|
||||
def report_peak_memory_use(self) -> None:
|
||||
self.zenbenchmark.record("peak_mem",
|
||||
self.zenbenchmark.get_peak_mem(self.env.pageserver) / 1024,
|
||||
'MB',
|
||||
report=MetricReport.LOWER_IS_BETTER)
|
||||
|
||||
def report_size(self) -> None:
|
||||
timeline_size = self.zenbenchmark.get_timeline_size(self.env.repo_dir,
|
||||
self.env.initial_tenant,
|
||||
self.timeline)
|
||||
self.zenbenchmark.record('size',
|
||||
timeline_size / (1024 * 1024),
|
||||
'MB',
|
||||
report=MetricReport.LOWER_IS_BETTER)
|
||||
|
||||
def record_pageserver_writes(self, out_name):
|
||||
return self.zenbenchmark.record_pageserver_writes(self.env.pageserver, out_name)
|
||||
|
||||
def record_duration(self, out_name):
|
||||
return self.zenbenchmark.record_duration(out_name)
|
||||
|
||||
|
||||
class VanillaCompare(PgCompare):
|
||||
"""PgCompare interface for vanilla postgres."""
|
||||
def __init__(self, zenbenchmark, vanilla_pg: VanillaPostgres):
|
||||
self._pg = vanilla_pg
|
||||
self.zenbenchmark = zenbenchmark
|
||||
vanilla_pg.configure(['shared_buffers=1MB'])
|
||||
vanilla_pg.start()
|
||||
|
||||
# Long-lived cursor, useful for flushing
|
||||
self.conn = self.pg.connect()
|
||||
self.cur = self.conn.cursor()
|
||||
|
||||
@property
|
||||
def pg(self):
|
||||
return self._pg
|
||||
|
||||
@property
|
||||
def pg_bin(self):
|
||||
return self._pg.pg_bin
|
||||
|
||||
def flush(self):
|
||||
self.cur.execute("checkpoint")
|
||||
|
||||
def report_peak_memory_use(self) -> None:
|
||||
pass # TODO find something
|
||||
|
||||
def report_size(self) -> None:
|
||||
data_size = self.pg.get_subdir_size('base')
|
||||
self.zenbenchmark.record('data_size',
|
||||
data_size / (1024 * 1024),
|
||||
'MB',
|
||||
report=MetricReport.LOWER_IS_BETTER)
|
||||
wal_size = self.pg.get_subdir_size('pg_wal')
|
||||
self.zenbenchmark.record('wal_size',
|
||||
wal_size / (1024 * 1024),
|
||||
'MB',
|
||||
report=MetricReport.LOWER_IS_BETTER)
|
||||
|
||||
@contextmanager
|
||||
def record_pageserver_writes(self, out_name):
|
||||
yield # Do nothing
|
||||
|
||||
def record_duration(self, out_name):
|
||||
return self.zenbenchmark.record_duration(out_name)
|
||||
|
||||
|
||||
@pytest.fixture(scope='function')
|
||||
def zenith_compare(request, zenbenchmark, pg_bin, zenith_simple_env) -> ZenithCompare:
|
||||
branch_name = request.node.name
|
||||
return ZenithCompare(zenbenchmark, zenith_simple_env, pg_bin, branch_name)
|
||||
|
||||
|
||||
@pytest.fixture(scope='function')
|
||||
def vanilla_compare(zenbenchmark, vanilla_pg) -> VanillaCompare:
|
||||
return VanillaCompare(zenbenchmark, vanilla_pg)
|
||||
|
||||
|
||||
@pytest.fixture(params=["vanilla_compare", "zenith_compare"], ids=["vanilla", "zenith"])
|
||||
def zenith_with_baseline(request) -> PgCompare:
|
||||
"""Parameterized fixture that helps compare zenith against vanilla postgres.
|
||||
|
||||
A test that uses this fixture turns into a parameterized test that runs against:
|
||||
1. A vanilla postgres instance
|
||||
2. A simple zenith env (see zenith_simple_env)
|
||||
3. Possibly other postgres protocol implementations.
|
||||
|
||||
The main goal of this fixture is to make it easier for people to read and write
|
||||
performance tests. Easy test writing leads to more tests.
|
||||
|
||||
Perfect encapsulation of the postgres implementations is **not** a goal because
|
||||
it's impossible. Operational and configuration differences in the different
|
||||
implementations sometimes matter, and the writer of the test should be mindful
|
||||
of that.
|
||||
|
||||
If a test requires some one-off special implementation-specific logic, use of
|
||||
isinstance(zenith_with_baseline, ZenithCompare) is encouraged. Though if that
|
||||
implementation-specific logic is widely useful across multiple tests, it might
|
||||
make sense to add methods to the PgCompare class.
|
||||
"""
|
||||
fixture = request.getfixturevalue(request.param)
|
||||
if isinstance(fixture, PgCompare):
|
||||
return fixture
|
||||
else:
|
||||
raise AssertionError(f"test error: fixture {request.param} is not PgCompare")
|
||||
@@ -2,8 +2,13 @@ from contextlib import closing
|
||||
from fixtures.zenith_fixtures import ZenithEnv
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.benchmark_fixture import MetricReport, ZenithBenchmarker
|
||||
from fixtures.compare_fixtures import PgCompare, VanillaCompare, ZenithCompare
|
||||
|
||||
pytest_plugins = ("fixtures.zenith_fixtures", "fixtures.benchmark_fixture")
|
||||
pytest_plugins = (
|
||||
"fixtures.zenith_fixtures",
|
||||
"fixtures.benchmark_fixture",
|
||||
"fixtures.compare_fixtures",
|
||||
)
|
||||
|
||||
|
||||
#
|
||||
@@ -16,47 +21,19 @@ pytest_plugins = ("fixtures.zenith_fixtures", "fixtures.benchmark_fixture")
|
||||
# 3. Disk space used
|
||||
# 4. Peak memory usage
|
||||
#
|
||||
def test_bulk_insert(zenith_simple_env: ZenithEnv, zenbenchmark: ZenithBenchmarker):
|
||||
env = zenith_simple_env
|
||||
# Create a branch for us
|
||||
env.zenith_cli(["branch", "test_bulk_insert", "empty"])
|
||||
|
||||
pg = env.postgres.create_start('test_bulk_insert')
|
||||
log.info("postgres is running on 'test_bulk_insert' branch")
|
||||
|
||||
# Open a connection directly to the page server that we'll use to force
|
||||
# flushing the layers to disk
|
||||
psconn = env.pageserver.connect()
|
||||
pscur = psconn.cursor()
|
||||
def test_bulk_insert(zenith_with_baseline: PgCompare):
|
||||
env = zenith_with_baseline
|
||||
|
||||
# Get the timeline ID of our branch. We need it for the 'do_gc' command
|
||||
with closing(pg.connect()) as conn:
|
||||
with closing(env.pg.connect()) as conn:
|
||||
with conn.cursor() as cur:
|
||||
cur.execute("SHOW zenith.zenith_timeline")
|
||||
timeline = cur.fetchone()[0]
|
||||
|
||||
cur.execute("create table huge (i int, j int);")
|
||||
|
||||
# Run INSERT, recording the time and I/O it takes
|
||||
with zenbenchmark.record_pageserver_writes(env.pageserver, 'pageserver_writes'):
|
||||
with zenbenchmark.record_duration('insert'):
|
||||
with env.record_pageserver_writes('pageserver_writes'):
|
||||
with env.record_duration('insert'):
|
||||
cur.execute("insert into huge values (generate_series(1, 5000000), 0);")
|
||||
env.flush()
|
||||
|
||||
# Flush the layers from memory to disk. This is included in the reported
|
||||
# time and I/O
|
||||
pscur.execute(f"do_gc {env.initial_tenant} {timeline} 0")
|
||||
|
||||
# Record peak memory usage
|
||||
zenbenchmark.record("peak_mem",
|
||||
zenbenchmark.get_peak_mem(env.pageserver) / 1024,
|
||||
'MB',
|
||||
report=MetricReport.LOWER_IS_BETTER)
|
||||
|
||||
# Report disk space used by the repository
|
||||
timeline_size = zenbenchmark.get_timeline_size(env.repo_dir,
|
||||
env.initial_tenant,
|
||||
timeline)
|
||||
zenbenchmark.record('size',
|
||||
timeline_size / (1024 * 1024),
|
||||
'MB',
|
||||
report=MetricReport.LOWER_IS_BETTER)
|
||||
env.report_peak_memory_use()
|
||||
env.report_size()
|
||||
|
||||
@@ -2,10 +2,15 @@ from contextlib import closing
|
||||
from fixtures.zenith_fixtures import ZenithEnv
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.benchmark_fixture import MetricReport, ZenithBenchmarker
|
||||
from fixtures.compare_fixtures import PgCompare, VanillaCompare, ZenithCompare
|
||||
from io import BufferedReader, RawIOBase
|
||||
from itertools import repeat
|
||||
|
||||
pytest_plugins = ("fixtures.zenith_fixtures", "fixtures.benchmark_fixture")
|
||||
pytest_plugins = (
|
||||
"fixtures.zenith_fixtures",
|
||||
"fixtures.benchmark_fixture",
|
||||
"fixtures.compare_fixtures",
|
||||
)
|
||||
|
||||
|
||||
class CopyTestData(RawIOBase):
|
||||
@@ -42,77 +47,41 @@ def copy_test_data(rows: int):
|
||||
#
|
||||
# COPY performance tests.
|
||||
#
|
||||
def test_copy(zenith_simple_env: ZenithEnv, zenbenchmark: ZenithBenchmarker):
|
||||
env = zenith_simple_env
|
||||
# Create a branch for us
|
||||
env.zenith_cli(["branch", "test_copy", "empty"])
|
||||
|
||||
pg = env.postgres.create_start('test_copy')
|
||||
log.info("postgres is running on 'test_copy' branch")
|
||||
|
||||
# Open a connection directly to the page server that we'll use to force
|
||||
# flushing the layers to disk
|
||||
psconn = env.pageserver.connect()
|
||||
pscur = psconn.cursor()
|
||||
def test_copy(zenith_with_baseline: PgCompare):
|
||||
env = zenith_with_baseline
|
||||
|
||||
# Get the timeline ID of our branch. We need it for the pageserver 'checkpoint' command
|
||||
with closing(pg.connect()) as conn:
|
||||
with closing(env.pg.connect()) as conn:
|
||||
with conn.cursor() as cur:
|
||||
cur.execute("SHOW zenith.zenith_timeline")
|
||||
timeline = cur.fetchone()[0]
|
||||
|
||||
cur.execute("create table copytest (i int, t text);")
|
||||
|
||||
# Load data with COPY, recording the time and I/O it takes.
|
||||
#
|
||||
# Since there's no data in the table previously, this extends it.
|
||||
with zenbenchmark.record_pageserver_writes(env.pageserver,
|
||||
'copy_extend_pageserver_writes'):
|
||||
with zenbenchmark.record_duration('copy_extend'):
|
||||
with env.record_pageserver_writes('copy_extend_pageserver_writes'):
|
||||
with env.record_duration('copy_extend'):
|
||||
cur.copy_from(copy_test_data(1000000), 'copytest')
|
||||
# Flush the layers from memory to disk. This is included in the reported
|
||||
# time and I/O
|
||||
pscur.execute(f"checkpoint {env.initial_tenant} {timeline}")
|
||||
env.flush()
|
||||
|
||||
# Delete most rows, and VACUUM to make the space available for reuse.
|
||||
with zenbenchmark.record_pageserver_writes(env.pageserver, 'delete_pageserver_writes'):
|
||||
with zenbenchmark.record_duration('delete'):
|
||||
with env.record_pageserver_writes('delete_pageserver_writes'):
|
||||
with env.record_duration('delete'):
|
||||
cur.execute("delete from copytest where i % 100 <> 0;")
|
||||
# Flush the layers from memory to disk. This is included in the reported
|
||||
# time and I/O
|
||||
pscur.execute(f"checkpoint {env.initial_tenant} {timeline}")
|
||||
env.flush()
|
||||
|
||||
with zenbenchmark.record_pageserver_writes(env.pageserver, 'vacuum_pageserver_writes'):
|
||||
with zenbenchmark.record_duration('vacuum'):
|
||||
with env.record_pageserver_writes('vacuum_pageserver_writes'):
|
||||
with env.record_duration('vacuum'):
|
||||
cur.execute("vacuum copytest")
|
||||
# Flush the layers from memory to disk. This is included in the reported
|
||||
# time and I/O
|
||||
pscur.execute(f"checkpoint {env.initial_tenant} {timeline}")
|
||||
env.flush()
|
||||
|
||||
# Load data into the table again. This time, this will use the space free'd
|
||||
# by the VACUUM.
|
||||
#
|
||||
# This will also clear all the VM bits.
|
||||
with zenbenchmark.record_pageserver_writes(env.pageserver,
|
||||
'copy_reuse_pageserver_writes'):
|
||||
with zenbenchmark.record_duration('copy_reuse'):
|
||||
with env.record_pageserver_writes('copy_reuse_pageserver_writes'):
|
||||
with env.record_duration('copy_reuse'):
|
||||
cur.copy_from(copy_test_data(1000000), 'copytest')
|
||||
env.flush()
|
||||
|
||||
# Flush the layers from memory to disk. This is included in the reported
|
||||
# time and I/O
|
||||
pscur.execute(f"checkpoint {env.initial_tenant} {timeline}")
|
||||
|
||||
# Record peak memory usage
|
||||
zenbenchmark.record("peak_mem",
|
||||
zenbenchmark.get_peak_mem(env.pageserver) / 1024,
|
||||
'MB',
|
||||
report=MetricReport.LOWER_IS_BETTER)
|
||||
|
||||
# Report disk space used by the repository
|
||||
timeline_size = zenbenchmark.get_timeline_size(env.repo_dir,
|
||||
env.initial_tenant,
|
||||
timeline)
|
||||
zenbenchmark.record('size',
|
||||
timeline_size / (1024 * 1024),
|
||||
'MB',
|
||||
report=MetricReport.LOWER_IS_BETTER)
|
||||
env.report_peak_memory_use()
|
||||
env.report_size()
|
||||
|
||||
@@ -2,9 +2,14 @@ import os
|
||||
from contextlib import closing
|
||||
from fixtures.benchmark_fixture import MetricReport
|
||||
from fixtures.zenith_fixtures import ZenithEnv
|
||||
from fixtures.compare_fixtures import PgCompare, VanillaCompare, ZenithCompare
|
||||
from fixtures.log_helper import log
|
||||
|
||||
pytest_plugins = ("fixtures.zenith_fixtures", "fixtures.benchmark_fixture")
|
||||
pytest_plugins = (
|
||||
"fixtures.zenith_fixtures",
|
||||
"fixtures.benchmark_fixture",
|
||||
"fixtures.compare_fixtures",
|
||||
)
|
||||
|
||||
|
||||
#
|
||||
@@ -12,24 +17,11 @@ pytest_plugins = ("fixtures.zenith_fixtures", "fixtures.benchmark_fixture")
|
||||
# As of this writing, we're duplicate those giant WAL records for each page,
|
||||
# which makes the delta layer about 32x larger than it needs to be.
|
||||
#
|
||||
def test_gist_buffering_build(zenith_simple_env: ZenithEnv, zenbenchmark):
|
||||
env = zenith_simple_env
|
||||
# Create a branch for us
|
||||
env.zenith_cli(["branch", "test_gist_buffering_build", "empty"])
|
||||
def test_gist_buffering_build(zenith_with_baseline: PgCompare):
|
||||
env = zenith_with_baseline
|
||||
|
||||
pg = env.postgres.create_start('test_gist_buffering_build')
|
||||
log.info("postgres is running on 'test_gist_buffering_build' branch")
|
||||
|
||||
# Open a connection directly to the page server that we'll use to force
|
||||
# flushing the layers to disk
|
||||
psconn = env.pageserver.connect()
|
||||
pscur = psconn.cursor()
|
||||
|
||||
# Get the timeline ID of our branch. We need it for the 'do_gc' command
|
||||
with closing(pg.connect()) as conn:
|
||||
with closing(env.pg.connect()) as conn:
|
||||
with conn.cursor() as cur:
|
||||
cur.execute("SHOW zenith.zenith_timeline")
|
||||
timeline = cur.fetchone()[0]
|
||||
|
||||
# Create test table.
|
||||
cur.execute("create table gist_point_tbl(id int4, p point)")
|
||||
@@ -38,27 +30,12 @@ def test_gist_buffering_build(zenith_simple_env: ZenithEnv, zenbenchmark):
|
||||
)
|
||||
|
||||
# Build the index.
|
||||
with zenbenchmark.record_pageserver_writes(env.pageserver, 'pageserver_writes'):
|
||||
with zenbenchmark.record_duration('build'):
|
||||
with env.record_pageserver_writes('pageserver_writes'):
|
||||
with env.record_duration('build'):
|
||||
cur.execute(
|
||||
"create index gist_pointidx2 on gist_point_tbl using gist(p) with (buffering = on)"
|
||||
)
|
||||
env.flush()
|
||||
|
||||
# Flush the layers from memory to disk. This is included in the reported
|
||||
# time and I/O
|
||||
pscur.execute(f"do_gc {env.initial_tenant} {timeline} 1000000")
|
||||
|
||||
# Record peak memory usage
|
||||
zenbenchmark.record("peak_mem",
|
||||
zenbenchmark.get_peak_mem(env.pageserver) / 1024,
|
||||
'MB',
|
||||
report=MetricReport.LOWER_IS_BETTER)
|
||||
|
||||
# Report disk space used by the repository
|
||||
timeline_size = zenbenchmark.get_timeline_size(env.repo_dir,
|
||||
env.initial_tenant,
|
||||
timeline)
|
||||
zenbenchmark.record('size',
|
||||
timeline_size / (1024 * 1024),
|
||||
'MB',
|
||||
report=MetricReport.LOWER_IS_BETTER)
|
||||
env.report_peak_memory_use()
|
||||
env.report_size()
|
||||
|
||||
@@ -1,11 +1,16 @@
|
||||
from io import BytesIO
|
||||
import asyncio
|
||||
import asyncpg
|
||||
from fixtures.zenith_fixtures import ZenithEnv, Postgres
|
||||
from fixtures.zenith_fixtures import ZenithEnv, Postgres, PgProtocol
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.benchmark_fixture import MetricReport, ZenithBenchmarker
|
||||
from fixtures.compare_fixtures import PgCompare, VanillaCompare, ZenithCompare
|
||||
|
||||
pytest_plugins = ("fixtures.zenith_fixtures", "fixtures.benchmark_fixture")
|
||||
pytest_plugins = (
|
||||
"fixtures.zenith_fixtures",
|
||||
"fixtures.benchmark_fixture",
|
||||
"fixtures.compare_fixtures",
|
||||
)
|
||||
|
||||
|
||||
async def repeat_bytes(buf, repetitions: int):
|
||||
@@ -13,7 +18,7 @@ async def repeat_bytes(buf, repetitions: int):
|
||||
yield buf
|
||||
|
||||
|
||||
async def copy_test_data_to_table(pg: Postgres, worker_id: int, table_name: str):
|
||||
async def copy_test_data_to_table(pg: PgProtocol, worker_id: int, table_name: str):
|
||||
buf = BytesIO()
|
||||
for i in range(1000):
|
||||
buf.write(
|
||||
@@ -26,7 +31,7 @@ async def copy_test_data_to_table(pg: Postgres, worker_id: int, table_name: str)
|
||||
await pg_conn.copy_to_table(table_name, source=copy_input)
|
||||
|
||||
|
||||
async def parallel_load_different_tables(pg: Postgres, n_parallel: int):
|
||||
async def parallel_load_different_tables(pg: PgProtocol, n_parallel: int):
|
||||
workers = []
|
||||
for worker_id in range(n_parallel):
|
||||
worker = copy_test_data_to_table(pg, worker_id, f'copytest_{worker_id}')
|
||||
@@ -37,54 +42,25 @@ async def parallel_load_different_tables(pg: Postgres, n_parallel: int):
|
||||
|
||||
|
||||
# Load 5 different tables in parallel with COPY TO
|
||||
def test_parallel_copy_different_tables(zenith_simple_env: ZenithEnv,
|
||||
zenbenchmark: ZenithBenchmarker,
|
||||
n_parallel=5):
|
||||
def test_parallel_copy_different_tables(zenith_with_baseline: PgCompare, n_parallel=5):
|
||||
|
||||
env = zenith_simple_env
|
||||
# Create a branch for us
|
||||
env.zenith_cli(["branch", "test_parallel_copy_different_tables", "empty"])
|
||||
|
||||
pg = env.postgres.create_start('test_parallel_copy_different_tables')
|
||||
log.info("postgres is running on 'test_parallel_copy_different_tables' branch")
|
||||
|
||||
# Open a connection directly to the page server that we'll use to force
|
||||
# flushing the layers to disk
|
||||
psconn = env.pageserver.connect()
|
||||
pscur = psconn.cursor()
|
||||
|
||||
# Get the timeline ID of our branch. We need it for the 'do_gc' command
|
||||
conn = pg.connect()
|
||||
env = zenith_with_baseline
|
||||
conn = env.pg.connect()
|
||||
cur = conn.cursor()
|
||||
cur.execute("SHOW zenith.zenith_timeline")
|
||||
timeline = cur.fetchone()[0]
|
||||
|
||||
for worker_id in range(n_parallel):
|
||||
cur.execute(f'CREATE TABLE copytest_{worker_id} (i int, t text)')
|
||||
|
||||
with zenbenchmark.record_pageserver_writes(env.pageserver, 'pageserver_writes'):
|
||||
with zenbenchmark.record_duration('load'):
|
||||
asyncio.run(parallel_load_different_tables(pg, n_parallel))
|
||||
with env.record_pageserver_writes('pageserver_writes'):
|
||||
with env.record_duration('load'):
|
||||
asyncio.run(parallel_load_different_tables(env.pg, n_parallel))
|
||||
env.flush()
|
||||
|
||||
# Flush the layers from memory to disk. This is included in the reported
|
||||
# time and I/O
|
||||
pscur.execute(f"do_gc {env.initial_tenant} {timeline} 0")
|
||||
|
||||
# Record peak memory usage
|
||||
zenbenchmark.record("peak_mem",
|
||||
zenbenchmark.get_peak_mem(env.pageserver) / 1024,
|
||||
'MB',
|
||||
report=MetricReport.LOWER_IS_BETTER)
|
||||
|
||||
# Report disk space used by the repository
|
||||
timeline_size = zenbenchmark.get_timeline_size(env.repo_dir, env.initial_tenant, timeline)
|
||||
zenbenchmark.record('size',
|
||||
timeline_size / (1024 * 1024),
|
||||
'MB',
|
||||
report=MetricReport.LOWER_IS_BETTER)
|
||||
env.report_peak_memory_use()
|
||||
env.report_size()
|
||||
|
||||
|
||||
async def parallel_load_same_table(pg: Postgres, n_parallel: int):
|
||||
async def parallel_load_same_table(pg: PgProtocol, n_parallel: int):
|
||||
workers = []
|
||||
for worker_id in range(n_parallel):
|
||||
worker = copy_test_data_to_table(pg, worker_id, f'copytest')
|
||||
@@ -95,46 +71,17 @@ async def parallel_load_same_table(pg: Postgres, n_parallel: int):
|
||||
|
||||
|
||||
# Load data into one table with COPY TO from 5 parallel connections
|
||||
def test_parallel_copy_same_table(zenith_simple_env: ZenithEnv,
|
||||
zenbenchmark: ZenithBenchmarker,
|
||||
n_parallel=5):
|
||||
env = zenith_simple_env
|
||||
# Create a branch for us
|
||||
env.zenith_cli(["branch", "test_parallel_copy_same_table", "empty"])
|
||||
|
||||
pg = env.postgres.create_start('test_parallel_copy_same_table')
|
||||
log.info("postgres is running on 'test_parallel_copy_same_table' branch")
|
||||
|
||||
# Open a connection directly to the page server that we'll use to force
|
||||
# flushing the layers to disk
|
||||
psconn = env.pageserver.connect()
|
||||
pscur = psconn.cursor()
|
||||
|
||||
# Get the timeline ID of our branch. We need it for the 'do_gc' command
|
||||
conn = pg.connect()
|
||||
def test_parallel_copy_same_table(zenith_with_baseline: PgCompare, n_parallel=5):
|
||||
env = zenith_with_baseline
|
||||
conn = env.pg.connect()
|
||||
cur = conn.cursor()
|
||||
cur.execute("SHOW zenith.zenith_timeline")
|
||||
timeline = cur.fetchone()[0]
|
||||
|
||||
cur.execute(f'CREATE TABLE copytest (i int, t text)')
|
||||
|
||||
with zenbenchmark.record_pageserver_writes(env.pageserver, 'pageserver_writes'):
|
||||
with zenbenchmark.record_duration('load'):
|
||||
asyncio.run(parallel_load_same_table(pg, n_parallel))
|
||||
with env.record_pageserver_writes('pageserver_writes'):
|
||||
with env.record_duration('load'):
|
||||
asyncio.run(parallel_load_same_table(env.pg, n_parallel))
|
||||
env.flush()
|
||||
|
||||
# Flush the layers from memory to disk. This is included in the reported
|
||||
# time and I/O
|
||||
pscur.execute(f"do_gc {env.initial_tenant} {timeline} 0")
|
||||
|
||||
# Record peak memory usage
|
||||
zenbenchmark.record("peak_mem",
|
||||
zenbenchmark.get_peak_mem(env.pageserver) / 1024,
|
||||
'MB',
|
||||
report=MetricReport.LOWER_IS_BETTER)
|
||||
|
||||
# Report disk space used by the repository
|
||||
timeline_size = zenbenchmark.get_timeline_size(env.repo_dir, env.initial_tenant, timeline)
|
||||
zenbenchmark.record('size',
|
||||
timeline_size / (1024 * 1024),
|
||||
'MB',
|
||||
report=MetricReport.LOWER_IS_BETTER)
|
||||
env.report_peak_memory_use()
|
||||
env.report_size()
|
||||
|
||||
@@ -1,18 +1,15 @@
|
||||
from contextlib import closing
|
||||
from fixtures.zenith_fixtures import PgBin, VanillaPostgres, ZenithEnv
|
||||
from fixtures.compare_fixtures import PgCompare, VanillaCompare, ZenithCompare
|
||||
|
||||
from fixtures.benchmark_fixture import MetricReport, ZenithBenchmarker
|
||||
from fixtures.log_helper import log
|
||||
|
||||
pytest_plugins = ("fixtures.zenith_fixtures", "fixtures.benchmark_fixture")
|
||||
|
||||
|
||||
def pgbench_init(pg_bin: PgBin, connstr: str):
|
||||
pg_bin.run_capture(['pgbench', '-s5', '-i', connstr])
|
||||
|
||||
|
||||
def pgbench_run_5000_transactions(pg_bin: PgBin, connstr: str):
|
||||
pg_bin.run_capture(['pgbench', '-c1', '-t5000', connstr])
|
||||
pytest_plugins = (
|
||||
"fixtures.zenith_fixtures",
|
||||
"fixtures.benchmark_fixture",
|
||||
"fixtures.compare_fixtures",
|
||||
)
|
||||
|
||||
|
||||
#
|
||||
@@ -24,82 +21,16 @@ def pgbench_run_5000_transactions(pg_bin: PgBin, connstr: str):
|
||||
# 2. Time to run 5000 pgbench transactions
|
||||
# 3. Disk space used
|
||||
#
|
||||
def test_pgbench(zenith_simple_env: ZenithEnv, pg_bin: PgBin, zenbenchmark: ZenithBenchmarker):
|
||||
env = zenith_simple_env
|
||||
# Create a branch for us
|
||||
env.zenith_cli(["branch", "test_pgbench_perf", "empty"])
|
||||
def test_pgbench(zenith_with_baseline: PgCompare):
|
||||
env = zenith_with_baseline
|
||||
|
||||
pg = env.postgres.create_start('test_pgbench_perf')
|
||||
log.info("postgres is running on 'test_pgbench_perf' branch")
|
||||
with env.record_pageserver_writes('pageserver_writes'):
|
||||
with env.record_duration('init'):
|
||||
env.pg_bin.run_capture(['pgbench', '-s5', '-i', env.pg.connstr()])
|
||||
env.flush()
|
||||
|
||||
# Open a connection directly to the page server that we'll use to force
|
||||
# flushing the layers to disk
|
||||
psconn = env.pageserver.connect()
|
||||
pscur = psconn.cursor()
|
||||
with env.record_duration('5000_xacts'):
|
||||
env.pg_bin.run_capture(['pgbench', '-c1', '-t5000', env.pg.connstr()])
|
||||
env.flush()
|
||||
|
||||
# Get the timeline ID of our branch. We need it for the 'do_gc' command
|
||||
with closing(pg.connect()) as conn:
|
||||
with conn.cursor() as cur:
|
||||
cur.execute("SHOW zenith.zenith_timeline")
|
||||
timeline = cur.fetchone()[0]
|
||||
|
||||
connstr = pg.connstr()
|
||||
|
||||
# Initialize pgbench database, recording the time and I/O it takes
|
||||
with zenbenchmark.record_pageserver_writes(env.pageserver, 'pageserver_writes'):
|
||||
with zenbenchmark.record_duration('init'):
|
||||
pgbench_init(pg_bin, connstr)
|
||||
|
||||
# Flush the layers from memory to disk. This is included in the reported
|
||||
# time and I/O
|
||||
pscur.execute(f"do_gc {env.initial_tenant} {timeline} 0")
|
||||
|
||||
# Run pgbench for 5000 transactions
|
||||
with zenbenchmark.record_duration('5000_xacts'):
|
||||
pgbench_run_5000_transactions(pg_bin, connstr)
|
||||
|
||||
# Flush the layers to disk again. This is *not' included in the reported time,
|
||||
# though.
|
||||
pscur.execute(f"do_gc {env.initial_tenant} {timeline} 0")
|
||||
|
||||
# Report disk space used by the repository
|
||||
timeline_size = zenbenchmark.get_timeline_size(env.repo_dir, env.initial_tenant, timeline)
|
||||
zenbenchmark.record('size',
|
||||
timeline_size / (1024 * 1024),
|
||||
'MB',
|
||||
report=MetricReport.LOWER_IS_BETTER)
|
||||
|
||||
|
||||
def test_pgbench_baseline(vanilla_pg: VanillaPostgres, zenbenchmark: ZenithBenchmarker):
|
||||
vanilla_pg.configure(['shared_buffers=1MB'])
|
||||
vanilla_pg.start()
|
||||
|
||||
pg_bin = vanilla_pg.pg_bin
|
||||
connstr = vanilla_pg.connstr()
|
||||
conn = vanilla_pg.connect()
|
||||
cur = conn.cursor()
|
||||
|
||||
with zenbenchmark.record_duration('init'):
|
||||
pgbench_init(pg_bin, connstr)
|
||||
|
||||
# This is roughly equivalent to flushing the layers from memory to disk with Zenith.
|
||||
cur.execute(f"checkpoint")
|
||||
|
||||
# Run pgbench for 5000 transactions
|
||||
with zenbenchmark.record_duration('5000_xacts'):
|
||||
pgbench_run_5000_transactions(pg_bin, connstr)
|
||||
|
||||
# This is roughly equivalent to flush the layers from memory to disk with Zenith.
|
||||
cur.execute(f"checkpoint")
|
||||
|
||||
# Report disk space used by the repository
|
||||
data_size = vanilla_pg.get_subdir_size('base')
|
||||
zenbenchmark.record('data_size',
|
||||
data_size / (1024 * 1024),
|
||||
'MB',
|
||||
report=MetricReport.LOWER_IS_BETTER)
|
||||
wal_size = vanilla_pg.get_subdir_size('pg_wal')
|
||||
zenbenchmark.record('wal_size',
|
||||
wal_size / (1024 * 1024),
|
||||
'MB',
|
||||
report=MetricReport.LOWER_IS_BETTER)
|
||||
env.report_size()
|
||||
|
||||
@@ -7,24 +7,19 @@ from contextlib import closing
|
||||
from fixtures.zenith_fixtures import ZenithEnv
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.benchmark_fixture import MetricReport, ZenithBenchmarker
|
||||
from fixtures.compare_fixtures import PgCompare
|
||||
|
||||
pytest_plugins = ("fixtures.zenith_fixtures", "fixtures.benchmark_fixture")
|
||||
pytest_plugins = (
|
||||
"fixtures.zenith_fixtures",
|
||||
"fixtures.benchmark_fixture",
|
||||
"fixtures.compare_fixtures",
|
||||
)
|
||||
|
||||
|
||||
def test_small_seqscans(zenith_simple_env: ZenithEnv, zenbenchmark: ZenithBenchmarker):
|
||||
env = zenith_simple_env
|
||||
# Create a branch for us
|
||||
env.zenith_cli(["branch", "test_small_seqscans", "empty"])
|
||||
def test_small_seqscans(zenith_with_baseline: PgCompare):
|
||||
env = zenith_with_baseline
|
||||
|
||||
pg = env.postgres.create_start('test_small_seqscans')
|
||||
log.info("postgres is running on 'test_small_seqscans' branch")
|
||||
|
||||
# Open a connection directly to the page server that we'll use to force
|
||||
# flushing the layers to disk
|
||||
psconn = env.pageserver.connect()
|
||||
pscur = psconn.cursor()
|
||||
|
||||
with closing(pg.connect()) as conn:
|
||||
with closing(env.pg.connect()) as conn:
|
||||
with conn.cursor() as cur:
|
||||
cur.execute('create table t (i integer);')
|
||||
cur.execute('insert into t values (generate_series(1,100000));')
|
||||
@@ -38,6 +33,6 @@ def test_small_seqscans(zenith_simple_env: ZenithEnv, zenbenchmark: ZenithBenchm
|
||||
log.info(f"shared_buffers is {row[0]}, table size {row[1]}")
|
||||
assert int(row[0]) < int(row[1])
|
||||
|
||||
with zenbenchmark.record_duration('run'):
|
||||
with env.record_duration('run'):
|
||||
for i in range(1000):
|
||||
cur.execute('select count(*) from t;')
|
||||
|
||||
@@ -14,32 +14,23 @@ import os
|
||||
from contextlib import closing
|
||||
from fixtures.benchmark_fixture import MetricReport
|
||||
from fixtures.zenith_fixtures import ZenithEnv
|
||||
from fixtures.compare_fixtures import PgCompare, VanillaCompare, ZenithCompare
|
||||
from fixtures.log_helper import log
|
||||
|
||||
pytest_plugins = ("fixtures.zenith_fixtures", "fixtures.benchmark_fixture")
|
||||
pytest_plugins = (
|
||||
"fixtures.zenith_fixtures",
|
||||
"fixtures.benchmark_fixture",
|
||||
"fixtures.compare_fixtures",
|
||||
)
|
||||
|
||||
|
||||
def test_write_amplification(zenith_simple_env: ZenithEnv, zenbenchmark):
|
||||
env = zenith_simple_env
|
||||
# Create a branch for us
|
||||
env.zenith_cli(["branch", "test_write_amplification", "empty"])
|
||||
def test_write_amplification(zenith_with_baseline: PgCompare):
|
||||
env = zenith_with_baseline
|
||||
|
||||
pg = env.postgres.create_start('test_write_amplification')
|
||||
log.info("postgres is running on 'test_write_amplification' branch")
|
||||
|
||||
# Open a connection directly to the page server that we'll use to force
|
||||
# flushing the layers to disk
|
||||
psconn = env.pageserver.connect()
|
||||
pscur = psconn.cursor()
|
||||
|
||||
with closing(pg.connect()) as conn:
|
||||
with closing(env.pg.connect()) as conn:
|
||||
with conn.cursor() as cur:
|
||||
# Get the timeline ID of our branch. We need it for the 'do_gc' command
|
||||
cur.execute("SHOW zenith.zenith_timeline")
|
||||
timeline = cur.fetchone()[0]
|
||||
|
||||
with zenbenchmark.record_pageserver_writes(env.pageserver, 'pageserver_writes'):
|
||||
with zenbenchmark.record_duration('run'):
|
||||
with env.record_pageserver_writes('pageserver_writes'):
|
||||
with env.record_duration('run'):
|
||||
|
||||
# NOTE: Because each iteration updates every table already created,
|
||||
# the runtime and write amplification is O(n^2), where n is the
|
||||
@@ -71,13 +62,6 @@ def test_write_amplification(zenith_simple_env: ZenithEnv, zenbenchmark):
|
||||
# slower, adding some delays in this loop. But forcing
|
||||
# the checkpointing and GC makes the test go faster,
|
||||
# with the same total I/O effect.
|
||||
pscur.execute(f"do_gc {env.initial_tenant} {timeline} 0")
|
||||
env.flush()
|
||||
|
||||
# Report disk space used by the repository
|
||||
timeline_size = zenbenchmark.get_timeline_size(env.repo_dir,
|
||||
env.initial_tenant,
|
||||
timeline)
|
||||
zenbenchmark.record('size',
|
||||
timeline_size / (1024 * 1024),
|
||||
'MB',
|
||||
report=MetricReport.LOWER_IS_BETTER)
|
||||
env.report_size()
|
||||
|
||||
Reference in New Issue
Block a user