mirror of
https://github.com/neondatabase/neon.git
synced 2025-12-26 07:39:58 +00:00
Add performance test case for parallel COPY TO
This commit is contained in:
140
test_runner/performance/test_parallel_copy_to.py
Normal file
140
test_runner/performance/test_parallel_copy_to.py
Normal file
@@ -0,0 +1,140 @@
|
||||
from io import BytesIO
|
||||
import asyncio
|
||||
import asyncpg
|
||||
from fixtures.zenith_fixtures import ZenithEnv, Postgres
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.benchmark_fixture import MetricReport, ZenithBenchmarker
|
||||
|
||||
pytest_plugins = ("fixtures.zenith_fixtures", "fixtures.benchmark_fixture")
|
||||
|
||||
|
||||
async def repeat_bytes(buf, repetitions: int):
|
||||
for i in range(repetitions):
|
||||
yield buf
|
||||
|
||||
|
||||
async def copy_test_data_to_table(pg: Postgres, worker_id: int, table_name: str):
|
||||
buf = BytesIO()
|
||||
for i in range(1000):
|
||||
buf.write(
|
||||
f"{i}\tLoaded by worker {worker_id}. Long string to consume some space.\n".encode())
|
||||
buf.seek(0)
|
||||
|
||||
copy_input = repeat_bytes(buf.read(), 5000)
|
||||
|
||||
pg_conn = await pg.connect_async()
|
||||
await pg_conn.copy_to_table(table_name, source=copy_input)
|
||||
|
||||
|
||||
async def parallel_load_different_tables(pg: Postgres, n_parallel: int):
|
||||
workers = []
|
||||
for worker_id in range(n_parallel):
|
||||
worker = copy_test_data_to_table(pg, worker_id, f'copytest_{worker_id}')
|
||||
workers.append(asyncio.create_task(worker))
|
||||
|
||||
# await all workers
|
||||
await asyncio.gather(*workers)
|
||||
|
||||
|
||||
# Load 5 different tables in parallel with COPY TO
|
||||
def test_parallel_copy_different_tables(zenith_simple_env: ZenithEnv,
|
||||
zenbenchmark: ZenithBenchmarker,
|
||||
n_parallel=5):
|
||||
|
||||
env = zenith_simple_env
|
||||
# Create a branch for us
|
||||
env.zenith_cli(["branch", "test_parallel_copy_different_tables", "empty"])
|
||||
|
||||
pg = env.postgres.create_start('test_parallel_copy_different_tables')
|
||||
log.info("postgres is running on 'test_parallel_copy_different_tables' branch")
|
||||
|
||||
# Open a connection directly to the page server that we'll use to force
|
||||
# flushing the layers to disk
|
||||
psconn = env.pageserver.connect()
|
||||
pscur = psconn.cursor()
|
||||
|
||||
# Get the timeline ID of our branch. We need it for the 'do_gc' command
|
||||
conn = pg.connect()
|
||||
cur = conn.cursor()
|
||||
cur.execute("SHOW zenith.zenith_timeline")
|
||||
timeline = cur.fetchone()[0]
|
||||
|
||||
for worker_id in range(n_parallel):
|
||||
cur.execute(f'CREATE TABLE copytest_{worker_id} (i int, t text)')
|
||||
|
||||
with zenbenchmark.record_pageserver_writes(env.pageserver, 'pageserver_writes'):
|
||||
with zenbenchmark.record_duration('load'):
|
||||
asyncio.run(parallel_load_different_tables(pg, n_parallel))
|
||||
|
||||
# Flush the layers from memory to disk. This is included in the reported
|
||||
# time and I/O
|
||||
pscur.execute(f"do_gc {env.initial_tenant} {timeline} 0")
|
||||
|
||||
# Record peak memory usage
|
||||
zenbenchmark.record("peak_mem",
|
||||
zenbenchmark.get_peak_mem(env.pageserver) / 1024,
|
||||
'MB',
|
||||
report=MetricReport.LOWER_IS_BETTER)
|
||||
|
||||
# Report disk space used by the repository
|
||||
timeline_size = zenbenchmark.get_timeline_size(env.repo_dir, env.initial_tenant, timeline)
|
||||
zenbenchmark.record('size',
|
||||
timeline_size / (1024 * 1024),
|
||||
'MB',
|
||||
report=MetricReport.LOWER_IS_BETTER)
|
||||
|
||||
|
||||
async def parallel_load_same_table(pg: Postgres, n_parallel: int):
|
||||
workers = []
|
||||
for worker_id in range(n_parallel):
|
||||
worker = copy_test_data_to_table(pg, worker_id, f'copytest')
|
||||
workers.append(asyncio.create_task(worker))
|
||||
|
||||
# await all workers
|
||||
await asyncio.gather(*workers)
|
||||
|
||||
|
||||
# Load data into one table with COPY TO from 5 parallel connections
|
||||
def test_parallel_copy_same_table(zenith_simple_env: ZenithEnv,
|
||||
zenbenchmark: ZenithBenchmarker,
|
||||
n_parallel=5):
|
||||
env = zenith_simple_env
|
||||
# Create a branch for us
|
||||
env.zenith_cli(["branch", "test_parallel_copy_same_table", "empty"])
|
||||
|
||||
pg = env.postgres.create_start('test_parallel_copy_same_table')
|
||||
log.info("postgres is running on 'test_parallel_copy_same_table' branch")
|
||||
|
||||
# Open a connection directly to the page server that we'll use to force
|
||||
# flushing the layers to disk
|
||||
psconn = env.pageserver.connect()
|
||||
pscur = psconn.cursor()
|
||||
|
||||
# Get the timeline ID of our branch. We need it for the 'do_gc' command
|
||||
conn = pg.connect()
|
||||
cur = conn.cursor()
|
||||
cur.execute("SHOW zenith.zenith_timeline")
|
||||
timeline = cur.fetchone()[0]
|
||||
|
||||
cur.execute(f'CREATE TABLE copytest (i int, t text)')
|
||||
|
||||
with zenbenchmark.record_pageserver_writes(env.pageserver, 'pageserver_writes'):
|
||||
with zenbenchmark.record_duration('load'):
|
||||
asyncio.run(parallel_load_same_table(pg, n_parallel))
|
||||
|
||||
# Flush the layers from memory to disk. This is included in the reported
|
||||
# time and I/O
|
||||
pscur.execute(f"do_gc {env.initial_tenant} {timeline} 0")
|
||||
|
||||
# Record peak memory usage
|
||||
zenbenchmark.record("peak_mem",
|
||||
zenbenchmark.get_peak_mem(env.pageserver) / 1024,
|
||||
'MB',
|
||||
report=MetricReport.LOWER_IS_BETTER)
|
||||
|
||||
# Report disk space used by the repository
|
||||
timeline_size = zenbenchmark.get_timeline_size(env.repo_dir, env.initial_tenant, timeline)
|
||||
zenbenchmark.record('size',
|
||||
timeline_size / (1024 * 1024),
|
||||
'MB',
|
||||
report=MetricReport.LOWER_IS_BETTER)
|
||||
Reference in New Issue
Block a user