Compare commits

...

5 Commits

Author SHA1 Message Date
Bojan Serafimov
f7efbb2d42 WIP relocation test 2022-07-05 17:12:16 -04:00
Bojan Serafimov
0aff7c9ee9 Merge branch 'main' into projects-migration-complete 2022-07-05 16:24:29 -04:00
Bojan Serafimov
4a2a55d9b2 Add standalone script 2022-07-05 15:07:39 -04:00
Bojan Serafimov
99a0a5a19b Add missing empty rels 2022-07-05 09:16:55 -04:00
Anastasia Lubennikova
263a3ea5e3 Add script export_import_betwen_pageservers.py to migrate projects between pageservers 2022-07-05 15:27:31 +03:00
4 changed files with 869 additions and 13 deletions

438
scripts/add_missing_rels.py Normal file
View File

@@ -0,0 +1,438 @@
import os
import shutil
from pathlib import Path
import tempfile
from contextlib import closing
import psycopg2
import subprocess
import argparse
### utils copied from test fixtures
from typing import Any, List
from psycopg2.extensions import connection as PgConnection
import asyncpg
from typing import Any, Callable, Dict, Iterator, List, Optional, TypeVar, cast, Union, Tuple
Env = Dict[str, str]
_global_counter = 0
def global_counter() -> int:
""" A really dumb global counter.
This is useful for giving output files a unique number, so if we run the
same command multiple times we can keep their output separate.
"""
global _global_counter
_global_counter += 1
return _global_counter
def subprocess_capture(capture_dir: str, cmd: List[str], **kwargs: Any) -> str:
""" Run a process and capture its output
Output will go to files named "cmd_NNN.stdout" and "cmd_NNN.stderr"
where "cmd" is the name of the program and NNN is an incrementing
counter.
If those files already exist, we will overwrite them.
Returns basepath for files with captured output.
"""
assert type(cmd) is list
base = os.path.basename(cmd[0]) + '_{}'.format(global_counter())
basepath = os.path.join(capture_dir, base)
stdout_filename = basepath + '.stdout'
stderr_filename = basepath + '.stderr'
with open(stdout_filename, 'w') as stdout_f:
with open(stderr_filename, 'w') as stderr_f:
print('(capturing output to "{}.stdout")'.format(base))
subprocess.run(cmd, **kwargs, stdout=stdout_f, stderr=stderr_f)
return basepath
class PgBin:
""" A helper class for executing postgres binaries """
def __init__(self, log_dir: Path, pg_distrib_dir):
self.log_dir = log_dir
self.pg_bin_path = os.path.join(str(pg_distrib_dir), 'bin')
self.env = os.environ.copy()
self.env['LD_LIBRARY_PATH'] = os.path.join(str(pg_distrib_dir), 'lib')
def _fixpath(self, command: List[str]):
if '/' not in command[0]:
command[0] = os.path.join(self.pg_bin_path, command[0])
def _build_env(self, env_add: Optional[Env]) -> Env:
if env_add is None:
return self.env
env = self.env.copy()
env.update(env_add)
return env
def run(self, command: List[str], env: Optional[Env] = None, cwd: Optional[str] = None):
"""
Run one of the postgres binaries.
The command should be in list form, e.g. ['pgbench', '-p', '55432']
All the necessary environment variables will be set.
If the first argument (the command name) doesn't include a path (no '/'
characters present), then it will be edited to include the correct path.
If you want stdout/stderr captured to files, use `run_capture` instead.
"""
self._fixpath(command)
print('Running command "{}"'.format(' '.join(command)))
env = self._build_env(env)
subprocess.run(command, env=env, cwd=cwd, check=True)
def run_capture(self,
command: List[str],
env: Optional[Env] = None,
cwd: Optional[str] = None,
**kwargs: Any) -> str:
"""
Run one of the postgres binaries, with stderr and stdout redirected to a file.
This is just like `run`, but for chatty programs. Returns basepath for files
with captured output.
"""
self._fixpath(command)
print('Running command "{}"'.format(' '.join(command)))
env = self._build_env(env)
return subprocess_capture(str(self.log_dir),
command,
env=env,
cwd=cwd,
check=True,
**kwargs)
class PgProtocol:
""" Reusable connection logic """
def __init__(self, **kwargs):
self.default_options = kwargs
def connstr(self, **kwargs) -> str:
"""
Build a libpq connection string for the Postgres instance.
"""
return str(make_dsn(**self.conn_options(**kwargs)))
def conn_options(self, **kwargs):
conn_options = self.default_options.copy()
if 'dsn' in kwargs:
conn_options.update(parse_dsn(kwargs['dsn']))
conn_options.update(kwargs)
# Individual statement timeout in seconds. 2 minutes should be
# enough for our tests, but if you need a longer, you can
# change it by calling "SET statement_timeout" after
# connecting.
if 'options' in conn_options:
conn_options['options'] = f"-cstatement_timeout=120s " + conn_options['options']
else:
conn_options['options'] = "-cstatement_timeout=120s"
return conn_options
# autocommit=True here by default because that's what we need most of the time
def connect(self, autocommit=True, **kwargs) -> PgConnection:
"""
Connect to the node.
Returns psycopg2's connection object.
This method passes all extra params to connstr.
"""
conn = psycopg2.connect(**self.conn_options(**kwargs))
# WARNING: this setting affects *all* tests!
conn.autocommit = autocommit
return conn
async def connect_async(self, **kwargs) -> asyncpg.Connection:
"""
Connect to the node from async python.
Returns asyncpg's connection object.
"""
# asyncpg takes slightly different options than psycopg2. Try
# to convert the defaults from the psycopg2 format.
# The psycopg2 option 'dbname' is called 'database' is asyncpg
conn_options = self.conn_options(**kwargs)
if 'dbname' in conn_options:
conn_options['database'] = conn_options.pop('dbname')
# Convert options='-c<key>=<val>' to server_settings
if 'options' in conn_options:
options = conn_options.pop('options')
for match in re.finditer('-c(\w*)=(\w*)', options):
key = match.group(1)
val = match.group(2)
if 'server_options' in conn_options:
conn_options['server_settings'].update({key: val})
else:
conn_options['server_settings'] = {key: val}
return await asyncpg.connect(**conn_options)
def safe_psql(self, query: str, **kwargs: Any) -> List[Tuple[Any, ...]]:
"""
Execute query against the node and return all rows.
This method passes all extra params to connstr.
"""
return self.safe_psql_many([query], **kwargs)[0]
def safe_psql_many(self, queries: List[str], **kwargs: Any) -> List[List[Tuple[Any, ...]]]:
"""
Execute queries against the node and return all rows.
This method passes all extra params to connstr.
"""
result: List[List[Any]] = []
with closing(self.connect(**kwargs)) as conn:
with conn.cursor() as cur:
for query in queries:
print(f"Executing query: {query}")
cur.execute(query)
if cur.description is None:
result.append([]) # query didn't return data
else:
result.append(cast(List[Any], cur.fetchall()))
return result
class VanillaPostgres(PgProtocol):
def __init__(self, pgdatadir: Path, pg_bin: PgBin, port: int, init=True):
super().__init__(host='localhost', port=port, dbname='postgres')
self.pgdatadir = pgdatadir
self.pg_bin = pg_bin
self.running = False
if init:
self.pg_bin.run_capture(['initdb', '-D', str(pgdatadir)])
self.configure([f"port = {port}\n"])
def configure(self, options: List[str]):
"""Append lines into postgresql.conf file."""
assert not self.running
with open(os.path.join(self.pgdatadir, 'postgresql.conf'), 'a') as conf_file:
conf_file.write("\n".join(options))
def start(self, log_path: Optional[str] = None):
assert not self.running
self.running = True
if log_path is None:
log_path = os.path.join(self.pgdatadir, "pg.log")
self.pg_bin.run_capture(
['pg_ctl', '-w', '-D', str(self.pgdatadir), '-l', log_path, 'start'])
def stop(self):
assert self.running
self.running = False
self.pg_bin.run_capture(['pg_ctl', '-w', '-D', str(self.pgdatadir), 'stop'])
def get_subdir_size(self, subdir) -> int:
"""Return size of pgdatadir subdirectory in bytes."""
return get_dir_size(os.path.join(self.pgdatadir, subdir))
def __enter__(self):
return self
def __exit__(self, exc_type, exc, tb):
if self.running:
self.stop()
### actual code
def get_rel_paths(log_dir, pg_bin, base_tar):
"""Yeild list of relation paths"""
with tempfile.TemporaryDirectory() as restored_dir:
# Unpack the base tar
subprocess_capture(log_dir, ["tar", "-xf", base_tar, "-C", restored_dir])
port = "55439" # Probably free
with VanillaPostgres(restored_dir, pg_bin, port, init=False) as vanilla_pg:
vanilla_pg.configure([f"port={port}"])
vanilla_pg.start()
# Create database based on template0 because we can't connect to template0
query = "create database template0copy template template0"
vanilla_pg.safe_psql(query, user="cloud_admin")
vanilla_pg.safe_psql("CHECKPOINT", user="cloud_admin")
# Get all databases
query = "select oid, datname from pg_database"
oid_dbname_pairs = vanilla_pg.safe_psql(query, user="cloud_admin")
template0_oid = [
oid
for (oid, database) in oid_dbname_pairs
if database == "template0"
][0]
# Get rel paths for each database
for oid, database in oid_dbname_pairs:
if database == "template0":
# We can't connect to template0
continue
query = "select relname, pg_relation_filepath(oid) from pg_class"
result = vanilla_pg.safe_psql(query, user="cloud_admin", dbname=database)
for relname, filepath in result:
if filepath is not None:
if database == "template0copy":
# Add all template0copy paths to template0
prefix = f"base/{oid}/"
if filepath.startswith(prefix):
suffix = filepath[len(prefix):]
yield f"base/{template0_oid}/{suffix}"
elif filepath.startswith("global"):
print(f"skipping {database} global file {filepath}")
else:
raise AssertionError
else:
yield filepath
def pack_base(log_dir, restored_dir, output_tar):
tmp_tar_name = "tmp.tar"
tmp_tar_path = os.path.join(restored_dir, tmp_tar_name)
cmd = ["tar", "-cf", tmp_tar_name] + os.listdir(restored_dir)
subprocess_capture(log_dir, cmd, cwd=restored_dir)
shutil.move(tmp_tar_path, output_tar)
def get_files_in_tar(log_dir, tar):
with tempfile.TemporaryDirectory() as restored_dir:
# Unpack the base tar
subprocess_capture(log_dir, ["tar", "-xf", tar, "-C", restored_dir])
# Find empty files
empty_files = []
for root, dirs, files in os.walk(restored_dir):
for name in files:
file_path = os.path.join(root, name)
yield file_path[len(restored_dir) + 1:]
def corrupt(log_dir, base_tar, output_tar):
"""Remove all empty files and repackage. Return paths of files removed."""
with tempfile.TemporaryDirectory() as restored_dir:
# Unpack the base tar
subprocess_capture(log_dir, ["tar", "-xf", base_tar, "-C", restored_dir])
# Find empty files
empty_files = []
for root, dirs, files in os.walk(restored_dir):
for name in files:
file_path = os.path.join(root, name)
file_size = os.path.getsize(file_path)
if file_size == 0:
empty_files.append(file_path)
# Delete empty files (just to see if they get recreated)
for empty_file in empty_files:
os.remove(empty_file)
# Repackage
pack_base(log_dir, restored_dir, output_tar)
# Return relative paths
return {
empty_file[len(restored_dir) + 1:]
for empty_file in empty_files
}
def touch_missing_rels(log_dir, corrupt_tar, output_tar, paths):
with tempfile.TemporaryDirectory() as restored_dir:
# Unpack the base tar
subprocess_capture(log_dir, ["tar", "-xf", corrupt_tar, "-C", restored_dir])
# Touch files that don't exist
for path in paths:
absolute_path = os.path.join(restored_dir, path)
exists = os.path.exists(absolute_path)
if not exists:
print("File {absolute_path} didn't exist. Creating..")
Path(absolute_path).touch()
# Repackage
pack_base(log_dir, restored_dir, output_tar)
# TODO this test is not currently called. It needs any ordinary base.tar path as input
def test_add_missing_rels(base_tar):
output_tar = base_tar + ".fixed"
# Create new base tar with missing empty files
corrupt_tar = os.path.join(test_output_dir, "psql_2-corrupted.stdout")
deleted_files = corrupt(test_output_dir, base_tar, corrupt_tar)
assert len(set(get_files_in_tar(test_output_dir, base_tar)) -
set(get_files_in_tar(test_output_dir, corrupt_tar))) > 0
# Reconstruct paths from the corrupted tar, assert it covers everything important
reconstructed_paths = set(get_rel_paths(test_output_dir, pg_bin, corrupt_tar))
paths_missed = deleted_files - reconstructed_paths
assert paths_missed.issubset({
"postgresql.auto.conf",
"pg_ident.conf",
})
# Recreate the correct tar by touching files, compare with original tar
touch_missing_rels(test_output_dir, corrupt_tar, output_tar, reconstructed_paths)
paths_missed = (set(get_files_in_tar(test_output_dir, base_tar)) -
set(get_files_in_tar(test_output_dir, output_tar)))
assert paths_missed.issubset({
"postgresql.auto.conf",
"pg_ident.conf",
})
# Example command:
# poetry run python scripts/add_missing_rels.py \
# --base-tar /home/bojan/src/neondatabase/neon/test_output/test_import_from_pageserver/psql_2.stdout \
# --output-tar output-base.tar \
# --log-dir /home/bojan/tmp
# --pg-distrib-dir /home/bojan/src/neondatabase/neon/tmp_install/
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument(
'--base-tar',
dest='base_tar',
required=True,
help='base.tar file to add missing rels to (file will not be modified)',
)
parser.add_argument(
'--output-tar',
dest='output_tar',
required=True,
help='path and name for the output base.tar file',
)
parser.add_argument(
'--log-dir',
dest='log_dir',
required=True,
help='directory to save log files in',
)
parser.add_argument(
'--pg-distrib-dir',
dest='pg_distrib_dir',
required=True,
help='directory where postgres is installed',
)
args = parser.parse_args()
base_tar = args.base_tar
output_tar = args.output_tar
log_dir = args.log_dir
pg_bin = PgBin(log_dir, args.pg_distrib_dir)
reconstructed_paths = set(get_rel_paths(log_dir, pg_bin, base_tar))
touch_missing_rels(log_dir, base_tar, output_tar, reconstructed_paths)

