Compare commits

...

4 Commits

Author SHA1 Message Date
Matthias van de Meent
209fd611ac Some work on reducing test setup overhead
With the introduction of test-local tenants and timelines, we can still
allow tests to share a pageserver and/or safekeeper, removing the overhead
of setting up those components.

Future work can add wrappers for Pageserver, Safekeepers, and other APIs
that expose tenant-level configuration options that shouldn't impact
concurrent or sequential tests that reuse the same PS/SK resources.

Important note: This doesn't yet work consistently for all updated tests.

[skip ci]
2024-09-19 09:59:24 +02:00
Matthias van de Meent
31192ee655 Intermediate state for process-local Endpoints when accessing shared state 2024-09-13 02:20:31 +01:00
Matthias van de Meent
8ecbe975cd Intermediate state for process-local Endpoints when accessing shared state 2024-09-12 23:34:20 +01:00
Matthias van de Meent
1e62eb9ba0 Great Success! with shared fixtures
Not everything is done yet, but a good amount of this is done.
2024-09-12 22:05:05 +01:00
23 changed files with 895 additions and 315 deletions

View File

@@ -10,4 +10,8 @@ pytest_plugins = (
"fixtures.compare_fixtures",
"fixtures.slow",
"fixtures.flaky",
"fixtures.shared_fixtures",
"fixtures.function.neon_storage",
"fixtures.session.neon_storage",
"fixtures.session.s3",
)

View File

@@ -0,0 +1,48 @@
from typing import Iterator, Optional, Dict, Any
import pytest
from _pytest.fixtures import FixtureRequest
from fixtures.neon_fixtures import NeonEnv
from fixtures.shared_fixtures import TTenant, TTimeline
@pytest.fixture(scope="function")
def tenant_config() -> Dict[str, Any]:
return dict()
@pytest.fixture(scope="function")
def tenant(
neon_shared_env: NeonEnv,
request: FixtureRequest,
tenant_config: Optional[Dict[str, Any]] = None,
) -> Iterator[TTenant]:
tenant = TTenant(env=neon_shared_env, name=request.node.name, config=tenant_config)
yield tenant
tenant.shutdown_resources()
@pytest.fixture(scope="function")
def timeline(
tenant: TTenant,
) -> Iterator[TTimeline]:
timeline = tenant.default_timeline
yield timeline
@pytest.fixture(scope="function")
def exclusive_tenant(
neon_env: NeonEnv,
request: FixtureRequest,
tenant_config: Optional[Dict[str, Any]] = None,
) -> Iterator[TTenant]:
tenant = TTenant(env=neon_env, name=request.node.name, config=tenant_config)
yield tenant
tenant.shutdown_resources()
@pytest.fixture(scope="function")
def exclusive_timeline(
exclusive_tenant: TTenant,
) -> Iterator[TTimeline]:
timeline = exclusive_tenant.default_timeline
yield timeline

View File

