import copy import os import shutil import subprocess import tempfile from pathlib import Path from typing import Any, List, Optional import pytest import toml # TODO: replace with tomllib for Python >= 3.11 from fixtures.log_helper import log from fixtures.neon_fixtures import ( NeonCli, NeonEnvBuilder, PgBin, PortDistributor, ) from fixtures.pageserver.http import PageserverHttpClient from fixtures.pageserver.utils import ( timeline_delete_wait_completed, wait_for_last_record_lsn, wait_for_upload, ) from fixtures.pg_version import PgVersion from fixtures.types import Lsn from pytest import FixtureRequest # # A test suite that help to prevent unintentionally breaking backward or forward compatibility between Neon releases. # - `test_create_snapshot` a script wrapped in a test that creates a data snapshot. # - `test_backward_compatibility` checks that the current version of Neon can start/read/interract with a data snapshot created by the previous version. # The path to the snapshot is configured by COMPATIBILITY_SNAPSHOT_DIR environment variable. # If the breakage is intentional, the test can be xfaild with setting ALLOW_BACKWARD_COMPATIBILITY_BREAKAGE=true. # - `test_forward_compatibility` checks that a snapshot created by the current version can be started/read/interracted by the previous version of Neon. # Paths to Neon and Postgres are configured by COMPATIBILITY_NEON_BIN and COMPATIBILITY_POSTGRES_DISTRIB_DIR environment variables. # If the breakage is intentional, the test can be xfaild with setting ALLOW_FORWARD_COMPATIBILITY_BREAKAGE=true. # # The file contains a couple of helper functions: # - prepare_snapshot copies the snapshot, cleans it up and makes it ready for the current version of Neon (replaces paths and ports in config files). # - check_neon_works performs the test itself, feel free to add more checks there. # check_ondisk_data_compatibility_if_enabled = pytest.mark.skipif( os.environ.get("CHECK_ONDISK_DATA_COMPATIBILITY") is None, reason="CHECK_ONDISK_DATA_COMPATIBILITY env is not set", ) @pytest.mark.xdist_group("compatibility") @pytest.mark.order(before="test_forward_compatibility") def test_create_snapshot( neon_env_builder: NeonEnvBuilder, pg_bin: PgBin, top_output_dir: Path, test_output_dir: Path, pg_version: PgVersion, ): # The test doesn't really test anything # it creates a new snapshot for releases after we tested the current version against the previous snapshot in `test_backward_compatibility`. # # There's no cleanup here, it allows to adjust the data in `test_backward_compatibility` itself without re-collecting it. neon_env_builder.pg_version = pg_version neon_env_builder.num_safekeepers = 3 neon_env_builder.enable_local_fs_remote_storage() env = neon_env_builder.init_start() endpoint = env.endpoints.create_start("main") # FIXME: Is this expected? env.pageserver.allowed_errors.append( ".*init_tenant_mgr: marking .* as locally complete, while it doesnt exist in remote index.*" ) pg_bin.run_capture(["pgbench", "--initialize", "--scale=10", endpoint.connstr()]) pg_bin.run_capture(["pgbench", "--time=60", "--progress=2", endpoint.connstr()]) pg_bin.run_capture( ["pg_dumpall", f"--dbname={endpoint.connstr()}", f"--file={test_output_dir / 'dump.sql'}"] ) snapshot_config = toml.load(test_output_dir / "repo" / "config") tenant_id = snapshot_config["default_tenant_id"] timeline_id = dict(snapshot_config["branch_name_mappings"]["main"])[tenant_id] pageserver_http = env.pageserver.http_client() lsn = Lsn(endpoint.safe_psql("SELECT pg_current_wal_flush_lsn()")[0][0]) wait_for_last_record_lsn(pageserver_http, tenant_id, timeline_id, lsn) pageserver_http.timeline_checkpoint(tenant_id, timeline_id) wait_for_upload(pageserver_http, tenant_id, timeline_id, lsn) env.endpoints.stop_all() for sk in env.safekeepers: sk.stop() env.pageserver.stop() # Directory `compatibility_snapshot_dir` is uploaded to S3 in a workflow, keep the name in sync with it compatibility_snapshot_dir = ( top_output_dir / f"compatibility_snapshot_pg{pg_version.v_prefixed}" ) if compatibility_snapshot_dir.exists(): shutil.rmtree(compatibility_snapshot_dir) shutil.copytree(test_output_dir, compatibility_snapshot_dir) @check_ondisk_data_compatibility_if_enabled @pytest.mark.xdist_group("compatibility") @pytest.mark.order(after="test_create_snapshot") def test_backward_compatibility( pg_bin: PgBin, port_distributor: PortDistributor, test_output_dir: Path, neon_binpath: Path, pg_distrib_dir: Path, pg_version: PgVersion, request: FixtureRequest, ): """ Test that the new binaries can read old data """ compatibility_snapshot_dir_env = os.environ.get("COMPATIBILITY_SNAPSHOT_DIR") assert ( compatibility_snapshot_dir_env is not None ), f"COMPATIBILITY_SNAPSHOT_DIR is not set. It should be set to `compatibility_snapshot_pg{pg_version.v_prefixed}` path generateted by test_create_snapshot (ideally generated by the previous version of Neon)" compatibility_snapshot_dir = Path(compatibility_snapshot_dir_env).resolve() breaking_changes_allowed = ( os.environ.get("ALLOW_BACKWARD_COMPATIBILITY_BREAKAGE", "false").lower() == "true" ) try: # Copy the snapshot to current directory, and prepare for the test prepare_snapshot( from_dir=compatibility_snapshot_dir, to_dir=test_output_dir / "compatibility_snapshot", neon_binpath=neon_binpath, port_distributor=port_distributor, ) check_neon_works( test_output_dir / "compatibility_snapshot" / "repo", neon_binpath, neon_binpath, pg_distrib_dir, pg_version, port_distributor, test_output_dir, pg_bin, request, ) except Exception: if breaking_changes_allowed: pytest.xfail( "Breaking changes are allowed by ALLOW_BACKWARD_COMPATIBILITY_BREAKAGE env var" ) else: raise assert ( not breaking_changes_allowed ), "Breaking changes are allowed by ALLOW_BACKWARD_COMPATIBILITY_BREAKAGE, but the test has passed without any breakage" @check_ondisk_data_compatibility_if_enabled @pytest.mark.xdist_group("compatibility") @pytest.mark.order(after="test_create_snapshot") def test_forward_compatibility( test_output_dir: Path, top_output_dir: Path, port_distributor: PortDistributor, pg_version: PgVersion, request: FixtureRequest, neon_binpath: Path, ): """ Test that the old binaries can read new data """ compatibility_neon_bin_env = os.environ.get("COMPATIBILITY_NEON_BIN") assert compatibility_neon_bin_env is not None, ( "COMPATIBILITY_NEON_BIN is not set. It should be set to a path with Neon binaries " "(ideally generated by the previous version of Neon)" ) compatibility_neon_bin = Path(compatibility_neon_bin_env).resolve() compatibility_postgres_distrib_dir_env = os.environ.get("COMPATIBILITY_POSTGRES_DISTRIB_DIR") assert ( compatibility_postgres_distrib_dir_env is not None ), "COMPATIBILITY_POSTGRES_DISTRIB_DIR is not set. It should be set to a pg_install directrory (ideally generated by the previous version of Neon)" compatibility_postgres_distrib_dir = Path(compatibility_postgres_distrib_dir_env).resolve() compatibility_snapshot_dir = ( top_output_dir / f"compatibility_snapshot_pg{pg_version.v_prefixed}" ) breaking_changes_allowed = ( os.environ.get("ALLOW_FORWARD_COMPATIBILITY_BREAKAGE", "false").lower() == "true" ) try: # Copy the snapshot to current directory, and prepare for the test prepare_snapshot( from_dir=compatibility_snapshot_dir, to_dir=test_output_dir / "compatibility_snapshot", port_distributor=port_distributor, neon_binpath=compatibility_neon_bin, pg_distrib_dir=compatibility_postgres_distrib_dir, ) check_neon_works( test_output_dir / "compatibility_snapshot" / "repo", compatibility_neon_bin, neon_binpath, compatibility_postgres_distrib_dir, pg_version, port_distributor, test_output_dir, PgBin(test_output_dir, compatibility_postgres_distrib_dir, pg_version), request, ) except Exception: if breaking_changes_allowed: pytest.xfail( "Breaking changes are allowed by ALLOW_FORWARD_COMPATIBILITY_BREAKAGE env var" ) else: raise assert ( not breaking_changes_allowed ), "Breaking changes are allowed by ALLOW_FORWARD_COMPATIBILITY_BREAKAGE, but the test has passed without any breakage" def prepare_snapshot( from_dir: Path, to_dir: Path, port_distributor: PortDistributor, neon_binpath: Path, pg_distrib_dir: Optional[Path] = None, ): assert from_dir.exists(), f"Snapshot '{from_dir}' doesn't exist" assert (from_dir / "repo").exists(), f"Snapshot '{from_dir}' doesn't contain a repo directory" assert (from_dir / "dump.sql").exists(), f"Snapshot '{from_dir}' doesn't contain a dump.sql" log.info(f"Copying snapshot from {from_dir} to {to_dir}") shutil.copytree(from_dir, to_dir) repo_dir = to_dir / "repo" # Remove old logs to avoid confusion in test artifacts for logfile in repo_dir.glob("**/*.log"): logfile.unlink() # Remove old computes in 'endpoints'. Old versions of the control plane used a directory # called "pgdatadirs". Delete it, too. if (repo_dir / "endpoints").exists(): shutil.rmtree(repo_dir / "endpoints") if (repo_dir / "pgdatadirs").exists(): shutil.rmtree(repo_dir / "pgdatadirs") os.mkdir(repo_dir / "endpoints") # Update paths and ports in config files pageserver_toml = repo_dir / "pageserver.toml" pageserver_config = toml.load(pageserver_toml) pageserver_config["remote_storage"]["local_path"] = str(repo_dir / "local_fs_remote_storage") for param in ("listen_http_addr", "listen_pg_addr", "broker_endpoint"): pageserver_config[param] = port_distributor.replace_with_new_port(pageserver_config[param]) # We don't use authentication in compatibility tests # so just remove authentication related settings. pageserver_config.pop("pg_auth_type", None) pageserver_config.pop("http_auth_type", None) if pg_distrib_dir: pageserver_config["pg_distrib_dir"] = str(pg_distrib_dir) with pageserver_toml.open("w") as f: toml.dump(pageserver_config, f) snapshot_config_toml = repo_dir / "config" snapshot_config = toml.load(snapshot_config_toml) for param in ("listen_http_addr", "listen_pg_addr"): snapshot_config["pageserver"][param] = port_distributor.replace_with_new_port( snapshot_config["pageserver"][param] ) snapshot_config["broker"]["listen_addr"] = port_distributor.replace_with_new_port( snapshot_config["broker"]["listen_addr"] ) for sk in snapshot_config["safekeepers"]: for param in ("http_port", "pg_port", "pg_tenant_only_port"): sk[param] = port_distributor.replace_with_new_port(sk[param]) if pg_distrib_dir: snapshot_config["pg_distrib_dir"] = str(pg_distrib_dir) with snapshot_config_toml.open("w") as f: toml.dump(snapshot_config, f) # Ensure that snapshot doesn't contain references to the original path rv = subprocess.run( [ "grep", "--recursive", "--binary-file=without-match", "--files-with-matches", "test_create_snapshot/repo", str(repo_dir), ], capture_output=True, text=True, ) assert ( rv.returncode != 0 ), f"there're files referencing `test_create_snapshot/repo`, this path should be replaced with {repo_dir}:\n{rv.stdout}" def check_neon_works( repo_dir: Path, neon_target_binpath: Path, neon_current_binpath: Path, pg_distrib_dir: Path, pg_version: PgVersion, port_distributor: PortDistributor, test_output_dir: Path, pg_bin: PgBin, request: FixtureRequest, ): snapshot_config_toml = repo_dir / "config" snapshot_config = toml.load(snapshot_config_toml) snapshot_config["neon_distrib_dir"] = str(neon_target_binpath) snapshot_config["postgres_distrib_dir"] = str(pg_distrib_dir) with (snapshot_config_toml).open("w") as f: toml.dump(snapshot_config, f) # TODO: replace with NeonEnvBuilder / NeonEnv config: Any = type("NeonEnvStub", (object,), {}) config.rust_log_override = None config.repo_dir = repo_dir config.pg_version = pg_version config.initial_tenant = snapshot_config["default_tenant_id"] config.pg_distrib_dir = pg_distrib_dir # Use the "target" binaries to launch the storage nodes config_target = config config_target.neon_binpath = neon_target_binpath cli_target = NeonCli(config_target) # And the current binaries to launch computes snapshot_config["neon_distrib_dir"] = str(neon_current_binpath) with (snapshot_config_toml).open("w") as f: toml.dump(snapshot_config, f) config_current = copy.copy(config) config_current.neon_binpath = neon_current_binpath cli_current = NeonCli(config_current) cli_target.raw_cli(["start"]) request.addfinalizer(lambda: cli_target.raw_cli(["stop"])) pg_port = port_distributor.get_port() http_port = port_distributor.get_port() cli_current.endpoint_start("main", pg_port=pg_port, http_port=http_port) request.addfinalizer(lambda: cli_current.endpoint_stop("main")) connstr = f"host=127.0.0.1 port={pg_port} user=cloud_admin dbname=postgres" pg_bin.run_capture( ["pg_dumpall", f"--dbname={connstr}", f"--file={test_output_dir / 'dump.sql'}"] ) initial_dump_differs = dump_differs( repo_dir.parent / "dump.sql", test_output_dir / "dump.sql", test_output_dir / "dump.filediff", ) # Check that project can be recovered from WAL # loosely based on https://github.com/neondatabase/cloud/wiki/Recovery-from-WAL tenant_id = snapshot_config["default_tenant_id"] timeline_id = dict(snapshot_config["branch_name_mappings"]["main"])[tenant_id] pageserver_port = snapshot_config["pageserver"]["listen_http_addr"].split(":")[-1] pageserver_http = PageserverHttpClient( port=pageserver_port, is_testing_enabled_or_skip=lambda: True, # TODO: check if testing really enabled ) shutil.rmtree(repo_dir / "local_fs_remote_storage") timeline_delete_wait_completed(pageserver_http, tenant_id, timeline_id) pageserver_http.timeline_create(pg_version, tenant_id, timeline_id) pg_bin.run_capture( ["pg_dumpall", f"--dbname={connstr}", f"--file={test_output_dir / 'dump-from-wal.sql'}"] ) # The assert itself deferred to the end of the test # to allow us to perform checks that change data before failing dump_from_wal_differs = dump_differs( test_output_dir / "dump.sql", test_output_dir / "dump-from-wal.sql", test_output_dir / "dump-from-wal.filediff", ) # TODO: Run pg_amcheck unconditionally after the next release try: pg_bin.run(["psql", connstr, "--command", "CREATE EXTENSION IF NOT EXISTS amcheck"]) except subprocess.CalledProcessError: log.info("Extension amcheck is not available, skipping pg_amcheck") else: pg_bin.run_capture(["pg_amcheck", connstr, "--install-missing", "--verbose"]) # Check that we can interract with the data pg_bin.run_capture(["pgbench", "--time=10", "--progress=2", connstr]) assert not dump_from_wal_differs, "dump from WAL differs" assert not initial_dump_differs, "initial dump differs" def dump_differs( first: Path, second: Path, output: Path, allowed_diffs: Optional[List[str]] = None ) -> bool: """ Runs diff(1) command on two SQL dumps and write the output to the given output file. The function supports allowed diffs, if the diff is in the allowed_diffs list, it's not considered as a difference. See the example of it in https://github.com/neondatabase/neon/pull/4425/files#diff-15c5bfdd1d5cc1411b9221091511a60dd13a9edf672bdfbb57dd2ef8bb7815d6 Returns True if the dumps differ and produced diff is not allowed, False otherwise (in most cases we want it to return False). """ with output.open("w") as stdout: res = subprocess.run( [ "diff", "--unified", # Make diff output more readable "--ignore-matching-lines=^--", # Ignore changes in comments "--ignore-blank-lines", str(first), str(second), ], stdout=stdout, ) differs = res.returncode != 0 allowed_diffs = allowed_diffs or [] if differs and len(allowed_diffs) > 0: for allowed_diff in allowed_diffs: with tempfile.NamedTemporaryFile(mode="w") as tmp: tmp.write(allowed_diff) tmp.flush() allowed = subprocess.run( [ "diff", "--unified", # Make diff output more readable r"--ignore-matching-lines=^---", # Ignore diff headers r"--ignore-matching-lines=^\+\+\+", # Ignore diff headers "--ignore-matching-lines=^@@", # Ignore diff blocks location "--ignore-matching-lines=^ *$", # Ignore lines with only spaces "--ignore-matching-lines=^ --.*", # Ignore SQL comments in diff "--ignore-blank-lines", str(output), str(tmp.name), ], ) differs = allowed.returncode != 0 if not differs: break return differs