mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-16 18:02:56 +00:00
Add export/import test (#2036)
This commit is contained in:
@@ -10,8 +10,8 @@ from typing import Optional
|
||||
import signal
|
||||
import pytest
|
||||
|
||||
from fixtures.neon_fixtures import PgProtocol, PortDistributor, Postgres, NeonEnvBuilder, Etcd, NeonPageserverHttpClient, assert_local, wait_until, wait_for_last_record_lsn, wait_for_upload, neon_binpath, pg_distrib_dir
|
||||
from fixtures.utils import lsn_from_hex
|
||||
from fixtures.neon_fixtures import PgProtocol, PortDistributor, Postgres, NeonEnvBuilder, Etcd, NeonPageserverHttpClient, assert_local, wait_until, wait_for_last_record_lsn, wait_for_upload, neon_binpath, pg_distrib_dir, base_dir
|
||||
from fixtures.utils import lsn_from_hex, subprocess_capture
|
||||
|
||||
|
||||
def assert_abs_margin_ratio(a: float, b: float, margin_ratio: float):
|
||||
@@ -101,9 +101,23 @@ def load(pg: Postgres, stop_event: threading.Event, load_ok_event: threading.Eve
|
||||
log.info('load thread stopped')
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
'method',
|
||||
[
|
||||
# A minor migration involves no storage breaking changes.
|
||||
# It is done by attaching the tenant to a new pageserver.
|
||||
'minor',
|
||||
# A major migration involves exporting a postgres datadir
|
||||
# basebackup and importing it into the new pageserver.
|
||||
# This kind of migration can tolerate breaking changes
|
||||
# to storage format
|
||||
pytest.param('major', marks=pytest.mark.xfail(reason="Not implemented")),
|
||||
])
|
||||
@pytest.mark.parametrize('with_load', ['with_load', 'without_load'])
|
||||
def test_tenant_relocation(neon_env_builder: NeonEnvBuilder,
|
||||
port_distributor: PortDistributor,
|
||||
test_output_dir,
|
||||
method: str,
|
||||
with_load: str):
|
||||
neon_env_builder.enable_local_fs_remote_storage()
|
||||
|
||||
@@ -153,8 +167,11 @@ def test_tenant_relocation(neon_env_builder: NeonEnvBuilder,
|
||||
|
||||
load_stop_event = threading.Event()
|
||||
load_ok_event = threading.Event()
|
||||
load_thread = threading.Thread(target=load,
|
||||
args=(tenant_pg, load_stop_event, load_ok_event))
|
||||
load_thread = threading.Thread(
|
||||
target=load,
|
||||
args=(tenant_pg, load_stop_event, load_ok_event),
|
||||
daemon=True, # To make sure the child dies when the parent errors
|
||||
)
|
||||
load_thread.start()
|
||||
|
||||
# run checkpoint manually to be sure that data landed in remote storage
|
||||
@@ -184,19 +201,47 @@ def test_tenant_relocation(neon_env_builder: NeonEnvBuilder,
|
||||
new_pageserver_http_port,
|
||||
neon_env_builder.broker):
|
||||
|
||||
# call to attach timeline to new pageserver
|
||||
new_pageserver_http.timeline_attach(tenant, timeline)
|
||||
# new pageserver should be in sync (modulo wal tail or vacuum activity) with the old one because there was no new writes since checkpoint
|
||||
new_timeline_detail = wait_until(
|
||||
number_of_iterations=5,
|
||||
interval=1,
|
||||
func=lambda: assert_local(new_pageserver_http, tenant, timeline))
|
||||
# Migrate either by attaching from s3 or import/export basebackup
|
||||
if method == "major":
|
||||
cmd = [
|
||||
"python",
|
||||
os.path.join(base_dir, "scripts/export_import_between_pageservers.py"),
|
||||
"--tenant-id",
|
||||
tenant.hex,
|
||||
"--from-host",
|
||||
"localhost",
|
||||
"--from-http-port",
|
||||
str(pageserver_http.port),
|
||||
"--from-pg-port",
|
||||
str(env.pageserver.service_port.pg),
|
||||
"--to-host",
|
||||
"localhost",
|
||||
"--to-http-port",
|
||||
str(new_pageserver_http_port),
|
||||
"--to-pg-port",
|
||||
str(new_pageserver_pg_port),
|
||||
"--psql-path",
|
||||
os.path.join(pg_distrib_dir, "bin", "psql"),
|
||||
"--work-dir",
|
||||
os.path.join(test_output_dir),
|
||||
]
|
||||
subprocess_capture(str(env.repo_dir), cmd, check=True)
|
||||
elif method == "minor":
|
||||
# call to attach timeline to new pageserver
|
||||
new_pageserver_http.timeline_attach(tenant, timeline)
|
||||
|
||||
# when load is active these checks can break because lsns are not static
|
||||
# so lets check with some margin
|
||||
assert_abs_margin_ratio(lsn_from_hex(new_timeline_detail['local']['disk_consistent_lsn']),
|
||||
lsn_from_hex(timeline_detail['local']['disk_consistent_lsn']),
|
||||
0.03)
|
||||
# new pageserver should be in sync (modulo wal tail or vacuum activity) with the old one because there was no new writes since checkpoint
|
||||
new_timeline_detail = wait_until(
|
||||
number_of_iterations=5,
|
||||
interval=1,
|
||||
func=lambda: assert_local(new_pageserver_http, tenant, timeline))
|
||||
|
||||
# when load is active these checks can break because lsns are not static
|
||||
# so lets check with some margin
|
||||
assert_abs_margin_ratio(
|
||||
lsn_from_hex(new_timeline_detail['local']['disk_consistent_lsn']),
|
||||
lsn_from_hex(timeline_detail['local']['disk_consistent_lsn']),
|
||||
0.03)
|
||||
|
||||
tenant_pg.stop()
|
||||
|
||||
|
||||
Reference in New Issue
Block a user