mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-17 10:22:56 +00:00
Compare commits
5 Commits
layer_map_
...
projects-m
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
f7efbb2d42 | ||
|
|
0aff7c9ee9 | ||
|
|
4a2a55d9b2 | ||
|
|
99a0a5a19b | ||
|
|
263a3ea5e3 |
438
scripts/add_missing_rels.py
Normal file
438
scripts/add_missing_rels.py
Normal file
@@ -0,0 +1,438 @@
|
||||
import os
|
||||
import shutil
|
||||
from pathlib import Path
|
||||
import tempfile
|
||||
from contextlib import closing
|
||||
import psycopg2
|
||||
import subprocess
|
||||
import argparse
|
||||
|
||||
### utils copied from test fixtures
|
||||
from typing import Any, List
|
||||
from psycopg2.extensions import connection as PgConnection
|
||||
import asyncpg
|
||||
from typing import Any, Callable, Dict, Iterator, List, Optional, TypeVar, cast, Union, Tuple
|
||||
|
||||
Env = Dict[str, str]
|
||||
|
||||
_global_counter = 0
|
||||
|
||||
|
||||
def global_counter() -> int:
|
||||
""" A really dumb global counter.
|
||||
|
||||
This is useful for giving output files a unique number, so if we run the
|
||||
same command multiple times we can keep their output separate.
|
||||
"""
|
||||
global _global_counter
|
||||
_global_counter += 1
|
||||
return _global_counter
|
||||
|
||||
|
||||
def subprocess_capture(capture_dir: str, cmd: List[str], **kwargs: Any) -> str:
|
||||
""" Run a process and capture its output
|
||||
|
||||
Output will go to files named "cmd_NNN.stdout" and "cmd_NNN.stderr"
|
||||
where "cmd" is the name of the program and NNN is an incrementing
|
||||
counter.
|
||||
|
||||
If those files already exist, we will overwrite them.
|
||||
Returns basepath for files with captured output.
|
||||
"""
|
||||
assert type(cmd) is list
|
||||
base = os.path.basename(cmd[0]) + '_{}'.format(global_counter())
|
||||
basepath = os.path.join(capture_dir, base)
|
||||
stdout_filename = basepath + '.stdout'
|
||||
stderr_filename = basepath + '.stderr'
|
||||
|
||||
with open(stdout_filename, 'w') as stdout_f:
|
||||
with open(stderr_filename, 'w') as stderr_f:
|
||||
print('(capturing output to "{}.stdout")'.format(base))
|
||||
subprocess.run(cmd, **kwargs, stdout=stdout_f, stderr=stderr_f)
|
||||
|
||||
return basepath
|
||||
|
||||
|
||||
class PgBin:
|
||||
""" A helper class for executing postgres binaries """
|
||||
def __init__(self, log_dir: Path, pg_distrib_dir):
|
||||
self.log_dir = log_dir
|
||||
self.pg_bin_path = os.path.join(str(pg_distrib_dir), 'bin')
|
||||
self.env = os.environ.copy()
|
||||
self.env['LD_LIBRARY_PATH'] = os.path.join(str(pg_distrib_dir), 'lib')
|
||||
|
||||
def _fixpath(self, command: List[str]):
|
||||
if '/' not in command[0]:
|
||||
command[0] = os.path.join(self.pg_bin_path, command[0])
|
||||
|
||||
def _build_env(self, env_add: Optional[Env]) -> Env:
|
||||
if env_add is None:
|
||||
return self.env
|
||||
env = self.env.copy()
|
||||
env.update(env_add)
|
||||
return env
|
||||
|
||||
def run(self, command: List[str], env: Optional[Env] = None, cwd: Optional[str] = None):
|
||||
"""
|
||||
Run one of the postgres binaries.
|
||||
|
||||
The command should be in list form, e.g. ['pgbench', '-p', '55432']
|
||||
|
||||
All the necessary environment variables will be set.
|
||||
|
||||
If the first argument (the command name) doesn't include a path (no '/'
|
||||
characters present), then it will be edited to include the correct path.
|
||||
|
||||
If you want stdout/stderr captured to files, use `run_capture` instead.
|
||||
"""
|
||||
|
||||
self._fixpath(command)
|
||||
print('Running command "{}"'.format(' '.join(command)))
|
||||
env = self._build_env(env)
|
||||
subprocess.run(command, env=env, cwd=cwd, check=True)
|
||||
|
||||
def run_capture(self,
|
||||
command: List[str],
|
||||
env: Optional[Env] = None,
|
||||
cwd: Optional[str] = None,
|
||||
**kwargs: Any) -> str:
|
||||
"""
|
||||
Run one of the postgres binaries, with stderr and stdout redirected to a file.
|
||||
|
||||
This is just like `run`, but for chatty programs. Returns basepath for files
|
||||
with captured output.
|
||||
"""
|
||||
|
||||
self._fixpath(command)
|
||||
print('Running command "{}"'.format(' '.join(command)))
|
||||
env = self._build_env(env)
|
||||
return subprocess_capture(str(self.log_dir),
|
||||
command,
|
||||
env=env,
|
||||
cwd=cwd,
|
||||
check=True,
|
||||
**kwargs)
|
||||
|
||||
class PgProtocol:
|
||||
""" Reusable connection logic """
|
||||
def __init__(self, **kwargs):
|
||||
self.default_options = kwargs
|
||||
|
||||
def connstr(self, **kwargs) -> str:
|
||||
"""
|
||||
Build a libpq connection string for the Postgres instance.
|
||||
"""
|
||||
return str(make_dsn(**self.conn_options(**kwargs)))
|
||||
|
||||
def conn_options(self, **kwargs):
|
||||
conn_options = self.default_options.copy()
|
||||
if 'dsn' in kwargs:
|
||||
conn_options.update(parse_dsn(kwargs['dsn']))
|
||||
conn_options.update(kwargs)
|
||||
|
||||
# Individual statement timeout in seconds. 2 minutes should be
|
||||
# enough for our tests, but if you need a longer, you can
|
||||
# change it by calling "SET statement_timeout" after
|
||||
# connecting.
|
||||
if 'options' in conn_options:
|
||||
conn_options['options'] = f"-cstatement_timeout=120s " + conn_options['options']
|
||||
else:
|
||||
conn_options['options'] = "-cstatement_timeout=120s"
|
||||
return conn_options
|
||||
|
||||
# autocommit=True here by default because that's what we need most of the time
|
||||
def connect(self, autocommit=True, **kwargs) -> PgConnection:
|
||||
"""
|
||||
Connect to the node.
|
||||
Returns psycopg2's connection object.
|
||||
This method passes all extra params to connstr.
|
||||
"""
|
||||
conn = psycopg2.connect(**self.conn_options(**kwargs))
|
||||
|
||||
# WARNING: this setting affects *all* tests!
|
||||
conn.autocommit = autocommit
|
||||
return conn
|
||||
|
||||
async def connect_async(self, **kwargs) -> asyncpg.Connection:
|
||||
"""
|
||||
Connect to the node from async python.
|
||||
Returns asyncpg's connection object.
|
||||
"""
|
||||
|
||||
# asyncpg takes slightly different options than psycopg2. Try
|
||||
# to convert the defaults from the psycopg2 format.
|
||||
|
||||
# The psycopg2 option 'dbname' is called 'database' is asyncpg
|
||||
conn_options = self.conn_options(**kwargs)
|
||||
if 'dbname' in conn_options:
|
||||
conn_options['database'] = conn_options.pop('dbname')
|
||||
|
||||
# Convert options='-c<key>=<val>' to server_settings
|
||||
if 'options' in conn_options:
|
||||
options = conn_options.pop('options')
|
||||
for match in re.finditer('-c(\w*)=(\w*)', options):
|
||||
key = match.group(1)
|
||||
val = match.group(2)
|
||||
if 'server_options' in conn_options:
|
||||
conn_options['server_settings'].update({key: val})
|
||||
else:
|
||||
conn_options['server_settings'] = {key: val}
|
||||
return await asyncpg.connect(**conn_options)
|
||||
|
||||
def safe_psql(self, query: str, **kwargs: Any) -> List[Tuple[Any, ...]]:
|
||||
"""
|
||||
Execute query against the node and return all rows.
|
||||
This method passes all extra params to connstr.
|
||||
"""
|
||||
return self.safe_psql_many([query], **kwargs)[0]
|
||||
|
||||
def safe_psql_many(self, queries: List[str], **kwargs: Any) -> List[List[Tuple[Any, ...]]]:
|
||||
"""
|
||||
Execute queries against the node and return all rows.
|
||||
This method passes all extra params to connstr.
|
||||
"""
|
||||
result: List[List[Any]] = []
|
||||
with closing(self.connect(**kwargs)) as conn:
|
||||
with conn.cursor() as cur:
|
||||
for query in queries:
|
||||
print(f"Executing query: {query}")
|
||||
cur.execute(query)
|
||||
|
||||
if cur.description is None:
|
||||
result.append([]) # query didn't return data
|
||||
else:
|
||||
result.append(cast(List[Any], cur.fetchall()))
|
||||
return result
|
||||
|
||||
|
||||
class VanillaPostgres(PgProtocol):
|
||||
def __init__(self, pgdatadir: Path, pg_bin: PgBin, port: int, init=True):
|
||||
super().__init__(host='localhost', port=port, dbname='postgres')
|
||||
self.pgdatadir = pgdatadir
|
||||
self.pg_bin = pg_bin
|
||||
self.running = False
|
||||
if init:
|
||||
self.pg_bin.run_capture(['initdb', '-D', str(pgdatadir)])
|
||||
self.configure([f"port = {port}\n"])
|
||||
|
||||
def configure(self, options: List[str]):
|
||||
"""Append lines into postgresql.conf file."""
|
||||
assert not self.running
|
||||
with open(os.path.join(self.pgdatadir, 'postgresql.conf'), 'a') as conf_file:
|
||||
conf_file.write("\n".join(options))
|
||||
|
||||
def start(self, log_path: Optional[str] = None):
|
||||
assert not self.running
|
||||
self.running = True
|
||||
|
||||
if log_path is None:
|
||||
log_path = os.path.join(self.pgdatadir, "pg.log")
|
||||
|
||||
self.pg_bin.run_capture(
|
||||
['pg_ctl', '-w', '-D', str(self.pgdatadir), '-l', log_path, 'start'])
|
||||
|
||||
def stop(self):
|
||||
assert self.running
|
||||
self.running = False
|
||||
self.pg_bin.run_capture(['pg_ctl', '-w', '-D', str(self.pgdatadir), 'stop'])
|
||||
|
||||
def get_subdir_size(self, subdir) -> int:
|
||||
"""Return size of pgdatadir subdirectory in bytes."""
|
||||
return get_dir_size(os.path.join(self.pgdatadir, subdir))
|
||||
|
||||
def __enter__(self):
|
||||
return self
|
||||
|
||||
def __exit__(self, exc_type, exc, tb):
|
||||
if self.running:
|
||||
self.stop()
|
||||
|
||||
|
||||
### actual code
|
||||
|
||||
|
||||
def get_rel_paths(log_dir, pg_bin, base_tar):
|
||||
"""Yeild list of relation paths"""
|
||||
with tempfile.TemporaryDirectory() as restored_dir:
|
||||
# Unpack the base tar
|
||||
subprocess_capture(log_dir, ["tar", "-xf", base_tar, "-C", restored_dir])
|
||||
|
||||
port = "55439" # Probably free
|
||||
with VanillaPostgres(restored_dir, pg_bin, port, init=False) as vanilla_pg:
|
||||
vanilla_pg.configure([f"port={port}"])
|
||||
vanilla_pg.start()
|
||||
|
||||
# Create database based on template0 because we can't connect to template0
|
||||
query = "create database template0copy template template0"
|
||||
vanilla_pg.safe_psql(query, user="cloud_admin")
|
||||
vanilla_pg.safe_psql("CHECKPOINT", user="cloud_admin")
|
||||
|
||||
# Get all databases
|
||||
query = "select oid, datname from pg_database"
|
||||
oid_dbname_pairs = vanilla_pg.safe_psql(query, user="cloud_admin")
|
||||
template0_oid = [
|
||||
oid
|
||||
for (oid, database) in oid_dbname_pairs
|
||||
if database == "template0"
|
||||
][0]
|
||||
|
||||
# Get rel paths for each database
|
||||
for oid, database in oid_dbname_pairs:
|
||||
if database == "template0":
|
||||
# We can't connect to template0
|
||||
continue
|
||||
|
||||
query = "select relname, pg_relation_filepath(oid) from pg_class"
|
||||
result = vanilla_pg.safe_psql(query, user="cloud_admin", dbname=database)
|
||||
for relname, filepath in result:
|
||||
if filepath is not None:
|
||||
|
||||
if database == "template0copy":
|
||||
# Add all template0copy paths to template0
|
||||
prefix = f"base/{oid}/"
|
||||
if filepath.startswith(prefix):
|
||||
suffix = filepath[len(prefix):]
|
||||
yield f"base/{template0_oid}/{suffix}"
|
||||
elif filepath.startswith("global"):
|
||||
print(f"skipping {database} global file {filepath}")
|
||||
else:
|
||||
raise AssertionError
|
||||
else:
|
||||
yield filepath
|
||||
|
||||
|
||||
def pack_base(log_dir, restored_dir, output_tar):
|
||||
tmp_tar_name = "tmp.tar"
|
||||
tmp_tar_path = os.path.join(restored_dir, tmp_tar_name)
|
||||
cmd = ["tar", "-cf", tmp_tar_name] + os.listdir(restored_dir)
|
||||
subprocess_capture(log_dir, cmd, cwd=restored_dir)
|
||||
shutil.move(tmp_tar_path, output_tar)
|
||||
|
||||
|
||||
def get_files_in_tar(log_dir, tar):
|
||||
with tempfile.TemporaryDirectory() as restored_dir:
|
||||
# Unpack the base tar
|
||||
subprocess_capture(log_dir, ["tar", "-xf", tar, "-C", restored_dir])
|
||||
|
||||
# Find empty files
|
||||
empty_files = []
|
||||
for root, dirs, files in os.walk(restored_dir):
|
||||
for name in files:
|
||||
file_path = os.path.join(root, name)
|
||||
yield file_path[len(restored_dir) + 1:]
|
||||
|
||||
|
||||
def corrupt(log_dir, base_tar, output_tar):
|
||||
"""Remove all empty files and repackage. Return paths of files removed."""
|
||||
with tempfile.TemporaryDirectory() as restored_dir:
|
||||
# Unpack the base tar
|
||||
subprocess_capture(log_dir, ["tar", "-xf", base_tar, "-C", restored_dir])
|
||||
|
||||
# Find empty files
|
||||
empty_files = []
|
||||
for root, dirs, files in os.walk(restored_dir):
|
||||
for name in files:
|
||||
file_path = os.path.join(root, name)
|
||||
file_size = os.path.getsize(file_path)
|
||||
if file_size == 0:
|
||||
empty_files.append(file_path)
|
||||
|
||||
# Delete empty files (just to see if they get recreated)
|
||||
for empty_file in empty_files:
|
||||
os.remove(empty_file)
|
||||
|
||||
# Repackage
|
||||
pack_base(log_dir, restored_dir, output_tar)
|
||||
|
||||
# Return relative paths
|
||||
return {
|
||||
empty_file[len(restored_dir) + 1:]
|
||||
for empty_file in empty_files
|
||||
}
|
||||
|
||||
|
||||
def touch_missing_rels(log_dir, corrupt_tar, output_tar, paths):
|
||||
with tempfile.TemporaryDirectory() as restored_dir:
|
||||
# Unpack the base tar
|
||||
subprocess_capture(log_dir, ["tar", "-xf", corrupt_tar, "-C", restored_dir])
|
||||
|
||||
# Touch files that don't exist
|
||||
for path in paths:
|
||||
absolute_path = os.path.join(restored_dir, path)
|
||||
exists = os.path.exists(absolute_path)
|
||||
if not exists:
|
||||
print("File {absolute_path} didn't exist. Creating..")
|
||||
Path(absolute_path).touch()
|
||||
|
||||
# Repackage
|
||||
pack_base(log_dir, restored_dir, output_tar)
|
||||
|
||||
|
||||
# TODO this test is not currently called. It needs any ordinary base.tar path as input
|
||||
def test_add_missing_rels(base_tar):
|
||||
output_tar = base_tar + ".fixed"
|
||||
|
||||
# Create new base tar with missing empty files
|
||||
corrupt_tar = os.path.join(test_output_dir, "psql_2-corrupted.stdout")
|
||||
deleted_files = corrupt(test_output_dir, base_tar, corrupt_tar)
|
||||
assert len(set(get_files_in_tar(test_output_dir, base_tar)) -
|
||||
set(get_files_in_tar(test_output_dir, corrupt_tar))) > 0
|
||||
|
||||
# Reconstruct paths from the corrupted tar, assert it covers everything important
|
||||
reconstructed_paths = set(get_rel_paths(test_output_dir, pg_bin, corrupt_tar))
|
||||
paths_missed = deleted_files - reconstructed_paths
|
||||
assert paths_missed.issubset({
|
||||
"postgresql.auto.conf",
|
||||
"pg_ident.conf",
|
||||
})
|
||||
|
||||
# Recreate the correct tar by touching files, compare with original tar
|
||||
touch_missing_rels(test_output_dir, corrupt_tar, output_tar, reconstructed_paths)
|
||||
paths_missed = (set(get_files_in_tar(test_output_dir, base_tar)) -
|
||||
set(get_files_in_tar(test_output_dir, output_tar)))
|
||||
assert paths_missed.issubset({
|
||||
"postgresql.auto.conf",
|
||||
"pg_ident.conf",
|
||||
})
|
||||
|
||||
|
||||
# Example command:
|
||||
# poetry run python scripts/add_missing_rels.py \
|
||||
# --base-tar /home/bojan/src/neondatabase/neon/test_output/test_import_from_pageserver/psql_2.stdout \
|
||||
# --output-tar output-base.tar \
|
||||
# --log-dir /home/bojan/tmp
|
||||
# --pg-distrib-dir /home/bojan/src/neondatabase/neon/tmp_install/
|
||||
if __name__ == '__main__':
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument(
|
||||
'--base-tar',
|
||||
dest='base_tar',
|
||||
required=True,
|
||||
help='base.tar file to add missing rels to (file will not be modified)',
|
||||
)
|
||||
parser.add_argument(
|
||||
'--output-tar',
|
||||
dest='output_tar',
|
||||
required=True,
|
||||
help='path and name for the output base.tar file',
|
||||
)
|
||||
parser.add_argument(
|
||||
'--log-dir',
|
||||
dest='log_dir',
|
||||
required=True,
|
||||
help='directory to save log files in',
|
||||
)
|
||||
parser.add_argument(
|
||||
'--pg-distrib-dir',
|
||||
dest='pg_distrib_dir',
|
||||
required=True,
|
||||
help='directory where postgres is installed',
|
||||
)
|
||||
args = parser.parse_args()
|
||||
base_tar = args.base_tar
|
||||
output_tar = args.output_tar
|
||||
log_dir = args.log_dir
|
||||
pg_bin = PgBin(log_dir, args.pg_distrib_dir)
|
||||
|
||||
reconstructed_paths = set(get_rel_paths(log_dir, pg_bin, base_tar))
|
||||
touch_missing_rels(log_dir, base_tar, output_tar, reconstructed_paths)
|
||||
232
scripts/export_import_betwen_pageservers.py
Executable file
232
scripts/export_import_betwen_pageservers.py
Executable file
@@ -0,0 +1,232 @@
|
||||
#
|
||||
# Simple script to export nodes from one pageserver
|
||||
# and import them into another page server
|
||||
#
|
||||
from os import path
|
||||
import os
|
||||
import requests
|
||||
import uuid
|
||||
import subprocess
|
||||
import argparse
|
||||
from pathlib import Path
|
||||
|
||||
# directory to save exported tar files to
|
||||
basepath = path.dirname(path.abspath(__file__))
|
||||
|
||||
|
||||
class NeonPageserverApiException(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class NeonPageserverHttpClient(requests.Session):
|
||||
def __init__(self, host, port):
|
||||
super().__init__()
|
||||
self.host = host
|
||||
self.port = port
|
||||
|
||||
def verbose_error(self, res: requests.Response):
|
||||
try:
|
||||
res.raise_for_status()
|
||||
except requests.RequestException as e:
|
||||
try:
|
||||
msg = res.json()['msg']
|
||||
except:
|
||||
msg = ''
|
||||
raise NeonPageserverApiException(msg) from e
|
||||
|
||||
def check_status(self):
|
||||
self.get(f"http://{self.host}:{self.port}/v1/status").raise_for_status()
|
||||
|
||||
def tenant_list(self):
|
||||
res = self.get(f"http://{self.host}:{self.port}/v1/tenant")
|
||||
self.verbose_error(res)
|
||||
res_json = res.json()
|
||||
assert isinstance(res_json, list)
|
||||
return res_json
|
||||
|
||||
def tenant_create(self, new_tenant_id: uuid.UUID, ok_if_exists):
|
||||
res = self.post(
|
||||
f"http://{self.host}:{self.port}/v1/tenant",
|
||||
json={
|
||||
'new_tenant_id': new_tenant_id.hex,
|
||||
},
|
||||
)
|
||||
|
||||
if res.status_code == 409:
|
||||
if ok_if_exists:
|
||||
print(f'could not create tenant: already exists for id {new_tenant_id}')
|
||||
else:
|
||||
res.raise_for_status()
|
||||
elif res.status_code == 201:
|
||||
print(f'created tenant {new_tenant_id}')
|
||||
else:
|
||||
self.verbose_error(res)
|
||||
|
||||
return new_tenant_id
|
||||
|
||||
def timeline_list(self, tenant_id: uuid.UUID):
|
||||
res = self.get(f"http://{self.host}:{self.port}/v1/tenant/{tenant_id.hex}/timeline")
|
||||
self.verbose_error(res)
|
||||
res_json = res.json()
|
||||
assert isinstance(res_json, list)
|
||||
return res_json
|
||||
|
||||
|
||||
import pytest
|
||||
import os
|
||||
def add_missing_empty_rels(base_tar, output_tar):
|
||||
os.environ['INPUT_BASE_TAR'] = base_tar
|
||||
os.environ['OUTPUT_BASE_TAR'] = output_tar
|
||||
pytest.main(["-s", "-k", "test_main_hack"])
|
||||
|
||||
|
||||
def main(args: argparse.Namespace):
|
||||
old_pageserver_host = args.old_pageserver_host
|
||||
new_pageserver_host = args.new_pageserver_host
|
||||
tenants = args.tenants
|
||||
|
||||
old_http_client = NeonPageserverHttpClient(old_pageserver_host, args.old_pageserver_http_port)
|
||||
old_http_client.check_status()
|
||||
old_pageserver_connstr = f"postgresql://{old_pageserver_host}:{args.old_pageserver_pg_port}"
|
||||
|
||||
new_http_client = NeonPageserverHttpClient(new_pageserver_host, args.new_pageserver_http_port)
|
||||
new_http_client.check_status()
|
||||
new_pageserver_connstr = f"postgresql://{new_pageserver_host}:{args.new_pageserver_pg_port}"
|
||||
|
||||
psql_env = {**os.environ, 'LD_LIBRARY_PATH': '/usr/local/lib/'}
|
||||
|
||||
for tenant_id in tenants:
|
||||
print(f"Tenant: {tenant_id}")
|
||||
timelines = old_http_client.timeline_list(uuid.UUID(tenant_id))
|
||||
print(f"Timelines: {timelines}")
|
||||
|
||||
# Create tenant in new pageserver
|
||||
if args.only_import is False:
|
||||
new_http_client.tenant_create(uuid.UUID(tenant_id), args.ok_if_exists)
|
||||
|
||||
for timeline in timelines:
|
||||
|
||||
# Export timelines from old pageserver
|
||||
if args.only_import is False:
|
||||
query = f"fullbackup {timeline['tenant_id']} {timeline['timeline_id']} {timeline['local']['last_record_lsn']}"
|
||||
|
||||
cmd = [args.psql_path, "--no-psqlrc", old_pageserver_connstr, "-c", query]
|
||||
print(f"Running: {cmd}")
|
||||
|
||||
tar_filename = path.join(basepath,
|
||||
f"{timeline['tenant_id']}_{timeline['timeline_id']}.tar")
|
||||
stderr_filename = path.join(
|
||||
basepath, f"{timeline['tenant_id']}_{timeline['timeline_id']}.stderr")
|
||||
|
||||
with open(tar_filename, 'w') as stdout_f:
|
||||
with open(stderr_filename, 'w') as stderr_f:
|
||||
print(f"(capturing output to {tar_filename})")
|
||||
subprocess.run(cmd, stdout=stdout_f, stderr=stderr_f, env=psql_env)
|
||||
|
||||
# add_missing_emtpy_rels(incomplete_tar_filename, tar_filename)
|
||||
|
||||
print(f"Done export: {tar_filename}")
|
||||
|
||||
# Import timelines to new pageserver
|
||||
psql_path = Path(args.psql_path)
|
||||
import_cmd = f"import basebackup {timeline['tenant_id']} {timeline['timeline_id']} {timeline['local']['last_record_lsn']} {timeline['local']['last_record_lsn']}"
|
||||
tar_filename = path.join(basepath,
|
||||
f"{timeline['tenant_id']}_{timeline['timeline_id']}.tar")
|
||||
full_cmd = rf"""cat {tar_filename} | {psql_path} {new_pageserver_connstr} -c '{import_cmd}' """
|
||||
|
||||
stderr_filename2 = path.join(
|
||||
basepath, f"import_{timeline['tenant_id']}_{timeline['timeline_id']}.stderr")
|
||||
stdout_filename = path.join(
|
||||
basepath, f"import_{timeline['tenant_id']}_{timeline['timeline_id']}.stdout")
|
||||
|
||||
print(f"Running: {full_cmd}")
|
||||
|
||||
with open(stdout_filename, 'w') as stdout_f:
|
||||
with open(stderr_filename2, 'w') as stderr_f:
|
||||
print(f"(capturing output to {stdout_filename})")
|
||||
subprocess.run(full_cmd,
|
||||
stdout=stdout_f,
|
||||
stderr=stderr_f,
|
||||
env=psql_env,
|
||||
shell=True)
|
||||
|
||||
print(f"Done import")
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument(
|
||||
'--tenant-id',
|
||||
dest='tenants',
|
||||
required=True,
|
||||
nargs='+',
|
||||
help='Id of the tenant to migrate. You can pass multiple arguments',
|
||||
)
|
||||
parser.add_argument(
|
||||
'--from-host',
|
||||
dest='old_pageserver_host',
|
||||
required=True,
|
||||
help='Host of the pageserver to migrate data from',
|
||||
)
|
||||
parser.add_argument(
|
||||
'--from-http-port',
|
||||
dest='old_pageserver_http_port',
|
||||
required=False,
|
||||
type=int,
|
||||
default=9898,
|
||||
help='HTTP port of the pageserver to migrate data from. Default: 9898',
|
||||
)
|
||||
parser.add_argument(
|
||||
'--from-pg-port',
|
||||
dest='old_pageserver_pg_port',
|
||||
required=False,
|
||||
type=int,
|
||||
default=6400,
|
||||
help='pg port of the pageserver to migrate data from. Default: 6400',
|
||||
)
|
||||
parser.add_argument(
|
||||
'--to-host',
|
||||
dest='new_pageserver_host',
|
||||
required=True,
|
||||
help='Host of the pageserver to migrate data to',
|
||||
)
|
||||
parser.add_argument(
|
||||
'--to-http-port',
|
||||
dest='new_pageserver_http_port',
|
||||
required=False,
|
||||
default=9898,
|
||||
type=int,
|
||||
help='HTTP port of the pageserver to migrate data to. Default: 9898',
|
||||
)
|
||||
parser.add_argument(
|
||||
'--to-pg-port',
|
||||
dest='new_pageserver_pg_port',
|
||||
required=False,
|
||||
default=6400,
|
||||
type=int,
|
||||
help='pg port of the pageserver to migrate data to. Default: 6400',
|
||||
)
|
||||
parser.add_argument(
|
||||
'--ignore-tenant-exists',
|
||||
dest='ok_if_exists',
|
||||
required=False,
|
||||
help=
|
||||
'Ignore error if we are trying to create the tenant that already exists. It can be dangerous if existing tenant already contains some data.',
|
||||
)
|
||||
parser.add_argument(
|
||||
'--psql-path',
|
||||
dest='psql_path',
|
||||
required=False,
|
||||
default='/usr/local/bin/psql',
|
||||
help='Path to the psql binary. Default: /usr/local/bin/psql',
|
||||
)
|
||||
parser.add_argument(
|
||||
'--only-import',
|
||||
dest='only_import',
|
||||
required=False,
|
||||
default=False,
|
||||
action='store_true',
|
||||
help='Skip export and tenant creation part',
|
||||
)
|
||||
args = parser.parse_args()
|
||||
main(args)
|
||||
167
test_runner/batch_others/test_complete_basebackup.py
Normal file
167
test_runner/batch_others/test_complete_basebackup.py
Normal file
@@ -0,0 +1,167 @@
|
||||
from fixtures.neon_fixtures import VanillaPostgres
|
||||
from fixtures.utils import subprocess_capture
|
||||
import os
|
||||
import shutil
|
||||
from pathlib import Path
|
||||
import tempfile
|
||||
|
||||
|
||||
def get_rel_paths(log_dir, pg_bin, base_tar):
|
||||
"""Yeild list of relation paths"""
|
||||
with tempfile.TemporaryDirectory() as restored_dir:
|
||||
# Unpack the base tar
|
||||
subprocess_capture(log_dir, ["tar", "-xf", base_tar, "-C", restored_dir])
|
||||
|
||||
port = "55439" # Probably free
|
||||
with VanillaPostgres(restored_dir, pg_bin, port, init=False) as vanilla_pg:
|
||||
vanilla_pg.configure([f"port={port}"])
|
||||
vanilla_pg.start()
|
||||
|
||||
# Create database based on template0 because we can't connect to template0
|
||||
query = "create database template0copy template template0"
|
||||
vanilla_pg.safe_psql(query, user="cloud_admin")
|
||||
vanilla_pg.safe_psql("CHECKPOINT", user="cloud_admin")
|
||||
|
||||
# Get all databases
|
||||
query = "select oid, datname from pg_database"
|
||||
oid_dbname_pairs = vanilla_pg.safe_psql(query, user="cloud_admin")
|
||||
template0_oid = [
|
||||
oid
|
||||
for (oid, database) in oid_dbname_pairs
|
||||
if database == "template0"
|
||||
][0]
|
||||
|
||||
# Get rel paths for each database
|
||||
for oid, database in oid_dbname_pairs:
|
||||
if database == "template0":
|
||||
# We can't connect to template0
|
||||
continue
|
||||
|
||||
query = "select relname, pg_relation_filepath(oid) from pg_class"
|
||||
result = vanilla_pg.safe_psql(query, user="cloud_admin", dbname=database)
|
||||
for relname, filepath in result:
|
||||
if filepath is not None:
|
||||
|
||||
if database == "template0copy":
|
||||
# Add all template0copy paths to template0
|
||||
prefix = f"base/{oid}/"
|
||||
if filepath.startswith(prefix):
|
||||
suffix = filepath[len(prefix):]
|
||||
yield f"base/{template0_oid}/{suffix}"
|
||||
elif filepath.startswith("global"):
|
||||
print(f"skipping {database} global file {filepath}")
|
||||
else:
|
||||
raise AssertionError
|
||||
else:
|
||||
yield filepath
|
||||
|
||||
|
||||
def pack_base(log_dir, restored_dir, output_tar):
|
||||
tmp_tar_name = "tmp.tar"
|
||||
tmp_tar_path = os.path.join(restored_dir, tmp_tar_name)
|
||||
cmd = ["tar", "-cf", tmp_tar_name] + os.listdir(restored_dir)
|
||||
subprocess_capture(log_dir, cmd, cwd=restored_dir)
|
||||
shutil.move(tmp_tar_path, output_tar)
|
||||
|
||||
|
||||
def get_files_in_tar(log_dir, tar):
|
||||
with tempfile.TemporaryDirectory() as restored_dir:
|
||||
# Unpack the base tar
|
||||
subprocess_capture(log_dir, ["tar", "-xf", tar, "-C", restored_dir])
|
||||
|
||||
# Find empty files
|
||||
empty_files = []
|
||||
for root, dirs, files in os.walk(restored_dir):
|
||||
for name in files:
|
||||
file_path = os.path.join(root, name)
|
||||
yield file_path[len(restored_dir) + 1:]
|
||||
|
||||
|
||||
def corrupt(log_dir, base_tar, output_tar):
|
||||
"""Remove all empty files and repackage. Return paths of files removed."""
|
||||
with tempfile.TemporaryDirectory() as restored_dir:
|
||||
# Unpack the base tar
|
||||
subprocess_capture(log_dir, ["tar", "-xf", base_tar, "-C", restored_dir])
|
||||
|
||||
# Find empty files
|
||||
empty_files = []
|
||||
for root, dirs, files in os.walk(restored_dir):
|
||||
for name in files:
|
||||
file_path = os.path.join(root, name)
|
||||
file_size = os.path.getsize(file_path)
|
||||
if file_size == 0:
|
||||
empty_files.append(file_path)
|
||||
|
||||
# Delete empty files (just to see if they get recreated)
|
||||
for empty_file in empty_files:
|
||||
os.remove(empty_file)
|
||||
|
||||
# Repackage
|
||||
pack_base(log_dir, restored_dir, output_tar)
|
||||
|
||||
# Return relative paths
|
||||
return {
|
||||
empty_file[len(restored_dir) + 1:]
|
||||
for empty_file in empty_files
|
||||
}
|
||||
|
||||
|
||||
def touch_missing_rels(log_dir, corrupt_tar, output_tar, paths):
|
||||
with tempfile.TemporaryDirectory() as restored_dir:
|
||||
# Unpack the base tar
|
||||
subprocess_capture(log_dir, ["tar", "-xf", corrupt_tar, "-C", restored_dir])
|
||||
|
||||
# Touch files that don't exist
|
||||
for path in paths:
|
||||
absolute_path = os.path.join(restored_dir, path)
|
||||
exists = os.path.exists(absolute_path)
|
||||
if not exists:
|
||||
print("File {absolute_path} didn't exist. Creating..")
|
||||
Path(absolute_path).touch()
|
||||
|
||||
# Repackage
|
||||
pack_base(log_dir, restored_dir, output_tar)
|
||||
|
||||
|
||||
def test_complete(test_output_dir, pg_bin):
|
||||
# Specify directories
|
||||
# TODO make a basebackup instead of using one from another test
|
||||
work_dir = "/home/bojan/src/neondatabase/neon/test_output/test_import_from_pageserver/"
|
||||
base_tar = os.path.join(work_dir, "psql_2.stdout")
|
||||
output_tar = os.path.join(work_dir, "psql_2-completed.stdout")
|
||||
|
||||
# Create new base tar with missing empty files
|
||||
corrupt_tar = os.path.join(test_output_dir, "psql_2-corrupted.stdout")
|
||||
deleted_files = corrupt(test_output_dir, base_tar, corrupt_tar)
|
||||
assert len(set(get_files_in_tar(test_output_dir, base_tar)) -
|
||||
set(get_files_in_tar(test_output_dir, corrupt_tar))) > 0
|
||||
|
||||
# Reconstruct paths from the corrupted tar, assert it covers everything important
|
||||
reconstructed_paths = set(get_rel_paths(test_output_dir, pg_bin, corrupt_tar))
|
||||
paths_missed = deleted_files - reconstructed_paths
|
||||
assert paths_missed.issubset({
|
||||
"postgresql.auto.conf",
|
||||
"pg_ident.conf",
|
||||
})
|
||||
|
||||
# Recreate the correct tar by touching files, compare with original tar
|
||||
touch_missing_rels(test_output_dir, corrupt_tar, output_tar, reconstructed_paths)
|
||||
paths_missed = (set(get_files_in_tar(test_output_dir, base_tar)) -
|
||||
set(get_files_in_tar(test_output_dir, output_tar)))
|
||||
assert paths_missed.issubset({
|
||||
"postgresql.auto.conf",
|
||||
"pg_ident.conf",
|
||||
})
|
||||
|
||||
# HACK this script relies on test fixtures, but you can run it with
|
||||
# poetry run pytest -k test_main_hack and pass inputs via envvars
|
||||
#
|
||||
# The script takes a base tar, infers what empty rel files might be missing
|
||||
# and creates a new base tar with those files included. It does not modify
|
||||
# the original file.
|
||||
def test_main_hack(test_output_dir, pg_bin, pytestconfig):
|
||||
base_tar = os.environ['INPUT_BASE_TAR']
|
||||
output_tar = os.environ['OUTPUT_BASE_TAR']
|
||||
|
||||
reconstructed_paths = set(get_rel_paths(test_output_dir, pg_bin, base_tar))
|
||||
touch_missing_rels(test_output_dir, base_tar, output_tar, reconstructed_paths)
|
||||
@@ -11,7 +11,7 @@ import signal
|
||||
import pytest
|
||||
|
||||
from fixtures.neon_fixtures import PgProtocol, PortDistributor, Postgres, NeonEnvBuilder, Etcd, NeonPageserverHttpClient, assert_local, wait_until, wait_for_last_record_lsn, wait_for_upload, neon_binpath, pg_distrib_dir
|
||||
from fixtures.utils import lsn_from_hex
|
||||
from fixtures.utils import lsn_from_hex, subprocess_capture
|
||||
|
||||
|
||||
def assert_abs_margin_ratio(a: float, b: float, margin_ratio: float):
|
||||
@@ -184,19 +184,38 @@ def test_tenant_relocation(neon_env_builder: NeonEnvBuilder,
|
||||
new_pageserver_http_port,
|
||||
neon_env_builder.broker):
|
||||
|
||||
# call to attach timeline to new pageserver
|
||||
new_pageserver_http.timeline_attach(tenant, timeline)
|
||||
# new pageserver should be in sync (modulo wal tail or vacuum activity) with the old one because there was no new writes since checkpoint
|
||||
new_timeline_detail = wait_until(
|
||||
number_of_iterations=5,
|
||||
interval=1,
|
||||
func=lambda: assert_local(new_pageserver_http, tenant, timeline))
|
||||
# Migrate either by attacking from s3 or import/export basebackup
|
||||
relocation_method = "import"
|
||||
if relocation_method == "import":
|
||||
scripts_dir = "/home/bojan/src/neondatabase/neon/scripts/"
|
||||
cmd = [
|
||||
"python",
|
||||
os.path.join(scripts_dir, "export_import_betwen_pageservers.py"),
|
||||
"--tenant-id", tenant.hex,
|
||||
"--from-host", "localhost",
|
||||
"--from-http-port", str(pageserver_http.port),
|
||||
"--from-pg-port", str(env.pageserver.service_port.pg),
|
||||
"--to-host", "localhost",
|
||||
"--to-http-port", str(new_pageserver_http_port),
|
||||
"--to-pg-port", str(new_pageserver_pg_port),
|
||||
"--psql-path", os.path.join(pg_distrib_dir, "bin", "psql"),
|
||||
]
|
||||
subprocess_capture(env.repo_dir, cmd, check=True)
|
||||
elif relocation_method == "attach":
|
||||
# call to attach timeline to new pageserver
|
||||
new_pageserver_http.timeline_attach(tenant, timeline)
|
||||
|
||||
# when load is active these checks can break because lsns are not static
|
||||
# so lets check with some margin
|
||||
assert_abs_margin_ratio(lsn_from_hex(new_timeline_detail['local']['disk_consistent_lsn']),
|
||||
lsn_from_hex(timeline_detail['local']['disk_consistent_lsn']),
|
||||
0.03)
|
||||
# new pageserver should be in sync (modulo wal tail or vacuum activity) with the old one because there was no new writes since checkpoint
|
||||
new_timeline_detail = wait_until(
|
||||
number_of_iterations=5,
|
||||
interval=1,
|
||||
func=lambda: assert_local(new_pageserver_http, tenant, timeline))
|
||||
|
||||
# when load is active these checks can break because lsns are not static
|
||||
# so lets check with some margin
|
||||
assert_abs_margin_ratio(lsn_from_hex(new_timeline_detail['local']['disk_consistent_lsn']),
|
||||
lsn_from_hex(timeline_detail['local']['disk_consistent_lsn']),
|
||||
0.03)
|
||||
|
||||
tenant_pg.stop()
|
||||
|
||||
|
||||
Reference in New Issue
Block a user