View File

@@ -0,0 +1,232 @@
#
# Simple script to export nodes from one pageserver
# and import them into another page server
#
from os import path
import os
import requests
import uuid
import subprocess
import argparse
from pathlib import Path
# directory to save exported tar files to
basepath = path.dirname(path.abspath(__file__))
class NeonPageserverApiException(Exception):
pass
class NeonPageserverHttpClient(requests.Session):
def __init__(self, host, port):
super().__init__()
self.host = host
self.port = port
def verbose_error(self, res: requests.Response):
try:
res.raise_for_status()
except requests.RequestException as e:
try:
msg = res.json()['msg']
except:
msg = ''
raise NeonPageserverApiException(msg) from e
def check_status(self):
self.get(f"http://{self.host}:{self.port}/v1/status").raise_for_status()
def tenant_list(self):
res = self.get(f"http://{self.host}:{self.port}/v1/tenant")
self.verbose_error(res)
res_json = res.json()
assert isinstance(res_json, list)
return res_json
def tenant_create(self, new_tenant_id: uuid.UUID, ok_if_exists):
res = self.post(
f"http://{self.host}:{self.port}/v1/tenant",
json={
'new_tenant_id': new_tenant_id.hex,
},
)
if res.status_code == 409:
if ok_if_exists:
print(f'could not create tenant: already exists for id {new_tenant_id}')
else:
res.raise_for_status()
elif res.status_code == 201:
print(f'created tenant {new_tenant_id}')
else:
self.verbose_error(res)
return new_tenant_id
def timeline_list(self, tenant_id: uuid.UUID):
res = self.get(f"http://{self.host}:{self.port}/v1/tenant/{tenant_id.hex}/timeline")
self.verbose_error(res)
res_json = res.json()
assert isinstance(res_json, list)
return res_json
import pytest
import os
def add_missing_empty_rels(base_tar, output_tar):
os.environ['INPUT_BASE_TAR'] = base_tar
os.environ['OUTPUT_BASE_TAR'] = output_tar
pytest.main(["-s", "-k", "test_main_hack"])
def main(args: argparse.Namespace):
old_pageserver_host = args.old_pageserver_host
new_pageserver_host = args.new_pageserver_host
tenants = args.tenants
old_http_client = NeonPageserverHttpClient(old_pageserver_host, args.old_pageserver_http_port)
old_http_client.check_status()
old_pageserver_connstr = f"postgresql://{old_pageserver_host}:{args.old_pageserver_pg_port}"
new_http_client = NeonPageserverHttpClient(new_pageserver_host, args.new_pageserver_http_port)
new_http_client.check_status()
new_pageserver_connstr = f"postgresql://{new_pageserver_host}:{args.new_pageserver_pg_port}"
psql_env = {**os.environ, 'LD_LIBRARY_PATH': '/usr/local/lib/'}
for tenant_id in tenants:
print(f"Tenant: {tenant_id}")
timelines = old_http_client.timeline_list(uuid.UUID(tenant_id))
print(f"Timelines: {timelines}")
# Create tenant in new pageserver
if args.only_import is False:
new_http_client.tenant_create(uuid.UUID(tenant_id), args.ok_if_exists)
for timeline in timelines:
# Export timelines from old pageserver
if args.only_import is False:
query = f"fullbackup {timeline['tenant_id']} {timeline['timeline_id']} {timeline['local']['last_record_lsn']}"
cmd = [args.psql_path, "--no-psqlrc", old_pageserver_connstr, "-c", query]
print(f"Running: {cmd}")
tar_filename = path.join(basepath,
f"{timeline['tenant_id']}_{timeline['timeline_id']}.tar")
stderr_filename = path.join(
basepath, f"{timeline['tenant_id']}_{timeline['timeline_id']}.stderr")
with open(tar_filename, 'w') as stdout_f:
with open(stderr_filename, 'w') as stderr_f:
print(f"(capturing output to {tar_filename})")
subprocess.run(cmd, stdout=stdout_f, stderr=stderr_f, env=psql_env)
# add_missing_emtpy_rels(incomplete_tar_filename, tar_filename)
print(f"Done export: {tar_filename}")
# Import timelines to new pageserver
psql_path = Path(args.psql_path)
import_cmd = f"import basebackup {timeline['tenant_id']} {timeline['timeline_id']} {timeline['local']['last_record_lsn']} {timeline['local']['last_record_lsn']}"
tar_filename = path.join(basepath,
f"{timeline['tenant_id']}_{timeline['timeline_id']}.tar")
full_cmd = rf"""cat {tar_filename} | {psql_path} {new_pageserver_connstr} -c '{import_cmd}' """
stderr_filename2 = path.join(
basepath, f"import_{timeline['tenant_id']}_{timeline['timeline_id']}.stderr")
stdout_filename = path.join(
basepath, f"import_{timeline['tenant_id']}_{timeline['timeline_id']}.stdout")
print(f"Running: {full_cmd}")
with open(stdout_filename, 'w') as stdout_f:
with open(stderr_filename2, 'w') as stderr_f:
print(f"(capturing output to {stdout_filename})")
subprocess.run(full_cmd,
stdout=stdout_f,
stderr=stderr_f,
env=psql_env,
shell=True)
print(f"Done import")
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument(
'--tenant-id',
dest='tenants',
required=True,
nargs='+',
help='Id of the tenant to migrate. You can pass multiple arguments',
)
parser.add_argument(
'--from-host',
dest='old_pageserver_host',
required=True,
help='Host of the pageserver to migrate data from',
)
parser.add_argument(
'--from-http-port',
dest='old_pageserver_http_port',
required=False,
type=int,
default=9898,
help='HTTP port of the pageserver to migrate data from. Default: 9898',
)
parser.add_argument(
'--from-pg-port',
dest='old_pageserver_pg_port',
required=False,
type=int,
default=6400,
help='pg port of the pageserver to migrate data from. Default: 6400',
)
parser.add_argument(
'--to-host',
dest='new_pageserver_host',
required=True,
help='Host of the pageserver to migrate data to',
)
parser.add_argument(
'--to-http-port',
dest='new_pageserver_http_port',
required=False,
default=9898,
type=int,
help='HTTP port of the pageserver to migrate data to. Default: 9898',
)
parser.add_argument(
'--to-pg-port',
dest='new_pageserver_pg_port',
required=False,
default=6400,
type=int,
help='pg port of the pageserver to migrate data to. Default: 6400',
)
parser.add_argument(
'--ignore-tenant-exists',
dest='ok_if_exists',
required=False,
help=
'Ignore error if we are trying to create the tenant that already exists. It can be dangerous if existing tenant already contains some data.',
)
parser.add_argument(
'--psql-path',
dest='psql_path',
required=False,
default='/usr/local/bin/psql',
help='Path to the psql binary. Default: /usr/local/bin/psql',
)
parser.add_argument(
'--only-import',
dest='only_import',
required=False,
default=False,
action='store_true',
help='Skip export and tenant creation part',
)
args = parser.parse_args()
main(args)

