mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-26 09:30:37 +00:00
Bandaid fix for "page not found" errors, when a table is loaded.
During parallel load of a table, Postgres sometimes requests a page from the page server for which no WAL has been generated yet. That's normal; Postgres expects the page to be full of zeros. There was a special case for that in LayeredTimeline::materialize_page, but the problem remained when you're crossing a segment boundary, so that there's no layer for the segment at all. It would be nice to have a more robust cross-check for this case. That might need help from the Postgres side. But this extends the bandaid fix we had in materialize_page() to the case where cross segment boundary. Fixes https://github.com/zenithdb/zenith/issues/841
This commit is contained in:
@@ -596,7 +596,16 @@ impl Timeline for LayeredTimeline {
|
||||
RECONSTRUCT_TIME
|
||||
.observe_closure_duration(|| self.materialize_page(seg, blknum, lsn, &*layer))
|
||||
} else {
|
||||
bail!("relish {} not found at {}", rel, lsn);
|
||||
// FIXME: This can happen if PostgreSQL extends a relation but never writes
|
||||
// the page. See https://github.com/zenithdb/zenith/issues/841
|
||||
//
|
||||
// Would be nice to detect that situation better.
|
||||
if seg.segno > 0 && self.get_rel_exists(rel, lsn)? {
|
||||
warn!("Page {} blk {} at {} not found", seg.rel, blknum, lsn);
|
||||
return Ok(ZERO_PAGE.clone());
|
||||
}
|
||||
|
||||
bail!("segment {} not found at {}", rel, lsn);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
54
test_runner/batch_others/test_parallel_copy.py
Normal file
54
test_runner/batch_others/test_parallel_copy.py
Normal file
@@ -0,0 +1,54 @@
|
||||
from io import BytesIO
|
||||
import asyncio
|
||||
import asyncpg
|
||||
import subprocess
|
||||
from fixtures.zenith_fixtures import ZenithEnv, Postgres
|
||||
from fixtures.log_helper import log
|
||||
|
||||
pytest_plugins = ("fixtures.zenith_fixtures")
|
||||
|
||||
|
||||
async def repeat_bytes(buf, repetitions: int):
|
||||
for i in range(repetitions):
|
||||
yield buf
|
||||
|
||||
|
||||
async def copy_test_data_to_table(pg: Postgres, worker_id: int, table_name: str):
|
||||
buf = BytesIO()
|
||||
for i in range(1000):
|
||||
buf.write(
|
||||
f"{i}\tLoaded by worker {worker_id}. Long string to consume some space.\n".encode())
|
||||
buf.seek(0)
|
||||
|
||||
copy_input = repeat_bytes(buf.read(), 5000)
|
||||
|
||||
pg_conn = await pg.connect_async()
|
||||
await pg_conn.copy_to_table(table_name, source=copy_input)
|
||||
|
||||
|
||||
async def parallel_load_same_table(pg: Postgres, n_parallel: int):
|
||||
workers = []
|
||||
for worker_id in range(n_parallel):
|
||||
worker = copy_test_data_to_table(pg, worker_id, f'copytest')
|
||||
workers.append(asyncio.create_task(worker))
|
||||
|
||||
# await all workers
|
||||
await asyncio.gather(*workers)
|
||||
|
||||
|
||||
# Load data into one table with COPY TO from 5 parallel connections
|
||||
def test_parallel_copy(zenith_simple_env: ZenithEnv, n_parallel=5):
|
||||
env = zenith_simple_env
|
||||
# Create a branch for us
|
||||
env.zenith_cli(["branch", "test_parallel_copy", "empty"])
|
||||
|
||||
pg = env.postgres.create_start('test_parallel_copy')
|
||||
log.info("postgres is running on 'test_parallel_copy' branch")
|
||||
|
||||
# Create test table
|
||||
conn = pg.connect()
|
||||
cur = conn.cursor()
|
||||
cur.execute(f'CREATE TABLE copytest (i int, t text)')
|
||||
|
||||
# Run COPY TO to load the table with parallel connections.
|
||||
asyncio.run(parallel_load_same_table(pg, n_parallel))
|
||||
Reference in New Issue
Block a user