mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-09 14:32:57 +00:00
Fix bug when import large (>1GB) relations (#2172)
Resolves #2097 - use timeline modification's `lsn` and timeline's `last_record_lsn` to determine the corresponding LSN to query data in `DatadirModification::get` - update `test_import_from_pageserver`. Split the test into 2 variants: `small` and `multisegment`. + `small` is the old test + `multisegment` is to simulate #2097 by using a larger number of inserted rows to create multiple segment files of a relation. `multisegment` is configured to only run with a `release` build
This commit is contained in:
@@ -83,6 +83,7 @@ runs:
|
||||
# this variable will be embedded in perf test report
|
||||
# and is needed to distinguish different environments
|
||||
PLATFORM: github-actions-selfhosted
|
||||
BUILD_TYPE: ${{ inputs.build_type }}
|
||||
AWS_ACCESS_KEY_ID: ${{ inputs.real_s3_access_key_id }}
|
||||
AWS_SECRET_ACCESS_KEY: ${{ inputs.real_s3_secret_access_key }}
|
||||
shell: bash -euxo pipefail {0}
|
||||
|
||||
@@ -966,8 +966,8 @@ impl<'a, T: DatadirTimeline> DatadirModification<'a, T> {
|
||||
bail!("unexpected pending WAL record");
|
||||
}
|
||||
} else {
|
||||
let last_lsn = self.tline.get_last_record_lsn();
|
||||
self.tline.get(key, last_lsn)
|
||||
let lsn = Lsn::max(self.tline.get_last_record_lsn(), self.lsn);
|
||||
self.tline.get(key, lsn)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -1,9 +1,10 @@
|
||||
import re
|
||||
import pytest
|
||||
from fixtures.neon_fixtures import NeonEnvBuilder, wait_for_upload, wait_for_last_record_lsn
|
||||
from fixtures.utils import lsn_from_hex, lsn_to_hex
|
||||
from fixtures.neon_fixtures import NeonEnv, NeonEnvBuilder, PgBin, Postgres, wait_for_upload, wait_for_last_record_lsn
|
||||
from fixtures.utils import lsn_from_hex
|
||||
from uuid import UUID, uuid4
|
||||
import tarfile
|
||||
import os
|
||||
import tarfile
|
||||
import shutil
|
||||
from pathlib import Path
|
||||
import json
|
||||
@@ -105,20 +106,60 @@ def test_import_from_vanilla(test_output_dir, pg_bin, vanilla_pg, neon_env_build
|
||||
|
||||
|
||||
@pytest.mark.timeout(600)
|
||||
def test_import_from_pageserver(test_output_dir, pg_bin, vanilla_pg, neon_env_builder):
|
||||
|
||||
num_rows = 3000
|
||||
def test_import_from_pageserver_small(pg_bin: PgBin, neon_env_builder: NeonEnvBuilder):
|
||||
neon_env_builder.num_safekeepers = 1
|
||||
neon_env_builder.enable_local_fs_remote_storage()
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
env.neon_cli.create_branch('test_import_from_pageserver')
|
||||
pgmain = env.postgres.create_start('test_import_from_pageserver')
|
||||
log.info("postgres is running on 'test_import_from_pageserver' branch")
|
||||
timeline = env.neon_cli.create_branch('test_import_from_pageserver_small')
|
||||
pg = env.postgres.create_start('test_import_from_pageserver_small')
|
||||
|
||||
timeline = pgmain.safe_psql("SHOW neon.timeline_id")[0][0]
|
||||
num_rows = 3000
|
||||
lsn = _generate_data(num_rows, pg)
|
||||
_import(num_rows, lsn, env, pg_bin, timeline)
|
||||
|
||||
with closing(pgmain.connect()) as conn:
|
||||
|
||||
@pytest.mark.timeout(1800)
|
||||
@pytest.mark.skipif(os.environ.get('BUILD_TYPE') == "debug", reason="only run with release build")
|
||||
def test_import_from_pageserver_multisegment(pg_bin: PgBin, neon_env_builder: NeonEnvBuilder):
|
||||
neon_env_builder.num_safekeepers = 1
|
||||
neon_env_builder.enable_local_fs_remote_storage()
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
timeline = env.neon_cli.create_branch('test_import_from_pageserver_multisegment')
|
||||
pg = env.postgres.create_start('test_import_from_pageserver_multisegment')
|
||||
|
||||
# For `test_import_from_pageserver_multisegment`, we want to make sure that the data
|
||||
# is large enough to create multi-segment files. Typically, a segment file's size is
|
||||
# at most 1GB. A large number of inserted rows (`30000000`) is used to increase the
|
||||
# DB size to above 1GB. Related: https://github.com/neondatabase/neon/issues/2097.
|
||||
num_rows = 30000000
|
||||
lsn = _generate_data(num_rows, pg)
|
||||
|
||||
logical_size = env.pageserver.http_client().timeline_detail(
|
||||
env.initial_tenant, timeline)['local']['current_logical_size']
|
||||
log.info(f"timeline logical size = {logical_size / (1024 ** 2)}MB")
|
||||
assert logical_size > 1024**3 # = 1GB
|
||||
|
||||
tar_output_file = _import(num_rows, lsn, env, pg_bin, timeline)
|
||||
|
||||
# Check if the backup data contains multiple segment files
|
||||
cnt_seg_files = 0
|
||||
segfile_re = re.compile('[0-9]+\\.[0-9]+')
|
||||
with tarfile.open(tar_output_file, "r") as tar_f:
|
||||
for f in tar_f.getnames():
|
||||
if segfile_re.search(f) is not None:
|
||||
cnt_seg_files += 1
|
||||
log.info(f"Found a segment file: {f} in the backup archive file")
|
||||
assert cnt_seg_files > 0
|
||||
|
||||
|
||||
def _generate_data(num_rows: int, pg: Postgres) -> str:
|
||||
"""Generate a table with `num_rows` rows.
|
||||
|
||||
Returns:
|
||||
the latest insert WAL's LSN"""
|
||||
with closing(pg.connect()) as conn:
|
||||
with conn.cursor() as cur:
|
||||
# data loading may take a while, so increase statement timeout
|
||||
cur.execute("SET statement_timeout='300s'")
|
||||
@@ -127,15 +168,28 @@ def test_import_from_pageserver(test_output_dir, pg_bin, vanilla_pg, neon_env_bu
|
||||
cur.execute("CHECKPOINT")
|
||||
|
||||
cur.execute('SELECT pg_current_wal_insert_lsn()')
|
||||
lsn = cur.fetchone()[0]
|
||||
log.info(f"start_backup_lsn = {lsn}")
|
||||
res = cur.fetchone()
|
||||
assert res is not None and isinstance(res[0], str)
|
||||
return res[0]
|
||||
|
||||
|
||||
def _import(expected_num_rows: int, lsn: str, env: NeonEnv, pg_bin: PgBin, timeline: UUID) -> str:
|
||||
"""Test importing backup data to the pageserver.
|
||||
|
||||
Args:
|
||||
expected_num_rows: the expected number of rows of the test table in the backup data
|
||||
lsn: the backup's base LSN
|
||||
|
||||
Returns:
|
||||
path to the backup archive file"""
|
||||
log.info(f"start_backup_lsn = {lsn}")
|
||||
|
||||
# Set LD_LIBRARY_PATH in the env properly, otherwise we may use the wrong libpq.
|
||||
# PgBin sets it automatically, but here we need to pipe psql output to the tar command.
|
||||
psql_env = {'LD_LIBRARY_PATH': os.path.join(str(pg_distrib_dir), 'lib')}
|
||||
|
||||
# Get a fullbackup from pageserver
|
||||
query = f"fullbackup { env.initial_tenant.hex} {timeline} {lsn}"
|
||||
query = f"fullbackup { env.initial_tenant.hex} {timeline.hex} {lsn}"
|
||||
cmd = ["psql", "--no-psqlrc", env.pageserver.connstr(), "-c", query]
|
||||
result_basepath = pg_bin.run_capture(cmd, env=psql_env)
|
||||
tar_output_file = result_basepath + ".stdout"
|
||||
@@ -152,7 +206,7 @@ def test_import_from_pageserver(test_output_dir, pg_bin, vanilla_pg, neon_env_bu
|
||||
env.pageserver.start()
|
||||
|
||||
# Import using another tenantid, because we use the same pageserver.
|
||||
# TODO Create another pageserver to maeke test more realistic.
|
||||
# TODO Create another pageserver to make test more realistic.
|
||||
tenant = uuid4()
|
||||
|
||||
# Import to pageserver
|
||||
@@ -165,7 +219,7 @@ def test_import_from_pageserver(test_output_dir, pg_bin, vanilla_pg, neon_env_bu
|
||||
"--tenant-id",
|
||||
tenant.hex,
|
||||
"--timeline-id",
|
||||
timeline,
|
||||
timeline.hex,
|
||||
"--node-name",
|
||||
node_name,
|
||||
"--base-lsn",
|
||||
@@ -175,15 +229,15 @@ def test_import_from_pageserver(test_output_dir, pg_bin, vanilla_pg, neon_env_bu
|
||||
])
|
||||
|
||||
# Wait for data to land in s3
|
||||
wait_for_last_record_lsn(client, tenant, UUID(timeline), lsn_from_hex(lsn))
|
||||
wait_for_upload(client, tenant, UUID(timeline), lsn_from_hex(lsn))
|
||||
wait_for_last_record_lsn(client, tenant, timeline, lsn_from_hex(lsn))
|
||||
wait_for_upload(client, tenant, timeline, lsn_from_hex(lsn))
|
||||
|
||||
# Check it worked
|
||||
pg = env.postgres.create_start(node_name, tenant_id=tenant)
|
||||
assert pg.safe_psql('select count(*) from tbl') == [(num_rows, )]
|
||||
assert pg.safe_psql('select count(*) from tbl') == [(expected_num_rows, )]
|
||||
|
||||
# Take another fullbackup
|
||||
query = f"fullbackup { tenant.hex} {timeline} {lsn}"
|
||||
query = f"fullbackup { tenant.hex} {timeline.hex} {lsn}"
|
||||
cmd = ["psql", "--no-psqlrc", env.pageserver.connstr(), "-c", query]
|
||||
result_basepath = pg_bin.run_capture(cmd, env=psql_env)
|
||||
new_tar_output_file = result_basepath + ".stdout"
|
||||
@@ -195,4 +249,6 @@ def test_import_from_pageserver(test_output_dir, pg_bin, vanilla_pg, neon_env_bu
|
||||
# Check that gc works
|
||||
psconn = env.pageserver.connect()
|
||||
pscur = psconn.cursor()
|
||||
pscur.execute(f"do_gc {tenant.hex} {timeline} 0")
|
||||
pscur.execute(f"do_gc {tenant.hex} {timeline.hex} 0")
|
||||
|
||||
return tar_output_file
|
||||
|
||||
Reference in New Issue
Block a user