Files
neon/test_runner/batch_others/test_tenant_relocation.py
2022-02-17 13:41:19 +02:00

268 lines
11 KiB
Python

from contextlib import closing, contextmanager
import os
import pathlib
import subprocess
import threading
from uuid import UUID
from fixtures.log_helper import log
import time
import signal
import pytest
from fixtures.zenith_fixtures import PgProtocol, PortDistributor, Postgres, ZenithEnvBuilder, ZenithPageserverHttpClient, zenith_binpath, pg_distrib_dir
def assert_abs_margin_ratio(a: float, b: float, margin_ratio: float):
assert abs(a - b) / a < margin_ratio, (a, b, margin_ratio)
@contextmanager
def new_pageserver_helper(new_pageserver_dir: pathlib.Path,
pageserver_bin: pathlib.Path,
remote_storage_mock_path: pathlib.Path,
pg_port: int,
http_port: int):
"""
cannot use ZenithPageserver yet because it depends on zenith cli
which currently lacks support for multiple pageservers
"""
cmd = [
str(pageserver_bin),
'--init',
'--workdir',
str(new_pageserver_dir),
f"-c listen_pg_addr='localhost:{pg_port}'",
f"-c listen_http_addr='localhost:{http_port}'",
f"-c pg_distrib_dir='{pg_distrib_dir}'",
f"-c remote_storage={{local_path='{remote_storage_mock_path}'}}",
]
subprocess.check_output(cmd, text=True)
# actually run new pageserver
cmd = [
str(pageserver_bin),
'--workdir',
str(new_pageserver_dir),
'--daemonize',
]
log.info("starting new pageserver %s", cmd)
out = subprocess.check_output(cmd, text=True)
log.info("started new pageserver %s", out)
try:
yield
finally:
log.info("stopping new pageserver")
pid = int((new_pageserver_dir / 'pageserver.pid').read_text())
os.kill(pid, signal.SIGQUIT)
def wait_for(number_of_iterations: int, interval: int, func):
last_exception = None
for i in range(number_of_iterations):
try:
res = func()
except Exception as e:
log.info("waiting for %s iteration %s failed", func, i + 1)
last_exception = e
time.sleep(interval)
continue
return res
raise Exception("timed out while waiting for %s" % func) from last_exception
@contextmanager
def pg_cur(pg):
with closing(pg.connect()) as conn:
with conn.cursor() as cur:
yield cur
def load(pg: Postgres, stop_event: threading.Event, load_ok_event: threading.Event):
log.info("load started")
inserted_ctr = 0
failed = False
while not stop_event.is_set():
try:
with pg_cur(pg) as cur:
cur.execute("INSERT INTO load VALUES ('some payload')")
inserted_ctr += 1
except:
if not failed:
log.info("load failed")
failed = True
load_ok_event.clear()
else:
if failed:
with pg_cur(pg) as cur:
# if we recovered after failure verify that we have correct number of rows
log.info("recovering at %s", inserted_ctr)
cur.execute("SELECT count(*) FROM load")
# it seems that sometimes transaction gets commited before we can acknowledge
# the result, so sometimes selected value is larger by one than we expect
assert cur.fetchone()[0] - inserted_ctr <= 1
log.info("successfully recovered %s", inserted_ctr)
failed = False
load_ok_event.set()
log.info('load thread stopped')
def assert_local(pageserver_http_client: ZenithPageserverHttpClient, tenant: UUID, timeline: str):
timeline_detail = pageserver_http_client.timeline_detail(tenant, UUID(timeline))
assert timeline_detail.get('type') == "Local", timeline_detail
return timeline_detail
@pytest.mark.skip(reason="will be fixed with https://github.com/zenithdb/zenith/issues/1193")
@pytest.mark.parametrize('with_load', ['with_load', 'without_load'])
def test_tenant_relocation(zenith_env_builder: ZenithEnvBuilder,
port_distributor: PortDistributor,
with_load: str):
zenith_env_builder.num_safekeepers = 1
zenith_env_builder.enable_local_fs_remote_storage()
env = zenith_env_builder.init()
# create folder for remote storage mock
remote_storage_mock_path = env.repo_dir / 'local_fs_remote_storage'
tenant = env.create_tenant(UUID("74ee8b079a0e437eb0afea7d26a07209"))
log.info("tenant to relocate %s", tenant)
env.zenith_cli.create_branch("test_tenant_relocation", "main", tenant_id=tenant)
tenant_pg = env.postgres.create_start(
"test_tenant_relocation",
"main", # branch name, None means same as node name
tenant_id=tenant,
)
# insert some data
with closing(tenant_pg.connect()) as conn:
with conn.cursor() as cur:
# save timeline for later gc call
cur.execute("SHOW zenith.zenith_timeline")
timeline = cur.fetchone()[0]
log.info("timeline to relocate %s", timeline)
# we rely upon autocommit after each statement
# as waiting for acceptors happens there
cur.execute("CREATE TABLE t(key int primary key, value text)")
cur.execute("INSERT INTO t SELECT generate_series(1,1000), 'some payload'")
cur.execute("SELECT sum(key) FROM t")
assert cur.fetchone() == (500500, )
if with_load == 'with_load':
# create load table
with pg_cur(tenant_pg) as cur:
cur.execute("CREATE TABLE load(value text)")
load_stop_event = threading.Event()
load_ok_event = threading.Event()
load_thread = threading.Thread(target=load,
args=(tenant_pg, load_stop_event, load_ok_event))
load_thread.start()
# run checkpoint manually to be sure that data landed in remote storage
with closing(env.pageserver.connect()) as psconn:
with psconn.cursor() as pscur:
pscur.execute(f"do_gc {tenant.hex} {timeline}")
# ensure upload is completed
pageserver_http_client = env.pageserver.http_client()
timeline_detail = pageserver_http_client.timeline_detail(tenant, UUID(timeline))
assert timeline_detail['disk_consistent_lsn'] == timeline_detail['timeline_state']['Ready']
log.info("inititalizing new pageserver")
# bootstrap second pageserver
new_pageserver_dir = env.repo_dir / 'new_pageserver'
new_pageserver_dir.mkdir()
new_pageserver_pg_port = port_distributor.get_port()
new_pageserver_http_port = port_distributor.get_port()
log.info("new pageserver ports pg %s http %s", new_pageserver_pg_port, new_pageserver_http_port)
pageserver_bin = pathlib.Path(zenith_binpath) / 'pageserver'
new_pageserver_http_client = ZenithPageserverHttpClient(port=new_pageserver_http_port,
auth_token=None)
with new_pageserver_helper(new_pageserver_dir,
pageserver_bin,
remote_storage_mock_path,
new_pageserver_pg_port,
new_pageserver_http_port):
# call to attach timeline to new pageserver
new_pageserver_http_client.timeline_attach(tenant, UUID(timeline))
# FIXME cannot handle duplicate download requests, subject to fix in https://github.com/zenithdb/zenith/issues/997
time.sleep(5)
# new pageserver should in sync (modulo wal tail or vacuum activity) with the old one because there was no new writes since checkpoint
new_timeline_detail = wait_for(
number_of_iterations=5,
interval=1,
func=lambda: assert_local(new_pageserver_http_client, tenant, timeline))
assert new_timeline_detail['timeline_state'].get('Ready'), new_timeline_detail
# when load is active these checks can break because lsns are not static
# so lets check with some margin
if with_load == 'without_load':
# TODO revisit this once https://github.com/zenithdb/zenith/issues/1049 is fixed
assert_abs_margin_ratio(new_timeline_detail['disk_consistent_lsn'],
timeline_detail['disk_consistent_lsn'],
0.01)
assert_abs_margin_ratio(new_timeline_detail['timeline_state']['Ready'],
timeline_detail['timeline_state']['Ready'],
0.01)
# callmemaybe to start replication from safekeeper to the new pageserver
# when there is no load there is a clean checkpoint and no wal delta
# needs to be streamed to the new pageserver
# TODO (rodionov) use attach to start replication
with pg_cur(PgProtocol(host='localhost', port=new_pageserver_pg_port)) as cur:
# "callmemaybe {} {} host={} port={} options='-c ztimelineid={} ztenantid={}'"
safekeeper_connstring = f"host=localhost port={env.safekeepers[0].port.pg} options='-c ztimelineid={timeline} ztenantid={tenant} pageserver_connstr=postgresql://no_user:@localhost:{new_pageserver_pg_port}'"
cur.execute("callmemaybe {} {} {}".format(tenant, timeline, safekeeper_connstring))
tenant_pg.stop()
# rewrite zenith cli config to use new pageserver for basebackup to start new compute
cli_config_lines = (env.repo_dir / 'config').read_text().splitlines()
cli_config_lines[-2] = f"listen_http_addr = 'localhost:{new_pageserver_http_port}'"
cli_config_lines[-1] = f"listen_pg_addr = 'localhost:{new_pageserver_pg_port}'"
(env.repo_dir / 'config').write_text('\n'.join(cli_config_lines))
tenant_pg_config_file_path = pathlib.Path(tenant_pg.config_file_path())
tenant_pg_config_file_path.open('a').write(
f"\nzenith.page_server_connstring = 'postgresql://no_user:@localhost:{new_pageserver_pg_port}'"
)
tenant_pg.start()
# detach tenant from old pageserver before we check
# that all the data is there to be sure that old pageserver
# is no longer involved, and if it is, we will see the errors
pageserver_http_client.timeline_detach(tenant, UUID(timeline))
with pg_cur(tenant_pg) as cur:
# check that data is still there
cur.execute("SELECT sum(key) FROM t")
assert cur.fetchone() == (500500, )
# check that we can write new data
cur.execute("INSERT INTO t SELECT generate_series(1001,2000), 'some payload'")
cur.execute("SELECT sum(key) FROM t")
assert cur.fetchone() == (2001000, )
if with_load == 'with_load':
assert load_ok_event.wait(1)
log.info('stopping load thread')
load_stop_event.set()
load_thread.join()
log.info('load thread stopped')
# bring old pageserver back for clean shutdown via zenith cli
# new pageserver will be shut down by the context manager
cli_config_lines = (env.repo_dir / 'config').read_text().splitlines()
cli_config_lines[-2] = f"listen_http_addr = 'localhost:{env.pageserver.service_port.http}'"
cli_config_lines[-1] = f"listen_pg_addr = 'localhost:{env.pageserver.service_port.pg}'"
(env.repo_dir / 'config').write_text('\n'.join(cli_config_lines))