mirror of
https://github.com/neondatabase/neon.git
synced 2025-12-23 06:09:59 +00:00
test_runner: add from_repo_dir method (#6087)
## Problem We need a reliable way to restore a project state (in this context, I mean data on pageservers, safekeepers, and remote storage) from a snapshot. The existing method (that we use in `test_compatibility`) heavily relies on config files, which makes it harder to add/change fields in the config. The proposed solution uses config file only to get `default_tenant_id` and `branch_name_mappings`. ## Summary of changes - Add `NeonEnvBuilder#from_repo_dir` method, which allows using the `neon_env_builder` fixture with data from a snapshot. - Use `NeonEnvBuilder#from_repo_dir` in compatibility tests Requires for https://github.com/neondatabase/neon/issues/6033
This commit is contained in:
committed by
GitHub
parent
aec1acdbac
commit
6acbee2368
@@ -507,6 +507,66 @@ class NeonEnvBuilder:
|
||||
|
||||
return env
|
||||
|
||||
def from_repo_dir(
|
||||
self,
|
||||
repo_dir: Path,
|
||||
neon_binpath: Optional[Path] = None,
|
||||
pg_distrib_dir: Optional[Path] = None,
|
||||
) -> NeonEnv:
|
||||
"""
|
||||
A simple method to import data into the current NeonEnvBuilder from a snapshot of a repo dir.
|
||||
"""
|
||||
|
||||
# Setting custom `neon_binpath` and `pg_distrib_dir` is useful for compatibility tests
|
||||
self.neon_binpath = neon_binpath or self.neon_binpath
|
||||
self.pg_distrib_dir = pg_distrib_dir or self.pg_distrib_dir
|
||||
|
||||
# Get the initial tenant and timeline from the snapshot config
|
||||
snapshot_config_toml = repo_dir / "config"
|
||||
with snapshot_config_toml.open("r") as f:
|
||||
snapshot_config = toml.load(f)
|
||||
|
||||
self.initial_tenant = TenantId(snapshot_config["default_tenant_id"])
|
||||
self.initial_timeline = TimelineId(
|
||||
dict(snapshot_config["branch_name_mappings"][DEFAULT_BRANCH_NAME])[
|
||||
str(self.initial_tenant)
|
||||
]
|
||||
)
|
||||
self.env = self.init_configs()
|
||||
|
||||
for ps_dir in repo_dir.glob("pageserver_*"):
|
||||
tenants_from_dir = ps_dir / "tenants"
|
||||
tenants_to_dir = self.repo_dir / ps_dir.name / "tenants"
|
||||
|
||||
log.info(f"Copying pageserver tenants directory {tenants_from_dir} to {tenants_to_dir}")
|
||||
shutil.copytree(tenants_from_dir, tenants_to_dir)
|
||||
|
||||
for sk_from_dir in (repo_dir / "safekeepers").glob("sk*"):
|
||||
sk_to_dir = self.repo_dir / "safekeepers" / sk_from_dir.name
|
||||
log.info(f"Copying safekeeper directory {sk_from_dir} to {sk_to_dir}")
|
||||
sk_to_dir.rmdir()
|
||||
shutil.copytree(sk_from_dir, sk_to_dir, ignore=shutil.ignore_patterns("*.log", "*.pid"))
|
||||
|
||||
shutil.rmtree(self.repo_dir / "local_fs_remote_storage", ignore_errors=True)
|
||||
shutil.copytree(
|
||||
repo_dir / "local_fs_remote_storage", self.repo_dir / "local_fs_remote_storage"
|
||||
)
|
||||
|
||||
if (attachments_json := Path(repo_dir / "attachments.json")).exists():
|
||||
shutil.copyfile(attachments_json, self.repo_dir / attachments_json.name)
|
||||
|
||||
# Update the config with info about tenants and timelines
|
||||
with (self.repo_dir / "config").open("r") as f:
|
||||
config = toml.load(f)
|
||||
|
||||
config["default_tenant_id"] = snapshot_config["default_tenant_id"]
|
||||
config["branch_name_mappings"] = snapshot_config["branch_name_mappings"]
|
||||
|
||||
with (self.repo_dir / "config").open("w") as f:
|
||||
toml.dump(config, f)
|
||||
|
||||
return self.env
|
||||
|
||||
def enable_scrub_on_exit(self):
|
||||
"""
|
||||
Call this if you would like the fixture to automatically run
|
||||
|
||||
@@ -1,30 +1,25 @@
|
||||
import copy
|
||||
import os
|
||||
import shutil
|
||||
import subprocess
|
||||
import tempfile
|
||||
from pathlib import Path
|
||||
from typing import Any, List, Optional
|
||||
from typing import List, Optional
|
||||
|
||||
import pytest
|
||||
import toml
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import (
|
||||
NeonCli,
|
||||
NeonEnv,
|
||||
NeonEnvBuilder,
|
||||
PgBin,
|
||||
)
|
||||
from fixtures.pageserver.http import PageserverHttpClient
|
||||
from fixtures.pageserver.utils import (
|
||||
timeline_delete_wait_completed,
|
||||
wait_for_last_record_lsn,
|
||||
wait_for_upload,
|
||||
)
|
||||
from fixtures.pg_version import PgVersion
|
||||
from fixtures.port_distributor import PortDistributor
|
||||
from fixtures.remote_storage import LocalFsStorage, RemoteStorageKind, RemoteStorageUser
|
||||
from fixtures.remote_storage import RemoteStorageKind
|
||||
from fixtures.types import Lsn
|
||||
from pytest import FixtureRequest
|
||||
|
||||
#
|
||||
# A test suite that help to prevent unintentionally breaking backward or forward compatibility between Neon releases.
|
||||
@@ -37,8 +32,8 @@ from pytest import FixtureRequest
|
||||
# If the breakage is intentional, the test can be xfaild with setting ALLOW_FORWARD_COMPATIBILITY_BREAKAGE=true.
|
||||
#
|
||||
# The file contains a couple of helper functions:
|
||||
# - prepare_snapshot copies the snapshot, cleans it up and makes it ready for the current version of Neon (replaces paths and ports in config files).
|
||||
# - check_neon_works performs the test itself, feel free to add more checks there.
|
||||
# - dump_differs compares two SQL dumps and writes the diff to a file.
|
||||
#
|
||||
#
|
||||
# How to run `test_backward_compatibility` locally:
|
||||
@@ -46,6 +41,7 @@ from pytest import FixtureRequest
|
||||
# export DEFAULT_PG_VERSION=15
|
||||
# export BUILD_TYPE=release
|
||||
# export CHECK_ONDISK_DATA_COMPATIBILITY=true
|
||||
# export COMPATIBILITY_SNAPSHOT_DIR=test_output/compatibility_snapshot_pgv${DEFAULT_PG_VERSION}
|
||||
#
|
||||
# # Build previous version of binaries and create a data snapshot:
|
||||
# rm -rf pg_install target
|
||||
@@ -59,8 +55,7 @@ from pytest import FixtureRequest
|
||||
# CARGO_BUILD_FLAGS="--features=testing" make -s -j`nproc`
|
||||
#
|
||||
# # Run backward compatibility test
|
||||
# COMPATIBILITY_SNAPSHOT_DIR=test_output/compatibility_snapshot_pgv${DEFAULT_PG_VERSION} \
|
||||
# ./scripts/pytest -k test_backward_compatibility
|
||||
# ./scripts/pytest -k test_backward_compatibility
|
||||
#
|
||||
#
|
||||
# How to run `test_forward_compatibility` locally:
|
||||
@@ -68,6 +63,8 @@ from pytest import FixtureRequest
|
||||
# export DEFAULT_PG_VERSION=15
|
||||
# export BUILD_TYPE=release
|
||||
# export CHECK_ONDISK_DATA_COMPATIBILITY=true
|
||||
# export COMPATIBILITY_NEON_BIN=neon_previous/target/${BUILD_TYPE}
|
||||
# export COMPATIBILITY_POSTGRES_DISTRIB_DIR=neon_previous/pg_install
|
||||
#
|
||||
# # Build previous version of binaries and store them somewhere:
|
||||
# rm -rf pg_install target
|
||||
@@ -84,9 +81,7 @@ from pytest import FixtureRequest
|
||||
# ./scripts/pytest -k test_create_snapshot
|
||||
#
|
||||
# # Run forward compatibility test
|
||||
# COMPATIBILITY_NEON_BIN=neon_previous/target/${BUILD_TYPE} \
|
||||
# COMPATIBILITY_POSTGRES_DISTRIB_DIR=neon_previous/pg_install \
|
||||
# ./scripts/pytest -k test_forward_compatibility
|
||||
# ./scripts/pytest -k test_forward_compatibility
|
||||
#
|
||||
|
||||
check_ondisk_data_compatibility_if_enabled = pytest.mark.skipif(
|
||||
@@ -155,13 +150,9 @@ def test_create_snapshot(
|
||||
@pytest.mark.xdist_group("compatibility")
|
||||
@pytest.mark.order(after="test_create_snapshot")
|
||||
def test_backward_compatibility(
|
||||
pg_bin: PgBin,
|
||||
port_distributor: PortDistributor,
|
||||
neon_env_builder: NeonEnvBuilder,
|
||||
test_output_dir: Path,
|
||||
neon_binpath: Path,
|
||||
pg_distrib_dir: Path,
|
||||
pg_version: PgVersion,
|
||||
request: FixtureRequest,
|
||||
):
|
||||
"""
|
||||
Test that the new binaries can read old data
|
||||
@@ -177,23 +168,15 @@ def test_backward_compatibility(
|
||||
)
|
||||
|
||||
try:
|
||||
# Copy the snapshot to current directory, and prepare for the test
|
||||
prepare_snapshot(
|
||||
from_dir=compatibility_snapshot_dir,
|
||||
to_dir=test_output_dir / "compatibility_snapshot",
|
||||
port_distributor=port_distributor,
|
||||
)
|
||||
neon_env_builder.num_safekeepers = 3
|
||||
env = neon_env_builder.from_repo_dir(compatibility_snapshot_dir / "repo")
|
||||
neon_env_builder.start()
|
||||
|
||||
check_neon_works(
|
||||
test_output_dir / "compatibility_snapshot" / "repo",
|
||||
neon_binpath,
|
||||
neon_binpath,
|
||||
pg_distrib_dir,
|
||||
pg_version,
|
||||
port_distributor,
|
||||
test_output_dir,
|
||||
pg_bin,
|
||||
request,
|
||||
env,
|
||||
test_output_dir=test_output_dir,
|
||||
sql_dump_path=compatibility_snapshot_dir / "dump.sql",
|
||||
repo_dir=env.repo_dir,
|
||||
)
|
||||
except Exception:
|
||||
if breaking_changes_allowed:
|
||||
@@ -212,12 +195,10 @@ def test_backward_compatibility(
|
||||
@pytest.mark.xdist_group("compatibility")
|
||||
@pytest.mark.order(after="test_create_snapshot")
|
||||
def test_forward_compatibility(
|
||||
neon_env_builder: NeonEnvBuilder,
|
||||
test_output_dir: Path,
|
||||
top_output_dir: Path,
|
||||
port_distributor: PortDistributor,
|
||||
pg_version: PgVersion,
|
||||
request: FixtureRequest,
|
||||
neon_binpath: Path,
|
||||
):
|
||||
"""
|
||||
Test that the old binaries can read new data
|
||||
@@ -244,24 +225,19 @@ def test_forward_compatibility(
|
||||
)
|
||||
|
||||
try:
|
||||
# Copy the snapshot to current directory, and prepare for the test
|
||||
prepare_snapshot(
|
||||
from_dir=compatibility_snapshot_dir,
|
||||
to_dir=test_output_dir / "compatibility_snapshot",
|
||||
port_distributor=port_distributor,
|
||||
neon_env_builder.num_safekeepers = 3
|
||||
env = neon_env_builder.from_repo_dir(
|
||||
compatibility_snapshot_dir / "repo",
|
||||
neon_binpath=compatibility_neon_bin,
|
||||
pg_distrib_dir=compatibility_postgres_distrib_dir,
|
||||
)
|
||||
neon_env_builder.start()
|
||||
|
||||
check_neon_works(
|
||||
test_output_dir / "compatibility_snapshot" / "repo",
|
||||
compatibility_neon_bin,
|
||||
neon_binpath,
|
||||
compatibility_postgres_distrib_dir,
|
||||
pg_version,
|
||||
port_distributor,
|
||||
test_output_dir,
|
||||
PgBin(test_output_dir, compatibility_postgres_distrib_dir, pg_version),
|
||||
request,
|
||||
env,
|
||||
test_output_dir=test_output_dir,
|
||||
sql_dump_path=compatibility_snapshot_dir / "dump.sql",
|
||||
repo_dir=env.repo_dir,
|
||||
)
|
||||
except Exception:
|
||||
if breaking_changes_allowed:
|
||||
@@ -276,189 +252,26 @@ def test_forward_compatibility(
|
||||
), "Breaking changes are allowed by ALLOW_FORWARD_COMPATIBILITY_BREAKAGE, but the test has passed without any breakage"
|
||||
|
||||
|
||||
def prepare_snapshot(
|
||||
from_dir: Path,
|
||||
to_dir: Path,
|
||||
port_distributor: PortDistributor,
|
||||
pg_distrib_dir: Optional[Path] = None,
|
||||
):
|
||||
assert from_dir.exists(), f"Snapshot '{from_dir}' doesn't exist"
|
||||
assert (from_dir / "repo").exists(), f"Snapshot '{from_dir}' doesn't contain a repo directory"
|
||||
assert (from_dir / "dump.sql").exists(), f"Snapshot '{from_dir}' doesn't contain a dump.sql"
|
||||
def check_neon_works(env: NeonEnv, test_output_dir: Path, sql_dump_path: Path, repo_dir: Path):
|
||||
ep = env.endpoints.create_start("main")
|
||||
pg_bin = PgBin(test_output_dir, env.pg_distrib_dir, env.pg_version)
|
||||
|
||||
log.info(f"Copying snapshot from {from_dir} to {to_dir}")
|
||||
shutil.copytree(from_dir, to_dir)
|
||||
|
||||
repo_dir = to_dir / "repo"
|
||||
|
||||
snapshot_config_toml = repo_dir / "config"
|
||||
snapshot_config = toml.load(snapshot_config_toml)
|
||||
|
||||
# Remove old logs to avoid confusion in test artifacts
|
||||
for logfile in repo_dir.glob("**/*.log"):
|
||||
logfile.unlink()
|
||||
|
||||
# Remove old computes in 'endpoints'. Old versions of the control plane used a directory
|
||||
# called "pgdatadirs". Delete it, too.
|
||||
if (repo_dir / "endpoints").exists():
|
||||
shutil.rmtree(repo_dir / "endpoints")
|
||||
if (repo_dir / "pgdatadirs").exists():
|
||||
shutil.rmtree(repo_dir / "pgdatadirs")
|
||||
os.mkdir(repo_dir / "endpoints")
|
||||
|
||||
# Update paths and ports in config files
|
||||
legacy_pageserver_toml = repo_dir / "pageserver.toml"
|
||||
legacy_bundle = os.path.exists(legacy_pageserver_toml)
|
||||
|
||||
path_to_config: dict[Path, dict[Any, Any]] = {}
|
||||
if legacy_bundle:
|
||||
os.mkdir(repo_dir / "pageserver_1")
|
||||
path_to_config[repo_dir / "pageserver_1" / "pageserver.toml"] = toml.load(
|
||||
legacy_pageserver_toml
|
||||
)
|
||||
os.remove(legacy_pageserver_toml)
|
||||
os.rename(repo_dir / "tenants", repo_dir / "pageserver_1" / "tenants")
|
||||
else:
|
||||
for ps_conf in snapshot_config["pageservers"]:
|
||||
config_path = repo_dir / f"pageserver_{ps_conf['id']}" / "pageserver.toml"
|
||||
path_to_config[config_path] = toml.load(config_path)
|
||||
|
||||
# For each pageserver config, edit it and rewrite
|
||||
for config_path, pageserver_config in path_to_config.items():
|
||||
pageserver_config["remote_storage"]["local_path"] = str(
|
||||
LocalFsStorage.component_path(repo_dir, RemoteStorageUser.PAGESERVER)
|
||||
)
|
||||
|
||||
for param in ("listen_http_addr", "listen_pg_addr", "broker_endpoint"):
|
||||
pageserver_config[param] = port_distributor.replace_with_new_port(
|
||||
pageserver_config[param]
|
||||
)
|
||||
|
||||
# We don't use authentication in compatibility tests
|
||||
# so just remove authentication related settings.
|
||||
pageserver_config.pop("pg_auth_type", None)
|
||||
pageserver_config.pop("http_auth_type", None)
|
||||
|
||||
if pg_distrib_dir:
|
||||
pageserver_config["pg_distrib_dir"] = str(pg_distrib_dir)
|
||||
|
||||
with config_path.open("w") as f:
|
||||
toml.dump(pageserver_config, f)
|
||||
|
||||
# neon_local config doesn't have to be backward compatible. If we're using a dump from before
|
||||
# it supported multiple pageservers, fix it up.
|
||||
if "pageservers" not in snapshot_config:
|
||||
snapshot_config["pageservers"] = [snapshot_config["pageserver"]]
|
||||
del snapshot_config["pageserver"]
|
||||
|
||||
for param in ("listen_http_addr", "listen_pg_addr"):
|
||||
for pageserver in snapshot_config["pageservers"]:
|
||||
pageserver[param] = port_distributor.replace_with_new_port(pageserver[param])
|
||||
snapshot_config["broker"]["listen_addr"] = port_distributor.replace_with_new_port(
|
||||
snapshot_config["broker"]["listen_addr"]
|
||||
)
|
||||
for sk in snapshot_config["safekeepers"]:
|
||||
for param in ("http_port", "pg_port", "pg_tenant_only_port"):
|
||||
sk[param] = port_distributor.replace_with_new_port(sk[param])
|
||||
|
||||
if pg_distrib_dir:
|
||||
snapshot_config["pg_distrib_dir"] = str(pg_distrib_dir)
|
||||
|
||||
with snapshot_config_toml.open("w") as f:
|
||||
toml.dump(snapshot_config, f)
|
||||
|
||||
# Ensure that snapshot doesn't contain references to the original path
|
||||
rv = subprocess.run(
|
||||
[
|
||||
"grep",
|
||||
"--recursive",
|
||||
"--binary-file=without-match",
|
||||
"--files-with-matches",
|
||||
"test_create_snapshot/repo",
|
||||
str(repo_dir),
|
||||
],
|
||||
capture_output=True,
|
||||
text=True,
|
||||
)
|
||||
assert (
|
||||
rv.returncode != 0
|
||||
), f"there're files referencing `test_create_snapshot/repo`, this path should be replaced with {repo_dir}:\n{rv.stdout}"
|
||||
|
||||
|
||||
def check_neon_works(
|
||||
repo_dir: Path,
|
||||
neon_target_binpath: Path,
|
||||
neon_current_binpath: Path,
|
||||
pg_distrib_dir: Path,
|
||||
pg_version: PgVersion,
|
||||
port_distributor: PortDistributor,
|
||||
test_output_dir: Path,
|
||||
pg_bin: PgBin,
|
||||
request: FixtureRequest,
|
||||
):
|
||||
snapshot_config_toml = repo_dir / "config"
|
||||
snapshot_config = toml.load(snapshot_config_toml)
|
||||
snapshot_config["neon_distrib_dir"] = str(neon_target_binpath)
|
||||
snapshot_config["postgres_distrib_dir"] = str(pg_distrib_dir)
|
||||
with (snapshot_config_toml).open("w") as f:
|
||||
toml.dump(snapshot_config, f)
|
||||
|
||||
# TODO: replace with NeonEnvBuilder / NeonEnv
|
||||
config: Any = type("NeonEnvStub", (object,), {})
|
||||
config.rust_log_override = None
|
||||
config.repo_dir = repo_dir
|
||||
config.pg_version = pg_version
|
||||
config.initial_tenant = snapshot_config["default_tenant_id"]
|
||||
config.pg_distrib_dir = pg_distrib_dir
|
||||
config.remote_storage = None
|
||||
config.safekeepers_remote_storage = None
|
||||
|
||||
# Use the "target" binaries to launch the storage nodes
|
||||
config_target = config
|
||||
config_target.neon_binpath = neon_target_binpath
|
||||
# We are using maybe-old binaries for neon services, but want to use current
|
||||
# binaries for test utilities like neon_local
|
||||
config_target.neon_local_binpath = neon_current_binpath
|
||||
cli_target = NeonCli(config_target)
|
||||
|
||||
# And the current binaries to launch computes
|
||||
snapshot_config["neon_distrib_dir"] = str(neon_current_binpath)
|
||||
with (snapshot_config_toml).open("w") as f:
|
||||
toml.dump(snapshot_config, f)
|
||||
config_current = copy.copy(config)
|
||||
config_current.neon_binpath = neon_current_binpath
|
||||
cli_current = NeonCli(config_current)
|
||||
|
||||
cli_target.raw_cli(["start"])
|
||||
request.addfinalizer(lambda: cli_target.raw_cli(["stop"]))
|
||||
|
||||
pg_port = port_distributor.get_port()
|
||||
http_port = port_distributor.get_port()
|
||||
cli_current.endpoint_create(
|
||||
branch_name="main", pg_port=pg_port, http_port=http_port, endpoint_id="ep-main"
|
||||
)
|
||||
cli_current.endpoint_start("ep-main")
|
||||
request.addfinalizer(lambda: cli_current.endpoint_stop("ep-main"))
|
||||
|
||||
connstr = f"host=127.0.0.1 port={pg_port} user=cloud_admin dbname=postgres"
|
||||
connstr = ep.connstr()
|
||||
pg_bin.run_capture(
|
||||
["pg_dumpall", f"--dbname={connstr}", f"--file={test_output_dir / 'dump.sql'}"]
|
||||
)
|
||||
initial_dump_differs = dump_differs(
|
||||
repo_dir.parent / "dump.sql",
|
||||
sql_dump_path,
|
||||
test_output_dir / "dump.sql",
|
||||
test_output_dir / "dump.filediff",
|
||||
)
|
||||
|
||||
# Check that project can be recovered from WAL
|
||||
# loosely based on https://www.notion.so/neondatabase/Storage-Recovery-from-WAL-d92c0aac0ebf40df892b938045d7d720
|
||||
tenant_id = snapshot_config["default_tenant_id"]
|
||||
timeline_id = dict(snapshot_config["branch_name_mappings"]["main"])[tenant_id]
|
||||
pageserver_port = snapshot_config["pageservers"][0]["listen_http_addr"].split(":")[-1]
|
||||
pageserver_http = PageserverHttpClient(
|
||||
port=pageserver_port,
|
||||
is_testing_enabled_or_skip=lambda: True, # TODO: check if testing really enabled
|
||||
)
|
||||
pageserver_http = env.pageserver.http_client()
|
||||
tenant_id = env.initial_tenant
|
||||
timeline_id = env.initial_timeline
|
||||
pg_version = env.pg_version
|
||||
|
||||
shutil.rmtree(repo_dir / "local_fs_remote_storage")
|
||||
timeline_delete_wait_completed(pageserver_http, tenant_id, timeline_id)
|
||||
@@ -494,6 +307,11 @@ def dump_differs(
|
||||
Returns True if the dumps differ and produced diff is not allowed, False otherwise (in most cases we want it to return False).
|
||||
"""
|
||||
|
||||
if not first.exists():
|
||||
raise FileNotFoundError(f"{first} doesn't exist")
|
||||
if not second.exists():
|
||||
raise FileNotFoundError(f"{second} doesn't exist")
|
||||
|
||||
with output.open("w") as stdout:
|
||||
res = subprocess.run(
|
||||
[
|
||||
|
||||
Reference in New Issue
Block a user