from __future__ import annotations import os import shutil import subprocess import threading from fcntl import LOCK_EX, LOCK_UN, flock from pathlib import Path from typing import TYPE_CHECKING import pytest from pytest import FixtureRequest from fixtures import overlayfs from fixtures.log_helper import log from fixtures.utils import allure_attach_from_dir if TYPE_CHECKING: from collections.abc import Iterator from types import TracebackType BASE_DIR = Path(__file__).parents[2] DEFAULT_OUTPUT_DIR: str = "test_output" COMPUTE_CONFIG_DIR = BASE_DIR / "compute" / "etc" def get_test_dir(request: FixtureRequest, top_output_dir: Path, prefix: str | None = None) -> Path: """Compute the path to a working directory for an individual test.""" test_name = request.node.name test_dir = top_output_dir / f"{prefix or ''}{test_name.replace('/', '-')}" # We rerun failed tests multiple times, use a separate directory for each run. if (suffix := getattr(request.node, "execution_count", None)) is not None: test_dir = test_dir.parent / f"{test_dir.name}-{suffix}" return test_dir def get_test_output_dir(request: FixtureRequest, top_output_dir: Path) -> Path: """ The working directory for a test. """ return get_test_dir(request, top_output_dir) def get_test_overlay_dir(request: FixtureRequest, top_output_dir: Path) -> Path: """ Directory that contains `upperdir` and `workdir` for overlayfs mounts that a test creates. See `NeonEnvBuilder.overlay_mount`. """ return get_test_dir(request, top_output_dir, "overlay-") def get_shared_snapshot_dir_path(top_output_dir: Path, snapshot_name: str) -> Path: return top_output_dir / "shared-snapshots" / snapshot_name def get_test_repo_dir(request: FixtureRequest, top_output_dir: Path) -> Path: return get_test_output_dir(request, top_output_dir) / "repo" @pytest.fixture(scope="session") def base_dir() -> Iterator[Path]: # find the base directory (currently this is the git root) log.info(f"base_dir is {BASE_DIR}") yield BASE_DIR @pytest.fixture(scope="session") def compute_config_dir() -> Iterator[Path]: """ Retrieve the path to the compute configuration directory. """ yield COMPUTE_CONFIG_DIR @pytest.fixture(scope="function") def neon_binpath(base_dir: Path, build_type: str) -> Iterator[Path]: if os.getenv("REMOTE_ENV"): # we are in remote env and do not have neon binaries locally # this is the case for benchmarks run on self-hosted runner return # Find the neon binaries. if env_neon_bin := os.environ.get("NEON_BIN"): binpath = Path(env_neon_bin) else: binpath = base_dir / "target" / build_type log.info(f"neon_binpath is {binpath}") if not (binpath / "pageserver").exists(): raise Exception(f"neon binaries not found at '{binpath}'") yield binpath.absolute() @pytest.fixture(scope="session") def compatibility_snapshot_dir() -> Iterator[Path]: if os.getenv("REMOTE_ENV"): return compatibility_snapshot_dir_env = os.environ.get("COMPATIBILITY_SNAPSHOT_DIR") assert compatibility_snapshot_dir_env is not None, ( "COMPATIBILITY_SNAPSHOT_DIR is not set. It should be set to `compatibility_snapshot_pg(PG_VERSION)` path generateted by test_create_snapshot (ideally generated by the previous version of Neon)" ) compatibility_snapshot_dir = Path(compatibility_snapshot_dir_env).resolve() yield compatibility_snapshot_dir @pytest.fixture(scope="session") def compatibility_neon_binpath() -> Iterator[Path | None]: if os.getenv("REMOTE_ENV"): return comp_binpath = None if env_compatibility_neon_binpath := os.environ.get("COMPATIBILITY_NEON_BIN"): comp_binpath = Path(env_compatibility_neon_binpath).resolve().absolute() yield comp_binpath @pytest.fixture(scope="session") def pg_distrib_dir(base_dir: Path) -> Iterator[Path]: if env_postgres_bin := os.environ.get("POSTGRES_DISTRIB_DIR"): distrib_dir = Path(env_postgres_bin).resolve() else: distrib_dir = base_dir / "pg_install" log.info(f"pg_distrib_dir is {distrib_dir}") yield distrib_dir @pytest.fixture(scope="session") def compatibility_pg_distrib_dir() -> Iterator[Path | None]: compat_distrib_dir = None if env_compat_postgres_bin := os.environ.get("COMPATIBILITY_POSTGRES_DISTRIB_DIR"): compat_distrib_dir = Path(env_compat_postgres_bin).resolve() if not compat_distrib_dir.exists(): raise Exception(f"compatibility postgres directory not found at {compat_distrib_dir}") if compat_distrib_dir: log.info(f"compatibility_pg_distrib_dir is {compat_distrib_dir}") yield compat_distrib_dir @pytest.fixture(scope="session") def top_output_dir(base_dir: Path) -> Iterator[Path]: # Compute the top-level directory for all tests. if env_test_output := os.environ.get("TEST_OUTPUT"): output_dir = Path(env_test_output).resolve() else: output_dir = base_dir / DEFAULT_OUTPUT_DIR output_dir.mkdir(exist_ok=True) log.info(f"top_output_dir is {output_dir}") yield output_dir # This is autouse, so the test output directory always gets created, even # if a test doesn't put anything there. # # NB: we request the overlay dir fixture so the fixture does its cleanups @pytest.fixture(scope="function", autouse=True) def test_output_dir(request: pytest.FixtureRequest, top_output_dir: Path) -> Iterator[Path]: """Create the working directory for an individual test.""" # one directory per test test_dir = get_test_output_dir(request, top_output_dir) log.info(f"test_output_dir is {test_dir}") shutil.rmtree(test_dir, ignore_errors=True) test_dir.mkdir() yield test_dir # Allure artifacts creation might involve the creation of `.tar.zst` archives, # which aren't going to be used if Allure results collection is not enabled # (i.e. --alluredir is not set). # Skip `allure_attach_from_dir` in this case if not request.config.getoption("--alluredir"): return preserve_database_files = False for k, v in request.node.user_properties: # NB: the neon_env_builder fixture uses this fixture (test_output_dir). # So, neon_env_builder's cleanup runs before here. # The cleanup propagates NeonEnvBuilder.preserve_database_files into this user property. if k == "preserve_database_files": assert isinstance(v, bool) preserve_database_files = v allure_attach_from_dir(test_dir, preserve_database_files) class FileAndThreadLock: def __init__(self, path: Path): self.path = path self.thread_lock = threading.Lock() self.fd: int | None = None def __enter__(self): self.fd = os.open(self.path, os.O_CREAT | os.O_WRONLY) # lock thread lock before file lock so that there's no race # around flocking / funlocking the file lock self.thread_lock.acquire() flock(self.fd, LOCK_EX) def __exit__( self, exc_type: type[BaseException] | None, exc_value: BaseException | None, exc_traceback: TracebackType | None, ): assert self.fd is not None assert self.thread_lock.locked() # ... by us flock(self.fd, LOCK_UN) self.thread_lock.release() os.close(self.fd) self.fd = None class SnapshotDirLocked: def __init__(self, parent: SnapshotDir): self._parent = parent def is_initialized(self): # TODO: in the future, take a `tag` as argument and store it in the marker in set_initialized. # Then, in this function, compare marker file contents with the tag to invalidate the snapshot if the tag changed. return self._parent.marker_file_path.exists() def set_initialized(self): self._parent.marker_file_path.write_text("") @property def path(self) -> Path: return self._parent.path / "snapshot" class SnapshotDir: _path: Path def __init__(self, path: Path): self._path = path assert self._path.is_dir() self._lock = FileAndThreadLock(self.lock_file_path) @property def path(self) -> Path: return self._path @property def lock_file_path(self) -> Path: return self._path / "initializing.flock" @property def marker_file_path(self) -> Path: return self._path / "initialized.marker" def __enter__(self) -> SnapshotDirLocked: self._lock.__enter__() return SnapshotDirLocked(self) def __exit__( self, exc_type: type[BaseException] | None, exc_value: BaseException | None, exc_traceback: TracebackType | None, ): self._lock.__exit__(exc_type, exc_value, exc_traceback) def shared_snapshot_dir(top_output_dir: Path, ident: str) -> SnapshotDir: snapshot_dir_path = get_shared_snapshot_dir_path(top_output_dir, ident) snapshot_dir_path.mkdir(exist_ok=True, parents=True) return SnapshotDir(snapshot_dir_path) @pytest.fixture(scope="function") def test_overlay_dir(request: FixtureRequest, top_output_dir: Path) -> Path | None: """ Idempotently create a test's overlayfs mount state directory. If the functionality isn't enabled via env var, returns None. The procedure cleans up after previous runs that were aborted (e.g. due to Ctrl-C, OOM kills, etc). """ if os.getenv("NEON_ENV_BUILDER_USE_OVERLAYFS_FOR_SNAPSHOTS") is None: return None overlay_dir = get_test_overlay_dir(request, top_output_dir) log.info(f"test_overlay_dir is {overlay_dir}") overlay_dir.mkdir(exist_ok=True) # unmount stale overlayfs mounts which subdirectories of `overlay_dir/*` as the overlayfs `upperdir` and `workdir` for mountpoint in overlayfs.iter_mounts_beneath(get_test_output_dir(request, top_output_dir)): cmd = ["sudo", "umount", str(mountpoint)] log.info( f"Unmounting stale overlayfs mount probably created during earlier test run: {cmd}" ) subprocess.run(cmd, capture_output=True, check=True) # the overlayfs `workdir`` is owned by `root`, shutil.rmtree won't work. cmd = ["sudo", "rm", "-rf", str(overlay_dir)] subprocess.run(cmd, capture_output=True, check=True) overlay_dir.mkdir() return overlay_dir # no need to clean up anything: on clean shutdown, # NeonEnvBuilder.overlay_cleanup_teardown takes care of cleanup # and on unclean shutdown, this function will take care of it # on the next test run