|
|
|
|
@@ -4,18 +4,12 @@
|
|
|
|
|
# Outline of steps:
|
|
|
|
|
# 1. Get `(last_lsn, prev_lsn)` from old pageserver
|
|
|
|
|
# 2. Get `fullbackup` from old pageserver, which creates a basebackup tar file
|
|
|
|
|
# 3. This tar file might be missing relation files for empty relations, if the pageserver
|
|
|
|
|
# is old enough (we didn't always store those). So to recreate them, we start a local
|
|
|
|
|
# vanilla postgres on this basebackup and ask it what relations should exist, then touch
|
|
|
|
|
# any missing files and re-pack the tar.
|
|
|
|
|
# TODO This functionality is no longer needed, so we can delete it later if we don't
|
|
|
|
|
# end up using the same utils for the pg 15 upgrade. Not sure.
|
|
|
|
|
# 4. We import the patched basebackup into a new pageserver
|
|
|
|
|
# 5. We export again via fullbackup, now from the new pageserver and compare the returned
|
|
|
|
|
# 3. We import the basebackup into a new pageserver
|
|
|
|
|
# 4. We export again via fullbackup, now from the new pageserver and compare the returned
|
|
|
|
|
# tar file with the one we imported. This confirms that we imported everything that was
|
|
|
|
|
# exported, but doesn't guarantee correctness (what if we didn't **export** everything
|
|
|
|
|
# initially?)
|
|
|
|
|
# 6. We wait for the new pageserver's remote_consistent_lsn to catch up
|
|
|
|
|
# 5. We wait for the new pageserver's remote_consistent_lsn to catch up
|
|
|
|
|
#
|
|
|
|
|
# For more context on how to use this, see:
|
|
|
|
|
# https://github.com/neondatabase/cloud/wiki/Storage-format-migration
|
|
|
|
|
@@ -24,17 +18,13 @@ import argparse
|
|
|
|
|
import os
|
|
|
|
|
import shutil
|
|
|
|
|
import subprocess
|
|
|
|
|
import tempfile
|
|
|
|
|
import time
|
|
|
|
|
import uuid
|
|
|
|
|
from contextlib import closing
|
|
|
|
|
from pathlib import Path
|
|
|
|
|
from typing import Any, Dict, List, Optional, Tuple, cast
|
|
|
|
|
from typing import Any, Dict, List, Optional
|
|
|
|
|
|
|
|
|
|
import psycopg2
|
|
|
|
|
import requests
|
|
|
|
|
from psycopg2.extensions import connection as PgConnection
|
|
|
|
|
from psycopg2.extensions import parse_dsn
|
|
|
|
|
|
|
|
|
|
###############################################
|
|
|
|
|
### client-side utils copied from test fixtures
|
|
|
|
|
@@ -135,105 +125,6 @@ class PgBin:
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class PgProtocol:
|
|
|
|
|
"""Reusable connection logic"""
|
|
|
|
|
|
|
|
|
|
def __init__(self, **kwargs):
|
|
|
|
|
self.default_options = kwargs
|
|
|
|
|
|
|
|
|
|
def conn_options(self, **kwargs):
|
|
|
|
|
conn_options = self.default_options.copy()
|
|
|
|
|
if "dsn" in kwargs:
|
|
|
|
|
conn_options.update(parse_dsn(kwargs["dsn"]))
|
|
|
|
|
conn_options.update(kwargs)
|
|
|
|
|
|
|
|
|
|
# Individual statement timeout in seconds. 2 minutes should be
|
|
|
|
|
# enough for our tests, but if you need a longer, you can
|
|
|
|
|
# change it by calling "SET statement_timeout" after
|
|
|
|
|
# connecting.
|
|
|
|
|
conn_options["options"] = f"-cstatement_timeout=120s {conn_options.get('options', '')}"
|
|
|
|
|
|
|
|
|
|
return conn_options
|
|
|
|
|
|
|
|
|
|
# autocommit=True here by default because that's what we need most of the time
|
|
|
|
|
def connect(self, autocommit=True, **kwargs) -> PgConnection:
|
|
|
|
|
"""
|
|
|
|
|
Connect to the node.
|
|
|
|
|
Returns psycopg2's connection object.
|
|
|
|
|
This method passes all extra params to connstr.
|
|
|
|
|
"""
|
|
|
|
|
conn = psycopg2.connect(**self.conn_options(**kwargs))
|
|
|
|
|
|
|
|
|
|
# WARNING: this setting affects *all* tests!
|
|
|
|
|
conn.autocommit = autocommit
|
|
|
|
|
return conn
|
|
|
|
|
|
|
|
|
|
def safe_psql(self, query: str, **kwargs: Any) -> List[Tuple[Any, ...]]:
|
|
|
|
|
"""
|
|
|
|
|
Execute query against the node and return all rows.
|
|
|
|
|
This method passes all extra params to connstr.
|
|
|
|
|
"""
|
|
|
|
|
return self.safe_psql_many([query], **kwargs)[0]
|
|
|
|
|
|
|
|
|
|
def safe_psql_many(self, queries: List[str], **kwargs: Any) -> List[List[Tuple[Any, ...]]]:
|
|
|
|
|
"""
|
|
|
|
|
Execute queries against the node and return all rows.
|
|
|
|
|
This method passes all extra params to connstr.
|
|
|
|
|
"""
|
|
|
|
|
result: List[List[Any]] = []
|
|
|
|
|
with closing(self.connect(**kwargs)) as conn:
|
|
|
|
|
with conn.cursor() as cur:
|
|
|
|
|
for query in queries:
|
|
|
|
|
print(f"Executing query: {query}")
|
|
|
|
|
cur.execute(query)
|
|
|
|
|
|
|
|
|
|
if cur.description is None:
|
|
|
|
|
result.append([]) # query didn't return data
|
|
|
|
|
else:
|
|
|
|
|
result.append(cast(List[Any], cur.fetchall()))
|
|
|
|
|
return result
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class VanillaPostgres(PgProtocol):
|
|
|
|
|
def __init__(self, pgdatadir: Path, pg_bin: PgBin, port: int, init=True):
|
|
|
|
|
super().__init__(host="localhost", port=port, dbname="postgres")
|
|
|
|
|
self.pgdatadir = pgdatadir
|
|
|
|
|
self.pg_bin = pg_bin
|
|
|
|
|
self.running = False
|
|
|
|
|
if init:
|
|
|
|
|
self.pg_bin.run_capture(["initdb", "-D", str(pgdatadir)])
|
|
|
|
|
self.configure([f"port = {port}\n"])
|
|
|
|
|
|
|
|
|
|
def configure(self, options: List[str]):
|
|
|
|
|
"""Append lines into postgresql.conf file."""
|
|
|
|
|
assert not self.running
|
|
|
|
|
with open(os.path.join(self.pgdatadir, "postgresql.conf"), "a") as conf_file:
|
|
|
|
|
conf_file.write("\n".join(options))
|
|
|
|
|
|
|
|
|
|
def start(self, log_path: Optional[str] = None):
|
|
|
|
|
assert not self.running
|
|
|
|
|
self.running = True
|
|
|
|
|
|
|
|
|
|
if log_path is None:
|
|
|
|
|
log_path = os.path.join(self.pgdatadir, "pg.log")
|
|
|
|
|
|
|
|
|
|
self.pg_bin.run_capture(
|
|
|
|
|
["pg_ctl", "-w", "-D", str(self.pgdatadir), "-l", log_path, "start"]
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
def stop(self):
|
|
|
|
|
assert self.running
|
|
|
|
|
self.running = False
|
|
|
|
|
self.pg_bin.run_capture(["pg_ctl", "-w", "-D", str(self.pgdatadir), "stop"])
|
|
|
|
|
|
|
|
|
|
def __enter__(self):
|
|
|
|
|
return self
|
|
|
|
|
|
|
|
|
|
def __exit__(self, exc_type, exc, tb):
|
|
|
|
|
if self.running:
|
|
|
|
|
self.stop()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class NeonPageserverApiException(Exception):
|
|
|
|
|
pass
|
|
|
|
|
|
|
|
|
|
@@ -370,83 +261,6 @@ def pack_base(log_dir, restored_dir, output_tar):
|
|
|
|
|
shutil.move(tmp_tar_path, output_tar)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def reconstruct_paths(log_dir, pg_bin, base_tar, port: int):
|
|
|
|
|
"""Reconstruct what relation files should exist in the datadir by querying postgres."""
|
|
|
|
|
with tempfile.TemporaryDirectory() as restored_dir:
|
|
|
|
|
# Unpack the base tar
|
|
|
|
|
subprocess_capture(log_dir, ["tar", "-xf", base_tar, "-C", restored_dir])
|
|
|
|
|
|
|
|
|
|
# Start a vanilla postgres from the given datadir and query it to find
|
|
|
|
|
# what relfiles should exist, but possibly don't.
|
|
|
|
|
with VanillaPostgres(Path(restored_dir), pg_bin, port, init=False) as vanilla_pg:
|
|
|
|
|
vanilla_pg.configure([f"port={port}"])
|
|
|
|
|
vanilla_pg.start(log_path=os.path.join(log_dir, "tmp_pg.log"))
|
|
|
|
|
|
|
|
|
|
# Create database based on template0 because we can't connect to template0
|
|
|
|
|
query = "create database template0copy template template0"
|
|
|
|
|
vanilla_pg.safe_psql(query, user="cloud_admin")
|
|
|
|
|
vanilla_pg.safe_psql("CHECKPOINT", user="cloud_admin")
|
|
|
|
|
|
|
|
|
|
# Get all databases
|
|
|
|
|
query = "select oid, datname from pg_database"
|
|
|
|
|
oid_dbname_pairs = vanilla_pg.safe_psql(query, user="cloud_admin")
|
|
|
|
|
template0_oid = [
|
|
|
|
|
oid for (oid, database) in oid_dbname_pairs if database == "template0"
|
|
|
|
|
][0]
|
|
|
|
|
|
|
|
|
|
# Get rel paths for each database
|
|
|
|
|
for oid, database in oid_dbname_pairs:
|
|
|
|
|
if database == "template0":
|
|
|
|
|
# We can't connect to template0
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
query = "select relname, pg_relation_filepath(oid) from pg_class"
|
|
|
|
|
result = vanilla_pg.safe_psql(query, user="cloud_admin", dbname=database)
|
|
|
|
|
for relname, filepath in result:
|
|
|
|
|
if filepath is not None:
|
|
|
|
|
|
|
|
|
|
if database == "template0copy":
|
|
|
|
|
# Add all template0copy paths to template0
|
|
|
|
|
prefix = f"base/{oid}/"
|
|
|
|
|
if filepath.startswith(prefix):
|
|
|
|
|
suffix = filepath[len(prefix) :]
|
|
|
|
|
yield f"base/{template0_oid}/{suffix}"
|
|
|
|
|
elif filepath.startswith("global"):
|
|
|
|
|
print(f"skipping {database} global file {filepath}")
|
|
|
|
|
else:
|
|
|
|
|
raise AssertionError
|
|
|
|
|
else:
|
|
|
|
|
yield filepath
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def touch_missing_rels(log_dir, corrupt_tar, output_tar, paths):
|
|
|
|
|
"""Add the appropriate empty files to a basebadkup tar."""
|
|
|
|
|
with tempfile.TemporaryDirectory() as restored_dir:
|
|
|
|
|
# Unpack the base tar
|
|
|
|
|
subprocess_capture(log_dir, ["tar", "-xf", corrupt_tar, "-C", restored_dir])
|
|
|
|
|
|
|
|
|
|
# Touch files that don't exist
|
|
|
|
|
for path in paths:
|
|
|
|
|
absolute_path = os.path.join(restored_dir, path)
|
|
|
|
|
exists = os.path.exists(absolute_path)
|
|
|
|
|
if not exists:
|
|
|
|
|
print(f"File {absolute_path} didn't exist. Creating..")
|
|
|
|
|
Path(absolute_path).touch()
|
|
|
|
|
|
|
|
|
|
# Repackage
|
|
|
|
|
pack_base(log_dir, restored_dir, output_tar)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# HACK This is a workaround for exporting from old pageservers that
|
|
|
|
|
# can't export empty relations. In this case we need to start
|
|
|
|
|
# a vanilla postgres from the exported datadir, and query it
|
|
|
|
|
# to see what empty relations are missing, and then create
|
|
|
|
|
# those empty files before importing.
|
|
|
|
|
def add_missing_rels(base_tar, output_tar, log_dir, pg_bin, tmp_pg_port: int):
|
|
|
|
|
reconstructed_paths = set(reconstruct_paths(log_dir, pg_bin, base_tar, tmp_pg_port))
|
|
|
|
|
touch_missing_rels(log_dir, base_tar, output_tar, reconstructed_paths)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def get_rlsn(pageserver_connstr, tenant_id, timeline_id):
|
|
|
|
|
conn = psycopg2.connect(pageserver_connstr)
|
|
|
|
|
conn.autocommit = True
|
|
|
|
|
@@ -515,7 +329,6 @@ def export_timeline(
|
|
|
|
|
pg_version,
|
|
|
|
|
):
|
|
|
|
|
# Choose filenames
|
|
|
|
|
incomplete_filename = tar_filename + ".incomplete"
|
|
|
|
|
stderr_filename = os.path.join(args.work_dir, f"{tenant_id}_{timeline_id}.stderr")
|
|
|
|
|
|
|
|
|
|
# Construct export command
|
|
|
|
|
@@ -524,18 +337,14 @@ def export_timeline(
|
|
|
|
|
|
|
|
|
|
# Run export command
|
|
|
|
|
print(f"Running: {cmd}")
|
|
|
|
|
with open(incomplete_filename, "w") as stdout_f:
|
|
|
|
|
with open(tar_filename, "w") as stdout_f:
|
|
|
|
|
with open(stderr_filename, "w") as stderr_f:
|
|
|
|
|
print(f"(capturing output to {incomplete_filename})")
|
|
|
|
|
print(f"(capturing output to {tar_filename})")
|
|
|
|
|
pg_bin = PgBin(args.work_dir, args.pg_distrib_dir, pg_version)
|
|
|
|
|
subprocess.run(
|
|
|
|
|
cmd, stdout=stdout_f, stderr=stderr_f, env=pg_bin._build_env(None), check=True
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
# Add missing rels
|
|
|
|
|
pg_bin = PgBin(args.work_dir, args.pg_distrib_dir, pg_version)
|
|
|
|
|
add_missing_rels(incomplete_filename, tar_filename, args.work_dir, pg_bin, args.tmp_pg_port)
|
|
|
|
|
|
|
|
|
|
# Log more info
|
|
|
|
|
file_size = os.path.getsize(tar_filename)
|
|
|
|
|
print(f"Done export: {tar_filename}, size {file_size}")
|
|
|
|
|
|