diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index fb6cea5713..4b23650960 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -507,6 +507,66 @@ class NeonEnvBuilder: return env + def from_repo_dir( + self, + repo_dir: Path, + neon_binpath: Optional[Path] = None, + pg_distrib_dir: Optional[Path] = None, + ) -> NeonEnv: + """ + A simple method to import data into the current NeonEnvBuilder from a snapshot of a repo dir. + """ + + # Setting custom `neon_binpath` and `pg_distrib_dir` is useful for compatibility tests + self.neon_binpath = neon_binpath or self.neon_binpath + self.pg_distrib_dir = pg_distrib_dir or self.pg_distrib_dir + + # Get the initial tenant and timeline from the snapshot config + snapshot_config_toml = repo_dir / "config" + with snapshot_config_toml.open("r") as f: + snapshot_config = toml.load(f) + + self.initial_tenant = TenantId(snapshot_config["default_tenant_id"]) + self.initial_timeline = TimelineId( + dict(snapshot_config["branch_name_mappings"][DEFAULT_BRANCH_NAME])[ + str(self.initial_tenant) + ] + ) + self.env = self.init_configs() + + for ps_dir in repo_dir.glob("pageserver_*"): + tenants_from_dir = ps_dir / "tenants" + tenants_to_dir = self.repo_dir / ps_dir.name / "tenants" + + log.info(f"Copying pageserver tenants directory {tenants_from_dir} to {tenants_to_dir}") + shutil.copytree(tenants_from_dir, tenants_to_dir) + + for sk_from_dir in (repo_dir / "safekeepers").glob("sk*"): + sk_to_dir = self.repo_dir / "safekeepers" / sk_from_dir.name + log.info(f"Copying safekeeper directory {sk_from_dir} to {sk_to_dir}") + sk_to_dir.rmdir() + shutil.copytree(sk_from_dir, sk_to_dir, ignore=shutil.ignore_patterns("*.log", "*.pid")) + + shutil.rmtree(self.repo_dir / "local_fs_remote_storage", ignore_errors=True) + shutil.copytree( + repo_dir / "local_fs_remote_storage", self.repo_dir / "local_fs_remote_storage" + ) + + if (attachments_json := Path(repo_dir / "attachments.json")).exists(): + shutil.copyfile(attachments_json, self.repo_dir / attachments_json.name) + + # Update the config with info about tenants and timelines + with (self.repo_dir / "config").open("r") as f: + config = toml.load(f) + + config["default_tenant_id"] = snapshot_config["default_tenant_id"] + config["branch_name_mappings"] = snapshot_config["branch_name_mappings"] + + with (self.repo_dir / "config").open("w") as f: + toml.dump(config, f) + + return self.env + def enable_scrub_on_exit(self): """ Call this if you would like the fixture to automatically run diff --git a/test_runner/regress/test_compatibility.py b/test_runner/regress/test_compatibility.py index 35963c0d41..3f5de100fd 100644 --- a/test_runner/regress/test_compatibility.py +++ b/test_runner/regress/test_compatibility.py @@ -1,30 +1,25 @@ -import copy import os import shutil import subprocess import tempfile from pathlib import Path -from typing import Any, List, Optional +from typing import List, Optional import pytest import toml -from fixtures.log_helper import log from fixtures.neon_fixtures import ( - NeonCli, + NeonEnv, NeonEnvBuilder, PgBin, ) -from fixtures.pageserver.http import PageserverHttpClient from fixtures.pageserver.utils import ( timeline_delete_wait_completed, wait_for_last_record_lsn, wait_for_upload, ) from fixtures.pg_version import PgVersion -from fixtures.port_distributor import PortDistributor -from fixtures.remote_storage import LocalFsStorage, RemoteStorageKind, RemoteStorageUser +from fixtures.remote_storage import RemoteStorageKind from fixtures.types import Lsn -from pytest import FixtureRequest # # A test suite that help to prevent unintentionally breaking backward or forward compatibility between Neon releases. @@ -37,8 +32,8 @@ from pytest import FixtureRequest # If the breakage is intentional, the test can be xfaild with setting ALLOW_FORWARD_COMPATIBILITY_BREAKAGE=true. # # The file contains a couple of helper functions: -# - prepare_snapshot copies the snapshot, cleans it up and makes it ready for the current version of Neon (replaces paths and ports in config files). # - check_neon_works performs the test itself, feel free to add more checks there. +# - dump_differs compares two SQL dumps and writes the diff to a file. # # # How to run `test_backward_compatibility` locally: @@ -46,6 +41,7 @@ from pytest import FixtureRequest # export DEFAULT_PG_VERSION=15 # export BUILD_TYPE=release # export CHECK_ONDISK_DATA_COMPATIBILITY=true +# export COMPATIBILITY_SNAPSHOT_DIR=test_output/compatibility_snapshot_pgv${DEFAULT_PG_VERSION} # # # Build previous version of binaries and create a data snapshot: # rm -rf pg_install target @@ -59,8 +55,7 @@ from pytest import FixtureRequest # CARGO_BUILD_FLAGS="--features=testing" make -s -j`nproc` # # # Run backward compatibility test -# COMPATIBILITY_SNAPSHOT_DIR=test_output/compatibility_snapshot_pgv${DEFAULT_PG_VERSION} \ -# ./scripts/pytest -k test_backward_compatibility +# ./scripts/pytest -k test_backward_compatibility # # # How to run `test_forward_compatibility` locally: @@ -68,6 +63,8 @@ from pytest import FixtureRequest # export DEFAULT_PG_VERSION=15 # export BUILD_TYPE=release # export CHECK_ONDISK_DATA_COMPATIBILITY=true +# export COMPATIBILITY_NEON_BIN=neon_previous/target/${BUILD_TYPE} +# export COMPATIBILITY_POSTGRES_DISTRIB_DIR=neon_previous/pg_install # # # Build previous version of binaries and store them somewhere: # rm -rf pg_install target @@ -84,9 +81,7 @@ from pytest import FixtureRequest # ./scripts/pytest -k test_create_snapshot # # # Run forward compatibility test -# COMPATIBILITY_NEON_BIN=neon_previous/target/${BUILD_TYPE} \ -# COMPATIBILITY_POSTGRES_DISTRIB_DIR=neon_previous/pg_install \ -# ./scripts/pytest -k test_forward_compatibility +# ./scripts/pytest -k test_forward_compatibility # check_ondisk_data_compatibility_if_enabled = pytest.mark.skipif( @@ -155,13 +150,9 @@ def test_create_snapshot( @pytest.mark.xdist_group("compatibility") @pytest.mark.order(after="test_create_snapshot") def test_backward_compatibility( - pg_bin: PgBin, - port_distributor: PortDistributor, + neon_env_builder: NeonEnvBuilder, test_output_dir: Path, - neon_binpath: Path, - pg_distrib_dir: Path, pg_version: PgVersion, - request: FixtureRequest, ): """ Test that the new binaries can read old data @@ -177,23 +168,15 @@ def test_backward_compatibility( ) try: - # Copy the snapshot to current directory, and prepare for the test - prepare_snapshot( - from_dir=compatibility_snapshot_dir, - to_dir=test_output_dir / "compatibility_snapshot", - port_distributor=port_distributor, - ) + neon_env_builder.num_safekeepers = 3 + env = neon_env_builder.from_repo_dir(compatibility_snapshot_dir / "repo") + neon_env_builder.start() check_neon_works( - test_output_dir / "compatibility_snapshot" / "repo", - neon_binpath, - neon_binpath, - pg_distrib_dir, - pg_version, - port_distributor, - test_output_dir, - pg_bin, - request, + env, + test_output_dir=test_output_dir, + sql_dump_path=compatibility_snapshot_dir / "dump.sql", + repo_dir=env.repo_dir, ) except Exception: if breaking_changes_allowed: @@ -212,12 +195,10 @@ def test_backward_compatibility( @pytest.mark.xdist_group("compatibility") @pytest.mark.order(after="test_create_snapshot") def test_forward_compatibility( + neon_env_builder: NeonEnvBuilder, test_output_dir: Path, top_output_dir: Path, - port_distributor: PortDistributor, pg_version: PgVersion, - request: FixtureRequest, - neon_binpath: Path, ): """ Test that the old binaries can read new data @@ -244,24 +225,19 @@ def test_forward_compatibility( ) try: - # Copy the snapshot to current directory, and prepare for the test - prepare_snapshot( - from_dir=compatibility_snapshot_dir, - to_dir=test_output_dir / "compatibility_snapshot", - port_distributor=port_distributor, + neon_env_builder.num_safekeepers = 3 + env = neon_env_builder.from_repo_dir( + compatibility_snapshot_dir / "repo", + neon_binpath=compatibility_neon_bin, pg_distrib_dir=compatibility_postgres_distrib_dir, ) + neon_env_builder.start() check_neon_works( - test_output_dir / "compatibility_snapshot" / "repo", - compatibility_neon_bin, - neon_binpath, - compatibility_postgres_distrib_dir, - pg_version, - port_distributor, - test_output_dir, - PgBin(test_output_dir, compatibility_postgres_distrib_dir, pg_version), - request, + env, + test_output_dir=test_output_dir, + sql_dump_path=compatibility_snapshot_dir / "dump.sql", + repo_dir=env.repo_dir, ) except Exception: if breaking_changes_allowed: @@ -276,189 +252,26 @@ def test_forward_compatibility( ), "Breaking changes are allowed by ALLOW_FORWARD_COMPATIBILITY_BREAKAGE, but the test has passed without any breakage" -def prepare_snapshot( - from_dir: Path, - to_dir: Path, - port_distributor: PortDistributor, - pg_distrib_dir: Optional[Path] = None, -): - assert from_dir.exists(), f"Snapshot '{from_dir}' doesn't exist" - assert (from_dir / "repo").exists(), f"Snapshot '{from_dir}' doesn't contain a repo directory" - assert (from_dir / "dump.sql").exists(), f"Snapshot '{from_dir}' doesn't contain a dump.sql" +def check_neon_works(env: NeonEnv, test_output_dir: Path, sql_dump_path: Path, repo_dir: Path): + ep = env.endpoints.create_start("main") + pg_bin = PgBin(test_output_dir, env.pg_distrib_dir, env.pg_version) - log.info(f"Copying snapshot from {from_dir} to {to_dir}") - shutil.copytree(from_dir, to_dir) - - repo_dir = to_dir / "repo" - - snapshot_config_toml = repo_dir / "config" - snapshot_config = toml.load(snapshot_config_toml) - - # Remove old logs to avoid confusion in test artifacts - for logfile in repo_dir.glob("**/*.log"): - logfile.unlink() - - # Remove old computes in 'endpoints'. Old versions of the control plane used a directory - # called "pgdatadirs". Delete it, too. - if (repo_dir / "endpoints").exists(): - shutil.rmtree(repo_dir / "endpoints") - if (repo_dir / "pgdatadirs").exists(): - shutil.rmtree(repo_dir / "pgdatadirs") - os.mkdir(repo_dir / "endpoints") - - # Update paths and ports in config files - legacy_pageserver_toml = repo_dir / "pageserver.toml" - legacy_bundle = os.path.exists(legacy_pageserver_toml) - - path_to_config: dict[Path, dict[Any, Any]] = {} - if legacy_bundle: - os.mkdir(repo_dir / "pageserver_1") - path_to_config[repo_dir / "pageserver_1" / "pageserver.toml"] = toml.load( - legacy_pageserver_toml - ) - os.remove(legacy_pageserver_toml) - os.rename(repo_dir / "tenants", repo_dir / "pageserver_1" / "tenants") - else: - for ps_conf in snapshot_config["pageservers"]: - config_path = repo_dir / f"pageserver_{ps_conf['id']}" / "pageserver.toml" - path_to_config[config_path] = toml.load(config_path) - - # For each pageserver config, edit it and rewrite - for config_path, pageserver_config in path_to_config.items(): - pageserver_config["remote_storage"]["local_path"] = str( - LocalFsStorage.component_path(repo_dir, RemoteStorageUser.PAGESERVER) - ) - - for param in ("listen_http_addr", "listen_pg_addr", "broker_endpoint"): - pageserver_config[param] = port_distributor.replace_with_new_port( - pageserver_config[param] - ) - - # We don't use authentication in compatibility tests - # so just remove authentication related settings. - pageserver_config.pop("pg_auth_type", None) - pageserver_config.pop("http_auth_type", None) - - if pg_distrib_dir: - pageserver_config["pg_distrib_dir"] = str(pg_distrib_dir) - - with config_path.open("w") as f: - toml.dump(pageserver_config, f) - - # neon_local config doesn't have to be backward compatible. If we're using a dump from before - # it supported multiple pageservers, fix it up. - if "pageservers" not in snapshot_config: - snapshot_config["pageservers"] = [snapshot_config["pageserver"]] - del snapshot_config["pageserver"] - - for param in ("listen_http_addr", "listen_pg_addr"): - for pageserver in snapshot_config["pageservers"]: - pageserver[param] = port_distributor.replace_with_new_port(pageserver[param]) - snapshot_config["broker"]["listen_addr"] = port_distributor.replace_with_new_port( - snapshot_config["broker"]["listen_addr"] - ) - for sk in snapshot_config["safekeepers"]: - for param in ("http_port", "pg_port", "pg_tenant_only_port"): - sk[param] = port_distributor.replace_with_new_port(sk[param]) - - if pg_distrib_dir: - snapshot_config["pg_distrib_dir"] = str(pg_distrib_dir) - - with snapshot_config_toml.open("w") as f: - toml.dump(snapshot_config, f) - - # Ensure that snapshot doesn't contain references to the original path - rv = subprocess.run( - [ - "grep", - "--recursive", - "--binary-file=without-match", - "--files-with-matches", - "test_create_snapshot/repo", - str(repo_dir), - ], - capture_output=True, - text=True, - ) - assert ( - rv.returncode != 0 - ), f"there're files referencing `test_create_snapshot/repo`, this path should be replaced with {repo_dir}:\n{rv.stdout}" - - -def check_neon_works( - repo_dir: Path, - neon_target_binpath: Path, - neon_current_binpath: Path, - pg_distrib_dir: Path, - pg_version: PgVersion, - port_distributor: PortDistributor, - test_output_dir: Path, - pg_bin: PgBin, - request: FixtureRequest, -): - snapshot_config_toml = repo_dir / "config" - snapshot_config = toml.load(snapshot_config_toml) - snapshot_config["neon_distrib_dir"] = str(neon_target_binpath) - snapshot_config["postgres_distrib_dir"] = str(pg_distrib_dir) - with (snapshot_config_toml).open("w") as f: - toml.dump(snapshot_config, f) - - # TODO: replace with NeonEnvBuilder / NeonEnv - config: Any = type("NeonEnvStub", (object,), {}) - config.rust_log_override = None - config.repo_dir = repo_dir - config.pg_version = pg_version - config.initial_tenant = snapshot_config["default_tenant_id"] - config.pg_distrib_dir = pg_distrib_dir - config.remote_storage = None - config.safekeepers_remote_storage = None - - # Use the "target" binaries to launch the storage nodes - config_target = config - config_target.neon_binpath = neon_target_binpath - # We are using maybe-old binaries for neon services, but want to use current - # binaries for test utilities like neon_local - config_target.neon_local_binpath = neon_current_binpath - cli_target = NeonCli(config_target) - - # And the current binaries to launch computes - snapshot_config["neon_distrib_dir"] = str(neon_current_binpath) - with (snapshot_config_toml).open("w") as f: - toml.dump(snapshot_config, f) - config_current = copy.copy(config) - config_current.neon_binpath = neon_current_binpath - cli_current = NeonCli(config_current) - - cli_target.raw_cli(["start"]) - request.addfinalizer(lambda: cli_target.raw_cli(["stop"])) - - pg_port = port_distributor.get_port() - http_port = port_distributor.get_port() - cli_current.endpoint_create( - branch_name="main", pg_port=pg_port, http_port=http_port, endpoint_id="ep-main" - ) - cli_current.endpoint_start("ep-main") - request.addfinalizer(lambda: cli_current.endpoint_stop("ep-main")) - - connstr = f"host=127.0.0.1 port={pg_port} user=cloud_admin dbname=postgres" + connstr = ep.connstr() pg_bin.run_capture( ["pg_dumpall", f"--dbname={connstr}", f"--file={test_output_dir / 'dump.sql'}"] ) initial_dump_differs = dump_differs( - repo_dir.parent / "dump.sql", + sql_dump_path, test_output_dir / "dump.sql", test_output_dir / "dump.filediff", ) # Check that project can be recovered from WAL # loosely based on https://www.notion.so/neondatabase/Storage-Recovery-from-WAL-d92c0aac0ebf40df892b938045d7d720 - tenant_id = snapshot_config["default_tenant_id"] - timeline_id = dict(snapshot_config["branch_name_mappings"]["main"])[tenant_id] - pageserver_port = snapshot_config["pageservers"][0]["listen_http_addr"].split(":")[-1] - pageserver_http = PageserverHttpClient( - port=pageserver_port, - is_testing_enabled_or_skip=lambda: True, # TODO: check if testing really enabled - ) + pageserver_http = env.pageserver.http_client() + tenant_id = env.initial_tenant + timeline_id = env.initial_timeline + pg_version = env.pg_version shutil.rmtree(repo_dir / "local_fs_remote_storage") timeline_delete_wait_completed(pageserver_http, tenant_id, timeline_id) @@ -494,6 +307,11 @@ def dump_differs( Returns True if the dumps differ and produced diff is not allowed, False otherwise (in most cases we want it to return False). """ + if not first.exists(): + raise FileNotFoundError(f"{first} doesn't exist") + if not second.exists(): + raise FileNotFoundError(f"{second} doesn't exist") + with output.open("w") as stdout: res = subprocess.run( [