View File

@@ -0,0 +1,167 @@
from fixtures.neon_fixtures import VanillaPostgres
from fixtures.utils import subprocess_capture
import os
import shutil
from pathlib import Path
import tempfile
def get_rel_paths(log_dir, pg_bin, base_tar):
"""Yeild list of relation paths"""
with tempfile.TemporaryDirectory() as restored_dir:
# Unpack the base tar
subprocess_capture(log_dir, ["tar", "-xf", base_tar, "-C", restored_dir])
port = "55439" # Probably free
with VanillaPostgres(restored_dir, pg_bin, port, init=False) as vanilla_pg:
vanilla_pg.configure([f"port={port}"])
vanilla_pg.start()
# Create database based on template0 because we can't connect to template0
query = "create database template0copy template template0"
vanilla_pg.safe_psql(query, user="cloud_admin")
vanilla_pg.safe_psql("CHECKPOINT", user="cloud_admin")
# Get all databases
query = "select oid, datname from pg_database"
oid_dbname_pairs = vanilla_pg.safe_psql(query, user="cloud_admin")
template0_oid = [
oid
for (oid, database) in oid_dbname_pairs
if database == "template0"
][0]
# Get rel paths for each database
for oid, database in oid_dbname_pairs:
if database == "template0":
# We can't connect to template0
continue
query = "select relname, pg_relation_filepath(oid) from pg_class"
result = vanilla_pg.safe_psql(query, user="cloud_admin", dbname=database)
for relname, filepath in result:
if filepath is not None:
if database == "template0copy":
# Add all template0copy paths to template0
prefix = f"base/{oid}/"
if filepath.startswith(prefix):
suffix = filepath[len(prefix):]
yield f"base/{template0_oid}/{suffix}"
elif filepath.startswith("global"):
print(f"skipping {database} global file {filepath}")
else:
raise AssertionError
else:
yield filepath
def pack_base(log_dir, restored_dir, output_tar):
tmp_tar_name = "tmp.tar"
tmp_tar_path = os.path.join(restored_dir, tmp_tar_name)
cmd = ["tar", "-cf", tmp_tar_name] + os.listdir(restored_dir)
subprocess_capture(log_dir, cmd, cwd=restored_dir)
shutil.move(tmp_tar_path, output_tar)
def get_files_in_tar(log_dir, tar):
with tempfile.TemporaryDirectory() as restored_dir:
# Unpack the base tar
subprocess_capture(log_dir, ["tar", "-xf", tar, "-C", restored_dir])
# Find empty files
empty_files = []
for root, dirs, files in os.walk(restored_dir):
for name in files:
file_path = os.path.join(root, name)
yield file_path[len(restored_dir) + 1:]
def corrupt(log_dir, base_tar, output_tar):
"""Remove all empty files and repackage. Return paths of files removed."""
with tempfile.TemporaryDirectory() as restored_dir:
# Unpack the base tar
subprocess_capture(log_dir, ["tar", "-xf", base_tar, "-C", restored_dir])
# Find empty files
empty_files = []
for root, dirs, files in os.walk(restored_dir):
for name in files:
file_path = os.path.join(root, name)
file_size = os.path.getsize(file_path)
if file_size == 0:
empty_files.append(file_path)
# Delete empty files (just to see if they get recreated)
for empty_file in empty_files:
os.remove(empty_file)
# Repackage
pack_base(log_dir, restored_dir, output_tar)
# Return relative paths
return {
empty_file[len(restored_dir) + 1:]
for empty_file in empty_files
}
def touch_missing_rels(log_dir, corrupt_tar, output_tar, paths):
with tempfile.TemporaryDirectory() as restored_dir:
# Unpack the base tar
subprocess_capture(log_dir, ["tar", "-xf", corrupt_tar, "-C", restored_dir])
# Touch files that don't exist
for path in paths:
absolute_path = os.path.join(restored_dir, path)
exists = os.path.exists(absolute_path)
if not exists:
print("File {absolute_path} didn't exist. Creating..")
Path(absolute_path).touch()
# Repackage
pack_base(log_dir, restored_dir, output_tar)
def test_complete(test_output_dir, pg_bin):
# Specify directories
# TODO make a basebackup instead of using one from another test
work_dir = "/home/bojan/src/neondatabase/neon/test_output/test_import_from_pageserver/"
base_tar = os.path.join(work_dir, "psql_2.stdout")
output_tar = os.path.join(work_dir, "psql_2-completed.stdout")
# Create new base tar with missing empty files
corrupt_tar = os.path.join(test_output_dir, "psql_2-corrupted.stdout")
deleted_files = corrupt(test_output_dir, base_tar, corrupt_tar)
assert len(set(get_files_in_tar(test_output_dir, base_tar)) -
set(get_files_in_tar(test_output_dir, corrupt_tar))) > 0
# Reconstruct paths from the corrupted tar, assert it covers everything important
reconstructed_paths = set(get_rel_paths(test_output_dir, pg_bin, corrupt_tar))
paths_missed = deleted_files - reconstructed_paths
assert paths_missed.issubset({
"postgresql.auto.conf",
"pg_ident.conf",
})
# Recreate the correct tar by touching files, compare with original tar
touch_missing_rels(test_output_dir, corrupt_tar, output_tar, reconstructed_paths)
paths_missed = (set(get_files_in_tar(test_output_dir, base_tar)) -
set(get_files_in_tar(test_output_dir, output_tar)))
assert paths_missed.issubset({
"postgresql.auto.conf",
"pg_ident.conf",
})
# HACK this script relies on test fixtures, but you can run it with
# poetry run pytest -k test_main_hack and pass inputs via envvars
#
# The script takes a base tar, infers what empty rel files might be missing
# and creates a new base tar with those files included. It does not modify
# the original file.
def test_main_hack(test_output_dir, pg_bin, pytestconfig):
base_tar = os.environ['INPUT_BASE_TAR']
output_tar = os.environ['OUTPUT_BASE_TAR']
reconstructed_paths = set(get_rel_paths(test_output_dir, pg_bin, base_tar))
touch_missing_rels(test_output_dir, base_tar, output_tar, reconstructed_paths)

