mirror of
https://github.com/neondatabase/neon.git
synced 2026-06-01 12:30:38 +00:00
Some work on reducing test setup overhead
With the introduction of test-local tenants and timelines, we can still allow tests to share a pageserver and/or safekeeper, removing the overhead of setting up those components. Future work can add wrappers for Pageserver, Safekeepers, and other APIs that expose tenant-level configuration options that shouldn't impact concurrent or sequential tests that reuse the same PS/SK resources. Important note: This doesn't yet work consistently for all updated tests. [skip ci]
This commit is contained in:
0
test_runner/fixtures/function/__init__.py
Normal file
0
test_runner/fixtures/function/__init__.py
Normal file
48
test_runner/fixtures/function/neon_storage.py
Normal file
48
test_runner/fixtures/function/neon_storage.py
Normal file
@@ -0,0 +1,48 @@
|
||||
from typing import Iterator, Optional, Dict, Any
|
||||
|
||||
import pytest
|
||||
from _pytest.fixtures import FixtureRequest
|
||||
|
||||
from fixtures.neon_fixtures import NeonEnv
|
||||
from fixtures.shared_fixtures import TTenant, TTimeline
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def tenant_config() -> Dict[str, Any]:
|
||||
return dict()
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def tenant(
|
||||
neon_shared_env: NeonEnv,
|
||||
request: FixtureRequest,
|
||||
tenant_config: Optional[Dict[str, Any]] = None,
|
||||
) -> Iterator[TTenant]:
|
||||
tenant = TTenant(env=neon_shared_env, name=request.node.name, config=tenant_config)
|
||||
yield tenant
|
||||
tenant.shutdown_resources()
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def timeline(
|
||||
tenant: TTenant,
|
||||
) -> Iterator[TTimeline]:
|
||||
timeline = tenant.default_timeline
|
||||
yield timeline
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def exclusive_tenant(
|
||||
neon_env: NeonEnv,
|
||||
request: FixtureRequest,
|
||||
tenant_config: Optional[Dict[str, Any]] = None,
|
||||
) -> Iterator[TTenant]:
|
||||
tenant = TTenant(env=neon_env, name=request.node.name, config=tenant_config)
|
||||
yield tenant
|
||||
tenant.shutdown_resources()
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def exclusive_timeline(
|
||||
exclusive_tenant: TTenant,
|
||||
) -> Iterator[TTimeline]:
|
||||
timeline = exclusive_tenant.default_timeline
|
||||
yield timeline
|
||||
@@ -93,7 +93,6 @@ from fixtures.utils import (
|
||||
allure_add_grafana_links,
|
||||
allure_attach_from_dir,
|
||||
assert_no_errors,
|
||||
get_self_dir,
|
||||
print_gc_result,
|
||||
subprocess_capture,
|
||||
wait_until,
|
||||
@@ -126,122 +125,6 @@ Env = Dict[str, str]
|
||||
DEFAULT_OUTPUT_DIR: str = "test_output"
|
||||
DEFAULT_BRANCH_NAME: str = "main"
|
||||
|
||||
BASE_PORT: int = 15000
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def base_dir() -> Iterator[Path]:
|
||||
# find the base directory (currently this is the git root)
|
||||
base_dir = get_self_dir().parent.parent
|
||||
log.info(f"base_dir is {base_dir}")
|
||||
|
||||
yield base_dir
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def neon_binpath(base_dir: Path, build_type: str) -> Iterator[Path]:
|
||||
if os.getenv("REMOTE_ENV"):
|
||||
# we are in remote env and do not have neon binaries locally
|
||||
# this is the case for benchmarks run on self-hosted runner
|
||||
return
|
||||
|
||||
# Find the neon binaries.
|
||||
if env_neon_bin := os.environ.get("NEON_BIN"):
|
||||
binpath = Path(env_neon_bin)
|
||||
else:
|
||||
binpath = base_dir / "target" / build_type
|
||||
log.info(f"neon_binpath is {binpath}")
|
||||
|
||||
if not (binpath / "pageserver").exists():
|
||||
raise Exception(f"neon binaries not found at '{binpath}'")
|
||||
|
||||
yield binpath
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def pg_distrib_dir(base_dir: Path) -> Iterator[Path]:
|
||||
if env_postgres_bin := os.environ.get("POSTGRES_DISTRIB_DIR"):
|
||||
distrib_dir = Path(env_postgres_bin).resolve()
|
||||
else:
|
||||
distrib_dir = base_dir / "pg_install"
|
||||
|
||||
log.info(f"pg_distrib_dir is {distrib_dir}")
|
||||
yield distrib_dir
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def top_output_dir(base_dir: Path) -> Iterator[Path]:
|
||||
# Compute the top-level directory for all tests.
|
||||
if env_test_output := os.environ.get("TEST_OUTPUT"):
|
||||
output_dir = Path(env_test_output).resolve()
|
||||
else:
|
||||
output_dir = base_dir / DEFAULT_OUTPUT_DIR
|
||||
output_dir.mkdir(exist_ok=True)
|
||||
|
||||
log.info(f"top_output_dir is {output_dir}")
|
||||
yield output_dir
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def versioned_pg_distrib_dir(pg_distrib_dir: Path, pg_version: PgVersion) -> Iterator[Path]:
|
||||
versioned_dir = pg_distrib_dir / pg_version.v_prefixed
|
||||
|
||||
psql_bin_path = versioned_dir / "bin/psql"
|
||||
postgres_bin_path = versioned_dir / "bin/postgres"
|
||||
|
||||
if os.getenv("REMOTE_ENV"):
|
||||
# When testing against a remote server, we only need the client binary.
|
||||
if not psql_bin_path.exists():
|
||||
raise Exception(f"psql not found at '{psql_bin_path}'")
|
||||
else:
|
||||
if not postgres_bin_path.exists():
|
||||
raise Exception(f"postgres not found at '{postgres_bin_path}'")
|
||||
|
||||
log.info(f"versioned_pg_distrib_dir is {versioned_dir}")
|
||||
yield versioned_dir
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def neon_api_key() -> str:
|
||||
api_key = os.getenv("NEON_API_KEY")
|
||||
if not api_key:
|
||||
raise AssertionError("Set the NEON_API_KEY environment variable")
|
||||
|
||||
return api_key
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def neon_api_base_url() -> str:
|
||||
return os.getenv("NEON_API_BASE_URL", "https://console-stage.neon.build/api/v2")
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def neon_api(neon_api_key: str, neon_api_base_url: str) -> NeonAPI:
|
||||
return NeonAPI(neon_api_key, neon_api_base_url)
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def worker_port_num():
|
||||
return (32768 - BASE_PORT) // int(os.environ.get("PYTEST_XDIST_WORKER_COUNT", "1"))
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def worker_seq_no(worker_id: str) -> int:
|
||||
# worker_id is a pytest-xdist fixture
|
||||
# it can be master or gw<number>
|
||||
# parse it to always get a number
|
||||
if worker_id == "master":
|
||||
return 0
|
||||
assert worker_id.startswith("gw")
|
||||
return int(worker_id[2:])
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def worker_base_port(worker_seq_no: int, worker_port_num: int) -> int:
|
||||
# so we divide ports in ranges of ports
|
||||
# so workers have disjoint set of ports for services
|
||||
return BASE_PORT + worker_seq_no * worker_port_num
|
||||
|
||||
|
||||
def get_dir_size(path: str) -> int:
|
||||
"""Return size in bytes."""
|
||||
@@ -253,11 +136,6 @@ def get_dir_size(path: str) -> int:
|
||||
return totalbytes
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def port_distributor(worker_base_port: int, worker_port_num: int) -> PortDistributor:
|
||||
return PortDistributor(base_port=worker_base_port, port_number=worker_port_num)
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def default_broker(
|
||||
port_distributor: PortDistributor,
|
||||
@@ -272,32 +150,6 @@ def default_broker(
|
||||
yield broker
|
||||
broker.stop()
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def shared_broker(
|
||||
port_distributor: PortDistributor,
|
||||
shared_test_output_dir: Path,
|
||||
neon_binpath: Path,
|
||||
) -> Iterator[NeonBroker]:
|
||||
# multiple pytest sessions could get launched in parallel, get them different ports/datadirs
|
||||
client_port = port_distributor.get_port()
|
||||
broker_logfile = shared_test_output_dir / "repo" / "storage_broker.log"
|
||||
|
||||
broker = NeonBroker(logfile=broker_logfile, port=client_port, neon_binpath=neon_binpath)
|
||||
yield broker
|
||||
broker.stop()
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def run_id() -> Iterator[uuid.UUID]:
|
||||
yield uuid.uuid4()
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def mock_s3_server(port_distributor: PortDistributor) -> Iterator[MockS3Server]:
|
||||
mock_s3_server = MockS3Server(port_distributor.get_port())
|
||||
yield mock_s3_server
|
||||
mock_s3_server.kill()
|
||||
|
||||
|
||||
class PgProtocol:
|
||||
"""Reusable connection logic"""
|
||||
@@ -555,9 +407,9 @@ class NeonEnvBuilder:
|
||||
|
||||
self.pageserver_io_buffer_alignment = pageserver_io_buffer_alignment
|
||||
|
||||
assert test_name.startswith(
|
||||
"test_"
|
||||
) or shared, "Unexpectedly instantiated from outside a test function"
|
||||
assert (
|
||||
test_name.startswith("test_") or shared
|
||||
), "Unexpectedly instantiated from outside a test function"
|
||||
self.test_name = test_name
|
||||
|
||||
def init_configs(self, default_remote_storage_if_missing: bool = True) -> NeonEnv:
|
||||
@@ -1418,9 +1270,6 @@ class NeonEnv:
|
||||
return "ep-" + str(self.endpoint_counter)
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def neon_simple_env(
|
||||
request: FixtureRequest,
|
||||
@@ -1470,59 +1319,6 @@ def neon_simple_env(
|
||||
|
||||
yield env
|
||||
|
||||
@pytest.fixture(scope="session", autouse=True)
|
||||
def neon_shared_env(
|
||||
pytestconfig: Config,
|
||||
port_distributor: PortDistributor,
|
||||
mock_s3_server: MockS3Server,
|
||||
shared_broker: NeonBroker,
|
||||
run_id: uuid.UUID,
|
||||
top_output_dir: Path,
|
||||
shared_test_output_dir: Path,
|
||||
neon_binpath: Path,
|
||||
build_type: str,
|
||||
pg_distrib_dir: Path,
|
||||
pg_version: PgVersion,
|
||||
pageserver_virtual_file_io_engine: str,
|
||||
pageserver_aux_file_policy: Optional[AuxFileStore],
|
||||
pageserver_default_tenant_config_compaction_algorithm: Optional[Dict[str, Any]],
|
||||
pageserver_io_buffer_alignment: Optional[int],
|
||||
request: FixtureRequest,
|
||||
) -> Iterator[NeonEnv]:
|
||||
"""
|
||||
Simple Neon environment, with no authentication and no safekeepers.
|
||||
|
||||
This fixture will use RemoteStorageKind.LOCAL_FS with pageserver.
|
||||
"""
|
||||
|
||||
prefix = f"shared[{build_type}-{pg_version.v_prefixed}]-"
|
||||
|
||||
# Create the environment in the per-test output directory
|
||||
repo_dir = get_test_repo_dir(request, top_output_dir, prefix)
|
||||
|
||||
with NeonEnvBuilder(
|
||||
top_output_dir=top_output_dir,
|
||||
repo_dir=repo_dir,
|
||||
port_distributor=port_distributor,
|
||||
broker=shared_broker,
|
||||
mock_s3_server=mock_s3_server,
|
||||
neon_binpath=neon_binpath,
|
||||
pg_distrib_dir=pg_distrib_dir,
|
||||
pg_version=pg_version,
|
||||
run_id=run_id,
|
||||
preserve_database_files=cast(bool, pytestconfig.getoption("--preserve-database-files")),
|
||||
test_name=f"{prefix}{request.node.name}",
|
||||
test_output_dir=shared_test_output_dir,
|
||||
pageserver_virtual_file_io_engine=pageserver_virtual_file_io_engine,
|
||||
pageserver_aux_file_policy=pageserver_aux_file_policy,
|
||||
pageserver_default_tenant_config_compaction_algorithm=pageserver_default_tenant_config_compaction_algorithm,
|
||||
pageserver_io_buffer_alignment=pageserver_io_buffer_alignment,
|
||||
shared=True,
|
||||
) as builder:
|
||||
env = builder.init_start()
|
||||
|
||||
yield env
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def neon_endpoint(request: FixtureRequest, neon_shared_env: NeonEnv) -> Endpoint:
|
||||
@@ -1538,6 +1334,7 @@ def neon_endpoint(request: FixtureRequest, neon_shared_env: NeonEnv) -> Endpoint
|
||||
except BaseException:
|
||||
pass
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def neon_env_builder(
|
||||
pytestconfig: Config,
|
||||
@@ -4893,14 +4690,18 @@ def _get_test_dir(request: FixtureRequest, top_output_dir: Path, prefix: str) ->
|
||||
return test_dir
|
||||
|
||||
|
||||
def get_test_output_dir(request: FixtureRequest, top_output_dir: Path, prefix: Optional[str] = None) -> Path:
|
||||
def get_test_output_dir(
|
||||
request: FixtureRequest, top_output_dir: Path, prefix: Optional[str] = None
|
||||
) -> Path:
|
||||
"""
|
||||
The working directory for a test.
|
||||
"""
|
||||
return _get_test_dir(request, top_output_dir, prefix or "")
|
||||
|
||||
|
||||
def get_test_overlay_dir(request: FixtureRequest, top_output_dir: Path, prefix: Optional[str] = None) -> Path:
|
||||
def get_test_overlay_dir(
|
||||
request: FixtureRequest, top_output_dir: Path, prefix: Optional[str] = None
|
||||
) -> Path:
|
||||
"""
|
||||
Directory that contains `upperdir` and `workdir` for overlayfs mounts
|
||||
that a test creates. See `NeonEnvBuilder.overlay_mount`.
|
||||
@@ -4908,12 +4709,16 @@ def get_test_overlay_dir(request: FixtureRequest, top_output_dir: Path, prefix:
|
||||
return _get_test_dir(request, top_output_dir, f"overlay-{prefix or ''}")
|
||||
|
||||
|
||||
def get_shared_snapshot_dir_path(top_output_dir: Path, snapshot_name: str, prefix: Optional[str] = None) -> Path:
|
||||
def get_shared_snapshot_dir_path(
|
||||
top_output_dir: Path, snapshot_name: str, prefix: Optional[str] = None
|
||||
) -> Path:
|
||||
return top_output_dir / f"{prefix or ''}shared-snapshots" / snapshot_name
|
||||
|
||||
|
||||
def get_test_repo_dir(request: FixtureRequest, top_output_dir: Path, prefix: Optional[str] = None) -> Path:
|
||||
return get_test_output_dir(request, top_output_dir, prefix or '') / "repo"
|
||||
def get_test_repo_dir(
|
||||
request: FixtureRequest, top_output_dir: Path, prefix: Optional[str] = None
|
||||
) -> Path:
|
||||
return get_test_output_dir(request, top_output_dir, prefix or "") / "repo"
|
||||
|
||||
|
||||
def pytest_addoption(parser: Parser):
|
||||
@@ -4966,49 +4771,6 @@ def test_output_dir(
|
||||
|
||||
allure_attach_from_dir(test_dir, preserve_database_files)
|
||||
|
||||
# This is autouse, so the test output directory always gets created, even
|
||||
# if a test doesn't put anything there.
|
||||
#
|
||||
# NB: we request the overlay dir fixture so the fixture does its cleanups
|
||||
@pytest.fixture(scope="session", autouse=True)
|
||||
def shared_test_output_dir(
|
||||
request: FixtureRequest,
|
||||
pg_version: PgVersion,
|
||||
build_type: str,
|
||||
top_output_dir: Path,
|
||||
shared_test_overlay_dir: Path
|
||||
) -> Iterator[Path]:
|
||||
"""Create the working directory for shared tests."""
|
||||
|
||||
prefix = f"shared[{build_type}-{pg_version.v_prefixed}]-"
|
||||
|
||||
# one directory per test
|
||||
test_dir = get_test_output_dir(request, top_output_dir, prefix)
|
||||
|
||||
log.info(f"test_output_dir is {test_dir}")
|
||||
shutil.rmtree(test_dir, ignore_errors=True)
|
||||
test_dir.mkdir()
|
||||
|
||||
yield test_dir
|
||||
|
||||
# Allure artifacts creation might involve the creation of `.tar.zst` archives,
|
||||
# which aren't going to be used if Allure results collection is not enabled
|
||||
# (i.e. --alluredir is not set).
|
||||
# Skip `allure_attach_from_dir` in this case
|
||||
if not request.config.getoption("--alluredir"):
|
||||
return
|
||||
|
||||
preserve_database_files = False
|
||||
for k, v in request.node.user_properties:
|
||||
# NB: the neon_env_builder fixture uses this fixture (test_output_dir).
|
||||
# So, neon_env_builder's cleanup runs before here.
|
||||
# The cleanup propagates NeonEnvBuilder.preserve_database_files into this user property.
|
||||
if k == "preserve_database_files":
|
||||
assert isinstance(v, bool)
|
||||
preserve_database_files = v
|
||||
|
||||
allure_attach_from_dir(test_dir, preserve_database_files)
|
||||
|
||||
|
||||
class FileAndThreadLock:
|
||||
def __init__(self, path: Path):
|
||||
@@ -5115,42 +4877,6 @@ def test_overlay_dir(request: FixtureRequest, top_output_dir: Path) -> Optional[
|
||||
# and on unclean shutdown, this function will take care of it
|
||||
# on the next test run
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def shared_test_overlay_dir(request: FixtureRequest, top_output_dir: Path) -> Optional[Path]:
|
||||
"""
|
||||
Idempotently create a test's overlayfs mount state directory.
|
||||
If the functionality isn't enabled via env var, returns None.
|
||||
|
||||
The procedure cleans up after previous runs that were aborted (e.g. due to Ctrl-C, OOM kills, etc).
|
||||
"""
|
||||
|
||||
if os.getenv("NEON_ENV_BUILDER_USE_OVERLAYFS_FOR_SNAPSHOTS") is None:
|
||||
return None
|
||||
|
||||
overlay_dir = get_test_overlay_dir(request, top_output_dir)
|
||||
log.info(f"test_overlay_dir is {overlay_dir}")
|
||||
|
||||
overlay_dir.mkdir(exist_ok=True)
|
||||
# unmount stale overlayfs mounts which subdirectories of `overlay_dir/*` as the overlayfs `upperdir` and `workdir`
|
||||
for mountpoint in overlayfs.iter_mounts_beneath(get_test_output_dir(request, top_output_dir)):
|
||||
cmd = ["sudo", "umount", str(mountpoint)]
|
||||
log.info(
|
||||
f"Unmounting stale overlayfs mount probably created during earlier test run: {cmd}"
|
||||
)
|
||||
subprocess.run(cmd, capture_output=True, check=True)
|
||||
# the overlayfs `workdir`` is owned by `root`, shutil.rmtree won't work.
|
||||
cmd = ["sudo", "rm", "-rf", str(overlay_dir)]
|
||||
subprocess.run(cmd, capture_output=True, check=True)
|
||||
|
||||
overlay_dir.mkdir()
|
||||
|
||||
return overlay_dir
|
||||
|
||||
# no need to clean up anything: on clean shutdown,
|
||||
# NeonEnvBuilder.overlay_cleanup_teardown takes care of cleanup
|
||||
# and on unclean shutdown, this function will take care of it
|
||||
# on the next test run
|
||||
|
||||
|
||||
SKIP_DIRS = frozenset(
|
||||
(
|
||||
@@ -5357,7 +5083,7 @@ def tenant_get_shards(
|
||||
return [(TenantShardId(tenant_id, 0, 0), override_pageserver or env.pageserver)]
|
||||
|
||||
|
||||
def wait_replica_caughtup(primary: Endpoint, secondary: Endpoint):
|
||||
def wait_replica_caughtup(primary: PgProtocol, secondary: PgProtocol):
|
||||
primary_lsn = Lsn(
|
||||
primary.safe_psql_scalar("SELECT pg_current_wal_flush_lsn()", log_query=False)
|
||||
)
|
||||
|
||||
@@ -13,6 +13,7 @@ from fixtures.utils import AuxFileStore
|
||||
Dynamically parametrize tests by different parameters
|
||||
"""
|
||||
|
||||
|
||||
def get_pgversions():
|
||||
if (v := os.getenv("DEFAULT_PG_VERSION")) is None:
|
||||
pg_versions = [version for version in PgVersion if version != PgVersion.NOT_SET]
|
||||
|
||||
0
test_runner/fixtures/session/__init__.py
Normal file
0
test_runner/fixtures/session/__init__.py
Normal file
303
test_runner/fixtures/session/neon_storage.py
Normal file
303
test_runner/fixtures/session/neon_storage.py
Normal file
@@ -0,0 +1,303 @@
|
||||
import os
|
||||
import shutil
|
||||
import subprocess
|
||||
import uuid
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict, Iterator, Optional, cast
|
||||
|
||||
import pytest
|
||||
from _pytest.config import Config
|
||||
from _pytest.fixtures import FixtureRequest
|
||||
|
||||
from fixtures import overlayfs
|
||||
from fixtures.broker import NeonBroker
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_api import NeonAPI
|
||||
from fixtures.neon_fixtures import (
|
||||
DEFAULT_OUTPUT_DIR,
|
||||
NeonEnv,
|
||||
NeonEnvBuilder,
|
||||
get_test_output_dir,
|
||||
get_test_overlay_dir,
|
||||
get_test_repo_dir,
|
||||
)
|
||||
from fixtures.pg_version import PgVersion
|
||||
from fixtures.port_distributor import PortDistributor
|
||||
from fixtures.remote_storage import MockS3Server
|
||||
from fixtures.utils import AuxFileStore, allure_attach_from_dir, get_self_dir
|
||||
|
||||
BASE_PORT: int = 15000
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def base_dir() -> Iterator[Path]:
|
||||
# find the base directory (currently this is the git root)
|
||||
base_dir = get_self_dir().parent.parent
|
||||
log.info(f"base_dir is {base_dir}")
|
||||
|
||||
yield base_dir
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def neon_binpath(base_dir: Path, build_type: str) -> Iterator[Path]:
|
||||
if os.getenv("REMOTE_ENV"):
|
||||
# we are in remote env and do not have neon binaries locally
|
||||
# this is the case for benchmarks run on self-hosted runner
|
||||
return
|
||||
|
||||
# Find the neon binaries.
|
||||
if env_neon_bin := os.environ.get("NEON_BIN"):
|
||||
binpath = Path(env_neon_bin)
|
||||
else:
|
||||
binpath = base_dir / "target" / build_type
|
||||
log.info(f"neon_binpath is {binpath}")
|
||||
|
||||
if not (binpath / "pageserver").exists():
|
||||
raise Exception(f"neon binaries not found at '{binpath}'")
|
||||
|
||||
yield binpath
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def pg_distrib_dir(base_dir: Path) -> Iterator[Path]:
|
||||
if env_postgres_bin := os.environ.get("POSTGRES_DISTRIB_DIR"):
|
||||
distrib_dir = Path(env_postgres_bin).resolve()
|
||||
else:
|
||||
distrib_dir = base_dir / "pg_install"
|
||||
|
||||
log.info(f"pg_distrib_dir is {distrib_dir}")
|
||||
yield distrib_dir
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def top_output_dir(base_dir: Path) -> Iterator[Path]:
|
||||
# Compute the top-level directory for all tests.
|
||||
if env_test_output := os.environ.get("TEST_OUTPUT"):
|
||||
output_dir = Path(env_test_output).resolve()
|
||||
else:
|
||||
output_dir = base_dir / DEFAULT_OUTPUT_DIR
|
||||
output_dir.mkdir(exist_ok=True)
|
||||
|
||||
log.info(f"top_output_dir is {output_dir}")
|
||||
yield output_dir
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def versioned_pg_distrib_dir(pg_distrib_dir: Path, pg_version: PgVersion) -> Iterator[Path]:
|
||||
versioned_dir = pg_distrib_dir / pg_version.v_prefixed
|
||||
|
||||
psql_bin_path = versioned_dir / "bin/psql"
|
||||
postgres_bin_path = versioned_dir / "bin/postgres"
|
||||
|
||||
if os.getenv("REMOTE_ENV"):
|
||||
# When testing against a remote server, we only need the client binary.
|
||||
if not psql_bin_path.exists():
|
||||
raise Exception(f"psql not found at '{psql_bin_path}'")
|
||||
else:
|
||||
if not postgres_bin_path.exists():
|
||||
raise Exception(f"postgres not found at '{postgres_bin_path}'")
|
||||
|
||||
log.info(f"versioned_pg_distrib_dir is {versioned_dir}")
|
||||
yield versioned_dir
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def neon_api_key() -> str:
|
||||
api_key = os.getenv("NEON_API_KEY")
|
||||
if not api_key:
|
||||
raise AssertionError("Set the NEON_API_KEY environment variable")
|
||||
|
||||
return api_key
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def neon_api_base_url() -> str:
|
||||
return os.getenv("NEON_API_BASE_URL", "https://console-stage.neon.build/api/v2")
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def neon_api(neon_api_key: str, neon_api_base_url: str) -> NeonAPI:
|
||||
return NeonAPI(neon_api_key, neon_api_base_url)
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def worker_port_num():
|
||||
return (32768 - BASE_PORT) // int(os.environ.get("PYTEST_XDIST_WORKER_COUNT", "1"))
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def worker_seq_no(worker_id: str) -> int:
|
||||
# worker_id is a pytest-xdist fixture
|
||||
# it can be master or gw<number>
|
||||
# parse it to always get a number
|
||||
if worker_id == "master":
|
||||
return 0
|
||||
assert worker_id.startswith("gw")
|
||||
return int(worker_id[2:])
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def worker_base_port(worker_seq_no: int, worker_port_num: int) -> int:
|
||||
# so we divide ports in ranges of ports
|
||||
# so workers have disjoint set of ports for services
|
||||
return BASE_PORT + worker_seq_no * worker_port_num
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def port_distributor(worker_base_port: int, worker_port_num: int) -> PortDistributor:
|
||||
return PortDistributor(base_port=worker_base_port, port_number=worker_port_num)
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def shared_broker(
|
||||
port_distributor: PortDistributor,
|
||||
shared_test_output_dir: Path,
|
||||
neon_binpath: Path,
|
||||
) -> Iterator[NeonBroker]:
|
||||
# multiple pytest sessions could get launched in parallel, get them different ports/datadirs
|
||||
client_port = port_distributor.get_port()
|
||||
broker_logfile = shared_test_output_dir / "repo" / "storage_broker.log"
|
||||
|
||||
broker = NeonBroker(logfile=broker_logfile, port=client_port, neon_binpath=neon_binpath)
|
||||
yield broker
|
||||
broker.stop()
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def run_id() -> Iterator[uuid.UUID]:
|
||||
yield uuid.uuid4()
|
||||
|
||||
|
||||
@pytest.fixture(scope="session", autouse=True)
|
||||
def neon_shared_env(
|
||||
pytestconfig: Config,
|
||||
port_distributor: PortDistributor,
|
||||
mock_s3_server: MockS3Server,
|
||||
shared_broker: NeonBroker,
|
||||
run_id: uuid.UUID,
|
||||
top_output_dir: Path,
|
||||
shared_test_output_dir: Path,
|
||||
neon_binpath: Path,
|
||||
build_type: str,
|
||||
pg_distrib_dir: Path,
|
||||
pg_version: PgVersion,
|
||||
pageserver_virtual_file_io_engine: str,
|
||||
pageserver_aux_file_policy: Optional[AuxFileStore],
|
||||
pageserver_default_tenant_config_compaction_algorithm: Optional[Dict[str, Any]],
|
||||
pageserver_io_buffer_alignment: Optional[int],
|
||||
request: FixtureRequest,
|
||||
) -> Iterator[NeonEnv]:
|
||||
"""
|
||||
Simple Neon environment, with no authentication and no safekeepers.
|
||||
|
||||
This fixture will use RemoteStorageKind.LOCAL_FS with pageserver.
|
||||
"""
|
||||
|
||||
prefix = f"shared[{build_type}-{pg_version.v_prefixed}]-"
|
||||
|
||||
# Create the environment in the per-test output directory
|
||||
repo_dir = get_test_repo_dir(request, top_output_dir, prefix)
|
||||
|
||||
with NeonEnvBuilder(
|
||||
top_output_dir=top_output_dir,
|
||||
repo_dir=repo_dir,
|
||||
port_distributor=port_distributor,
|
||||
broker=shared_broker,
|
||||
mock_s3_server=mock_s3_server,
|
||||
neon_binpath=neon_binpath,
|
||||
pg_distrib_dir=pg_distrib_dir,
|
||||
pg_version=pg_version,
|
||||
run_id=run_id,
|
||||
preserve_database_files=cast(bool, pytestconfig.getoption("--preserve-database-files")),
|
||||
test_name=f"{prefix}{request.node.name}",
|
||||
test_output_dir=shared_test_output_dir,
|
||||
pageserver_virtual_file_io_engine=pageserver_virtual_file_io_engine,
|
||||
pageserver_aux_file_policy=pageserver_aux_file_policy,
|
||||
pageserver_default_tenant_config_compaction_algorithm=pageserver_default_tenant_config_compaction_algorithm,
|
||||
pageserver_io_buffer_alignment=pageserver_io_buffer_alignment,
|
||||
shared=True,
|
||||
) as builder:
|
||||
env = builder.init_start()
|
||||
|
||||
yield env
|
||||
|
||||
|
||||
# This is autouse, so the test output directory always gets created, even
|
||||
# if a test doesn't put anything there.
|
||||
#
|
||||
# NB: we request the overlay dir fixture so the fixture does its cleanups
|
||||
@pytest.fixture(scope="session")
|
||||
def shared_test_output_dir(
|
||||
request: FixtureRequest,
|
||||
pg_version: PgVersion,
|
||||
build_type: str,
|
||||
top_output_dir: Path,
|
||||
shared_test_overlay_dir: Path,
|
||||
) -> Iterator[Path]:
|
||||
"""Create the working directory for shared tests."""
|
||||
|
||||
prefix = f"shared[{build_type}-{pg_version.v_prefixed}]-"
|
||||
|
||||
# one directory per test
|
||||
test_dir = get_test_output_dir(request, top_output_dir, prefix)
|
||||
|
||||
log.info(f"test_output_dir is {test_dir}")
|
||||
shutil.rmtree(test_dir, ignore_errors=True)
|
||||
test_dir.mkdir()
|
||||
|
||||
yield test_dir
|
||||
|
||||
# Allure artifacts creation might involve the creation of `.tar.zst` archives,
|
||||
# which aren't going to be used if Allure results collection is not enabled
|
||||
# (i.e. --alluredir is not set).
|
||||
# Skip `allure_attach_from_dir` in this case
|
||||
if not request.config.getoption("--alluredir"):
|
||||
return
|
||||
|
||||
preserve_database_files = False
|
||||
for k, v in request.node.user_properties:
|
||||
# NB: the neon_env_builder fixture uses this fixture (test_output_dir).
|
||||
# So, neon_env_builder's cleanup runs before here.
|
||||
# The cleanup propagates NeonEnvBuilder.preserve_database_files into this user property.
|
||||
if k == "preserve_database_files":
|
||||
assert isinstance(v, bool)
|
||||
preserve_database_files = v
|
||||
|
||||
allure_attach_from_dir(test_dir, preserve_database_files)
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def shared_test_overlay_dir(request: FixtureRequest, top_output_dir: Path) -> Optional[Path]:
|
||||
"""
|
||||
Idempotently create a test's overlayfs mount state directory.
|
||||
If the functionality isn't enabled via env var, returns None.
|
||||
|
||||
The procedure cleans up after previous runs that were aborted (e.g. due to Ctrl-C, OOM kills, etc).
|
||||
"""
|
||||
|
||||
if os.getenv("NEON_ENV_BUILDER_USE_OVERLAYFS_FOR_SNAPSHOTS") is None:
|
||||
return None
|
||||
|
||||
overlay_dir = get_test_overlay_dir(request, top_output_dir)
|
||||
log.info(f"test_overlay_dir is {overlay_dir}")
|
||||
|
||||
overlay_dir.mkdir(exist_ok=True)
|
||||
# unmount stale overlayfs mounts which subdirectories of `overlay_dir/*` as the overlayfs `upperdir` and `workdir`
|
||||
for mountpoint in overlayfs.iter_mounts_beneath(get_test_output_dir(request, top_output_dir)):
|
||||
cmd = ["sudo", "umount", str(mountpoint)]
|
||||
log.info(
|
||||
f"Unmounting stale overlayfs mount probably created during earlier test run: {cmd}"
|
||||
)
|
||||
subprocess.run(cmd, capture_output=True, check=True)
|
||||
# the overlayfs `workdir`` is owned by `root`, shutil.rmtree won't work.
|
||||
cmd = ["sudo", "rm", "-rf", str(overlay_dir)]
|
||||
subprocess.run(cmd, capture_output=True, check=True)
|
||||
|
||||
overlay_dir.mkdir()
|
||||
|
||||
return overlay_dir
|
||||
|
||||
# no need to clean up anything: on clean shutdown,
|
||||
# NeonEnvBuilder.overlay_cleanup_teardown takes care of cleanup
|
||||
# and on unclean shutdown, this function will take care of it
|
||||
# on the next test run
|
||||
13
test_runner/fixtures/session/s3.py
Normal file
13
test_runner/fixtures/session/s3.py
Normal file
@@ -0,0 +1,13 @@
|
||||
from typing import Iterator
|
||||
|
||||
import pytest
|
||||
|
||||
from fixtures.port_distributor import PortDistributor
|
||||
from fixtures.remote_storage import MockS3Server
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def mock_s3_server(port_distributor: PortDistributor) -> Iterator[MockS3Server]:
|
||||
mock_s3_server = MockS3Server(port_distributor.get_port())
|
||||
yield mock_s3_server
|
||||
mock_s3_server.kill()
|
||||
@@ -1,12 +1,16 @@
|
||||
from dataclasses import dataclass
|
||||
from functools import partial
|
||||
from pathlib import Path
|
||||
from typing import Optional, List, Dict, Any, Iterator
|
||||
from typing import Any, Optional, cast, List, Dict
|
||||
|
||||
import pytest
|
||||
|
||||
from fixtures.common_types import TimelineId, Lsn
|
||||
from fixtures.neon_fixtures import Endpoint, NeonEnv, PgProtocol, Safekeeper
|
||||
from fixtures.pg_version import PgVersion
|
||||
from fixtures.common_types import Lsn, TenantId, TimelineId
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import Endpoint, NeonEnv, PgProtocol, DEFAULT_BRANCH_NAME, tenant_get_shards, \
|
||||
check_restored_datadir_content
|
||||
from fixtures.pageserver.utils import wait_for_last_record_lsn
|
||||
from fixtures.safekeeper.utils import are_walreceivers_absent
|
||||
from fixtures.utils import wait_until
|
||||
|
||||
"""
|
||||
In this file most important resources are exposed as function-level fixtures
|
||||
@@ -14,7 +18,7 @@ that depend on session-level resources like pageservers and safekeepers.
|
||||
|
||||
The main rationale here is that we don't need to initialize a new SK/PS/etc
|
||||
every time we want to test something that has branching: we can just as well
|
||||
reuse still-available PageServers from other tests of the same kind.
|
||||
reuse still-available PageServers from other tests of the same kind.
|
||||
"""
|
||||
|
||||
|
||||
@@ -23,120 +27,322 @@ def shared_storage_repo_path(build_type: str, base_dir: Path) -> Path:
|
||||
return base_dir / f"shared[{build_type}]" / "repo"
|
||||
|
||||
|
||||
class TEndpoint(PgProtocol):
|
||||
def clear_shared_buffers(self, cursor: Optional[Any] = None):
|
||||
"""
|
||||
Best-effort way to clear postgres buffers. Pinned buffers will not be 'cleared.'
|
||||
|
||||
class TestEndpoint(PgProtocol):
|
||||
def __init__(self, compute: Endpoint, **kwargs: Any):
|
||||
super().__init__(**kwargs)
|
||||
Might also clear LFC.
|
||||
"""
|
||||
if cursor is not None:
|
||||
cursor.execute("select clear_buffer_cache()")
|
||||
else:
|
||||
self.safe_psql("select clear_buffer_cache()")
|
||||
|
||||
_TEndpoint = Endpoint
|
||||
|
||||
|
||||
class TestTimeline:
|
||||
def __init__(self, name: str, tenant: 'TestTenant'):
|
||||
self.primary = None
|
||||
class TTimeline:
|
||||
__primary: Optional[_TEndpoint]
|
||||
__secondary: Optional[_TEndpoint]
|
||||
|
||||
def start_primary(self) -> Optional[TestEndpoint]:
|
||||
if not self.primary:
|
||||
return None
|
||||
self.primary.start()
|
||||
def __init__(self, timeline_id: TimelineId, tenant: "TTenant", name: str):
|
||||
self.id = timeline_id
|
||||
self.tenant = tenant
|
||||
self.name = name
|
||||
self.__primary = None
|
||||
self.__secondary = None
|
||||
|
||||
return self.primary
|
||||
@property
|
||||
def primary(self) -> TEndpoint:
|
||||
if self.__primary is None:
|
||||
self.__primary = cast(_TEndpoint, self.tenant.create_endpoint(
|
||||
name=self._get_full_endpoint_name("primary"),
|
||||
timeline_id=self.id,
|
||||
primary=True,
|
||||
running=True,
|
||||
))
|
||||
|
||||
return cast(TEndpoint, self.__primary)
|
||||
|
||||
def primary_with_config(self, config_lines: List[str], reboot: bool = False):
|
||||
if self.__primary is None:
|
||||
self.__primary = cast(_TEndpoint, self.tenant.create_endpoint(
|
||||
name=self._get_full_endpoint_name("primary"),
|
||||
timeline_id=self.id,
|
||||
primary=True,
|
||||
running=True,
|
||||
config_lines=config_lines,
|
||||
))
|
||||
else:
|
||||
self.__primary.config(config_lines)
|
||||
if reboot:
|
||||
if self.__primary.is_running():
|
||||
self.__primary.stop()
|
||||
self.__primary.start()
|
||||
|
||||
return cast(TTimeline, self.__primary)
|
||||
|
||||
@property
|
||||
def secondary(self) -> TEndpoint:
|
||||
if self.__secondary is None:
|
||||
self.__secondary = cast(_TEndpoint, self.tenant.create_endpoint(
|
||||
name=self._get_full_endpoint_name("secondary"),
|
||||
timeline_id=self.id,
|
||||
primary=False,
|
||||
running=True,
|
||||
))
|
||||
|
||||
return cast(TEndpoint, self.__secondary)
|
||||
|
||||
def secondary_with_config(self, config_lines: List[str], reboot: bool = False) -> TEndpoint:
|
||||
if self.__secondary is None:
|
||||
self.__secondary = cast(_TEndpoint, self.tenant.create_endpoint(
|
||||
name=self._get_full_endpoint_name("secondary"),
|
||||
timeline_id=self.id,
|
||||
primary=False,
|
||||
running=True,
|
||||
config_lines=config_lines,
|
||||
))
|
||||
else:
|
||||
self.__secondary.config(config_lines)
|
||||
if reboot:
|
||||
if self.__secondary.is_running():
|
||||
self.__secondary.stop()
|
||||
self.__secondary.start()
|
||||
|
||||
return cast(TEndpoint, self.__secondary)
|
||||
|
||||
def _get_full_endpoint_name(self, name) -> str:
|
||||
return f"{self.name}.{name}"
|
||||
|
||||
def create_branch(self, name: str, lsn: Optional[Lsn]) -> "TTimeline":
|
||||
return self.tenant.create_timeline(
|
||||
new_name=name,
|
||||
parent_name=self.name,
|
||||
branch_at=lsn,
|
||||
)
|
||||
|
||||
def stop_and_flush(self, endpoint: TEndpoint):
|
||||
self.tenant.stop_endpoint(endpoint)
|
||||
self.tenant.flush_timeline_data(self)
|
||||
|
||||
def checkpoint(self, **kwargs):
|
||||
self.tenant.checkpoint_timeline(self, **kwargs)
|
||||
|
||||
|
||||
class TestPageserver:
|
||||
"""
|
||||
We only need one pageserver for every build_type.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
build_type: str,
|
||||
bin_path: Path,
|
||||
repo_path: Path,
|
||||
port: int
|
||||
):
|
||||
self.build_type = build_type
|
||||
self.bin_path = bin_path
|
||||
self.repo_path = repo_path
|
||||
self.port = port
|
||||
self.started = False
|
||||
|
||||
def _start(self):
|
||||
pass
|
||||
|
||||
def _stop(self):
|
||||
pass
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def shared_pageserver() -> Iterator[TestPageserver]:
|
||||
yield None
|
||||
|
||||
|
||||
class TestSafekeeper:
|
||||
"""
|
||||
We only need one safekeeper for every build_type.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
build_type: str,
|
||||
bin_path: Path,
|
||||
repo_path: Path,
|
||||
port: int
|
||||
):
|
||||
self.build_type = build_type
|
||||
self.bin_path = bin_path
|
||||
self.repo_path = repo_path
|
||||
self.port = port
|
||||
self.started = False
|
||||
|
||||
def _start(self):
|
||||
pass
|
||||
|
||||
def _stop(self):
|
||||
pass
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def shared_safekeeper(
|
||||
request,
|
||||
port: int,
|
||||
build_type: str,
|
||||
neon_binpath: Path,
|
||||
shared_storage_repo_path: Path
|
||||
) -> Iterator[TestSafekeeper]:
|
||||
sk = TestSafekeeper(
|
||||
build_type=build_type,
|
||||
bin_path=(neon_binpath / "safekeeper"),
|
||||
port=port,
|
||||
repo_path=shared_storage_repo_path,
|
||||
)
|
||||
sk.start()
|
||||
|
||||
yield sk
|
||||
sk.stop()
|
||||
|
||||
|
||||
class TestTenant:
|
||||
class TTenant:
|
||||
"""
|
||||
An object representing a single test case on a shared pageserver.
|
||||
All operations here are safe practically safe.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
pageserver: TestPageserver,
|
||||
safekeeper: TestSafekeeper,
|
||||
self,
|
||||
env: NeonEnv,
|
||||
name: str,
|
||||
config: Optional[Dict[str, Any]] = None,
|
||||
):
|
||||
self.pageserver = pageserver
|
||||
self.id = TenantId.generate()
|
||||
self.timelines = []
|
||||
self.timeline_names = {}
|
||||
self.timeline_ids = {}
|
||||
self.name = name
|
||||
|
||||
# publicly inaccessible stuff, used during shutdown
|
||||
self.__endpoints: List[Endpoint] = []
|
||||
self.__env: NeonEnv = env
|
||||
|
||||
env.neon_cli.create_tenant(
|
||||
tenant_id=self.id,
|
||||
set_default=False,
|
||||
conf=config
|
||||
)
|
||||
|
||||
self.first_timeline_id = env.neon_cli.create_branch(
|
||||
new_branch_name=f"{self.name}:{DEFAULT_BRANCH_NAME}",
|
||||
ancestor_branch_name=DEFAULT_BRANCH_NAME,
|
||||
tenant_id=self.id,
|
||||
)
|
||||
|
||||
self._add_timeline(
|
||||
TTimeline(
|
||||
timeline_id=self.first_timeline_id,
|
||||
tenant=self,
|
||||
name=DEFAULT_BRANCH_NAME,
|
||||
)
|
||||
)
|
||||
|
||||
def _add_timeline(self, timeline: TTimeline):
|
||||
assert timeline.tenant == self
|
||||
assert timeline not in self.timelines
|
||||
assert timeline.id is not None
|
||||
|
||||
self.timelines.append(timeline)
|
||||
self.timeline_ids[timeline.id] = timeline
|
||||
|
||||
if timeline.name is not None:
|
||||
self.timeline_names[timeline.name] = timeline
|
||||
|
||||
def create_timeline(
|
||||
self,
|
||||
name: str,
|
||||
parent_name: Optional[str],
|
||||
branch_at: Optional[Lsn],
|
||||
) -> TestTimeline:
|
||||
pass
|
||||
self,
|
||||
new_name: str,
|
||||
parent_name: Optional[str] = None,
|
||||
parent_id: Optional[TimelineId] = None,
|
||||
branch_at: Optional[Lsn] = None,
|
||||
) -> TTimeline:
|
||||
if parent_name is not None:
|
||||
pass
|
||||
elif parent_id is not None:
|
||||
assert parent_name is None
|
||||
parent = self.timeline_ids[parent_id]
|
||||
parent_name = parent.name
|
||||
else:
|
||||
raise LookupError("Timeline creation requires parent by either ID or name")
|
||||
|
||||
def stop(self):
|
||||
for it in self.active_computes:
|
||||
it.stop()
|
||||
assert parent_name is not None
|
||||
|
||||
new_id = self.__env.neon_cli.create_branch(
|
||||
new_branch_name=f"{self.name}:{new_name}",
|
||||
ancestor_branch_name=f"{self.name}:{parent_name}",
|
||||
tenant_id=self.id,
|
||||
ancestor_start_lsn=branch_at,
|
||||
)
|
||||
|
||||
new_tl = TTimeline(
|
||||
timeline_id=new_id,
|
||||
tenant=self,
|
||||
name=new_name,
|
||||
)
|
||||
self._add_timeline(new_tl)
|
||||
|
||||
return new_tl
|
||||
|
||||
@property
|
||||
def default_timeline(self) -> TTimeline:
|
||||
return self.timeline_ids.get(self.first_timeline_id)
|
||||
|
||||
def start_endpoint(self, ep: TEndpoint):
|
||||
if ep not in self.__endpoints:
|
||||
return
|
||||
|
||||
ep = cast(Endpoint, ep)
|
||||
|
||||
if not ep.is_running():
|
||||
ep.start()
|
||||
|
||||
def stop_endpoint(self, ep: TEndpoint, mode: str = "fast"):
|
||||
if ep not in self.__endpoints:
|
||||
return
|
||||
|
||||
ep = cast(Endpoint, ep)
|
||||
|
||||
if ep.is_running():
|
||||
ep.stop(mode=mode)
|
||||
|
||||
def create_endpoint(
|
||||
self,
|
||||
name: str,
|
||||
timeline_id: TimelineId,
|
||||
primary: bool = True,
|
||||
running: bool = False,
|
||||
port: Optional[int] = None,
|
||||
http_port: Optional[int] = None,
|
||||
lsn: Optional[Lsn] = None,
|
||||
config_lines: Optional[List[str]] = None,
|
||||
) -> TEndpoint:
|
||||
endpoint: _TEndpoint
|
||||
|
||||
if port is None:
|
||||
port = self.__env.port_distributor.get_port()
|
||||
|
||||
if http_port is None:
|
||||
http_port = self.__env.port_distributor.get_port()
|
||||
|
||||
if running:
|
||||
endpoint = self.__env.endpoints.create_start(
|
||||
branch_name=self.timeline_ids[timeline_id].name,
|
||||
endpoint_id=f"{self.name}.{name}",
|
||||
tenant_id=self.id,
|
||||
lsn=lsn,
|
||||
hot_standby=not primary,
|
||||
config_lines=config_lines,
|
||||
)
|
||||
else:
|
||||
endpoint = self.__env.endpoints.create(
|
||||
branch_name=self.timeline_ids[timeline_id].name,
|
||||
endpoint_id=f"{self.name}.{name}",
|
||||
tenant_id=self.id,
|
||||
lsn=lsn,
|
||||
hot_standby=not primary,
|
||||
config_lines=config_lines,
|
||||
)
|
||||
|
||||
self.__endpoints.append(endpoint)
|
||||
|
||||
return endpoint
|
||||
|
||||
def shutdown_resources(self):
|
||||
for ep in self.__endpoints:
|
||||
try:
|
||||
ep.stop_and_destroy("fast")
|
||||
except BaseException as e:
|
||||
log.error("Error encountered while shutting down endpoint %s", ep.endpoint_id, exc_info=e)
|
||||
|
||||
def reconfigure(self, endpoint: TEndpoint, lines: List[str], restart: bool):
|
||||
if endpoint not in self.__endpoints:
|
||||
return
|
||||
|
||||
endpoint = cast(_TEndpoint, endpoint)
|
||||
was_running = endpoint.is_running()
|
||||
if restart and was_running:
|
||||
endpoint.stop()
|
||||
|
||||
endpoint.config(lines)
|
||||
|
||||
if restart:
|
||||
endpoint.start()
|
||||
|
||||
def flush_timeline_data(self, timeline: TTimeline) -> Lsn:
|
||||
commit_lsn: Lsn = Lsn(0)
|
||||
# In principle in the absense of failures polling single sk would be enough.
|
||||
for sk in self.__env.safekeepers:
|
||||
cli = sk.http_client()
|
||||
# wait until compute connections are gone
|
||||
wait_until(30, 0.5, partial(are_walreceivers_absent, cli, self.id, timeline.id))
|
||||
commit_lsn = max(cli.get_commit_lsn(self.id, timeline.id), commit_lsn)
|
||||
|
||||
# Note: depending on WAL filtering implementation, probably most shards
|
||||
# won't be able to reach commit_lsn (unless gaps are also ack'ed), so this
|
||||
# is broken in sharded case.
|
||||
shards = tenant_get_shards(env=self.__env, tenant_id=self.id)
|
||||
for tenant_shard_id, pageserver in shards:
|
||||
log.info(
|
||||
f"flush_ep_to_pageserver: waiting for {commit_lsn} on shard {tenant_shard_id} on pageserver {pageserver.id})"
|
||||
)
|
||||
waited = wait_for_last_record_lsn(
|
||||
pageserver.http_client(), tenant_shard_id, timeline.id, commit_lsn
|
||||
)
|
||||
|
||||
assert waited >= commit_lsn
|
||||
|
||||
return commit_lsn
|
||||
|
||||
def checkpoint_timeline(self, timeline: TTimeline, **kwargs):
|
||||
self.__env.pageserver.http_client().timeline_checkpoint(
|
||||
tenant_id=self.id,
|
||||
timeline_id=timeline.id,
|
||||
**kwargs,
|
||||
)
|
||||
|
||||
def pgdatadir(self, endpoint: TEndpoint):
|
||||
if endpoint not in self.__endpoints:
|
||||
return None
|
||||
|
||||
return cast(Endpoint, endpoint).pgdata_dir
|
||||
|
||||
def check_restored_datadir_content(self, path, endpoint: TEndpoint, *args, **kwargs):
|
||||
if endpoint not in self.__endpoints:
|
||||
return
|
||||
|
||||
check_restored_datadir_content(path, self.__env, cast(Endpoint, endpoint), *args, **kwargs)
|
||||
|
||||
Reference in New Issue
Block a user