Files
neon/test_runner/batch_others/test_parallel_copy.py
Heikki Linnakangas 849ac791a6 Bandaid fix for "page not found" errors, when a table is loaded.
During parallel load of a table, Postgres sometimes requests a page from
the page server for which no WAL has been generated yet. That's normal;
Postgres expects the page to be full of zeros. There was a special case
for that in LayeredTimeline::materialize_page, but the problem remained
when you're crossing a segment boundary, so that there's no layer for
the segment at all.

It would be nice to have a more robust cross-check for this case. That
might need help from the Postgres side. But this extends the bandaid fix
we had in materialize_page() to the case where cross segment boundary.

Fixes https://github.com/zenithdb/zenith/issues/841
2021-11-12 18:47:39 +02:00

55 lines
1.6 KiB
Python

from io import BytesIO
import asyncio
import asyncpg
import subprocess
from fixtures.zenith_fixtures import ZenithEnv, Postgres
from fixtures.log_helper import log
pytest_plugins = ("fixtures.zenith_fixtures")
async def repeat_bytes(buf, repetitions: int):
for i in range(repetitions):
yield buf
async def copy_test_data_to_table(pg: Postgres, worker_id: int, table_name: str):
buf = BytesIO()
for i in range(1000):
buf.write(
f"{i}\tLoaded by worker {worker_id}. Long string to consume some space.\n".encode())
buf.seek(0)
copy_input = repeat_bytes(buf.read(), 5000)
pg_conn = await pg.connect_async()
await pg_conn.copy_to_table(table_name, source=copy_input)
async def parallel_load_same_table(pg: Postgres, n_parallel: int):
workers = []
for worker_id in range(n_parallel):
worker = copy_test_data_to_table(pg, worker_id, f'copytest')
workers.append(asyncio.create_task(worker))
# await all workers
await asyncio.gather(*workers)
# Load data into one table with COPY TO from 5 parallel connections
def test_parallel_copy(zenith_simple_env: ZenithEnv, n_parallel=5):
env = zenith_simple_env
# Create a branch for us
env.zenith_cli(["branch", "test_parallel_copy", "empty"])
pg = env.postgres.create_start('test_parallel_copy')
log.info("postgres is running on 'test_parallel_copy' branch")
# Create test table
conn = pg.connect()
cur = conn.cursor()
cur.execute(f'CREATE TABLE copytest (i int, t text)')
# Run COPY TO to load the table with parallel connections.
asyncio.run(parallel_load_same_table(pg, n_parallel))