View File

@@ -11,7 +11,7 @@ import signal
import pytest
from fixtures.neon_fixtures import PgProtocol, PortDistributor, Postgres, NeonEnvBuilder, Etcd, NeonPageserverHttpClient, assert_local, wait_until, wait_for_last_record_lsn, wait_for_upload, neon_binpath, pg_distrib_dir
from fixtures.utils import lsn_from_hex
from fixtures.utils import lsn_from_hex, subprocess_capture
def assert_abs_margin_ratio(a: float, b: float, margin_ratio: float):
@@ -184,19 +184,38 @@ def test_tenant_relocation(neon_env_builder: NeonEnvBuilder,
new_pageserver_http_port,
neon_env_builder.broker):
# call to attach timeline to new pageserver
new_pageserver_http.timeline_attach(tenant, timeline)
# new pageserver should be in sync (modulo wal tail or vacuum activity) with the old one because there was no new writes since checkpoint
new_timeline_detail = wait_until(
number_of_iterations=5,
interval=1,
func=lambda: assert_local(new_pageserver_http, tenant, timeline))
# Migrate either by attacking from s3 or import/export basebackup
relocation_method = "import"
if relocation_method == "import":
scripts_dir = "/home/bojan/src/neondatabase/neon/scripts/"
cmd = [
"python",
os.path.join(scripts_dir, "export_import_betwen_pageservers.py"),
"--tenant-id", tenant.hex,
"--from-host", "localhost",
"--from-http-port", str(pageserver_http.port),
"--from-pg-port", str(env.pageserver.service_port.pg),
"--to-host", "localhost",
"--to-http-port", str(new_pageserver_http_port),
"--to-pg-port", str(new_pageserver_pg_port),
"--psql-path", os.path.join(pg_distrib_dir, "bin", "psql"),
]
subprocess_capture(env.repo_dir, cmd, check=True)
elif relocation_method == "attach":
# call to attach timeline to new pageserver
new_pageserver_http.timeline_attach(tenant, timeline)
# when load is active these checks can break because lsns are not static
# so lets check with some margin
assert_abs_margin_ratio(lsn_from_hex(new_timeline_detail['local']['disk_consistent_lsn']),
lsn_from_hex(timeline_detail['local']['disk_consistent_lsn']),
0.03)
# new pageserver should be in sync (modulo wal tail or vacuum activity) with the old one because there was no new writes since checkpoint
new_timeline_detail = wait_until(
number_of_iterations=5,
interval=1,
func=lambda: assert_local(new_pageserver_http, tenant, timeline))
# when load is active these checks can break because lsns are not static
# so lets check with some margin
assert_abs_margin_ratio(lsn_from_hex(new_timeline_detail['local']['disk_consistent_lsn']),
lsn_from_hex(timeline_detail['local']['disk_consistent_lsn']),
0.03)
tenant_pg.stop()