diff --git a/test_runner/conftest.py b/test_runner/conftest.py index 996ca4d652..1fb7ae84d5 100644 --- a/test_runner/conftest.py +++ b/test_runner/conftest.py @@ -10,4 +10,8 @@ pytest_plugins = ( "fixtures.compare_fixtures", "fixtures.slow", "fixtures.flaky", + "fixtures.shared_fixtures", + "fixtures.function.neon_storage", + "fixtures.session.neon_storage", + "fixtures.session.s3", ) diff --git a/test_runner/fixtures/function/__init__.py b/test_runner/fixtures/function/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/test_runner/fixtures/function/neon_storage.py b/test_runner/fixtures/function/neon_storage.py new file mode 100644 index 0000000000..cb314ef9e0 --- /dev/null +++ b/test_runner/fixtures/function/neon_storage.py @@ -0,0 +1,48 @@ +from typing import Iterator, Optional, Dict, Any + +import pytest +from _pytest.fixtures import FixtureRequest + +from fixtures.neon_fixtures import NeonEnv +from fixtures.shared_fixtures import TTenant, TTimeline + +@pytest.fixture(scope="function") +def tenant_config() -> Dict[str, Any]: + return dict() + +@pytest.fixture(scope="function") +def tenant( + neon_shared_env: NeonEnv, + request: FixtureRequest, + tenant_config: Optional[Dict[str, Any]] = None, +) -> Iterator[TTenant]: + tenant = TTenant(env=neon_shared_env, name=request.node.name, config=tenant_config) + yield tenant + tenant.shutdown_resources() + + +@pytest.fixture(scope="function") +def timeline( + tenant: TTenant, +) -> Iterator[TTimeline]: + timeline = tenant.default_timeline + yield timeline + + +@pytest.fixture(scope="function") +def exclusive_tenant( + neon_env: NeonEnv, + request: FixtureRequest, + tenant_config: Optional[Dict[str, Any]] = None, +) -> Iterator[TTenant]: + tenant = TTenant(env=neon_env, name=request.node.name, config=tenant_config) + yield tenant + tenant.shutdown_resources() + + +@pytest.fixture(scope="function") +def exclusive_timeline( + exclusive_tenant: TTenant, +) -> Iterator[TTimeline]: + timeline = exclusive_tenant.default_timeline + yield timeline diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index c95cbd2353..29dd6cb302 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -93,7 +93,6 @@ from fixtures.utils import ( allure_add_grafana_links, allure_attach_from_dir, assert_no_errors, - get_self_dir, print_gc_result, subprocess_capture, wait_until, @@ -126,122 +125,6 @@ Env = Dict[str, str] DEFAULT_OUTPUT_DIR: str = "test_output" DEFAULT_BRANCH_NAME: str = "main" -BASE_PORT: int = 15000 - - -@pytest.fixture(scope="session") -def base_dir() -> Iterator[Path]: - # find the base directory (currently this is the git root) - base_dir = get_self_dir().parent.parent - log.info(f"base_dir is {base_dir}") - - yield base_dir - - -@pytest.fixture(scope="session") -def neon_binpath(base_dir: Path, build_type: str) -> Iterator[Path]: - if os.getenv("REMOTE_ENV"): - # we are in remote env and do not have neon binaries locally - # this is the case for benchmarks run on self-hosted runner - return - - # Find the neon binaries. - if env_neon_bin := os.environ.get("NEON_BIN"): - binpath = Path(env_neon_bin) - else: - binpath = base_dir / "target" / build_type - log.info(f"neon_binpath is {binpath}") - - if not (binpath / "pageserver").exists(): - raise Exception(f"neon binaries not found at '{binpath}'") - - yield binpath - - -@pytest.fixture(scope="session") -def pg_distrib_dir(base_dir: Path) -> Iterator[Path]: - if env_postgres_bin := os.environ.get("POSTGRES_DISTRIB_DIR"): - distrib_dir = Path(env_postgres_bin).resolve() - else: - distrib_dir = base_dir / "pg_install" - - log.info(f"pg_distrib_dir is {distrib_dir}") - yield distrib_dir - - -@pytest.fixture(scope="session") -def top_output_dir(base_dir: Path) -> Iterator[Path]: - # Compute the top-level directory for all tests. - if env_test_output := os.environ.get("TEST_OUTPUT"): - output_dir = Path(env_test_output).resolve() - else: - output_dir = base_dir / DEFAULT_OUTPUT_DIR - output_dir.mkdir(exist_ok=True) - - log.info(f"top_output_dir is {output_dir}") - yield output_dir - - -@pytest.fixture(scope="session") -def versioned_pg_distrib_dir(pg_distrib_dir: Path, pg_version: PgVersion) -> Iterator[Path]: - versioned_dir = pg_distrib_dir / pg_version.v_prefixed - - psql_bin_path = versioned_dir / "bin/psql" - postgres_bin_path = versioned_dir / "bin/postgres" - - if os.getenv("REMOTE_ENV"): - # When testing against a remote server, we only need the client binary. - if not psql_bin_path.exists(): - raise Exception(f"psql not found at '{psql_bin_path}'") - else: - if not postgres_bin_path.exists(): - raise Exception(f"postgres not found at '{postgres_bin_path}'") - - log.info(f"versioned_pg_distrib_dir is {versioned_dir}") - yield versioned_dir - - -@pytest.fixture(scope="session") -def neon_api_key() -> str: - api_key = os.getenv("NEON_API_KEY") - if not api_key: - raise AssertionError("Set the NEON_API_KEY environment variable") - - return api_key - - -@pytest.fixture(scope="session") -def neon_api_base_url() -> str: - return os.getenv("NEON_API_BASE_URL", "https://console-stage.neon.build/api/v2") - - -@pytest.fixture(scope="session") -def neon_api(neon_api_key: str, neon_api_base_url: str) -> NeonAPI: - return NeonAPI(neon_api_key, neon_api_base_url) - - -@pytest.fixture(scope="session") -def worker_port_num(): - return (32768 - BASE_PORT) // int(os.environ.get("PYTEST_XDIST_WORKER_COUNT", "1")) - - -@pytest.fixture(scope="session") -def worker_seq_no(worker_id: str) -> int: - # worker_id is a pytest-xdist fixture - # it can be master or gw - # parse it to always get a number - if worker_id == "master": - return 0 - assert worker_id.startswith("gw") - return int(worker_id[2:]) - - -@pytest.fixture(scope="session") -def worker_base_port(worker_seq_no: int, worker_port_num: int) -> int: - # so we divide ports in ranges of ports - # so workers have disjoint set of ports for services - return BASE_PORT + worker_seq_no * worker_port_num - def get_dir_size(path: str) -> int: """Return size in bytes.""" @@ -253,11 +136,6 @@ def get_dir_size(path: str) -> int: return totalbytes -@pytest.fixture(scope="session") -def port_distributor(worker_base_port: int, worker_port_num: int) -> PortDistributor: - return PortDistributor(base_port=worker_base_port, port_number=worker_port_num) - - @pytest.fixture(scope="function") def default_broker( port_distributor: PortDistributor, @@ -272,32 +150,6 @@ def default_broker( yield broker broker.stop() -@pytest.fixture(scope="session") -def shared_broker( - port_distributor: PortDistributor, - shared_test_output_dir: Path, - neon_binpath: Path, -) -> Iterator[NeonBroker]: - # multiple pytest sessions could get launched in parallel, get them different ports/datadirs - client_port = port_distributor.get_port() - broker_logfile = shared_test_output_dir / "repo" / "storage_broker.log" - - broker = NeonBroker(logfile=broker_logfile, port=client_port, neon_binpath=neon_binpath) - yield broker - broker.stop() - - -@pytest.fixture(scope="session") -def run_id() -> Iterator[uuid.UUID]: - yield uuid.uuid4() - - -@pytest.fixture(scope="session") -def mock_s3_server(port_distributor: PortDistributor) -> Iterator[MockS3Server]: - mock_s3_server = MockS3Server(port_distributor.get_port()) - yield mock_s3_server - mock_s3_server.kill() - class PgProtocol: """Reusable connection logic""" @@ -555,9 +407,9 @@ class NeonEnvBuilder: self.pageserver_io_buffer_alignment = pageserver_io_buffer_alignment - assert test_name.startswith( - "test_" - ) or shared, "Unexpectedly instantiated from outside a test function" + assert ( + test_name.startswith("test_") or shared + ), "Unexpectedly instantiated from outside a test function" self.test_name = test_name def init_configs(self, default_remote_storage_if_missing: bool = True) -> NeonEnv: @@ -1418,9 +1270,6 @@ class NeonEnv: return "ep-" + str(self.endpoint_counter) - - - @pytest.fixture(scope="function") def neon_simple_env( request: FixtureRequest, @@ -1470,59 +1319,6 @@ def neon_simple_env( yield env -@pytest.fixture(scope="session", autouse=True) -def neon_shared_env( - pytestconfig: Config, - port_distributor: PortDistributor, - mock_s3_server: MockS3Server, - shared_broker: NeonBroker, - run_id: uuid.UUID, - top_output_dir: Path, - shared_test_output_dir: Path, - neon_binpath: Path, - build_type: str, - pg_distrib_dir: Path, - pg_version: PgVersion, - pageserver_virtual_file_io_engine: str, - pageserver_aux_file_policy: Optional[AuxFileStore], - pageserver_default_tenant_config_compaction_algorithm: Optional[Dict[str, Any]], - pageserver_io_buffer_alignment: Optional[int], - request: FixtureRequest, -) -> Iterator[NeonEnv]: - """ - Simple Neon environment, with no authentication and no safekeepers. - - This fixture will use RemoteStorageKind.LOCAL_FS with pageserver. - """ - - prefix = f"shared[{build_type}-{pg_version.v_prefixed}]-" - - # Create the environment in the per-test output directory - repo_dir = get_test_repo_dir(request, top_output_dir, prefix) - - with NeonEnvBuilder( - top_output_dir=top_output_dir, - repo_dir=repo_dir, - port_distributor=port_distributor, - broker=shared_broker, - mock_s3_server=mock_s3_server, - neon_binpath=neon_binpath, - pg_distrib_dir=pg_distrib_dir, - pg_version=pg_version, - run_id=run_id, - preserve_database_files=cast(bool, pytestconfig.getoption("--preserve-database-files")), - test_name=f"{prefix}{request.node.name}", - test_output_dir=shared_test_output_dir, - pageserver_virtual_file_io_engine=pageserver_virtual_file_io_engine, - pageserver_aux_file_policy=pageserver_aux_file_policy, - pageserver_default_tenant_config_compaction_algorithm=pageserver_default_tenant_config_compaction_algorithm, - pageserver_io_buffer_alignment=pageserver_io_buffer_alignment, - shared=True, - ) as builder: - env = builder.init_start() - - yield env - @pytest.fixture(scope="function") def neon_endpoint(request: FixtureRequest, neon_shared_env: NeonEnv) -> Endpoint: @@ -1538,6 +1334,7 @@ def neon_endpoint(request: FixtureRequest, neon_shared_env: NeonEnv) -> Endpoint except BaseException: pass + @pytest.fixture(scope="function") def neon_env_builder( pytestconfig: Config, @@ -4893,14 +4690,18 @@ def _get_test_dir(request: FixtureRequest, top_output_dir: Path, prefix: str) -> return test_dir -def get_test_output_dir(request: FixtureRequest, top_output_dir: Path, prefix: Optional[str] = None) -> Path: +def get_test_output_dir( + request: FixtureRequest, top_output_dir: Path, prefix: Optional[str] = None +) -> Path: """ The working directory for a test. """ return _get_test_dir(request, top_output_dir, prefix or "") -def get_test_overlay_dir(request: FixtureRequest, top_output_dir: Path, prefix: Optional[str] = None) -> Path: +def get_test_overlay_dir( + request: FixtureRequest, top_output_dir: Path, prefix: Optional[str] = None +) -> Path: """ Directory that contains `upperdir` and `workdir` for overlayfs mounts that a test creates. See `NeonEnvBuilder.overlay_mount`. @@ -4908,12 +4709,16 @@ def get_test_overlay_dir(request: FixtureRequest, top_output_dir: Path, prefix: return _get_test_dir(request, top_output_dir, f"overlay-{prefix or ''}") -def get_shared_snapshot_dir_path(top_output_dir: Path, snapshot_name: str, prefix: Optional[str] = None) -> Path: +def get_shared_snapshot_dir_path( + top_output_dir: Path, snapshot_name: str, prefix: Optional[str] = None +) -> Path: return top_output_dir / f"{prefix or ''}shared-snapshots" / snapshot_name -def get_test_repo_dir(request: FixtureRequest, top_output_dir: Path, prefix: Optional[str] = None) -> Path: - return get_test_output_dir(request, top_output_dir, prefix or '') / "repo" +def get_test_repo_dir( + request: FixtureRequest, top_output_dir: Path, prefix: Optional[str] = None +) -> Path: + return get_test_output_dir(request, top_output_dir, prefix or "") / "repo" def pytest_addoption(parser: Parser): @@ -4966,49 +4771,6 @@ def test_output_dir( allure_attach_from_dir(test_dir, preserve_database_files) -# This is autouse, so the test output directory always gets created, even -# if a test doesn't put anything there. -# -# NB: we request the overlay dir fixture so the fixture does its cleanups -@pytest.fixture(scope="session", autouse=True) -def shared_test_output_dir( - request: FixtureRequest, - pg_version: PgVersion, - build_type: str, - top_output_dir: Path, - shared_test_overlay_dir: Path -) -> Iterator[Path]: - """Create the working directory for shared tests.""" - - prefix = f"shared[{build_type}-{pg_version.v_prefixed}]-" - - # one directory per test - test_dir = get_test_output_dir(request, top_output_dir, prefix) - - log.info(f"test_output_dir is {test_dir}") - shutil.rmtree(test_dir, ignore_errors=True) - test_dir.mkdir() - - yield test_dir - - # Allure artifacts creation might involve the creation of `.tar.zst` archives, - # which aren't going to be used if Allure results collection is not enabled - # (i.e. --alluredir is not set). - # Skip `allure_attach_from_dir` in this case - if not request.config.getoption("--alluredir"): - return - - preserve_database_files = False - for k, v in request.node.user_properties: - # NB: the neon_env_builder fixture uses this fixture (test_output_dir). - # So, neon_env_builder's cleanup runs before here. - # The cleanup propagates NeonEnvBuilder.preserve_database_files into this user property. - if k == "preserve_database_files": - assert isinstance(v, bool) - preserve_database_files = v - - allure_attach_from_dir(test_dir, preserve_database_files) - class FileAndThreadLock: def __init__(self, path: Path): @@ -5115,42 +4877,6 @@ def test_overlay_dir(request: FixtureRequest, top_output_dir: Path) -> Optional[ # and on unclean shutdown, this function will take care of it # on the next test run -@pytest.fixture(scope="session") -def shared_test_overlay_dir(request: FixtureRequest, top_output_dir: Path) -> Optional[Path]: - """ - Idempotently create a test's overlayfs mount state directory. - If the functionality isn't enabled via env var, returns None. - - The procedure cleans up after previous runs that were aborted (e.g. due to Ctrl-C, OOM kills, etc). - """ - - if os.getenv("NEON_ENV_BUILDER_USE_OVERLAYFS_FOR_SNAPSHOTS") is None: - return None - - overlay_dir = get_test_overlay_dir(request, top_output_dir) - log.info(f"test_overlay_dir is {overlay_dir}") - - overlay_dir.mkdir(exist_ok=True) - # unmount stale overlayfs mounts which subdirectories of `overlay_dir/*` as the overlayfs `upperdir` and `workdir` - for mountpoint in overlayfs.iter_mounts_beneath(get_test_output_dir(request, top_output_dir)): - cmd = ["sudo", "umount", str(mountpoint)] - log.info( - f"Unmounting stale overlayfs mount probably created during earlier test run: {cmd}" - ) - subprocess.run(cmd, capture_output=True, check=True) - # the overlayfs `workdir`` is owned by `root`, shutil.rmtree won't work. - cmd = ["sudo", "rm", "-rf", str(overlay_dir)] - subprocess.run(cmd, capture_output=True, check=True) - - overlay_dir.mkdir() - - return overlay_dir - - # no need to clean up anything: on clean shutdown, - # NeonEnvBuilder.overlay_cleanup_teardown takes care of cleanup - # and on unclean shutdown, this function will take care of it - # on the next test run - SKIP_DIRS = frozenset( ( @@ -5357,7 +5083,7 @@ def tenant_get_shards( return [(TenantShardId(tenant_id, 0, 0), override_pageserver or env.pageserver)] -def wait_replica_caughtup(primary: Endpoint, secondary: Endpoint): +def wait_replica_caughtup(primary: PgProtocol, secondary: PgProtocol): primary_lsn = Lsn( primary.safe_psql_scalar("SELECT pg_current_wal_flush_lsn()", log_query=False) ) diff --git a/test_runner/fixtures/parametrize.py b/test_runner/fixtures/parametrize.py index 094d0bf2fb..0c6d7470e5 100644 --- a/test_runner/fixtures/parametrize.py +++ b/test_runner/fixtures/parametrize.py @@ -13,6 +13,7 @@ from fixtures.utils import AuxFileStore Dynamically parametrize tests by different parameters """ + def get_pgversions(): if (v := os.getenv("DEFAULT_PG_VERSION")) is None: pg_versions = [version for version in PgVersion if version != PgVersion.NOT_SET] diff --git a/test_runner/fixtures/session/__init__.py b/test_runner/fixtures/session/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/test_runner/fixtures/session/neon_storage.py b/test_runner/fixtures/session/neon_storage.py new file mode 100644 index 0000000000..97d2243393 --- /dev/null +++ b/test_runner/fixtures/session/neon_storage.py @@ -0,0 +1,303 @@ +import os +import shutil +import subprocess +import uuid +from pathlib import Path +from typing import Any, Dict, Iterator, Optional, cast + +import pytest +from _pytest.config import Config +from _pytest.fixtures import FixtureRequest + +from fixtures import overlayfs +from fixtures.broker import NeonBroker +from fixtures.log_helper import log +from fixtures.neon_api import NeonAPI +from fixtures.neon_fixtures import ( + DEFAULT_OUTPUT_DIR, + NeonEnv, + NeonEnvBuilder, + get_test_output_dir, + get_test_overlay_dir, + get_test_repo_dir, +) +from fixtures.pg_version import PgVersion +from fixtures.port_distributor import PortDistributor +from fixtures.remote_storage import MockS3Server +from fixtures.utils import AuxFileStore, allure_attach_from_dir, get_self_dir + +BASE_PORT: int = 15000 + + +@pytest.fixture(scope="session") +def base_dir() -> Iterator[Path]: + # find the base directory (currently this is the git root) + base_dir = get_self_dir().parent.parent + log.info(f"base_dir is {base_dir}") + + yield base_dir + + +@pytest.fixture(scope="session") +def neon_binpath(base_dir: Path, build_type: str) -> Iterator[Path]: + if os.getenv("REMOTE_ENV"): + # we are in remote env and do not have neon binaries locally + # this is the case for benchmarks run on self-hosted runner + return + + # Find the neon binaries. + if env_neon_bin := os.environ.get("NEON_BIN"): + binpath = Path(env_neon_bin) + else: + binpath = base_dir / "target" / build_type + log.info(f"neon_binpath is {binpath}") + + if not (binpath / "pageserver").exists(): + raise Exception(f"neon binaries not found at '{binpath}'") + + yield binpath + + +@pytest.fixture(scope="session") +def pg_distrib_dir(base_dir: Path) -> Iterator[Path]: + if env_postgres_bin := os.environ.get("POSTGRES_DISTRIB_DIR"): + distrib_dir = Path(env_postgres_bin).resolve() + else: + distrib_dir = base_dir / "pg_install" + + log.info(f"pg_distrib_dir is {distrib_dir}") + yield distrib_dir + + +@pytest.fixture(scope="session") +def top_output_dir(base_dir: Path) -> Iterator[Path]: + # Compute the top-level directory for all tests. + if env_test_output := os.environ.get("TEST_OUTPUT"): + output_dir = Path(env_test_output).resolve() + else: + output_dir = base_dir / DEFAULT_OUTPUT_DIR + output_dir.mkdir(exist_ok=True) + + log.info(f"top_output_dir is {output_dir}") + yield output_dir + + +@pytest.fixture(scope="session") +def versioned_pg_distrib_dir(pg_distrib_dir: Path, pg_version: PgVersion) -> Iterator[Path]: + versioned_dir = pg_distrib_dir / pg_version.v_prefixed + + psql_bin_path = versioned_dir / "bin/psql" + postgres_bin_path = versioned_dir / "bin/postgres" + + if os.getenv("REMOTE_ENV"): + # When testing against a remote server, we only need the client binary. + if not psql_bin_path.exists(): + raise Exception(f"psql not found at '{psql_bin_path}'") + else: + if not postgres_bin_path.exists(): + raise Exception(f"postgres not found at '{postgres_bin_path}'") + + log.info(f"versioned_pg_distrib_dir is {versioned_dir}") + yield versioned_dir + + +@pytest.fixture(scope="session") +def neon_api_key() -> str: + api_key = os.getenv("NEON_API_KEY") + if not api_key: + raise AssertionError("Set the NEON_API_KEY environment variable") + + return api_key + + +@pytest.fixture(scope="session") +def neon_api_base_url() -> str: + return os.getenv("NEON_API_BASE_URL", "https://console-stage.neon.build/api/v2") + + +@pytest.fixture(scope="session") +def neon_api(neon_api_key: str, neon_api_base_url: str) -> NeonAPI: + return NeonAPI(neon_api_key, neon_api_base_url) + + +@pytest.fixture(scope="session") +def worker_port_num(): + return (32768 - BASE_PORT) // int(os.environ.get("PYTEST_XDIST_WORKER_COUNT", "1")) + + +@pytest.fixture(scope="session") +def worker_seq_no(worker_id: str) -> int: + # worker_id is a pytest-xdist fixture + # it can be master or gw + # parse it to always get a number + if worker_id == "master": + return 0 + assert worker_id.startswith("gw") + return int(worker_id[2:]) + + +@pytest.fixture(scope="session") +def worker_base_port(worker_seq_no: int, worker_port_num: int) -> int: + # so we divide ports in ranges of ports + # so workers have disjoint set of ports for services + return BASE_PORT + worker_seq_no * worker_port_num + + +@pytest.fixture(scope="session") +def port_distributor(worker_base_port: int, worker_port_num: int) -> PortDistributor: + return PortDistributor(base_port=worker_base_port, port_number=worker_port_num) + + +@pytest.fixture(scope="session") +def shared_broker( + port_distributor: PortDistributor, + shared_test_output_dir: Path, + neon_binpath: Path, +) -> Iterator[NeonBroker]: + # multiple pytest sessions could get launched in parallel, get them different ports/datadirs + client_port = port_distributor.get_port() + broker_logfile = shared_test_output_dir / "repo" / "storage_broker.log" + + broker = NeonBroker(logfile=broker_logfile, port=client_port, neon_binpath=neon_binpath) + yield broker + broker.stop() + + +@pytest.fixture(scope="session") +def run_id() -> Iterator[uuid.UUID]: + yield uuid.uuid4() + + +@pytest.fixture(scope="session", autouse=True) +def neon_shared_env( + pytestconfig: Config, + port_distributor: PortDistributor, + mock_s3_server: MockS3Server, + shared_broker: NeonBroker, + run_id: uuid.UUID, + top_output_dir: Path, + shared_test_output_dir: Path, + neon_binpath: Path, + build_type: str, + pg_distrib_dir: Path, + pg_version: PgVersion, + pageserver_virtual_file_io_engine: str, + pageserver_aux_file_policy: Optional[AuxFileStore], + pageserver_default_tenant_config_compaction_algorithm: Optional[Dict[str, Any]], + pageserver_io_buffer_alignment: Optional[int], + request: FixtureRequest, +) -> Iterator[NeonEnv]: + """ + Simple Neon environment, with no authentication and no safekeepers. + + This fixture will use RemoteStorageKind.LOCAL_FS with pageserver. + """ + + prefix = f"shared[{build_type}-{pg_version.v_prefixed}]-" + + # Create the environment in the per-test output directory + repo_dir = get_test_repo_dir(request, top_output_dir, prefix) + + with NeonEnvBuilder( + top_output_dir=top_output_dir, + repo_dir=repo_dir, + port_distributor=port_distributor, + broker=shared_broker, + mock_s3_server=mock_s3_server, + neon_binpath=neon_binpath, + pg_distrib_dir=pg_distrib_dir, + pg_version=pg_version, + run_id=run_id, + preserve_database_files=cast(bool, pytestconfig.getoption("--preserve-database-files")), + test_name=f"{prefix}{request.node.name}", + test_output_dir=shared_test_output_dir, + pageserver_virtual_file_io_engine=pageserver_virtual_file_io_engine, + pageserver_aux_file_policy=pageserver_aux_file_policy, + pageserver_default_tenant_config_compaction_algorithm=pageserver_default_tenant_config_compaction_algorithm, + pageserver_io_buffer_alignment=pageserver_io_buffer_alignment, + shared=True, + ) as builder: + env = builder.init_start() + + yield env + + +# This is autouse, so the test output directory always gets created, even +# if a test doesn't put anything there. +# +# NB: we request the overlay dir fixture so the fixture does its cleanups +@pytest.fixture(scope="session") +def shared_test_output_dir( + request: FixtureRequest, + pg_version: PgVersion, + build_type: str, + top_output_dir: Path, + shared_test_overlay_dir: Path, +) -> Iterator[Path]: + """Create the working directory for shared tests.""" + + prefix = f"shared[{build_type}-{pg_version.v_prefixed}]-" + + # one directory per test + test_dir = get_test_output_dir(request, top_output_dir, prefix) + + log.info(f"test_output_dir is {test_dir}") + shutil.rmtree(test_dir, ignore_errors=True) + test_dir.mkdir() + + yield test_dir + + # Allure artifacts creation might involve the creation of `.tar.zst` archives, + # which aren't going to be used if Allure results collection is not enabled + # (i.e. --alluredir is not set). + # Skip `allure_attach_from_dir` in this case + if not request.config.getoption("--alluredir"): + return + + preserve_database_files = False + for k, v in request.node.user_properties: + # NB: the neon_env_builder fixture uses this fixture (test_output_dir). + # So, neon_env_builder's cleanup runs before here. + # The cleanup propagates NeonEnvBuilder.preserve_database_files into this user property. + if k == "preserve_database_files": + assert isinstance(v, bool) + preserve_database_files = v + + allure_attach_from_dir(test_dir, preserve_database_files) + + +@pytest.fixture(scope="session") +def shared_test_overlay_dir(request: FixtureRequest, top_output_dir: Path) -> Optional[Path]: + """ + Idempotently create a test's overlayfs mount state directory. + If the functionality isn't enabled via env var, returns None. + + The procedure cleans up after previous runs that were aborted (e.g. due to Ctrl-C, OOM kills, etc). + """ + + if os.getenv("NEON_ENV_BUILDER_USE_OVERLAYFS_FOR_SNAPSHOTS") is None: + return None + + overlay_dir = get_test_overlay_dir(request, top_output_dir) + log.info(f"test_overlay_dir is {overlay_dir}") + + overlay_dir.mkdir(exist_ok=True) + # unmount stale overlayfs mounts which subdirectories of `overlay_dir/*` as the overlayfs `upperdir` and `workdir` + for mountpoint in overlayfs.iter_mounts_beneath(get_test_output_dir(request, top_output_dir)): + cmd = ["sudo", "umount", str(mountpoint)] + log.info( + f"Unmounting stale overlayfs mount probably created during earlier test run: {cmd}" + ) + subprocess.run(cmd, capture_output=True, check=True) + # the overlayfs `workdir`` is owned by `root`, shutil.rmtree won't work. + cmd = ["sudo", "rm", "-rf", str(overlay_dir)] + subprocess.run(cmd, capture_output=True, check=True) + + overlay_dir.mkdir() + + return overlay_dir + + # no need to clean up anything: on clean shutdown, + # NeonEnvBuilder.overlay_cleanup_teardown takes care of cleanup + # and on unclean shutdown, this function will take care of it + # on the next test run diff --git a/test_runner/fixtures/session/s3.py b/test_runner/fixtures/session/s3.py new file mode 100644 index 0000000000..27e1915f87 --- /dev/null +++ b/test_runner/fixtures/session/s3.py @@ -0,0 +1,13 @@ +from typing import Iterator + +import pytest + +from fixtures.port_distributor import PortDistributor +from fixtures.remote_storage import MockS3Server + + +@pytest.fixture(scope="session") +def mock_s3_server(port_distributor: PortDistributor) -> Iterator[MockS3Server]: + mock_s3_server = MockS3Server(port_distributor.get_port()) + yield mock_s3_server + mock_s3_server.kill() diff --git a/test_runner/fixtures/shared_fixtures.py b/test_runner/fixtures/shared_fixtures.py index 6e81c672f8..053d6892ec 100644 --- a/test_runner/fixtures/shared_fixtures.py +++ b/test_runner/fixtures/shared_fixtures.py @@ -1,12 +1,16 @@ -from dataclasses import dataclass +from functools import partial from pathlib import Path -from typing import Optional, List, Dict, Any, Iterator +from typing import Any, Optional, cast, List, Dict import pytest -from fixtures.common_types import TimelineId, Lsn -from fixtures.neon_fixtures import Endpoint, NeonEnv, PgProtocol, Safekeeper -from fixtures.pg_version import PgVersion +from fixtures.common_types import Lsn, TenantId, TimelineId +from fixtures.log_helper import log +from fixtures.neon_fixtures import Endpoint, NeonEnv, PgProtocol, DEFAULT_BRANCH_NAME, tenant_get_shards, \ + check_restored_datadir_content +from fixtures.pageserver.utils import wait_for_last_record_lsn +from fixtures.safekeeper.utils import are_walreceivers_absent +from fixtures.utils import wait_until """ In this file most important resources are exposed as function-level fixtures @@ -14,7 +18,7 @@ that depend on session-level resources like pageservers and safekeepers. The main rationale here is that we don't need to initialize a new SK/PS/etc every time we want to test something that has branching: we can just as well -reuse still-available PageServers from other tests of the same kind. +reuse still-available PageServers from other tests of the same kind. """ @@ -23,120 +27,322 @@ def shared_storage_repo_path(build_type: str, base_dir: Path) -> Path: return base_dir / f"shared[{build_type}]" / "repo" +class TEndpoint(PgProtocol): + def clear_shared_buffers(self, cursor: Optional[Any] = None): + """ + Best-effort way to clear postgres buffers. Pinned buffers will not be 'cleared.' -class TestEndpoint(PgProtocol): - def __init__(self, compute: Endpoint, **kwargs: Any): - super().__init__(**kwargs) + Might also clear LFC. + """ + if cursor is not None: + cursor.execute("select clear_buffer_cache()") + else: + self.safe_psql("select clear_buffer_cache()") + +_TEndpoint = Endpoint -class TestTimeline: - def __init__(self, name: str, tenant: 'TestTenant'): - self.primary = None +class TTimeline: + __primary: Optional[_TEndpoint] + __secondary: Optional[_TEndpoint] - def start_primary(self) -> Optional[TestEndpoint]: - if not self.primary: - return None - self.primary.start() + def __init__(self, timeline_id: TimelineId, tenant: "TTenant", name: str): + self.id = timeline_id + self.tenant = tenant + self.name = name + self.__primary = None + self.__secondary = None - return self.primary + @property + def primary(self) -> TEndpoint: + if self.__primary is None: + self.__primary = cast(_TEndpoint, self.tenant.create_endpoint( + name=self._get_full_endpoint_name("primary"), + timeline_id=self.id, + primary=True, + running=True, + )) + + return cast(TEndpoint, self.__primary) + + def primary_with_config(self, config_lines: List[str], reboot: bool = False): + if self.__primary is None: + self.__primary = cast(_TEndpoint, self.tenant.create_endpoint( + name=self._get_full_endpoint_name("primary"), + timeline_id=self.id, + primary=True, + running=True, + config_lines=config_lines, + )) + else: + self.__primary.config(config_lines) + if reboot: + if self.__primary.is_running(): + self.__primary.stop() + self.__primary.start() + + return cast(TTimeline, self.__primary) + + @property + def secondary(self) -> TEndpoint: + if self.__secondary is None: + self.__secondary = cast(_TEndpoint, self.tenant.create_endpoint( + name=self._get_full_endpoint_name("secondary"), + timeline_id=self.id, + primary=False, + running=True, + )) + + return cast(TEndpoint, self.__secondary) + + def secondary_with_config(self, config_lines: List[str], reboot: bool = False) -> TEndpoint: + if self.__secondary is None: + self.__secondary = cast(_TEndpoint, self.tenant.create_endpoint( + name=self._get_full_endpoint_name("secondary"), + timeline_id=self.id, + primary=False, + running=True, + config_lines=config_lines, + )) + else: + self.__secondary.config(config_lines) + if reboot: + if self.__secondary.is_running(): + self.__secondary.stop() + self.__secondary.start() + + return cast(TEndpoint, self.__secondary) + + def _get_full_endpoint_name(self, name) -> str: + return f"{self.name}.{name}" + + def create_branch(self, name: str, lsn: Optional[Lsn]) -> "TTimeline": + return self.tenant.create_timeline( + new_name=name, + parent_name=self.name, + branch_at=lsn, + ) + + def stop_and_flush(self, endpoint: TEndpoint): + self.tenant.stop_endpoint(endpoint) + self.tenant.flush_timeline_data(self) + + def checkpoint(self, **kwargs): + self.tenant.checkpoint_timeline(self, **kwargs) -class TestPageserver: - """ - We only need one pageserver for every build_type. - """ - - def __init__( - self, - build_type: str, - bin_path: Path, - repo_path: Path, - port: int - ): - self.build_type = build_type - self.bin_path = bin_path - self.repo_path = repo_path - self.port = port - self.started = False - - def _start(self): - pass - - def _stop(self): - pass - - -@pytest.fixture(scope="session") -def shared_pageserver() -> Iterator[TestPageserver]: - yield None - - -class TestSafekeeper: - """ - We only need one safekeeper for every build_type. - """ - - def __init__( - self, - build_type: str, - bin_path: Path, - repo_path: Path, - port: int - ): - self.build_type = build_type - self.bin_path = bin_path - self.repo_path = repo_path - self.port = port - self.started = False - - def _start(self): - pass - - def _stop(self): - pass - - -@pytest.fixture(scope="session") -def shared_safekeeper( - request, - port: int, - build_type: str, - neon_binpath: Path, - shared_storage_repo_path: Path -) -> Iterator[TestSafekeeper]: - sk = TestSafekeeper( - build_type=build_type, - bin_path=(neon_binpath / "safekeeper"), - port=port, - repo_path=shared_storage_repo_path, - ) - sk.start() - - yield sk - sk.stop() - - -class TestTenant: +class TTenant: """ An object representing a single test case on a shared pageserver. All operations here are safe practically safe. """ def __init__( - self, - pageserver: TestPageserver, - safekeeper: TestSafekeeper, + self, + env: NeonEnv, + name: str, + config: Optional[Dict[str, Any]] = None, ): - self.pageserver = pageserver + self.id = TenantId.generate() + self.timelines = [] + self.timeline_names = {} + self.timeline_ids = {} + self.name = name + + # publicly inaccessible stuff, used during shutdown + self.__endpoints: List[Endpoint] = [] + self.__env: NeonEnv = env + + env.neon_cli.create_tenant( + tenant_id=self.id, + set_default=False, + conf=config + ) + + self.first_timeline_id = env.neon_cli.create_branch( + new_branch_name=f"{self.name}:{DEFAULT_BRANCH_NAME}", + ancestor_branch_name=DEFAULT_BRANCH_NAME, + tenant_id=self.id, + ) + + self._add_timeline( + TTimeline( + timeline_id=self.first_timeline_id, + tenant=self, + name=DEFAULT_BRANCH_NAME, + ) + ) + + def _add_timeline(self, timeline: TTimeline): + assert timeline.tenant == self + assert timeline not in self.timelines + assert timeline.id is not None + + self.timelines.append(timeline) + self.timeline_ids[timeline.id] = timeline + + if timeline.name is not None: + self.timeline_names[timeline.name] = timeline def create_timeline( - self, - name: str, - parent_name: Optional[str], - branch_at: Optional[Lsn], - ) -> TestTimeline: - pass + self, + new_name: str, + parent_name: Optional[str] = None, + parent_id: Optional[TimelineId] = None, + branch_at: Optional[Lsn] = None, + ) -> TTimeline: + if parent_name is not None: + pass + elif parent_id is not None: + assert parent_name is None + parent = self.timeline_ids[parent_id] + parent_name = parent.name + else: + raise LookupError("Timeline creation requires parent by either ID or name") - def stop(self): - for it in self.active_computes: - it.stop() + assert parent_name is not None + + new_id = self.__env.neon_cli.create_branch( + new_branch_name=f"{self.name}:{new_name}", + ancestor_branch_name=f"{self.name}:{parent_name}", + tenant_id=self.id, + ancestor_start_lsn=branch_at, + ) + + new_tl = TTimeline( + timeline_id=new_id, + tenant=self, + name=new_name, + ) + self._add_timeline(new_tl) + + return new_tl + + @property + def default_timeline(self) -> TTimeline: + return self.timeline_ids.get(self.first_timeline_id) + + def start_endpoint(self, ep: TEndpoint): + if ep not in self.__endpoints: + return + + ep = cast(Endpoint, ep) + + if not ep.is_running(): + ep.start() + + def stop_endpoint(self, ep: TEndpoint, mode: str = "fast"): + if ep not in self.__endpoints: + return + + ep = cast(Endpoint, ep) + + if ep.is_running(): + ep.stop(mode=mode) + + def create_endpoint( + self, + name: str, + timeline_id: TimelineId, + primary: bool = True, + running: bool = False, + port: Optional[int] = None, + http_port: Optional[int] = None, + lsn: Optional[Lsn] = None, + config_lines: Optional[List[str]] = None, + ) -> TEndpoint: + endpoint: _TEndpoint + + if port is None: + port = self.__env.port_distributor.get_port() + + if http_port is None: + http_port = self.__env.port_distributor.get_port() + + if running: + endpoint = self.__env.endpoints.create_start( + branch_name=self.timeline_ids[timeline_id].name, + endpoint_id=f"{self.name}.{name}", + tenant_id=self.id, + lsn=lsn, + hot_standby=not primary, + config_lines=config_lines, + ) + else: + endpoint = self.__env.endpoints.create( + branch_name=self.timeline_ids[timeline_id].name, + endpoint_id=f"{self.name}.{name}", + tenant_id=self.id, + lsn=lsn, + hot_standby=not primary, + config_lines=config_lines, + ) + + self.__endpoints.append(endpoint) + + return endpoint + + def shutdown_resources(self): + for ep in self.__endpoints: + try: + ep.stop_and_destroy("fast") + except BaseException as e: + log.error("Error encountered while shutting down endpoint %s", ep.endpoint_id, exc_info=e) + + def reconfigure(self, endpoint: TEndpoint, lines: List[str], restart: bool): + if endpoint not in self.__endpoints: + return + + endpoint = cast(_TEndpoint, endpoint) + was_running = endpoint.is_running() + if restart and was_running: + endpoint.stop() + + endpoint.config(lines) + + if restart: + endpoint.start() + + def flush_timeline_data(self, timeline: TTimeline) -> Lsn: + commit_lsn: Lsn = Lsn(0) + # In principle in the absense of failures polling single sk would be enough. + for sk in self.__env.safekeepers: + cli = sk.http_client() + # wait until compute connections are gone + wait_until(30, 0.5, partial(are_walreceivers_absent, cli, self.id, timeline.id)) + commit_lsn = max(cli.get_commit_lsn(self.id, timeline.id), commit_lsn) + + # Note: depending on WAL filtering implementation, probably most shards + # won't be able to reach commit_lsn (unless gaps are also ack'ed), so this + # is broken in sharded case. + shards = tenant_get_shards(env=self.__env, tenant_id=self.id) + for tenant_shard_id, pageserver in shards: + log.info( + f"flush_ep_to_pageserver: waiting for {commit_lsn} on shard {tenant_shard_id} on pageserver {pageserver.id})" + ) + waited = wait_for_last_record_lsn( + pageserver.http_client(), tenant_shard_id, timeline.id, commit_lsn + ) + + assert waited >= commit_lsn + + return commit_lsn + + def checkpoint_timeline(self, timeline: TTimeline, **kwargs): + self.__env.pageserver.http_client().timeline_checkpoint( + tenant_id=self.id, + timeline_id=timeline.id, + **kwargs, + ) + + def pgdatadir(self, endpoint: TEndpoint): + if endpoint not in self.__endpoints: + return None + + return cast(Endpoint, endpoint).pgdata_dir + + def check_restored_datadir_content(self, path, endpoint: TEndpoint, *args, **kwargs): + if endpoint not in self.__endpoints: + return + + check_restored_datadir_content(path, self.__env, cast(Endpoint, endpoint), *args, **kwargs) diff --git a/test_runner/regress/test_branching.py b/test_runner/regress/test_branching.py index fc74707639..24c00e0302 100644 --- a/test_runner/regress/test_branching.py +++ b/test_runner/regress/test_branching.py @@ -15,6 +15,7 @@ from fixtures.neon_fixtures import ( ) from fixtures.pageserver.http import PageserverApiException from fixtures.pageserver.utils import wait_until_tenant_active +from fixtures.shared_fixtures import TTimeline from fixtures.utils import query_scalar from performance.test_perf_pgbench import get_scales_matrix from requests import RequestException @@ -115,13 +116,10 @@ def test_branching_with_pgbench( # This test checks if the pageserver is able to handle a "unnormalized" starting LSN. # # Related: see discussion in https://github.com/neondatabase/neon/pull/2143#issuecomment-1209092186 -def test_branching_unnormalized_start_lsn(neon_simple_env: NeonEnv, pg_bin: PgBin): +def test_branching_unnormalized_start_lsn(timeline: TTimeline, pg_bin: PgBin): XLOG_BLCKSZ = 8192 - env = neon_simple_env - - env.neon_cli.create_branch("b0") - endpoint0 = env.endpoints.create_start("b0") + endpoint0 = timeline.primary pg_bin.run_capture(["pgbench", "-i", endpoint0.connstr()]) @@ -133,8 +131,8 @@ def test_branching_unnormalized_start_lsn(neon_simple_env: NeonEnv, pg_bin: PgBi start_lsn = Lsn((int(curr_lsn) - XLOG_BLCKSZ) // XLOG_BLCKSZ * XLOG_BLCKSZ) log.info(f"Branching b1 from b0 starting at lsn {start_lsn}...") - env.neon_cli.create_branch("b1", "b0", ancestor_start_lsn=start_lsn) - endpoint1 = env.endpoints.create_start("b1") + tl2 = timeline.create_branch("branch", start_lsn) + endpoint1 = tl2.primary pg_bin.run_capture(["pgbench", "-i", endpoint1.connstr()]) diff --git a/test_runner/regress/test_combocid.py b/test_runner/regress/test_combocid.py index 41907b1f20..b887f70401 100644 --- a/test_runner/regress/test_combocid.py +++ b/test_runner/regress/test_combocid.py @@ -1,14 +1,13 @@ -from fixtures.neon_fixtures import NeonEnvBuilder, flush_ep_to_pageserver +from pathlib import Path + +from fixtures.neon_fixtures import NeonEnvBuilder, flush_ep_to_pageserver, test_output_dir +from fixtures.shared_fixtures import TTimeline -def do_combocid_op(neon_env_builder: NeonEnvBuilder, op): - env = neon_env_builder.init_start() - endpoint = env.endpoints.create_start( - "main", - config_lines=[ - "shared_buffers='1MB'", - ], - ) +def do_combocid_op(timeline: TTimeline, op): + endpoint = timeline.primary_with_config(config_lines=[ + "shared_buffers='1MB'", + ]) conn = endpoint.connect() cur = conn.cursor() @@ -43,32 +42,26 @@ def do_combocid_op(neon_env_builder: NeonEnvBuilder, op): assert len(rows) == 500 cur.execute("rollback") - flush_ep_to_pageserver(env, endpoint, env.initial_tenant, env.initial_timeline) - env.pageserver.http_client().timeline_checkpoint( - env.initial_tenant, env.initial_timeline, compact=False, wait_until_uploaded=True - ) + timeline.stop_and_flush(endpoint) + timeline.checkpoint(compact=False, wait_until_uploaded=True) -def test_combocid_delete(neon_env_builder: NeonEnvBuilder): - do_combocid_op(neon_env_builder, "delete from t") +def test_combocid_delete(timeline: TTimeline): + do_combocid_op(timeline, "delete from t") -def test_combocid_update(neon_env_builder: NeonEnvBuilder): - do_combocid_op(neon_env_builder, "update t set val=val+1") +def test_combocid_update(timeline: TTimeline): + do_combocid_op(timeline, "update t set val=val+1") -def test_combocid_lock(neon_env_builder: NeonEnvBuilder): - do_combocid_op(neon_env_builder, "select * from t for update") +def test_combocid_lock(timeline: TTimeline): + do_combocid_op(timeline, "select * from t for update") -def test_combocid_multi_insert(neon_env_builder: NeonEnvBuilder): - env = neon_env_builder.init_start() - endpoint = env.endpoints.create_start( - "main", - config_lines=[ - "shared_buffers='1MB'", - ], - ) +def test_combocid_multi_insert(timeline: TTimeline, test_output_dir: Path): + endpoint = timeline.primary_with_config(config_lines=[ + "shared_buffers='1MB'", + ]) conn = endpoint.connect() cur = conn.cursor() @@ -77,7 +70,7 @@ def test_combocid_multi_insert(neon_env_builder: NeonEnvBuilder): cur.execute("CREATE EXTENSION neon_test_utils") cur.execute("create table t(id integer, val integer)") - file_path = f"{endpoint.pg_data_dir_path()}/t.csv" + file_path = str(test_output_dir / "t.csv") cur.execute(f"insert into t select g, 0 from generate_series(1,{n_records}) g") cur.execute(f"copy t to '{file_path}'") cur.execute("truncate table t") @@ -106,15 +99,12 @@ def test_combocid_multi_insert(neon_env_builder: NeonEnvBuilder): cur.execute("rollback") - flush_ep_to_pageserver(env, endpoint, env.initial_tenant, env.initial_timeline) - env.pageserver.http_client().timeline_checkpoint( - env.initial_tenant, env.initial_timeline, compact=False, wait_until_uploaded=True - ) + timeline.stop_and_flush(endpoint) + timeline.checkpoint(compact=False, wait_until_uploaded=True) -def test_combocid(neon_env_builder: NeonEnvBuilder): - env = neon_env_builder.init_start() - endpoint = env.endpoints.create_start("main") +def test_combocid(timeline: TTimeline): + endpoint = timeline.primary conn = endpoint.connect() cur = conn.cursor() @@ -147,7 +137,5 @@ def test_combocid(neon_env_builder: NeonEnvBuilder): cur.execute("rollback") - flush_ep_to_pageserver(env, endpoint, env.initial_tenant, env.initial_timeline) - env.pageserver.http_client().timeline_checkpoint( - env.initial_tenant, env.initial_timeline, compact=False, wait_until_uploaded=True - ) + timeline.stop_and_flush(endpoint) + timeline.checkpoint(compact=False, wait_until_uploaded=True) diff --git a/test_runner/regress/test_createdropdb.py b/test_runner/regress/test_createdropdb.py index af643f45d7..16f6df8d1b 100644 --- a/test_runner/regress/test_createdropdb.py +++ b/test_runner/regress/test_createdropdb.py @@ -5,6 +5,7 @@ import pytest from fixtures.log_helper import log from fixtures.neon_fixtures import NeonEnv, check_restored_datadir_content from fixtures.pg_version import PgVersion +from fixtures.shared_fixtures import TTimeline from fixtures.utils import query_scalar @@ -12,18 +13,17 @@ from fixtures.utils import query_scalar # Test CREATE DATABASE when there have been relmapper changes # @pytest.mark.parametrize("strategy", ["file_copy", "wal_log"]) -def test_createdb(neon_simple_env: NeonEnv, strategy: str): - env = neon_simple_env - if env.pg_version == PgVersion.V14 and strategy == "wal_log": +def test_createdb(timeline: TTimeline, pg_version: PgVersion, strategy: str): + if pg_version == PgVersion.V14 and strategy == "wal_log": pytest.skip("wal_log strategy not supported on PostgreSQL 14") - endpoint = env.endpoints.create_start("main") + endpoint = timeline.primary with endpoint.cursor() as cur: # Cause a 'relmapper' change in the original branch cur.execute("VACUUM FULL pg_class") - if env.pg_version == PgVersion.V14: + if pg_version == PgVersion.V14: cur.execute("CREATE DATABASE foodb") else: cur.execute(f"CREATE DATABASE foodb STRATEGY={strategy}") @@ -31,8 +31,8 @@ def test_createdb(neon_simple_env: NeonEnv, strategy: str): lsn = query_scalar(cur, "SELECT pg_current_wal_insert_lsn()") # Create a branch - env.neon_cli.create_branch("test_createdb2", "main", ancestor_start_lsn=lsn) - endpoint2 = env.endpoints.create_start("test_createdb2") + tl2 = timeline.create_branch("createdb2", lsn=lsn) + endpoint2 = tl2.primary # Test that you can connect to the new database on both branches for db in (endpoint, endpoint2): @@ -58,9 +58,8 @@ def test_createdb(neon_simple_env: NeonEnv, strategy: str): # # Test DROP DATABASE # -def test_dropdb(neon_simple_env: NeonEnv, test_output_dir): - env = neon_simple_env - endpoint = env.endpoints.create_start("main") +def test_dropdb(timeline: TTimeline, test_output_dir): + endpoint = timeline.primary with endpoint.cursor() as cur: cur.execute("CREATE DATABASE foodb") @@ -77,28 +76,28 @@ def test_dropdb(neon_simple_env: NeonEnv, test_output_dir): lsn_after_drop = query_scalar(cur, "SELECT pg_current_wal_insert_lsn()") # Create two branches before and after database drop. - env.neon_cli.create_branch("test_before_dropdb", "main", ancestor_start_lsn=lsn_before_drop) - endpoint_before = env.endpoints.create_start("test_before_dropdb") + tl_before = timeline.create_branch("test_before_dropdb", lsn=lsn_before_drop) + endpoint_before = tl_before.primary - env.neon_cli.create_branch("test_after_dropdb", "main", ancestor_start_lsn=lsn_after_drop) - endpoint_after = env.endpoints.create_start("test_after_dropdb") + tl_after = timeline.create_branch("test_after_dropdb", lsn=lsn_after_drop) + endpoint_after = tl_after.primary # Test that database exists on the branch before drop endpoint_before.connect(dbname="foodb").close() # Test that database subdir exists on the branch before drop - assert endpoint_before.pgdata_dir - dbpath = pathlib.Path(endpoint_before.pgdata_dir) / "base" / str(dboid) + assert timeline.tenant.pgdatadir(endpoint_before) + dbpath = pathlib.Path(timeline.tenant.pgdatadir(endpoint_before)) / "base" / str(dboid) log.info(dbpath) assert os.path.isdir(dbpath) is True # Test that database subdir doesn't exist on the branch after drop - assert endpoint_after.pgdata_dir - dbpath = pathlib.Path(endpoint_after.pgdata_dir) / "base" / str(dboid) + assert timeline.tenant.pgdatadir(endpoint_after) + dbpath = pathlib.Path(timeline.tenant.pgdatadir(endpoint_after)) / "base" / str(dboid) log.info(dbpath) assert os.path.isdir(dbpath) is False # Check that we restore the content of the datadir correctly - check_restored_datadir_content(test_output_dir, env, endpoint) + timeline.tenant.check_restored_datadir_content(test_output_dir, endpoint) diff --git a/test_runner/regress/test_createuser.py b/test_runner/regress/test_createuser.py index d6f138e126..5050937403 100644 --- a/test_runner/regress/test_createuser.py +++ b/test_runner/regress/test_createuser.py @@ -1,13 +1,13 @@ from fixtures.neon_fixtures import NeonEnv +from fixtures.shared_fixtures import TTimeline from fixtures.utils import query_scalar # # Test CREATE USER to check shared catalog restore # -def test_createuser(neon_simple_env: NeonEnv): - env = neon_simple_env - endpoint = env.endpoints.create_start("main") +def test_createuser(timeline: TTimeline): + endpoint = timeline.primary with endpoint.cursor() as cur: # Cause a 'relmapper' change in the original branch @@ -18,8 +18,8 @@ def test_createuser(neon_simple_env: NeonEnv): lsn = query_scalar(cur, "SELECT pg_current_wal_insert_lsn()") # Create a branch - env.neon_cli.create_branch("test_createuser2", "main", ancestor_start_lsn=lsn) - endpoint2 = env.endpoints.create_start("test_createuser2") + branch = timeline.create_branch("test_createuser2", lsn=lsn) + endpoint2 = branch.primary # Test that you can connect to new branch as a new user assert endpoint2.safe_psql("select current_user", user="testuser") == [("testuser",)] diff --git a/test_runner/regress/test_endpoint_crash.py b/test_runner/regress/test_endpoint_crash.py index 7a432b4632..1f3f89c1c0 100644 --- a/test_runner/regress/test_endpoint_crash.py +++ b/test_runner/regress/test_endpoint_crash.py @@ -1,5 +1,6 @@ import pytest from fixtures.neon_fixtures import Endpoint +from fixtures.shared_fixtures import TTimeline @pytest.mark.parametrize( @@ -10,11 +11,11 @@ from fixtures.neon_fixtures import Endpoint "💣", # calls `trigger_segfault` internally ], ) -def test_endpoint_crash(neon_endpoint: Endpoint, sql_func: str): +def test_endpoint_crash(timeline: TTimeline, sql_func: str): """ Test that triggering crash from neon_test_utils crashes the endpoint """ - endpoint = neon_endpoint + endpoint = timeline.primary endpoint.safe_psql("CREATE EXTENSION neon_test_utils;") with pytest.raises(Exception, match="This probably means the server terminated abnormally"): endpoint.safe_psql(f"SELECT {sql_func}();") diff --git a/test_runner/regress/test_fsm_truncate.py b/test_runner/regress/test_fsm_truncate.py index 5b20176943..a469553786 100644 --- a/test_runner/regress/test_fsm_truncate.py +++ b/test_runner/regress/test_fsm_truncate.py @@ -1,8 +1,8 @@ -from fixtures.neon_fixtures import Endpoint +from fixtures.shared_fixtures import TTimeline -def test_fsm_truncate(neon_endpoint: Endpoint): - endpoint = neon_endpoint +def test_fsm_truncate(timeline: TTimeline): + endpoint = timeline.primary endpoint.safe_psql( "CREATE TABLE t1(key int); CREATE TABLE t2(key int); TRUNCATE TABLE t1; TRUNCATE TABLE t2;" ) diff --git a/test_runner/regress/test_gin_redo.py b/test_runner/regress/test_gin_redo.py index 9205882239..b863c116ac 100644 --- a/test_runner/regress/test_gin_redo.py +++ b/test_runner/regress/test_gin_redo.py @@ -1,17 +1,15 @@ import time from fixtures.neon_fixtures import NeonEnv, wait_replica_caughtup +from fixtures.shared_fixtures import TTimeline # # Test that redo of XLOG_GIN_VACUUM_PAGE doesn't produce error # -def test_gin_redo(neon_simple_env: NeonEnv): - env = neon_simple_env - - primary = env.endpoints.create_start(branch_name="main", endpoint_id="primary") - time.sleep(1) - secondary = env.endpoints.new_replica_start(origin=primary, endpoint_id="secondary") +def test_gin_redo(timeline: TTimeline): + primary = timeline.primary + secondary = timeline.secondary con = primary.connect() cur = con.cursor() cur.execute("create table gin_test_tbl(id integer, i int4[])") diff --git a/test_runner/regress/test_hot_standby.py b/test_runner/regress/test_hot_standby.py index ae63136abb..7b1b0c5dee 100644 --- a/test_runner/regress/test_hot_standby.py +++ b/test_runner/regress/test_hot_standby.py @@ -14,6 +14,7 @@ from fixtures.neon_fixtures import ( tenant_get_shards, wait_replica_caughtup, ) +from fixtures.shared_fixtures import TTimeline from fixtures.utils import wait_until @@ -221,29 +222,20 @@ def pgbench_accounts_initialized(ep): # # Without hs feedback enabled we'd see 'User query might have needed to see row # versions that must be removed.' errors. -def test_hot_standby_feedback(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin): - env = neon_env_builder.init_start() - agressive_vacuum_conf = [ +def test_hot_standby_feedback(timeline: TTimeline, pg_bin: PgBin): + with timeline.primary_with_config(config_lines=[ "log_autovacuum_min_duration = 0", "autovacuum_naptime = 10s", "autovacuum_vacuum_threshold = 25", "autovacuum_vacuum_scale_factor = 0.1", "autovacuum_vacuum_cost_delay = -1", - ] - with env.endpoints.create_start( - branch_name="main", endpoint_id="primary", config_lines=agressive_vacuum_conf - ) as primary: + ]) as primary: # It would be great to have more strict max_standby_streaming_delay=0s here, but then sometimes it fails with # 'User was holding shared buffer pin for too long.'. - with env.endpoints.new_replica_start( - origin=primary, - endpoint_id="secondary", - config_lines=[ - "max_standby_streaming_delay=2s", - "neon.protocol_version=2", - "hot_standby_feedback=true", - ], - ) as secondary: + with timeline.secondary_with_config(config_lines=[ + "max_standby_streaming_delay=2s", + "hot_standby_feedback=true", + ]) as secondary: log.info( f"primary connstr is {primary.connstr()}, secondary connstr {secondary.connstr()}" ) @@ -286,20 +278,15 @@ def test_hot_standby_feedback(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin): # Test race condition between WAL replay and backends performing queries # https://github.com/neondatabase/neon/issues/7791 -def test_replica_query_race(neon_simple_env: NeonEnv): - env = neon_simple_env - - primary_ep = env.endpoints.create_start( - branch_name="main", - endpoint_id="primary", - ) +def test_replica_query_race(timeline: TTimeline): + primary_ep = timeline.primary with primary_ep.connect() as p_con: with p_con.cursor() as p_cur: p_cur.execute("CREATE EXTENSION neon_test_utils") p_cur.execute("CREATE TABLE test AS SELECT 0 AS counter") - standby_ep = env.endpoints.new_replica_start(origin=primary_ep, endpoint_id="standby") + standby_ep = timeline.secondary wait_replica_caughtup(primary_ep, standby_ep) # In primary, run a lot of UPDATEs on a single page diff --git a/test_runner/regress/test_lfc_resize.py b/test_runner/regress/test_lfc_resize.py index cb0b30d9c6..87f0641556 100644 --- a/test_runner/regress/test_lfc_resize.py +++ b/test_runner/regress/test_lfc_resize.py @@ -8,22 +8,19 @@ import time import pytest from fixtures.log_helper import log from fixtures.neon_fixtures import NeonEnv, PgBin +from fixtures.shared_fixtures import TTimeline # # Test branching, when a transaction is in prepared state # @pytest.mark.timeout(600) -def test_lfc_resize(neon_simple_env: NeonEnv, pg_bin: PgBin): - env = neon_simple_env - endpoint = env.endpoints.create_start( - "main", - config_lines=[ - "neon.file_cache_path='file.cache'", - "neon.max_file_cache_size=512MB", - "neon.file_cache_size_limit=512MB", - ], - ) +def test_lfc_resize(timeline: TTimeline, pg_bin: PgBin): + endpoint = timeline.primary_with_config(config_lines=[ + "neon.file_cache_path='file.cache'", + "neon.max_file_cache_size=512MB", + "neon.file_cache_size_limit=512MB", + ]) n_resize = 10 scale = 100 @@ -49,7 +46,7 @@ def test_lfc_resize(neon_simple_env: NeonEnv, pg_bin: PgBin): thread.join() - lfc_file_path = f"{endpoint.pg_data_dir_path()}/file.cache" + lfc_file_path = timeline.tenant.pgdatadir(endpoint) / "file.cache" lfc_file_size = os.path.getsize(lfc_file_path) res = subprocess.run(["ls", "-sk", lfc_file_path], check=True, text=True, capture_output=True) lfc_file_blocks = re.findall("([0-9A-F]+)", res.stdout)[0] diff --git a/test_runner/regress/test_lfc_working_set_approximation.py b/test_runner/regress/test_lfc_working_set_approximation.py index 4a3a949d1a..64c33e829a 100644 --- a/test_runner/regress/test_lfc_working_set_approximation.py +++ b/test_runner/regress/test_lfc_working_set_approximation.py @@ -3,6 +3,7 @@ from pathlib import Path from fixtures.log_helper import log from fixtures.neon_fixtures import NeonEnv +from fixtures.shared_fixtures import TTimeline from fixtures.utils import query_scalar @@ -73,18 +74,14 @@ WITH (fillfactor='100'); assert blocks < 10 -def test_sliding_working_set_approximation(neon_simple_env: NeonEnv): - env = neon_simple_env +def test_sliding_working_set_approximation(timeline: TTimeline): + endpoint = timeline.primary_with_config(config_lines=[ + "autovacuum = off", + "shared_buffers=1MB", + "neon.max_file_cache_size=256MB", + "neon.file_cache_size_limit=245MB", + ]) - endpoint = env.endpoints.create_start( - branch_name="main", - config_lines=[ - "autovacuum = off", - "shared_buffers=1MB", - "neon.max_file_cache_size=256MB", - "neon.file_cache_size_limit=245MB", - ], - ) conn = endpoint.connect() cur = conn.cursor() cur.execute("create extension neon") diff --git a/test_runner/regress/test_multixact.py b/test_runner/regress/test_multixact.py index 8a00f8835f..143bd38291 100644 --- a/test_runner/regress/test_multixact.py +++ b/test_runner/regress/test_multixact.py @@ -1,4 +1,5 @@ from fixtures.neon_fixtures import NeonEnv, check_restored_datadir_content +from fixtures.shared_fixtures import TTimeline from fixtures.utils import query_scalar @@ -12,9 +13,8 @@ from fixtures.utils import query_scalar # is enough to verify that the WAL records are handled correctly # in the pageserver. # -def test_multixact(neon_simple_env: NeonEnv, test_output_dir): - env = neon_simple_env - endpoint = env.endpoints.create_start("main") +def test_multixact(timeline: TTimeline, test_output_dir): + endpoint = timeline.primary cur = endpoint.connect().cursor() cur.execute( @@ -72,10 +72,8 @@ def test_multixact(neon_simple_env: NeonEnv, test_output_dir): assert int(next_multixact_id) > int(next_multixact_id_old) # Branch at this point - env.neon_cli.create_branch( - "test_multixact_new", ancestor_branch_name="main", ancestor_start_lsn=lsn - ) - endpoint_new = env.endpoints.create_start("test_multixact_new") + branch = timeline.create_branch("test_multixact_new", lsn=lsn) + endpoint_new = branch.primary next_multixact_id_new = endpoint_new.safe_psql( "SELECT next_multixact_id FROM pg_control_checkpoint()" @@ -85,4 +83,4 @@ def test_multixact(neon_simple_env: NeonEnv, test_output_dir): assert next_multixact_id_new == next_multixact_id # Check that we can restore the content of the datadir correctly - check_restored_datadir_content(test_output_dir, env, endpoint) + timeline.tenant.check_restored_datadir_content(test_output_dir, endpoint) diff --git a/test_runner/regress/test_vm_bits.py b/test_runner/regress/test_vm_bits.py index 90c20d808a..c92a2a0d7e 100644 --- a/test_runner/regress/test_vm_bits.py +++ b/test_runner/regress/test_vm_bits.py @@ -2,7 +2,7 @@ import time from contextlib import closing from fixtures.log_helper import log -from fixtures.neon_fixtures import Endpoint, NeonEnv, NeonEnvBuilder, fork_at_current_lsn +from fixtures.neon_fixtures import NeonEnv, NeonEnvBuilder from fixtures.utils import query_scalar