@@ -93,7 +93,6 @@ from fixtures.utils import (
allure_add_grafana_links,
allure_attach_from_dir,
assert_no_errors,
get_self_dir,
print_gc_result,
subprocess_capture,
wait_until,
@@ -126,122 +125,6 @@ Env = Dict[str, str]
DEFAULT_OUTPUT_DIR: str = "test_output"
DEFAULT_BRANCH_NAME: str = "main"
BASE_PORT: int = 15000
@pytest.fixture(scope="session")
def base_dir() -> Iterator[Path]:
# find the base directory (currently this is the git root)
base_dir = get_self_dir().parent.parent
log.info(f"base_dir is {base_dir}")
yield base_dir
@pytest.fixture(scope="function")
def neon_binpath(base_dir: Path, build_type: str) -> Iterator[Path]:
if os.getenv("REMOTE_ENV"):
# we are in remote env and do not have neon binaries locally
# this is the case for benchmarks run on self-hosted runner
return
# Find the neon binaries.
if env_neon_bin := os.environ.get("NEON_BIN"):
binpath = Path(env_neon_bin)
else:
binpath = base_dir / "target" / build_type
log.info(f"neon_binpath is {binpath}")
if not (binpath / "pageserver").exists():
raise Exception(f"neon binaries not found at '{binpath}'")
yield binpath
@pytest.fixture(scope="function")
def pg_distrib_dir(base_dir: Path) -> Iterator[Path]:
if env_postgres_bin := os.environ.get("POSTGRES_DISTRIB_DIR"):
distrib_dir = Path(env_postgres_bin).resolve()
else:
distrib_dir = base_dir / "pg_install"
log.info(f"pg_distrib_dir is {distrib_dir}")
yield distrib_dir
@pytest.fixture(scope="session")
def top_output_dir(base_dir: Path) -> Iterator[Path]:
# Compute the top-level directory for all tests.
if env_test_output := os.environ.get("TEST_OUTPUT"):
output_dir = Path(env_test_output).resolve()
else:
output_dir = base_dir / DEFAULT_OUTPUT_DIR
output_dir.mkdir(exist_ok=True)
log.info(f"top_output_dir is {output_dir}")
yield output_dir
@pytest.fixture(scope="function")
def versioned_pg_distrib_dir(pg_distrib_dir: Path, pg_version: PgVersion) -> Iterator[Path]:
versioned_dir = pg_distrib_dir / pg_version.v_prefixed
psql_bin_path = versioned_dir / "bin/psql"
postgres_bin_path = versioned_dir / "bin/postgres"
if os.getenv("REMOTE_ENV"):
# When testing against a remote server, we only need the client binary.
if not psql_bin_path.exists():
raise Exception(f"psql not found at '{psql_bin_path}'")
else:
if not postgres_bin_path.exists():
raise Exception(f"postgres not found at '{postgres_bin_path}'")
log.info(f"versioned_pg_distrib_dir is {versioned_dir}")
yield versioned_dir
@pytest.fixture(scope="session")
def neon_api_key() -> str:
api_key = os.getenv("NEON_API_KEY")
if not api_key:
raise AssertionError("Set the NEON_API_KEY environment variable")
return api_key
@pytest.fixture(scope="session")
def neon_api_base_url() -> str:
return os.getenv("NEON_API_BASE_URL", "https://console-stage.neon.build/api/v2")
@pytest.fixture(scope="session")
def neon_api(neon_api_key: str, neon_api_base_url: str) -> NeonAPI:
return NeonAPI(neon_api_key, neon_api_base_url)
@pytest.fixture(scope="session")
def worker_port_num():
return (32768 - BASE_PORT) // int(os.environ.get("PYTEST_XDIST_WORKER_COUNT", "1"))
@pytest.fixture(scope="session")
def worker_seq_no(worker_id: str) -> int:
# worker_id is a pytest-xdist fixture
# it can be master or gw<number>
# parse it to always get a number
if worker_id == "master":
return 0
assert worker_id.startswith("gw")
return int(worker_id[2:])
@pytest.fixture(scope="session")
def worker_base_port(worker_seq_no: int, worker_port_num: int) -> int:
# so we divide ports in ranges of ports
# so workers have disjoint set of ports for services
return BASE_PORT + worker_seq_no * worker_port_num
def get_dir_size(path: str) -> int:
"""Return size in bytes."""
@@ -253,11 +136,6 @@ def get_dir_size(path: str) -> int:
return totalbytes
@pytest.fixture(scope="session")
def port_distributor(worker_base_port: int, worker_port_num: int) -> PortDistributor:
return PortDistributor(base_port=worker_base_port, port_number=worker_port_num)
@pytest.fixture(scope="function")
def default_broker(
port_distributor: PortDistributor,
@@ -273,18 +151,6 @@ def default_broker(
broker.stop()
@pytest.fixture(scope="session")
def run_id() -> Iterator[uuid.UUID]:
yield uuid.uuid4()
@pytest.fixture(scope="session")
def mock_s3_server(port_distributor: PortDistributor) -> Iterator[MockS3Server]:
mock_s3_server = MockS3Server(port_distributor.get_port())
yield mock_s3_server
mock_s3_server.kill()
class PgProtocol:
"""Reusable connection logic"""
@@ -484,6 +350,7 @@ class NeonEnvBuilder:
safekeeper_extra_opts: Optional[list[str]] = None,
storage_controller_port_override: Optional[int] = None,
pageserver_io_buffer_alignment: Optional[int] = None,
shared: Optional[bool] = False,
):
self.repo_dir = repo_dir
self.rust_log_override = rust_log_override
@@ -540,8 +407,8 @@ class NeonEnvBuilder:
self.pageserver_io_buffer_alignment = pageserver_io_buffer_alignment
assert test_name.startswith(
"test_"
assert (
test_name.startswith("test_") or shared
), "Unexpectedly instantiated from outside a test function"
self.test_name = test_name
@@ -1453,6 +1320,21 @@ def neon_simple_env(
yield env
@pytest.fixture(scope="function")
def neon_endpoint(request: FixtureRequest, neon_shared_env: NeonEnv) -> Endpoint:
neon_shared_env.neon_cli.create_branch(request.node.name)
ep = neon_shared_env.endpoints.create_start(request.node.name)
try:
yield ep
finally:
if ep.is_running():
try:
ep.stop()
except BaseException:
pass
@pytest.fixture(scope="function")
def neon_env_builder(
pytestconfig: Config,
@@ -4808,27 +4690,35 @@ def _get_test_dir(request: FixtureRequest, top_output_dir: Path, prefix: str) ->
return test_dir
def get_test_output_dir(request: FixtureRequest, top_output_dir: Path) -> Path:
def get_test_output_dir(
request: FixtureRequest, top_output_dir: Path, prefix: Optional[str] = None
) -> Path:
"""
The working directory for a test.
"""
return _get_test_dir(request, top_output_dir, "")
return _get_test_dir(request, top_output_dir, prefix or "")
def get_test_overlay_dir(request: FixtureRequest, top_output_dir: Path) -> Path:
def get_test_overlay_dir(
request: FixtureRequest, top_output_dir: Path, prefix: Optional[str] = None
) -> Path:
"""
Directory that contains `upperdir` and `workdir` for overlayfs mounts
that a test creates. See `NeonEnvBuilder.overlay_mount`.
"""
return _get_test_dir(request, top_output_dir, "overlay-")
return _get_test_dir(request, top_output_dir, f"overlay-{prefix or ''}")
def get_shared_snapshot_dir_path(top_output_dir: Path, snapshot_name: str) -> Path:
return top_output_dir / "shared-snapshots" / snapshot_name
def get_shared_snapshot_dir_path(
top_output_dir: Path, snapshot_name: str, prefix: Optional[str] = None
) -> Path:
return top_output_dir / f"{prefix or ''}shared-snapshots" / snapshot_name
def get_test_repo_dir(request: FixtureRequest, top_output_dir: Path) -> Path:
return get_test_output_dir(request, top_output_dir) / "repo"
def get_test_repo_dir(
request: FixtureRequest, top_output_dir: Path, prefix: Optional[str] = None
) -> Path:
return get_test_output_dir(request, top_output_dir, prefix or "") / "repo"
def pytest_addoption(parser: Parser):
@@ -5193,7 +5083,7 @@ def tenant_get_shards(
return [(TenantShardId(tenant_id, 0, 0), override_pageserver or env.pageserver)]
def wait_replica_caughtup(primary: Endpoint, secondary: Endpoint):
def wait_replica_caughtup(primary: PgProtocol, secondary: PgProtocol):
primary_lsn = Lsn(
primary.safe_psql_scalar("SELECT pg_current_wal_flush_lsn()", log_query=False)
)

View File

@@ -14,32 +14,60 @@ Dynamically parametrize tests by different parameters
"""
@pytest.fixture(scope="function", autouse=True)
def pg_version() -> Optional[PgVersion]:
return None
def get_pgversions():
if (v := os.getenv("DEFAULT_PG_VERSION")) is None:
pg_versions = [version for version in PgVersion if version != PgVersion.NOT_SET]
else:
pg_versions = [PgVersion(v)]
return pg_versions
@pytest.fixture(scope="function", autouse=True)
def build_type() -> Optional[str]:
return None
@pytest.fixture(
scope="session",
autouse=True,
params=get_pgversions(),
ids=lambda v: f"pg{v}",
)
def pg_version(request) -> Optional[PgVersion]:
return request.param
@pytest.fixture(scope="function", autouse=True)
def get_buildtypes():
if (bt := os.getenv("BUILD_TYPE")) is None:
build_types = ["debug", "release"]
else:
build_types = [bt.lower()]
return build_types
@pytest.fixture(
scope="session",
autouse=True,
params=get_buildtypes(),
ids=lambda t: f"{t}",
)
def build_type(request) -> Optional[str]:
return request.param
@pytest.fixture(scope="session", autouse=True)
def platform() -> Optional[str]:
return None
@pytest.fixture(scope="function", autouse=True)
@pytest.fixture(scope="session", autouse=True)
def pageserver_virtual_file_io_engine() -> Optional[str]:
return os.getenv("PAGESERVER_VIRTUAL_FILE_IO_ENGINE")
@pytest.fixture(scope="function", autouse=True)
@pytest.fixture(scope="session", autouse=True)
def pageserver_io_buffer_alignment() -> Optional[int]:
return None
@pytest.fixture(scope="function", autouse=True)
@pytest.fixture(scope="session", autouse=True)
def pageserver_aux_file_policy() -> Optional[AuxFileStore]:
return None
@@ -53,26 +81,12 @@ def get_pageserver_default_tenant_config_compaction_algorithm() -> Optional[Dict
return v
@pytest.fixture(scope="function", autouse=True)
@pytest.fixture(scope="session", autouse=True)
def pageserver_default_tenant_config_compaction_algorithm() -> Optional[Dict[str, Any]]:
return get_pageserver_default_tenant_config_compaction_algorithm()
def pytest_generate_tests(metafunc: Metafunc):
if (bt := os.getenv("BUILD_TYPE")) is None:
build_types = ["debug", "release"]
else:
build_types = [bt.lower()]
metafunc.parametrize("build_type", build_types)
if (v := os.getenv("DEFAULT_PG_VERSION")) is None:
pg_versions = [version for version in PgVersion if version != PgVersion.NOT_SET]
else:
pg_versions = [PgVersion(v)]
metafunc.parametrize("pg_version", pg_versions, ids=map(lambda v: f"pg{v}", pg_versions))
# A hacky way to parametrize tests only for `pageserver_virtual_file_io_engine=std-fs`
# And do not change test name for default `pageserver_virtual_file_io_engine=tokio-epoll-uring` to keep tests statistics
if (io_engine := os.getenv("PAGESERVER_VIRTUAL_FILE_IO_ENGINE", "")) not in (
@@ -89,6 +103,7 @@ def pytest_generate_tests(metafunc: Metafunc):
"pageserver_default_tenant_config_compaction_algorithm",
[explicit_default],
ids=[explicit_default["kind"]],
scope="session",
)
# For performance tests, parametrize also by platform

View File

View File

@@ -0,0 +1,303 @@
import os
import shutil
import subprocess
import uuid
from pathlib import Path
from typing import Any, Dict, Iterator, Optional, cast
import pytest
from _pytest.config import Config
from _pytest.fixtures import FixtureRequest
from fixtures import overlayfs
from fixtures.broker import NeonBroker
from fixtures.log_helper import log
from fixtures.neon_api import NeonAPI
from fixtures.neon_fixtures import (
DEFAULT_OUTPUT_DIR,
NeonEnv,
NeonEnvBuilder,
get_test_output_dir,
get_test_overlay_dir,
get_test_repo_dir,
)
from fixtures.pg_version import PgVersion
from fixtures.port_distributor import PortDistributor
from fixtures.remote_storage import MockS3Server
from fixtures.utils import AuxFileStore, allure_attach_from_dir, get_self_dir
BASE_PORT: int = 15000
@pytest.fixture(scope="session")
def base_dir() -> Iterator[Path]:
# find the base directory (currently this is the git root)
base_dir = get_self_dir().parent.parent
log.info(f"base_dir is {base_dir}")
yield base_dir
@pytest.fixture(scope="session")
def neon_binpath(base_dir: Path, build_type: str) -> Iterator[Path]:
if os.getenv("REMOTE_ENV"):
# we are in remote env and do not have neon binaries locally
# this is the case for benchmarks run on self-hosted runner
return
# Find the neon binaries.
if env_neon_bin := os.environ.get("NEON_BIN"):
binpath = Path(env_neon_bin)
else:
binpath = base_dir / "target" / build_type
log.info(f"neon_binpath is {binpath}")
if not (binpath / "pageserver").exists():
raise Exception(f"neon binaries not found at '{binpath}'")
yield binpath
@pytest.fixture(scope="session")
def pg_distrib_dir(base_dir: Path) -> Iterator[Path]:
if env_postgres_bin := os.environ.get("POSTGRES_DISTRIB_DIR"):
distrib_dir = Path(env_postgres_bin).resolve()
else:
distrib_dir = base_dir / "pg_install"
log.info(f"pg_distrib_dir is {distrib_dir}")
yield distrib_dir
@pytest.fixture(scope="session")
def top_output_dir(base_dir: Path) -> Iterator[Path]:
# Compute the top-level directory for all tests.
if env_test_output := os.environ.get("TEST_OUTPUT"):
output_dir = Path(env_test_output).resolve()
else:
output_dir = base_dir / DEFAULT_OUTPUT_DIR
output_dir.mkdir(exist_ok=True)
log.info(f"top_output_dir is {output_dir}")
yield output_dir
@pytest.fixture(scope="session")
def versioned_pg_distrib_dir(pg_distrib_dir: Path, pg_version: PgVersion) -> Iterator[Path]:
versioned_dir = pg_distrib_dir / pg_version.v_prefixed
psql_bin_path = versioned_dir / "bin/psql"
postgres_bin_path = versioned_dir / "bin/postgres"
if os.getenv("REMOTE_ENV"):
# When testing against a remote server, we only need the client binary.
if not psql_bin_path.exists():
raise Exception(f"psql not found at '{psql_bin_path}'")
else:
if not postgres_bin_path.exists():
raise Exception(f"postgres not found at '{postgres_bin_path}'")
log.info(f"versioned_pg_distrib_dir is {versioned_dir}")
yield versioned_dir
@pytest.fixture(scope="session")
def neon_api_key() -> str:
api_key = os.getenv("NEON_API_KEY")
if not api_key:
raise AssertionError("Set the NEON_API_KEY environment variable")
return api_key
@pytest.fixture(scope="session")
def neon_api_base_url() -> str:
return os.getenv("NEON_API_BASE_URL", "https://console-stage.neon.build/api/v2")
@pytest.fixture(scope="session")
def neon_api(neon_api_key: str, neon_api_base_url: str) -> NeonAPI:
return NeonAPI(neon_api_key, neon_api_base_url)
@pytest.fixture(scope="session")
def worker_port_num():
return (32768 - BASE_PORT) // int(os.environ.get("PYTEST_XDIST_WORKER_COUNT", "1"))
@pytest.fixture(scope="session")
def worker_seq_no(worker_id: str) -> int:
# worker_id is a pytest-xdist fixture
# it can be master or gw<number>
# parse it to always get a number
if worker_id == "master":
return 0
assert worker_id.startswith("gw")
return int(worker_id[2:])
@pytest.fixture(scope="session")
def worker_base_port(worker_seq_no: int, worker_port_num: int) -> int:
# so we divide ports in ranges of ports
# so workers have disjoint set of ports for services
return BASE_PORT + worker_seq_no * worker_port_num
@pytest.fixture(scope="session")
def port_distributor(worker_base_port: int, worker_port_num: int) -> PortDistributor:
return PortDistributor(base_port=worker_base_port, port_number=worker_port_num)
@pytest.fixture(scope="session")
def shared_broker(
port_distributor: PortDistributor,
shared_test_output_dir: Path,
neon_binpath: Path,
) -> Iterator[NeonBroker]:
# multiple pytest sessions could get launched in parallel, get them different ports/datadirs
client_port = port_distributor.get_port()
broker_logfile = shared_test_output_dir / "repo" / "storage_broker.log"
broker = NeonBroker(logfile=broker_logfile, port=client_port, neon_binpath=neon_binpath)
yield broker
broker.stop()
@pytest.fixture(scope="session")
def run_id() -> Iterator[uuid.UUID]:
yield uuid.uuid4()
@pytest.fixture(scope="session", autouse=True)
def neon_shared_env(
pytestconfig: Config,
port_distributor: PortDistributor,
mock_s3_server: MockS3Server,
shared_broker: NeonBroker,
run_id: uuid.UUID,
top_output_dir: Path,
shared_test_output_dir: Path,
neon_binpath: Path,
build_type: str,
pg_distrib_dir: Path,
pg_version: PgVersion,
pageserver_virtual_file_io_engine: str,
pageserver_aux_file_policy: Optional[AuxFileStore],
pageserver_default_tenant_config_compaction_algorithm: Optional[Dict[str, Any]],
pageserver_io_buffer_alignment: Optional[int],
request: FixtureRequest,
) -> Iterator[NeonEnv]:
"""
Simple Neon environment, with no authentication and no safekeepers.
This fixture will use RemoteStorageKind.LOCAL_FS with pageserver.
"""
prefix = f"shared[{build_type}-{pg_version.v_prefixed}]-"
# Create the environment in the per-test output directory
repo_dir = get_test_repo_dir(request, top_output_dir, prefix)
with NeonEnvBuilder(
top_output_dir=top_output_dir,
repo_dir=repo_dir,
port_distributor=port_distributor,
broker=shared_broker,
mock_s3_server=mock_s3_server,
neon_binpath=neon_binpath,
pg_distrib_dir=pg_distrib_dir,
pg_version=pg_version,
run_id=run_id,
preserve_database_files=cast(bool, pytestconfig.getoption("--preserve-database-files")),
test_name=f"{prefix}{request.node.name}",
test_output_dir=shared_test_output_dir,
pageserver_virtual_file_io_engine=pageserver_virtual_file_io_engine,
pageserver_aux_file_policy=pageserver_aux_file_policy,
pageserver_default_tenant_config_compaction_algorithm=pageserver_default_tenant_config_compaction_algorithm,
pageserver_io_buffer_alignment=pageserver_io_buffer_alignment,
shared=True,
) as builder:
env = builder.init_start()
yield env
# This is autouse, so the test output directory always gets created, even
# if a test doesn't put anything there.
#
# NB: we request the overlay dir fixture so the fixture does its cleanups
@pytest.fixture(scope="session")
def shared_test_output_dir(
request: FixtureRequest,
pg_version: PgVersion,
build_type: str,
top_output_dir: Path,
shared_test_overlay_dir: Path,
) -> Iterator[Path]:
"""Create the working directory for shared tests."""
prefix = f"shared[{build_type}-{pg_version.v_prefixed}]-"
# one directory per test
test_dir = get_test_output_dir(request, top_output_dir, prefix)
log.info(f"test_output_dir is {test_dir}")
shutil.rmtree(test_dir, ignore_errors=True)
test_dir.mkdir()
yield test_dir
# Allure artifacts creation might involve the creation of `.tar.zst` archives,
# which aren't going to be used if Allure results collection is not enabled
# (i.e. --alluredir is not set).
# Skip `allure_attach_from_dir` in this case
if not request.config.getoption("--alluredir"):
return
preserve_database_files = False
for k, v in request.node.user_properties:
# NB: the neon_env_builder fixture uses this fixture (test_output_dir).
# So, neon_env_builder's cleanup runs before here.
# The cleanup propagates NeonEnvBuilder.preserve_database_files into this user property.
if k == "preserve_database_files":
assert isinstance(v, bool)
preserve_database_files = v
allure_attach_from_dir(test_dir, preserve_database_files)
@pytest.fixture(scope="session")
def shared_test_overlay_dir(request: FixtureRequest, top_output_dir: Path) -> Optional[Path]:
"""
Idempotently create a test's overlayfs mount state directory.
If the functionality isn't enabled via env var, returns None.
The procedure cleans up after previous runs that were aborted (e.g. due to Ctrl-C, OOM kills, etc).
"""
if os.getenv("NEON_ENV_BUILDER_USE_OVERLAYFS_FOR_SNAPSHOTS") is None:
return None
overlay_dir = get_test_overlay_dir(request, top_output_dir)
log.info(f"test_overlay_dir is {overlay_dir}")
overlay_dir.mkdir(exist_ok=True)
# unmount stale overlayfs mounts which subdirectories of `overlay_dir/*` as the overlayfs `upperdir` and `workdir`
for mountpoint in overlayfs.iter_mounts_beneath(get_test_output_dir(request, top_output_dir)):
cmd = ["sudo", "umount", str(mountpoint)]
log.info(
f"Unmounting stale overlayfs mount probably created during earlier test run: {cmd}"
)
subprocess.run(cmd, capture_output=True, check=True)
# the overlayfs `workdir`` is owned by `root`, shutil.rmtree won't work.
cmd = ["sudo", "rm", "-rf", str(overlay_dir)]
subprocess.run(cmd, capture_output=True, check=True)
overlay_dir.mkdir()
return overlay_dir
# no need to clean up anything: on clean shutdown,
# NeonEnvBuilder.overlay_cleanup_teardown takes care of cleanup
# and on unclean shutdown, this function will take care of it
# on the next test run

View File

@@ -0,0 +1,13 @@
from typing import Iterator
import pytest
from fixtures.port_distributor import PortDistributor
from fixtures.remote_storage import MockS3Server
@pytest.fixture(scope="session")
def mock_s3_server(port_distributor: PortDistributor) -> Iterator[MockS3Server]:
mock_s3_server = MockS3Server(port_distributor.get_port())
yield mock_s3_server
mock_s3_server.kill()

View File

@@ -0,0 +1,348 @@
from functools import partial
from pathlib import Path
from typing import Any, Optional, cast, List, Dict
import pytest
from fixtures.common_types import Lsn, TenantId, TimelineId
from fixtures.log_helper import log
from fixtures.neon_fixtures import Endpoint, NeonEnv, PgProtocol, DEFAULT_BRANCH_NAME, tenant_get_shards, \
check_restored_datadir_content
from fixtures.pageserver.utils import wait_for_last_record_lsn
from fixtures.safekeeper.utils import are_walreceivers_absent
from fixtures.utils import wait_until
"""
In this file most important resources are exposed as function-level fixtures
that depend on session-level resources like pageservers and safekeepers.
The main rationale here is that we don't need to initialize a new SK/PS/etc
every time we want to test something that has branching: we can just as well
reuse still-available PageServers from other tests of the same kind.
"""
@pytest.fixture(scope="session")
def shared_storage_repo_path(build_type: str, base_dir: Path) -> Path:
return base_dir / f"shared[{build_type}]" / "repo"
class TEndpoint(PgProtocol):
def clear_shared_buffers(self, cursor: Optional[Any] = None):
"""
Best-effort way to clear postgres buffers. Pinned buffers will not be 'cleared.'
Might also clear LFC.
"""
if cursor is not None:
cursor.execute("select clear_buffer_cache()")
else:
self.safe_psql("select clear_buffer_cache()")
_TEndpoint = Endpoint
class TTimeline:
__primary: Optional[_TEndpoint]
__secondary: Optional[_TEndpoint]
def __init__(self, timeline_id: TimelineId, tenant: "TTenant", name: str):
self.id = timeline_id
self.tenant = tenant
self.name = name
self.__primary = None
self.__secondary = None
@property
def primary(self) -> TEndpoint:
if self.__primary is None:
self.__primary = cast(_TEndpoint, self.tenant.create_endpoint(
name=self._get_full_endpoint_name("primary"),
timeline_id=self.id,
primary=True,
running=True,
))
return cast(TEndpoint, self.__primary)
def primary_with_config(self, config_lines: List[str], reboot: bool = False):
if self.__primary is None:
self.__primary = cast(_TEndpoint, self.tenant.create_endpoint(
name=self._get_full_endpoint_name("primary"),
timeline_id=self.id,
primary=True,
running=True,
config_lines=config_lines,
))
else:
self.__primary.config(config_lines)
if reboot:
if self.__primary.is_running():
self.__primary.stop()
self.__primary.start()
return cast(TTimeline, self.__primary)
@property
def secondary(self) -> TEndpoint:
if self.__secondary is None:
self.__secondary = cast(_TEndpoint, self.tenant.create_endpoint(
name=self._get_full_endpoint_name("secondary"),
timeline_id=self.id,
primary=False,
running=True,
))
return cast(TEndpoint, self.__secondary)
def secondary_with_config(self, config_lines: List[str], reboot: bool = False) -> TEndpoint:
if self.__secondary is None:
self.__secondary = cast(_TEndpoint, self.tenant.create_endpoint(
name=self._get_full_endpoint_name("secondary"),
timeline_id=self.id,
primary=False,
running=True,
config_lines=config_lines,
))
else:
self.__secondary.config(config_lines)
if reboot:
if self.__secondary.is_running():
self.__secondary.stop()
self.__secondary.start()
return cast(TEndpoint, self.__secondary)
def _get_full_endpoint_name(self, name) -> str:
return f"{self.name}.{name}"
def create_branch(self, name: str, lsn: Optional[Lsn]) -> "TTimeline":
return self.tenant.create_timeline(
new_name=name,
parent_name=self.name,
branch_at=lsn,
)
def stop_and_flush(self, endpoint: TEndpoint):
self.tenant.stop_endpoint(endpoint)
self.tenant.flush_timeline_data(self)
def checkpoint(self, **kwargs):
self.tenant.checkpoint_timeline(self, **kwargs)
class TTenant:
"""
An object representing a single test case on a shared pageserver.
All operations here are safe practically safe.
"""
def __init__(
self,
env: NeonEnv,
name: str,
config: Optional[Dict[str, Any]] = None,
):
self.id = TenantId.generate()
self.timelines = []
self.timeline_names = {}
self.timeline_ids = {}
self.name = name
# publicly inaccessible stuff, used during shutdown
self.__endpoints: List[Endpoint] = []
self.__env: NeonEnv = env
env.neon_cli.create_tenant(
tenant_id=self.id,
set_default=False,
conf=config
)
self.first_timeline_id = env.neon_cli.create_branch(
new_branch_name=f"{self.name}:{DEFAULT_BRANCH_NAME}",
ancestor_branch_name=DEFAULT_BRANCH_NAME,
tenant_id=self.id,
)
self._add_timeline(
TTimeline(
timeline_id=self.first_timeline_id,
tenant=self,
name=DEFAULT_BRANCH_NAME,
)
)
def _add_timeline(self, timeline: TTimeline):
assert timeline.tenant == self
assert timeline not in self.timelines
assert timeline.id is not None
self.timelines.append(timeline)
self.timeline_ids[timeline.id] = timeline
if timeline.name is not None:
self.timeline_names[timeline.name] = timeline
def create_timeline(
self,
new_name: str,
parent_name: Optional[str] = None,
parent_id: Optional[TimelineId] = None,
branch_at: Optional[Lsn] = None,
) -> TTimeline:
if parent_name is not None:
pass
elif parent_id is not None:
assert parent_name is None
parent = self.timeline_ids[parent_id]
parent_name = parent.name
else:
raise LookupError("Timeline creation requires parent by either ID or name")
assert parent_name is not None
new_id = self.__env.neon_cli.create_branch(
new_branch_name=f"{self.name}:{new_name}",
ancestor_branch_name=f"{self.name}:{parent_name}",
tenant_id=self.id,
ancestor_start_lsn=branch_at,
)
new_tl = TTimeline(
timeline_id=new_id,
tenant=self,
name=new_name,
)
self._add_timeline(new_tl)
return new_tl
@property
def default_timeline(self) -> TTimeline:
return self.timeline_ids.get(self.first_timeline_id)
def start_endpoint(self, ep: TEndpoint):
if ep not in self.__endpoints:
return
ep = cast(Endpoint, ep)
if not ep.is_running():
ep.start()
def stop_endpoint(self, ep: TEndpoint, mode: str = "fast"):
if ep not in self.__endpoints:
return
ep = cast(Endpoint, ep)
if ep.is_running():
ep.stop(mode=mode)
def create_endpoint(
self,
name: str,
timeline_id: TimelineId,
primary: bool = True,
running: bool = False,
port: Optional[int] = None,
http_port: Optional[int] = None,
lsn: Optional[Lsn] = None,
config_lines: Optional[List[str]] = None,
) -> TEndpoint:
endpoint: _TEndpoint
if port is None:
port = self.__env.port_distributor.get_port()
if http_port is None:
http_port = self.__env.port_distributor.get_port()
if running:
endpoint = self.__env.endpoints.create_start(
branch_name=self.timeline_ids[timeline_id].name,
endpoint_id=f"{self.name}.{name}",
tenant_id=self.id,
lsn=lsn,
hot_standby=not primary,
config_lines=config_lines,
)
else:
endpoint = self.__env.endpoints.create(
branch_name=self.timeline_ids[timeline_id].name,
endpoint_id=f"{self.name}.{name}",
tenant_id=self.id,
lsn=lsn,
hot_standby=not primary,
config_lines=config_lines,
)
self.__endpoints.append(endpoint)
return endpoint
def shutdown_resources(self):
for ep in self.__endpoints:
try:
ep.stop_and_destroy("fast")
except BaseException as e:
log.error("Error encountered while shutting down endpoint %s", ep.endpoint_id, exc_info=e)
def reconfigure(self, endpoint: TEndpoint, lines: List[str], restart: bool):
if endpoint not in self.__endpoints:
return
endpoint = cast(_TEndpoint, endpoint)
was_running = endpoint.is_running()
if restart and was_running:
endpoint.stop()
endpoint.config(lines)
if restart:
endpoint.start()
def flush_timeline_data(self, timeline: TTimeline) -> Lsn:
commit_lsn: Lsn = Lsn(0)
# In principle in the absense of failures polling single sk would be enough.
for sk in self.__env.safekeepers:
cli = sk.http_client()
# wait until compute connections are gone
wait_until(30, 0.5, partial(are_walreceivers_absent, cli, self.id, timeline.id))
commit_lsn = max(cli.get_commit_lsn(self.id, timeline.id), commit_lsn)
# Note: depending on WAL filtering implementation, probably most shards
# won't be able to reach commit_lsn (unless gaps are also ack'ed), so this
# is broken in sharded case.
shards = tenant_get_shards(env=self.__env, tenant_id=self.id)
for tenant_shard_id, pageserver in shards:
log.info(
f"flush_ep_to_pageserver: waiting for {commit_lsn} on shard {tenant_shard_id} on pageserver {pageserver.id})"
)
waited = wait_for_last_record_lsn(
pageserver.http_client(), tenant_shard_id, timeline.id, commit_lsn
)
assert waited >= commit_lsn
return commit_lsn
def checkpoint_timeline(self, timeline: TTimeline, **kwargs):
self.__env.pageserver.http_client().timeline_checkpoint(
tenant_id=self.id,
timeline_id=timeline.id,
**kwargs,
)
def pgdatadir(self, endpoint: TEndpoint):
if endpoint not in self.__endpoints:
return None
return cast(Endpoint, endpoint).pgdata_dir
def check_restored_datadir_content(self, path, endpoint: TEndpoint, *args, **kwargs):
if endpoint not in self.__endpoints:
return
check_restored_datadir_content(path, self.__env, cast(Endpoint, endpoint), *args, **kwargs)

View File

@@ -15,6 +15,7 @@ from fixtures.neon_fixtures import (
)
from fixtures.pageserver.http import PageserverApiException
from fixtures.pageserver.utils import wait_until_tenant_active
from fixtures.shared_fixtures import TTimeline
from fixtures.utils import query_scalar
from performance.test_perf_pgbench import get_scales_matrix
from requests import RequestException
@@ -115,13 +116,10 @@ def test_branching_with_pgbench(
# This test checks if the pageserver is able to handle a "unnormalized" starting LSN.
#
# Related: see discussion in https://github.com/neondatabase/neon/pull/2143#issuecomment-1209092186
def test_branching_unnormalized_start_lsn(neon_simple_env: NeonEnv, pg_bin: PgBin):
def test_branching_unnormalized_start_lsn(timeline: TTimeline, pg_bin: PgBin):
XLOG_BLCKSZ = 8192
env = neon_simple_env
env.neon_cli.create_branch("b0")
endpoint0 = env.endpoints.create_start("b0")
endpoint0 = timeline.primary
pg_bin.run_capture(["pgbench", "-i", endpoint0.connstr()])
@@ -133,8 +131,8 @@ def test_branching_unnormalized_start_lsn(neon_simple_env: NeonEnv, pg_bin: PgBi
start_lsn = Lsn((int(curr_lsn) - XLOG_BLCKSZ) // XLOG_BLCKSZ * XLOG_BLCKSZ)
log.info(f"Branching b1 from b0 starting at lsn {start_lsn}...")
env.neon_cli.create_branch("b1", "b0", ancestor_start_lsn=start_lsn)
endpoint1 = env.endpoints.create_start("b1")
tl2 = timeline.create_branch("branch", start_lsn)
endpoint1 = tl2.primary
pg_bin.run_capture(["pgbench", "-i", endpoint1.connstr()])

View File

@@ -1,14 +1,13 @@
from fixtures.neon_fixtures import NeonEnvBuilder, flush_ep_to_pageserver
from pathlib import Path
from fixtures.neon_fixtures import NeonEnvBuilder, flush_ep_to_pageserver, test_output_dir
from fixtures.shared_fixtures import TTimeline
def do_combocid_op(neon_env_builder: NeonEnvBuilder, op):
env = neon_env_builder.init_start()
endpoint = env.endpoints.create_start(
"main",
config_lines=[
"shared_buffers='1MB'",
],
)
def do_combocid_op(timeline: TTimeline, op):
endpoint = timeline.primary_with_config(config_lines=[
"shared_buffers='1MB'",
])
conn = endpoint.connect()
cur = conn.cursor()
@@ -43,32 +42,26 @@ def do_combocid_op(neon_env_builder: NeonEnvBuilder, op):
assert len(rows) == 500
cur.execute("rollback")
flush_ep_to_pageserver(env, endpoint, env.initial_tenant, env.initial_timeline)
env.pageserver.http_client().timeline_checkpoint(
env.initial_tenant, env.initial_timeline, compact=False, wait_until_uploaded=True
)
timeline.stop_and_flush(endpoint)
timeline.checkpoint(compact=False, wait_until_uploaded=True)
def test_combocid_delete(neon_env_builder: NeonEnvBuilder):
do_combocid_op(neon_env_builder, "delete from t")
def test_combocid_delete(timeline: TTimeline):
do_combocid_op(timeline, "delete from t")
def test_combocid_update(neon_env_builder: NeonEnvBuilder):
do_combocid_op(neon_env_builder, "update t set val=val+1")
def test_combocid_update(timeline: TTimeline):
do_combocid_op(timeline, "update t set val=val+1")
def test_combocid_lock(neon_env_builder: NeonEnvBuilder):
do_combocid_op(neon_env_builder, "select * from t for update")
def test_combocid_lock(timeline: TTimeline):
do_combocid_op(timeline, "select * from t for update")
def test_combocid_multi_insert(neon_env_builder: NeonEnvBuilder):
env = neon_env_builder.init_start()
endpoint = env.endpoints.create_start(
"main",
config_lines=[
"shared_buffers='1MB'",
],
)
def test_combocid_multi_insert(timeline: TTimeline, test_output_dir: Path):
endpoint = timeline.primary_with_config(config_lines=[
"shared_buffers='1MB'",
])
conn = endpoint.connect()
cur = conn.cursor()
@@ -77,7 +70,7 @@ def test_combocid_multi_insert(neon_env_builder: NeonEnvBuilder):
cur.execute("CREATE EXTENSION neon_test_utils")
cur.execute("create table t(id integer, val integer)")
file_path = f"{endpoint.pg_data_dir_path()}/t.csv"
file_path = str(test_output_dir / "t.csv")
cur.execute(f"insert into t select g, 0 from generate_series(1,{n_records}) g")
cur.execute(f"copy t to '{file_path}'")
cur.execute("truncate table t")
@@ -106,15 +99,12 @@ def test_combocid_multi_insert(neon_env_builder: NeonEnvBuilder):
cur.execute("rollback")
flush_ep_to_pageserver(env, endpoint, env.initial_tenant, env.initial_timeline)
env.pageserver.http_client().timeline_checkpoint(
env.initial_tenant, env.initial_timeline, compact=False, wait_until_uploaded=True
)
timeline.stop_and_flush(endpoint)
timeline.checkpoint(compact=False, wait_until_uploaded=True)
def test_combocid(neon_env_builder: NeonEnvBuilder):
env = neon_env_builder.init_start()
endpoint = env.endpoints.create_start("main")
def test_combocid(timeline: TTimeline):
endpoint = timeline.primary
conn = endpoint.connect()
cur = conn.cursor()
@@ -147,7 +137,5 @@ def test_combocid(neon_env_builder: NeonEnvBuilder):
cur.execute("rollback")
flush_ep_to_pageserver(env, endpoint, env.initial_tenant, env.initial_timeline)
env.pageserver.http_client().timeline_checkpoint(
env.initial_tenant, env.initial_timeline, compact=False, wait_until_uploaded=True
)
timeline.stop_and_flush(endpoint)
timeline.checkpoint(compact=False, wait_until_uploaded=True)

View File

@@ -5,6 +5,7 @@ import pytest
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnv, check_restored_datadir_content
from fixtures.pg_version import PgVersion
from fixtures.shared_fixtures import TTimeline
from fixtures.utils import query_scalar
@@ -12,18 +13,17 @@ from fixtures.utils import query_scalar
# Test CREATE DATABASE when there have been relmapper changes
#
@pytest.mark.parametrize("strategy", ["file_copy", "wal_log"])
def test_createdb(neon_simple_env: NeonEnv, strategy: str):
env = neon_simple_env
if env.pg_version == PgVersion.V14 and strategy == "wal_log":
def test_createdb(timeline: TTimeline, pg_version: PgVersion, strategy: str):
if pg_version == PgVersion.V14 and strategy == "wal_log":
pytest.skip("wal_log strategy not supported on PostgreSQL 14")
endpoint = env.endpoints.create_start("main")
endpoint = timeline.primary
with endpoint.cursor() as cur:
# Cause a 'relmapper' change in the original branch
cur.execute("VACUUM FULL pg_class")
if env.pg_version == PgVersion.V14:
if pg_version == PgVersion.V14:
cur.execute("CREATE DATABASE foodb")
else:
cur.execute(f"CREATE DATABASE foodb STRATEGY={strategy}")
@@ -31,8 +31,8 @@ def test_createdb(neon_simple_env: NeonEnv, strategy: str):
lsn = query_scalar(cur, "SELECT pg_current_wal_insert_lsn()")
# Create a branch
env.neon_cli.create_branch("test_createdb2", "main", ancestor_start_lsn=lsn)
endpoint2 = env.endpoints.create_start("test_createdb2")
tl2 = timeline.create_branch("createdb2", lsn=lsn)
endpoint2 = tl2.primary
# Test that you can connect to the new database on both branches
for db in (endpoint, endpoint2):
@@ -58,9 +58,8 @@ def test_createdb(neon_simple_env: NeonEnv, strategy: str):
#
# Test DROP DATABASE
#
def test_dropdb(neon_simple_env: NeonEnv, test_output_dir):
env = neon_simple_env
endpoint = env.endpoints.create_start("main")
def test_dropdb(timeline: TTimeline, test_output_dir):
endpoint = timeline.primary
with endpoint.cursor() as cur:
cur.execute("CREATE DATABASE foodb")
@@ -77,28 +76,28 @@ def test_dropdb(neon_simple_env: NeonEnv, test_output_dir):
lsn_after_drop = query_scalar(cur, "SELECT pg_current_wal_insert_lsn()")
# Create two branches before and after database drop.
env.neon_cli.create_branch("test_before_dropdb", "main", ancestor_start_lsn=lsn_before_drop)
endpoint_before = env.endpoints.create_start("test_before_dropdb")
tl_before = timeline.create_branch("test_before_dropdb", lsn=lsn_before_drop)
endpoint_before = tl_before.primary
env.neon_cli.create_branch("test_after_dropdb", "main", ancestor_start_lsn=lsn_after_drop)
endpoint_after = env.endpoints.create_start("test_after_dropdb")
tl_after = timeline.create_branch("test_after_dropdb", lsn=lsn_after_drop)
endpoint_after = tl_after.primary
# Test that database exists on the branch before drop
endpoint_before.connect(dbname="foodb").close()
# Test that database subdir exists on the branch before drop
assert endpoint_before.pgdata_dir
dbpath = pathlib.Path(endpoint_before.pgdata_dir) / "base" / str(dboid)
assert timeline.tenant.pgdatadir(endpoint_before)
dbpath = pathlib.Path(timeline.tenant.pgdatadir(endpoint_before)) / "base" / str(dboid)
log.info(dbpath)
assert os.path.isdir(dbpath) is True
# Test that database subdir doesn't exist on the branch after drop
assert endpoint_after.pgdata_dir
dbpath = pathlib.Path(endpoint_after.pgdata_dir) / "base" / str(dboid)
assert timeline.tenant.pgdatadir(endpoint_after)
dbpath = pathlib.Path(timeline.tenant.pgdatadir(endpoint_after)) / "base" / str(dboid)
log.info(dbpath)
assert os.path.isdir(dbpath) is False
# Check that we restore the content of the datadir correctly
check_restored_datadir_content(test_output_dir, env, endpoint)
timeline.tenant.check_restored_datadir_content(test_output_dir, endpoint)

View File

@@ -1,13 +1,13 @@
from fixtures.neon_fixtures import NeonEnv
from fixtures.shared_fixtures import TTimeline
from fixtures.utils import query_scalar
#
# Test CREATE USER to check shared catalog restore
#
def test_createuser(neon_simple_env: NeonEnv):
env = neon_simple_env
endpoint = env.endpoints.create_start("main")
def test_createuser(timeline: TTimeline):
endpoint = timeline.primary
with endpoint.cursor() as cur:
# Cause a 'relmapper' change in the original branch
@@ -18,8 +18,8 @@ def test_createuser(neon_simple_env: NeonEnv):
lsn = query_scalar(cur, "SELECT pg_current_wal_insert_lsn()")
# Create a branch
env.neon_cli.create_branch("test_createuser2", "main", ancestor_start_lsn=lsn)
endpoint2 = env.endpoints.create_start("test_createuser2")
branch = timeline.create_branch("test_createuser2", lsn=lsn)
endpoint2 = branch.primary
# Test that you can connect to new branch as a new user
assert endpoint2.safe_psql("select current_user", user="testuser") == [("testuser",)]

View File

@@ -1,5 +1,6 @@
import pytest
from fixtures.neon_fixtures import NeonEnvBuilder
from fixtures.neon_fixtures import Endpoint
from fixtures.shared_fixtures import TTimeline
@pytest.mark.parametrize(
@@ -10,14 +11,11 @@ from fixtures.neon_fixtures import NeonEnvBuilder
"💣", # calls `trigger_segfault` internally
],
)
def test_endpoint_crash(neon_env_builder: NeonEnvBuilder, sql_func: str):
def test_endpoint_crash(timeline: TTimeline, sql_func: str):
"""
Test that triggering crash from neon_test_utils crashes the endpoint
"""
env = neon_env_builder.init_start()
env.neon_cli.create_branch("test_endpoint_crash")
endpoint = env.endpoints.create_start("test_endpoint_crash")
endpoint = timeline.primary
endpoint.safe_psql("CREATE EXTENSION neon_test_utils;")
with pytest.raises(Exception, match="This probably means the server terminated abnormally"):
endpoint.safe_psql(f"SELECT {sql_func}();")

View File

@@ -1,10 +1,8 @@
from fixtures.neon_fixtures import NeonEnvBuilder
from fixtures.shared_fixtures import TTimeline
def test_fsm_truncate(neon_env_builder: NeonEnvBuilder):
env = neon_env_builder.init_start()
env.neon_cli.create_branch("test_fsm_truncate")
endpoint = env.endpoints.create_start("test_fsm_truncate")
def test_fsm_truncate(timeline: TTimeline):
endpoint = timeline.primary
endpoint.safe_psql(
"CREATE TABLE t1(key int); CREATE TABLE t2(key int); TRUNCATE TABLE t1; TRUNCATE TABLE t2;"
)

View File

@@ -1,17 +1,15 @@
import time
from fixtures.neon_fixtures import NeonEnv, wait_replica_caughtup
from fixtures.shared_fixtures import TTimeline
#
# Test that redo of XLOG_GIN_VACUUM_PAGE doesn't produce error
#
def test_gin_redo(neon_simple_env: NeonEnv):
env = neon_simple_env
primary = env.endpoints.create_start(branch_name="main", endpoint_id="primary")
time.sleep(1)
secondary = env.endpoints.new_replica_start(origin=primary, endpoint_id="secondary")
def test_gin_redo(timeline: TTimeline):
primary = timeline.primary
secondary = timeline.secondary
con = primary.connect()
cur = con.cursor()
cur.execute("create table gin_test_tbl(id integer, i int4[])")

View File

@@ -14,6 +14,7 @@ from fixtures.neon_fixtures import (
tenant_get_shards,
wait_replica_caughtup,
)
from fixtures.shared_fixtures import TTimeline
from fixtures.utils import wait_until
@@ -221,29 +222,20 @@ def pgbench_accounts_initialized(ep):
#
# Without hs feedback enabled we'd see 'User query might have needed to see row
# versions that must be removed.' errors.
def test_hot_standby_feedback(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin):
env = neon_env_builder.init_start()
agressive_vacuum_conf = [
def test_hot_standby_feedback(timeline: TTimeline, pg_bin: PgBin):
with timeline.primary_with_config(config_lines=[
"log_autovacuum_min_duration = 0",
"autovacuum_naptime = 10s",
"autovacuum_vacuum_threshold = 25",
"autovacuum_vacuum_scale_factor = 0.1",
"autovacuum_vacuum_cost_delay = -1",
]
with env.endpoints.create_start(
branch_name="main", endpoint_id="primary", config_lines=agressive_vacuum_conf
) as primary:
]) as primary:
# It would be great to have more strict max_standby_streaming_delay=0s here, but then sometimes it fails with
# 'User was holding shared buffer pin for too long.'.
with env.endpoints.new_replica_start(
origin=primary,
endpoint_id="secondary",
config_lines=[
"max_standby_streaming_delay=2s",
"neon.protocol_version=2",
"hot_standby_feedback=true",
],
) as secondary:
with timeline.secondary_with_config(config_lines=[
"max_standby_streaming_delay=2s",
"hot_standby_feedback=true",
]) as secondary:
log.info(
f"primary connstr is {primary.connstr()}, secondary connstr {secondary.connstr()}"
)
@@ -286,20 +278,15 @@ def test_hot_standby_feedback(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin):
# Test race condition between WAL replay and backends performing queries
# https://github.com/neondatabase/neon/issues/7791
def test_replica_query_race(neon_simple_env: NeonEnv):
env = neon_simple_env
primary_ep = env.endpoints.create_start(
branch_name="main",
endpoint_id="primary",
)
def test_replica_query_race(timeline: TTimeline):
primary_ep = timeline.primary
with primary_ep.connect() as p_con:
with p_con.cursor() as p_cur:
p_cur.execute("CREATE EXTENSION neon_test_utils")
p_cur.execute("CREATE TABLE test AS SELECT 0 AS counter")
standby_ep = env.endpoints.new_replica_start(origin=primary_ep, endpoint_id="standby")
standby_ep = timeline.secondary
wait_replica_caughtup(primary_ep, standby_ep)
# In primary, run a lot of UPDATEs on a single page

View File

@@ -8,22 +8,19 @@ import time
import pytest
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnv, PgBin
from fixtures.shared_fixtures import TTimeline
#
# Test branching, when a transaction is in prepared state
#
@pytest.mark.timeout(600)
def test_lfc_resize(neon_simple_env: NeonEnv, pg_bin: PgBin):
env = neon_simple_env
endpoint = env.endpoints.create_start(
"main",
config_lines=[
"neon.file_cache_path='file.cache'",
"neon.max_file_cache_size=512MB",
"neon.file_cache_size_limit=512MB",
],
)
def test_lfc_resize(timeline: TTimeline, pg_bin: PgBin):
endpoint = timeline.primary_with_config(config_lines=[
"neon.file_cache_path='file.cache'",
"neon.max_file_cache_size=512MB",
"neon.file_cache_size_limit=512MB",
])
n_resize = 10
scale = 100
@@ -49,7 +46,7 @@ def test_lfc_resize(neon_simple_env: NeonEnv, pg_bin: PgBin):
thread.join()
lfc_file_path = f"{endpoint.pg_data_dir_path()}/file.cache"
lfc_file_path = timeline.tenant.pgdatadir(endpoint) / "file.cache"
lfc_file_size = os.path.getsize(lfc_file_path)
res = subprocess.run(["ls", "-sk", lfc_file_path], check=True, text=True, capture_output=True)
lfc_file_blocks = re.findall("([0-9A-F]+)", res.stdout)[0]

View File

@@ -3,6 +3,7 @@ from pathlib import Path
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnv
from fixtures.shared_fixtures import TTimeline
from fixtures.utils import query_scalar
@@ -73,18 +74,14 @@ WITH (fillfactor='100');
assert blocks < 10
def test_sliding_working_set_approximation(neon_simple_env: NeonEnv):
env = neon_simple_env
def test_sliding_working_set_approximation(timeline: TTimeline):
endpoint = timeline.primary_with_config(config_lines=[
"autovacuum = off",
"shared_buffers=1MB",
"neon.max_file_cache_size=256MB",
"neon.file_cache_size_limit=245MB",
])
endpoint = env.endpoints.create_start(
branch_name="main",
config_lines=[
"autovacuum = off",
"shared_buffers=1MB",
"neon.max_file_cache_size=256MB",
"neon.file_cache_size_limit=245MB",
],
)
conn = endpoint.connect()
cur = conn.cursor()
cur.execute("create extension neon")

View File

@@ -1,4 +1,5 @@
from fixtures.neon_fixtures import NeonEnv, check_restored_datadir_content
from fixtures.shared_fixtures import TTimeline
from fixtures.utils import query_scalar
@@ -12,9 +13,8 @@ from fixtures.utils import query_scalar
# is enough to verify that the WAL records are handled correctly
# in the pageserver.
#
def test_multixact(neon_simple_env: NeonEnv, test_output_dir):
env = neon_simple_env
endpoint = env.endpoints.create_start("main")
def test_multixact(timeline: TTimeline, test_output_dir):
endpoint = timeline.primary
cur = endpoint.connect().cursor()
cur.execute(
@@ -72,10 +72,8 @@ def test_multixact(neon_simple_env: NeonEnv, test_output_dir):
assert int(next_multixact_id) > int(next_multixact_id_old)
# Branch at this point
env.neon_cli.create_branch(
"test_multixact_new", ancestor_branch_name="main", ancestor_start_lsn=lsn
)
endpoint_new = env.endpoints.create_start("test_multixact_new")
branch = timeline.create_branch("test_multixact_new", lsn=lsn)
endpoint_new = branch.primary
next_multixact_id_new = endpoint_new.safe_psql(
"SELECT next_multixact_id FROM pg_control_checkpoint()"
@@ -85,4 +83,4 @@ def test_multixact(neon_simple_env: NeonEnv, test_output_dir):
assert next_multixact_id_new == next_multixact_id
# Check that we can restore the content of the datadir correctly
check_restored_datadir_content(test_output_dir, env, endpoint)
timeline.tenant.check_restored_datadir_content(test_output_dir, endpoint)

View File

@@ -2,7 +2,7 @@ import time
from contextlib import closing
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnv, NeonEnvBuilder, fork_at_current_lsn
from fixtures.neon_fixtures import NeonEnv, NeonEnvBuilder
from fixtures.utils import query_scalar
@@ -57,7 +57,7 @@ def test_vm_bit_clear(neon_simple_env: NeonEnv):
cur.execute("UPDATE vmtest_cold_update2 SET id = 5000, filler=repeat('x', 200) WHERE id = 1")
# Branch at this point, to test that later
fork_at_current_lsn(env, endpoint, "test_vm_bit_clear_new", "main")
# fork_at_current_lsn(env, endpoint, "test_vm_bit_clear_new", "main")
# Clear the buffer cache, to force the VM page to be re-fetched from
# the page server
@@ -91,6 +91,7 @@ def test_vm_bit_clear(neon_simple_env: NeonEnv):
# a dirty VM page is evicted. If the VM bit was not correctly cleared by the
# earlier WAL record, the full-page image hides the problem. Starting a new
# server at the right point-in-time avoids that full-page image.
endpoint_new = env.endpoints.create_start("test_vm_bit_clear_new")
pg_new_conn = endpoint_new.connect()