test_runner: add from_repo_dir method (#6087)

## Problem

We need a reliable way to restore a project state (in this context, I
mean data on pageservers, safekeepers, and remote storage) from a
snapshot. The existing method (that we use in `test_compatibility`)
heavily relies on config files, which makes it harder to add/change
fields in the config.
The proposed solution uses config file only to get `default_tenant_id`
and `branch_name_mappings`.

## Summary of changes
- Add `NeonEnvBuilder#from_repo_dir` method, which allows using the
`neon_env_builder` fixture with data from a snapshot.
- Use `NeonEnvBuilder#from_repo_dir` in compatibility tests

Requires for https://github.com/neondatabase/neon/issues/6033
This commit is contained in:
Alexander Bayandin
2023-12-12 16:24:13 +00:00
committed by GitHub
parent aec1acdbac
commit 6acbee2368
2 changed files with 101 additions and 223 deletions

View File

@@ -507,6 +507,66 @@ class NeonEnvBuilder:
return env
def from_repo_dir(
self,
repo_dir: Path,
neon_binpath: Optional[Path] = None,
pg_distrib_dir: Optional[Path] = None,
) -> NeonEnv:
"""
A simple method to import data into the current NeonEnvBuilder from a snapshot of a repo dir.
"""
# Setting custom `neon_binpath` and `pg_distrib_dir` is useful for compatibility tests
self.neon_binpath = neon_binpath or self.neon_binpath
self.pg_distrib_dir = pg_distrib_dir or self.pg_distrib_dir
# Get the initial tenant and timeline from the snapshot config
snapshot_config_toml = repo_dir / "config"
with snapshot_config_toml.open("r") as f:
snapshot_config = toml.load(f)
self.initial_tenant = TenantId(snapshot_config["default_tenant_id"])
self.initial_timeline = TimelineId(
dict(snapshot_config["branch_name_mappings"][DEFAULT_BRANCH_NAME])[
str(self.initial_tenant)
]
)
self.env = self.init_configs()
for ps_dir in repo_dir.glob("pageserver_*"):
tenants_from_dir = ps_dir / "tenants"
tenants_to_dir = self.repo_dir / ps_dir.name / "tenants"
log.info(f"Copying pageserver tenants directory {tenants_from_dir} to {tenants_to_dir}")
shutil.copytree(tenants_from_dir, tenants_to_dir)
for sk_from_dir in (repo_dir / "safekeepers").glob("sk*"):
sk_to_dir = self.repo_dir / "safekeepers" / sk_from_dir.name
log.info(f"Copying safekeeper directory {sk_from_dir} to {sk_to_dir}")
sk_to_dir.rmdir()
shutil.copytree(sk_from_dir, sk_to_dir, ignore=shutil.ignore_patterns("*.log", "*.pid"))
shutil.rmtree(self.repo_dir / "local_fs_remote_storage", ignore_errors=True)
shutil.copytree(
repo_dir / "local_fs_remote_storage", self.repo_dir / "local_fs_remote_storage"
)
if (attachments_json := Path(repo_dir / "attachments.json")).exists():
shutil.copyfile(attachments_json, self.repo_dir / attachments_json.name)
# Update the config with info about tenants and timelines
with (self.repo_dir / "config").open("r") as f:
config = toml.load(f)
config["default_tenant_id"] = snapshot_config["default_tenant_id"]
config["branch_name_mappings"] = snapshot_config["branch_name_mappings"]
with (self.repo_dir / "config").open("w") as f:
toml.dump(config, f)
return self.env
def enable_scrub_on_exit(self):
"""
Call this if you would like the fixture to automatically run

View File

@@ -1,30 +1,25 @@
import copy
import os
import shutil
import subprocess
import tempfile
from pathlib import Path
from typing import Any, List, Optional
from typing import List, Optional
import pytest
import toml
from fixtures.log_helper import log
from fixtures.neon_fixtures import (
NeonCli,
NeonEnv,
NeonEnvBuilder,
PgBin,
)
from fixtures.pageserver.http import PageserverHttpClient
from fixtures.pageserver.utils import (
timeline_delete_wait_completed,
wait_for_last_record_lsn,
wait_for_upload,
)
from fixtures.pg_version import PgVersion
from fixtures.port_distributor import PortDistributor
from fixtures.remote_storage import LocalFsStorage, RemoteStorageKind, RemoteStorageUser
from fixtures.remote_storage import RemoteStorageKind
from fixtures.types import Lsn
from pytest import FixtureRequest
#
# A test suite that help to prevent unintentionally breaking backward or forward compatibility between Neon releases.
@@ -37,8 +32,8 @@ from pytest import FixtureRequest
# If the breakage is intentional, the test can be xfaild with setting ALLOW_FORWARD_COMPATIBILITY_BREAKAGE=true.
#
# The file contains a couple of helper functions:
# - prepare_snapshot copies the snapshot, cleans it up and makes it ready for the current version of Neon (replaces paths and ports in config files).
# - check_neon_works performs the test itself, feel free to add more checks there.
# - dump_differs compares two SQL dumps and writes the diff to a file.
#
#
# How to run `test_backward_compatibility` locally:
@@ -46,6 +41,7 @@ from pytest import FixtureRequest
# export DEFAULT_PG_VERSION=15
# export BUILD_TYPE=release
# export CHECK_ONDISK_DATA_COMPATIBILITY=true
# export COMPATIBILITY_SNAPSHOT_DIR=test_output/compatibility_snapshot_pgv${DEFAULT_PG_VERSION}
#
# # Build previous version of binaries and create a data snapshot:
# rm -rf pg_install target
@@ -59,8 +55,7 @@ from pytest import FixtureRequest
# CARGO_BUILD_FLAGS="--features=testing" make -s -j`nproc`
#
# # Run backward compatibility test
# COMPATIBILITY_SNAPSHOT_DIR=test_output/compatibility_snapshot_pgv${DEFAULT_PG_VERSION} \
# ./scripts/pytest -k test_backward_compatibility
# ./scripts/pytest -k test_backward_compatibility
#
#
# How to run `test_forward_compatibility` locally:
@@ -68,6 +63,8 @@ from pytest import FixtureRequest
# export DEFAULT_PG_VERSION=15
# export BUILD_TYPE=release
# export CHECK_ONDISK_DATA_COMPATIBILITY=true
# export COMPATIBILITY_NEON_BIN=neon_previous/target/${BUILD_TYPE}
# export COMPATIBILITY_POSTGRES_DISTRIB_DIR=neon_previous/pg_install
#
# # Build previous version of binaries and store them somewhere:
# rm -rf pg_install target
@@ -84,9 +81,7 @@ from pytest import FixtureRequest
# ./scripts/pytest -k test_create_snapshot
#
# # Run forward compatibility test
# COMPATIBILITY_NEON_BIN=neon_previous/target/${BUILD_TYPE} \
# COMPATIBILITY_POSTGRES_DISTRIB_DIR=neon_previous/pg_install \
# ./scripts/pytest -k test_forward_compatibility
# ./scripts/pytest -k test_forward_compatibility
#
check_ondisk_data_compatibility_if_enabled = pytest.mark.skipif(
@@ -155,13 +150,9 @@ def test_create_snapshot(
@pytest.mark.xdist_group("compatibility")
@pytest.mark.order(after="test_create_snapshot")
def test_backward_compatibility(
pg_bin: PgBin,
port_distributor: PortDistributor,
neon_env_builder: NeonEnvBuilder,
test_output_dir: Path,
neon_binpath: Path,
pg_distrib_dir: Path,
pg_version: PgVersion,
request: FixtureRequest,
):
"""
Test that the new binaries can read old data
@@ -177,23 +168,15 @@ def test_backward_compatibility(
)
try:
# Copy the snapshot to current directory, and prepare for the test
prepare_snapshot(
from_dir=compatibility_snapshot_dir,
to_dir=test_output_dir / "compatibility_snapshot",
port_distributor=port_distributor,
)
neon_env_builder.num_safekeepers = 3
env = neon_env_builder.from_repo_dir(compatibility_snapshot_dir / "repo")
neon_env_builder.start()
check_neon_works(
test_output_dir / "compatibility_snapshot" / "repo",
neon_binpath,
neon_binpath,
pg_distrib_dir,
pg_version,
port_distributor,
test_output_dir,
pg_bin,
request,
env,
test_output_dir=test_output_dir,
sql_dump_path=compatibility_snapshot_dir / "dump.sql",
repo_dir=env.repo_dir,
)
except Exception:
if breaking_changes_allowed:
@@ -212,12 +195,10 @@ def test_backward_compatibility(
@pytest.mark.xdist_group("compatibility")
@pytest.mark.order(after="test_create_snapshot")
def test_forward_compatibility(
neon_env_builder: NeonEnvBuilder,
test_output_dir: Path,
top_output_dir: Path,
port_distributor: PortDistributor,
pg_version: PgVersion,
request: FixtureRequest,
neon_binpath: Path,
):
"""
Test that the old binaries can read new data
@@ -244,24 +225,19 @@ def test_forward_compatibility(
)
try:
# Copy the snapshot to current directory, and prepare for the test
prepare_snapshot(
from_dir=compatibility_snapshot_dir,
to_dir=test_output_dir / "compatibility_snapshot",
port_distributor=port_distributor,
neon_env_builder.num_safekeepers = 3
env = neon_env_builder.from_repo_dir(
compatibility_snapshot_dir / "repo",
neon_binpath=compatibility_neon_bin,
pg_distrib_dir=compatibility_postgres_distrib_dir,
)
neon_env_builder.start()
check_neon_works(
test_output_dir / "compatibility_snapshot" / "repo",
compatibility_neon_bin,
neon_binpath,
compatibility_postgres_distrib_dir,
pg_version,
port_distributor,
test_output_dir,
PgBin(test_output_dir, compatibility_postgres_distrib_dir, pg_version),
request,
env,
test_output_dir=test_output_dir,
sql_dump_path=compatibility_snapshot_dir / "dump.sql",
repo_dir=env.repo_dir,
)
except Exception:
if breaking_changes_allowed:
@@ -276,189 +252,26 @@ def test_forward_compatibility(
), "Breaking changes are allowed by ALLOW_FORWARD_COMPATIBILITY_BREAKAGE, but the test has passed without any breakage"
def prepare_snapshot(
from_dir: Path,
to_dir: Path,
port_distributor: PortDistributor,
pg_distrib_dir: Optional[Path] = None,
):
assert from_dir.exists(), f"Snapshot '{from_dir}' doesn't exist"
assert (from_dir / "repo").exists(), f"Snapshot '{from_dir}' doesn't contain a repo directory"
assert (from_dir / "dump.sql").exists(), f"Snapshot '{from_dir}' doesn't contain a dump.sql"
def check_neon_works(env: NeonEnv, test_output_dir: Path, sql_dump_path: Path, repo_dir: Path):
ep = env.endpoints.create_start("main")
pg_bin = PgBin(test_output_dir, env.pg_distrib_dir, env.pg_version)
log.info(f"Copying snapshot from {from_dir} to {to_dir}")
shutil.copytree(from_dir, to_dir)
repo_dir = to_dir / "repo"
snapshot_config_toml = repo_dir / "config"
snapshot_config = toml.load(snapshot_config_toml)
# Remove old logs to avoid confusion in test artifacts
for logfile in repo_dir.glob("**/*.log"):
logfile.unlink()
# Remove old computes in 'endpoints'. Old versions of the control plane used a directory
# called "pgdatadirs". Delete it, too.
if (repo_dir / "endpoints").exists():
shutil.rmtree(repo_dir / "endpoints")
if (repo_dir / "pgdatadirs").exists():
shutil.rmtree(repo_dir / "pgdatadirs")
os.mkdir(repo_dir / "endpoints")
# Update paths and ports in config files
legacy_pageserver_toml = repo_dir / "pageserver.toml"
legacy_bundle = os.path.exists(legacy_pageserver_toml)
path_to_config: dict[Path, dict[Any, Any]] = {}
if legacy_bundle:
os.mkdir(repo_dir / "pageserver_1")
path_to_config[repo_dir / "pageserver_1" / "pageserver.toml"] = toml.load(
legacy_pageserver_toml
)
os.remove(legacy_pageserver_toml)
os.rename(repo_dir / "tenants", repo_dir / "pageserver_1" / "tenants")
else:
for ps_conf in snapshot_config["pageservers"]:
config_path = repo_dir / f"pageserver_{ps_conf['id']}" / "pageserver.toml"
path_to_config[config_path] = toml.load(config_path)
# For each pageserver config, edit it and rewrite
for config_path, pageserver_config in path_to_config.items():
pageserver_config["remote_storage"]["local_path"] = str(
LocalFsStorage.component_path(repo_dir, RemoteStorageUser.PAGESERVER)
)
for param in ("listen_http_addr", "listen_pg_addr", "broker_endpoint"):
pageserver_config[param] = port_distributor.replace_with_new_port(
pageserver_config[param]
)
# We don't use authentication in compatibility tests
# so just remove authentication related settings.
pageserver_config.pop("pg_auth_type", None)
pageserver_config.pop("http_auth_type", None)
if pg_distrib_dir:
pageserver_config["pg_distrib_dir"] = str(pg_distrib_dir)
with config_path.open("w") as f:
toml.dump(pageserver_config, f)
# neon_local config doesn't have to be backward compatible. If we're using a dump from before
# it supported multiple pageservers, fix it up.
if "pageservers" not in snapshot_config:
snapshot_config["pageservers"] = [snapshot_config["pageserver"]]
del snapshot_config["pageserver"]
for param in ("listen_http_addr", "listen_pg_addr"):
for pageserver in snapshot_config["pageservers"]:
pageserver[param] = port_distributor.replace_with_new_port(pageserver[param])
snapshot_config["broker"]["listen_addr"] = port_distributor.replace_with_new_port(
snapshot_config["broker"]["listen_addr"]
)
for sk in snapshot_config["safekeepers"]:
for param in ("http_port", "pg_port", "pg_tenant_only_port"):
sk[param] = port_distributor.replace_with_new_port(sk[param])
if pg_distrib_dir:
snapshot_config["pg_distrib_dir"] = str(pg_distrib_dir)
with snapshot_config_toml.open("w") as f:
toml.dump(snapshot_config, f)
# Ensure that snapshot doesn't contain references to the original path
rv = subprocess.run(
[
"grep",
"--recursive",
"--binary-file=without-match",
"--files-with-matches",
"test_create_snapshot/repo",
str(repo_dir),
],
capture_output=True,
text=True,
)
assert (
rv.returncode != 0
), f"there're files referencing `test_create_snapshot/repo`, this path should be replaced with {repo_dir}:\n{rv.stdout}"
def check_neon_works(
repo_dir: Path,
neon_target_binpath: Path,
neon_current_binpath: Path,
pg_distrib_dir: Path,
pg_version: PgVersion,
port_distributor: PortDistributor,
test_output_dir: Path,
pg_bin: PgBin,
request: FixtureRequest,
):
snapshot_config_toml = repo_dir / "config"
snapshot_config = toml.load(snapshot_config_toml)
snapshot_config["neon_distrib_dir"] = str(neon_target_binpath)
snapshot_config["postgres_distrib_dir"] = str(pg_distrib_dir)
with (snapshot_config_toml).open("w") as f:
toml.dump(snapshot_config, f)
# TODO: replace with NeonEnvBuilder / NeonEnv
config: Any = type("NeonEnvStub", (object,), {})
config.rust_log_override = None
config.repo_dir = repo_dir
config.pg_version = pg_version
config.initial_tenant = snapshot_config["default_tenant_id"]
config.pg_distrib_dir = pg_distrib_dir
config.remote_storage = None
config.safekeepers_remote_storage = None
# Use the "target" binaries to launch the storage nodes
config_target = config
config_target.neon_binpath = neon_target_binpath
# We are using maybe-old binaries for neon services, but want to use current
# binaries for test utilities like neon_local
config_target.neon_local_binpath = neon_current_binpath
cli_target = NeonCli(config_target)
# And the current binaries to launch computes
snapshot_config["neon_distrib_dir"] = str(neon_current_binpath)
with (snapshot_config_toml).open("w") as f:
toml.dump(snapshot_config, f)
config_current = copy.copy(config)
config_current.neon_binpath = neon_current_binpath
cli_current = NeonCli(config_current)
cli_target.raw_cli(["start"])
request.addfinalizer(lambda: cli_target.raw_cli(["stop"]))
pg_port = port_distributor.get_port()
http_port = port_distributor.get_port()
cli_current.endpoint_create(
branch_name="main", pg_port=pg_port, http_port=http_port, endpoint_id="ep-main"
)
cli_current.endpoint_start("ep-main")
request.addfinalizer(lambda: cli_current.endpoint_stop("ep-main"))
connstr = f"host=127.0.0.1 port={pg_port} user=cloud_admin dbname=postgres"
connstr = ep.connstr()
pg_bin.run_capture(
["pg_dumpall", f"--dbname={connstr}", f"--file={test_output_dir / 'dump.sql'}"]
)
initial_dump_differs = dump_differs(
repo_dir.parent / "dump.sql",
sql_dump_path,
test_output_dir / "dump.sql",
test_output_dir / "dump.filediff",
)
# Check that project can be recovered from WAL
# loosely based on https://www.notion.so/neondatabase/Storage-Recovery-from-WAL-d92c0aac0ebf40df892b938045d7d720
tenant_id = snapshot_config["default_tenant_id"]
timeline_id = dict(snapshot_config["branch_name_mappings"]["main"])[tenant_id]
pageserver_port = snapshot_config["pageservers"][0]["listen_http_addr"].split(":")[-1]
pageserver_http = PageserverHttpClient(
port=pageserver_port,
is_testing_enabled_or_skip=lambda: True, # TODO: check if testing really enabled
)
pageserver_http = env.pageserver.http_client()
tenant_id = env.initial_tenant
timeline_id = env.initial_timeline
pg_version = env.pg_version
shutil.rmtree(repo_dir / "local_fs_remote_storage")
timeline_delete_wait_completed(pageserver_http, tenant_id, timeline_id)
@@ -494,6 +307,11 @@ def dump_differs(
Returns True if the dumps differ and produced diff is not allowed, False otherwise (in most cases we want it to return False).
"""
if not first.exists():
raise FileNotFoundError(f"{first} doesn't exist")
if not second.exists():
raise FileNotFoundError(f"{second} doesn't exist")
with output.open("w") as stdout:
res = subprocess.run(
[