mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-17 10:22:56 +00:00
Compare commits
1 Commits
amasteerov
...
share-test
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
b33ca57839 |
@@ -5,6 +5,7 @@ pytest_plugins = (
|
||||
"fixtures.compute_reconfigure",
|
||||
"fixtures.storage_controller_proxy",
|
||||
"fixtures.neon_fixtures",
|
||||
"fixtures.neon_tenant",
|
||||
"fixtures.benchmark_fixture",
|
||||
"fixtures.pg_stats",
|
||||
"fixtures.compare_fixtures",
|
||||
|
||||
@@ -454,9 +454,10 @@ class NeonEnvBuilder:
|
||||
|
||||
self.pageserver_io_buffer_alignment = pageserver_io_buffer_alignment
|
||||
|
||||
assert test_name.startswith(
|
||||
"test_"
|
||||
), "Unexpectedly instantiated from outside a test function"
|
||||
# FIXME
|
||||
# assert test_name.startswith(
|
||||
# "test_"
|
||||
# ), "Unexpectedly instantiated from outside a test function"
|
||||
self.test_name = test_name
|
||||
|
||||
def init_configs(self, default_remote_storage_if_missing: bool = True) -> NeonEnv:
|
||||
|
||||
352
test_runner/fixtures/neon_tenant.py
Normal file
352
test_runner/fixtures/neon_tenant.py
Normal file
@@ -0,0 +1,352 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import random
|
||||
import string
|
||||
import threading
|
||||
import uuid
|
||||
from dataclasses import dataclass
|
||||
from pathlib import Path
|
||||
from types import TracebackType
|
||||
from typing import (
|
||||
Any,
|
||||
Dict,
|
||||
Iterator,
|
||||
List,
|
||||
Optional,
|
||||
Type,
|
||||
cast,
|
||||
)
|
||||
|
||||
import pytest
|
||||
from _pytest.config import Config
|
||||
from _pytest.fixtures import FixtureRequest
|
||||
|
||||
from fixtures.common_types import Lsn, TenantId, TimelineId
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import Endpoint, NeonEnv, NeonEnvBuilder
|
||||
from fixtures.pg_version import PgVersion
|
||||
from fixtures.port_distributor import PortDistributor
|
||||
from fixtures.remote_storage import (
|
||||
MockS3Server,
|
||||
)
|
||||
from fixtures.utils import AuxFileStore
|
||||
|
||||
DEFAULT_BRANCH_NAME: str = "main"
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def neon_tenant(shared_env: NeonEnv) -> Iterator[NeonTestTenant]:
|
||||
tenant = NeonTestTenant(shared_env)
|
||||
tenant.create()
|
||||
yield tenant
|
||||
# TODO: clean up the tenant
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class NeonEnvDiscriminants:
|
||||
"""The options that define which environments can be shared"""
|
||||
|
||||
neon_binpath: Path
|
||||
pageserver_virtual_file_io_engine: str
|
||||
pageserver_aux_file_policy: Optional[AuxFileStore]
|
||||
pageserver_default_tenant_config_compaction_algorithm: Optional[Dict[str, Any]]
|
||||
pageserver_io_buffer_alignment: Optional[int]
|
||||
|
||||
|
||||
class NeonSharedEnvs:
|
||||
def __init__(
|
||||
self,
|
||||
port_distributor: PortDistributor,
|
||||
run_id: uuid.UUID,
|
||||
mock_s3_server: MockS3Server,
|
||||
pg_distrib_dir: Path,
|
||||
top_output_dir: Path,
|
||||
preserve_database_files: bool,
|
||||
):
|
||||
self.port_distributor = port_distributor
|
||||
self.run_id = run_id
|
||||
self.mock_s3_server = mock_s3_server
|
||||
self.pg_distrib_dir = pg_distrib_dir
|
||||
self.top_output_dir = top_output_dir
|
||||
self.preserve_database_files = preserve_database_files
|
||||
|
||||
self.lock = threading.Lock()
|
||||
self.envs: Dict[NeonEnvDiscriminants, NeonEnv] = {}
|
||||
|
||||
self.builders: List[NeonEnvBuilder] = []
|
||||
|
||||
def get_repo_dir(self, disc: NeonEnvDiscriminants) -> Path:
|
||||
# FIXME use discriminants
|
||||
randstr = "".join(
|
||||
random.SystemRandom().choice(string.ascii_uppercase + string.digits) for _ in range(10)
|
||||
)
|
||||
s = Path(f"shared-{randstr}")
|
||||
return self.top_output_dir / s
|
||||
|
||||
def get_or_create(self, disc: NeonEnvDiscriminants) -> NeonEnv:
|
||||
with self.lock:
|
||||
env = self.envs.get(disc)
|
||||
if env is None:
|
||||
builder = NeonEnvBuilder(
|
||||
repo_dir=self.get_repo_dir(disc),
|
||||
port_distributor=self.port_distributor,
|
||||
run_id=self.run_id,
|
||||
mock_s3_server=self.mock_s3_server,
|
||||
pg_distrib_dir=self.pg_distrib_dir,
|
||||
preserve_database_files=self.preserve_database_files,
|
||||
pg_version=PgVersion("17"), # FIXME: this should go unused. Pass None?
|
||||
test_name="shared", # FIXME
|
||||
test_output_dir=Path("shared"), # FIXME
|
||||
test_overlay_dir=None,
|
||||
top_output_dir=self.top_output_dir,
|
||||
neon_binpath=disc.neon_binpath,
|
||||
pageserver_virtual_file_io_engine=disc.pageserver_virtual_file_io_engine,
|
||||
pageserver_aux_file_policy=disc.pageserver_aux_file_policy,
|
||||
pageserver_default_tenant_config_compaction_algorithm=disc.pageserver_default_tenant_config_compaction_algorithm,
|
||||
# FIXME: only support defaults for these currently
|
||||
# pageserver_remote_storage
|
||||
# pageserver_config_override
|
||||
# num_safekeepers
|
||||
# num_pageservers
|
||||
# safekeepers_id_start
|
||||
# safekeepers_enable_fsync
|
||||
# auth_enabled
|
||||
# rust_log_override
|
||||
# default_branch_name
|
||||
initial_tenant=None, # FIXME should go unused
|
||||
initial_timeline=None, # FIXME should go unused
|
||||
# safekeeper_extra_opts: Optional[list[str]] = None,
|
||||
# storage_controller_port_override: Optional[int] = None,
|
||||
# pageserver_io_buffer_alignment: Optional[int] = None,
|
||||
)
|
||||
env = builder.init_start()
|
||||
|
||||
self.envs[disc] = env
|
||||
return env
|
||||
|
||||
def __enter__(self) -> "NeonSharedEnvs":
|
||||
return self
|
||||
|
||||
def __exit__(
|
||||
self,
|
||||
exc_type: Optional[Type[BaseException]],
|
||||
exc: Optional[BaseException],
|
||||
tb: Optional[TracebackType],
|
||||
):
|
||||
for env in self.envs.values():
|
||||
env.stop(immediate=True)
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def shared_environments(
|
||||
# session fixtures
|
||||
port_distributor: PortDistributor,
|
||||
run_id: uuid.UUID,
|
||||
mock_s3_server: MockS3Server,
|
||||
pg_distrib_dir: Path,
|
||||
top_output_dir: Path,
|
||||
pytestconfig: Config,
|
||||
) -> Iterator[NeonSharedEnvs]:
|
||||
with NeonSharedEnvs(
|
||||
port_distributor=port_distributor,
|
||||
run_id=run_id,
|
||||
mock_s3_server=mock_s3_server,
|
||||
pg_distrib_dir=pg_distrib_dir,
|
||||
top_output_dir=top_output_dir,
|
||||
# rust_log_override=rust_log_override, # FIXME
|
||||
preserve_database_files=cast(bool, pytestconfig.getoption("--preserve-database-files")),
|
||||
) as envs:
|
||||
yield envs
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def shared_env(
|
||||
request: FixtureRequest,
|
||||
# session fixture holding all the envs
|
||||
shared_environments: NeonSharedEnvs,
|
||||
# other session fixtures
|
||||
port_distributor: PortDistributor,
|
||||
mock_s3_server: MockS3Server,
|
||||
run_id: uuid.UUID,
|
||||
top_output_dir: Path,
|
||||
pg_distrib_dir: Path,
|
||||
# these define the env to use
|
||||
neon_binpath: Path,
|
||||
pageserver_virtual_file_io_engine: str,
|
||||
pageserver_aux_file_policy: Optional[AuxFileStore],
|
||||
pageserver_default_tenant_config_compaction_algorithm: Optional[Dict[str, Any]],
|
||||
pageserver_io_buffer_alignment: Optional[int],
|
||||
) -> NeonEnv:
|
||||
disc = NeonEnvDiscriminants(
|
||||
neon_binpath=neon_binpath,
|
||||
# FIXME: There's no difference in e.g. having pageserver_virtual_file_io_engine=None, and
|
||||
# explicitly specifying whatever the default is. We could share those envs.
|
||||
pageserver_virtual_file_io_engine=pageserver_virtual_file_io_engine,
|
||||
pageserver_aux_file_policy=pageserver_aux_file_policy,
|
||||
pageserver_default_tenant_config_compaction_algorithm=pageserver_default_tenant_config_compaction_algorithm,
|
||||
pageserver_io_buffer_alignment=pageserver_io_buffer_alignment,
|
||||
)
|
||||
return shared_environments.get_or_create(disc)
|
||||
|
||||
|
||||
class NeonTestTenant:
|
||||
"""
|
||||
An object representing a single Neon tenant, in a shared environment
|
||||
|
||||
Notable functions and fields:
|
||||
|
||||
endpoints - A factory object for creating postgres compute nodes.
|
||||
|
||||
tenant_id - tenant ID of the initial tenant created in the repository
|
||||
|
||||
initial_timeline - timeline ID of the "main" branch
|
||||
|
||||
create_branch() - branch a new timeline from an existing one, returns
|
||||
the new timeline id
|
||||
|
||||
create_timeline() - initializes a new timeline by running initdb, returns
|
||||
the new timeline id
|
||||
"""
|
||||
|
||||
def __init__(self, env: NeonEnv):
|
||||
self.tenant_id = TenantId.generate()
|
||||
self.initial_timeline = TimelineId.generate()
|
||||
self.created = False
|
||||
|
||||
self.endpoints = TenantEndpointFactory(self)
|
||||
self.env = env
|
||||
|
||||
def create(
|
||||
self,
|
||||
conf: Optional[Dict[str, Any]] = None,
|
||||
shard_count: Optional[int] = None,
|
||||
shard_stripe_size: Optional[int] = None,
|
||||
placement_policy: Optional[str] = None,
|
||||
aux_file_policy: Optional[AuxFileStore] = None,
|
||||
):
|
||||
assert not self.created
|
||||
self.env.create_tenant(
|
||||
tenant_id=self.tenant_id,
|
||||
timeline_id=self.initial_timeline,
|
||||
conf=conf,
|
||||
shard_count=shard_count,
|
||||
shard_stripe_size=shard_stripe_size,
|
||||
placement_policy=placement_policy,
|
||||
set_default=False,
|
||||
aux_file_policy=aux_file_policy,
|
||||
)
|
||||
self.created = True
|
||||
|
||||
# Todo: this could be imeplemented
|
||||
# def config_tenant(self, conf: Dict[str, str]):
|
||||
|
||||
def create_branch(
|
||||
self,
|
||||
new_branch_name: str = DEFAULT_BRANCH_NAME,
|
||||
ancestor_branch_name: Optional[str] = None,
|
||||
ancestor_start_lsn: Optional[Lsn] = None,
|
||||
new_timeline_id: Optional[TimelineId] = None,
|
||||
) -> TimelineId:
|
||||
return self.env.create_branch(
|
||||
new_branch_name=new_branch_name,
|
||||
tenant_id=self.tenant_id,
|
||||
ancestor_branch_name=ancestor_branch_name,
|
||||
ancestor_start_lsn=ancestor_start_lsn,
|
||||
new_timeline_id=new_timeline_id,
|
||||
)
|
||||
|
||||
def create_timeline(
|
||||
self,
|
||||
new_branch_name: str,
|
||||
timeline_id: Optional[TimelineId] = None,
|
||||
) -> TimelineId:
|
||||
return self.env.create_timeline(new_branch_name=new_branch_name, timeline_id=timeline_id)
|
||||
|
||||
|
||||
class TenantEndpointFactory:
|
||||
"""An object representing multiple compute endpoints of a single tenant."""
|
||||
|
||||
def __init__(self, tenant: NeonTestTenant):
|
||||
self.tenant = tenant
|
||||
self.num_instances: int = 0
|
||||
self.endpoints: List[Endpoint] = []
|
||||
|
||||
def create(
|
||||
self,
|
||||
branch_name: str,
|
||||
endpoint_id: Optional[str] = None,
|
||||
lsn: Optional[Lsn] = None,
|
||||
hot_standby: bool = False,
|
||||
config_lines: Optional[List[str]] = None,
|
||||
) -> Endpoint:
|
||||
ep = Endpoint(
|
||||
self.tenant.env,
|
||||
tenant_id=self.tenant.tenant_id,
|
||||
pg_port=self.tenant.env.port_distributor.get_port(),
|
||||
http_port=self.tenant.env.port_distributor.get_port(),
|
||||
)
|
||||
|
||||
endpoint_id = endpoint_id or self.tenant.env.generate_endpoint_id()
|
||||
|
||||
self.num_instances += 1
|
||||
self.endpoints.append(ep)
|
||||
|
||||
return ep.create(
|
||||
branch_name=branch_name,
|
||||
endpoint_id=endpoint_id,
|
||||
lsn=lsn,
|
||||
hot_standby=hot_standby,
|
||||
config_lines=config_lines,
|
||||
)
|
||||
|
||||
# FIXME: extra args for start
|
||||
# remote_ext_config: Optional[str] = None,
|
||||
# basebackup_request_tries: Optional[int] = None,
|
||||
def create_start(self, *args, **kwargs):
|
||||
ep = self.create(*args, **kwargs)
|
||||
ep.start()
|
||||
return ep
|
||||
|
||||
def stop_all(self, fail_on_error=True) -> "TenantEndpointFactory":
|
||||
exception = None
|
||||
for ep in self.endpoints:
|
||||
try:
|
||||
ep.stop()
|
||||
except Exception as e:
|
||||
log.error(f"Failed to stop endpoint {ep.endpoint_id}: {e}")
|
||||
exception = e
|
||||
|
||||
if fail_on_error and exception is not None:
|
||||
raise exception
|
||||
|
||||
return self
|
||||
|
||||
def new_replica(
|
||||
self, origin: Endpoint, endpoint_id: str, config_lines: Optional[List[str]] = None
|
||||
):
|
||||
branch_name = origin.branch_name
|
||||
assert origin in self.endpoints
|
||||
assert branch_name is not None
|
||||
|
||||
return self.create(
|
||||
branch_name=branch_name,
|
||||
endpoint_id=endpoint_id,
|
||||
lsn=None,
|
||||
hot_standby=True,
|
||||
config_lines=config_lines,
|
||||
)
|
||||
|
||||
def new_replica_start(
|
||||
self, origin: Endpoint, endpoint_id: str, config_lines: Optional[List[str]] = None
|
||||
):
|
||||
branch_name = origin.branch_name
|
||||
assert origin in self.endpoints
|
||||
assert branch_name is not None
|
||||
|
||||
return self.create_start(
|
||||
branch_name=branch_name,
|
||||
endpoint_id=endpoint_id,
|
||||
lsn=None,
|
||||
hot_standby=True,
|
||||
config_lines=config_lines,
|
||||
)
|
||||
@@ -2,16 +2,14 @@ import os
|
||||
import time
|
||||
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import NeonEnv
|
||||
from fixtures.neon_tenant import NeonTestTenant
|
||||
from fixtures.utils import query_scalar
|
||||
|
||||
|
||||
#
|
||||
# Test compute node start after clog truncation
|
||||
#
|
||||
def test_clog_truncate(neon_simple_env: NeonEnv):
|
||||
env = neon_simple_env
|
||||
|
||||
def test_clog_truncate(neon_tenant: NeonTestTenant):
|
||||
# set aggressive autovacuum to make sure that truncation will happen
|
||||
config = [
|
||||
"autovacuum_max_workers=10",
|
||||
@@ -23,7 +21,7 @@ def test_clog_truncate(neon_simple_env: NeonEnv):
|
||||
"autovacuum_freeze_max_age=100000",
|
||||
]
|
||||
|
||||
endpoint = env.endpoints.create_start("main", config_lines=config)
|
||||
endpoint = neon_tenant.endpoints.create_start("main", config_lines=config)
|
||||
|
||||
# Install extension containing function needed for test
|
||||
endpoint.safe_psql("CREATE EXTENSION neon_test_utils")
|
||||
@@ -56,12 +54,12 @@ def test_clog_truncate(neon_simple_env: NeonEnv):
|
||||
|
||||
# create new branch after clog truncation and start a compute node on it
|
||||
log.info(f"create branch at lsn_after_truncation {lsn_after_truncation}")
|
||||
env.create_branch(
|
||||
neon_tenant.create_branch(
|
||||
"test_clog_truncate_new",
|
||||
ancestor_branch_name="main",
|
||||
ancestor_start_lsn=lsn_after_truncation,
|
||||
)
|
||||
endpoint2 = env.endpoints.create_start("test_clog_truncate_new")
|
||||
endpoint2 = neon_tenant.endpoints.create_start("test_clog_truncate_new")
|
||||
|
||||
# check that new node doesn't contain truncated segment
|
||||
pg_xact_0000_path_new = os.path.join(endpoint2.pg_xact_dir_path(), "0000")
|
||||
|
||||
@@ -1,11 +1,9 @@
|
||||
import requests
|
||||
from fixtures.neon_fixtures import NeonEnv
|
||||
from fixtures.neon_tenant import NeonTestTenant
|
||||
|
||||
|
||||
def test_compute_catalog(neon_simple_env: NeonEnv):
|
||||
env = neon_simple_env
|
||||
|
||||
endpoint = env.endpoints.create_start("main", config_lines=["log_min_messages=debug1"])
|
||||
def test_compute_catalog(neon_tenant: NeonTestTenant):
|
||||
endpoint = neon_tenant.endpoints.create_start("main", config_lines=["log_min_messages=debug1"])
|
||||
client = endpoint.http_client()
|
||||
|
||||
objects = client.dbs_and_roles()
|
||||
|
||||
@@ -4,6 +4,7 @@ import pathlib
|
||||
import pytest
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import NeonEnv, check_restored_datadir_content
|
||||
from fixtures.neon_tenant import NeonTestTenant
|
||||
from fixtures.pg_version import PgVersion
|
||||
from fixtures.utils import query_scalar
|
||||
|
||||
@@ -12,18 +13,17 @@ from fixtures.utils import query_scalar
|
||||
# Test CREATE DATABASE when there have been relmapper changes
|
||||
#
|
||||
@pytest.mark.parametrize("strategy", ["file_copy", "wal_log"])
|
||||
def test_createdb(neon_simple_env: NeonEnv, strategy: str):
|
||||
env = neon_simple_env
|
||||
if env.pg_version == PgVersion.V14 and strategy == "wal_log":
|
||||
def test_createdb(neon_tenant: NeonTestTenant, strategy: str, pg_version: PgVersion):
|
||||
if pg_version == PgVersion.V14 and strategy == "wal_log":
|
||||
pytest.skip("wal_log strategy not supported on PostgreSQL 14")
|
||||
|
||||
endpoint = env.endpoints.create_start("main")
|
||||
endpoint = neon_tenant.endpoints.create_start("main")
|
||||
|
||||
with endpoint.cursor() as cur:
|
||||
# Cause a 'relmapper' change in the original branch
|
||||
cur.execute("VACUUM FULL pg_class")
|
||||
|
||||
if env.pg_version == PgVersion.V14:
|
||||
if pg_version == PgVersion.V14:
|
||||
cur.execute("CREATE DATABASE foodb")
|
||||
else:
|
||||
cur.execute(f"CREATE DATABASE foodb STRATEGY={strategy}")
|
||||
@@ -31,8 +31,8 @@ def test_createdb(neon_simple_env: NeonEnv, strategy: str):
|
||||
lsn = query_scalar(cur, "SELECT pg_current_wal_insert_lsn()")
|
||||
|
||||
# Create a branch
|
||||
env.create_branch("test_createdb2", ancestor_branch_name="main", ancestor_start_lsn=lsn)
|
||||
endpoint2 = env.endpoints.create_start("test_createdb2")
|
||||
neon_tenant.create_branch("test_createdb2", ancestor_branch_name="main", ancestor_start_lsn=lsn)
|
||||
endpoint2 = neon_tenant.endpoints.create_start("test_createdb2")
|
||||
|
||||
# Test that you can connect to the new database on both branches
|
||||
for db in (endpoint, endpoint2):
|
||||
|
||||
@@ -1,13 +1,12 @@
|
||||
from fixtures.neon_fixtures import NeonEnv
|
||||
from fixtures.neon_tenant import NeonTestTenant
|
||||
from fixtures.utils import query_scalar
|
||||
|
||||
|
||||
#
|
||||
# Test CREATE USER to check shared catalog restore
|
||||
#
|
||||
def test_createuser(neon_simple_env: NeonEnv):
|
||||
env = neon_simple_env
|
||||
endpoint = env.endpoints.create_start("main")
|
||||
def test_createuser(neon_tenant: NeonTestTenant):
|
||||
endpoint = neon_tenant.endpoints.create_start("main")
|
||||
|
||||
with endpoint.cursor() as cur:
|
||||
# Cause a 'relmapper' change in the original branch
|
||||
@@ -18,8 +17,10 @@ def test_createuser(neon_simple_env: NeonEnv):
|
||||
lsn = query_scalar(cur, "SELECT pg_current_wal_insert_lsn()")
|
||||
|
||||
# Create a branch
|
||||
env.create_branch("test_createuser2", ancestor_branch_name="main", ancestor_start_lsn=lsn)
|
||||
endpoint2 = env.endpoints.create_start("test_createuser2")
|
||||
neon_tenant.create_branch(
|
||||
"test_createuser2", ancestor_branch_name="main", ancestor_start_lsn=lsn
|
||||
)
|
||||
endpoint2 = neon_tenant.endpoints.create_start("test_createuser2")
|
||||
|
||||
# Test that you can connect to new branch as a new user
|
||||
assert endpoint2.safe_psql("select current_user", user="testuser") == [("testuser",)]
|
||||
|
||||
@@ -4,7 +4,8 @@ from typing import Any, Dict, List, Optional, Tuple, Type
|
||||
import psycopg2
|
||||
import pytest
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import NeonEnv, VanillaPostgres
|
||||
from fixtures.neon_fixtures import VanillaPostgres
|
||||
from fixtures.neon_tenant import NeonTestTenant
|
||||
from pytest_httpserver import HTTPServer
|
||||
from werkzeug.wrappers.request import Request
|
||||
from werkzeug.wrappers.response import Response
|
||||
@@ -288,9 +289,8 @@ def assert_db_connlimit(endpoint: Any, db_name: str, connlimit: int, msg: str):
|
||||
# 2. User can ignore, then compute_ctl will drop invalid databases
|
||||
# automatically during full configuration
|
||||
# Here we test the latter. The first one is tested in test_ddl_forwarding
|
||||
def test_ddl_forwarding_invalid_db(neon_simple_env: NeonEnv):
|
||||
env = neon_simple_env
|
||||
endpoint = env.endpoints.create_start(
|
||||
def test_ddl_forwarding_invalid_db(neon_tenant: NeonTestTenant):
|
||||
endpoint = neon_tenant.endpoints.create_start(
|
||||
"main",
|
||||
# Some non-existent url
|
||||
config_lines=["neon.console_url=http://localhost:9999/unknown/api/v0/roles_and_databases"],
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
import pytest
|
||||
from fixtures.neon_fixtures import NeonEnvBuilder
|
||||
from fixtures.neon_tenant import NeonTestTenant
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
@@ -10,13 +10,12 @@ from fixtures.neon_fixtures import NeonEnvBuilder
|
||||
"💣", # calls `trigger_segfault` internally
|
||||
],
|
||||
)
|
||||
def test_endpoint_crash(neon_env_builder: NeonEnvBuilder, sql_func: str):
|
||||
def test_endpoint_crash(neon_tenant: NeonTestTenant, sql_func: str):
|
||||
"""
|
||||
Test that triggering crash from neon_test_utils crashes the endpoint
|
||||
"""
|
||||
env = neon_env_builder.init_start()
|
||||
env.create_branch("test_endpoint_crash")
|
||||
endpoint = env.endpoints.create_start("test_endpoint_crash")
|
||||
neon_tenant.create_branch("test_endpoint_crash")
|
||||
endpoint = neon_tenant.endpoints.create_start("test_endpoint_crash")
|
||||
|
||||
endpoint.safe_psql("CREATE EXTENSION neon_test_utils;")
|
||||
with pytest.raises(Exception, match="This probably means the server terminated abnormally"):
|
||||
|
||||
@@ -1,10 +1,9 @@
|
||||
from fixtures.neon_fixtures import NeonEnvBuilder
|
||||
from fixtures.neon_tenant import NeonTestTenant
|
||||
|
||||
|
||||
def test_fsm_truncate(neon_env_builder: NeonEnvBuilder):
|
||||
env = neon_env_builder.init_start()
|
||||
env.create_branch("test_fsm_truncate")
|
||||
endpoint = env.endpoints.create_start("test_fsm_truncate")
|
||||
def test_fsm_truncate(neon_tenant: NeonTestTenant):
|
||||
neon_tenant.create_branch("test_fsm_truncate")
|
||||
endpoint = neon_tenant.endpoints.create_start("test_fsm_truncate")
|
||||
endpoint.safe_psql(
|
||||
"CREATE TABLE t1(key int); CREATE TABLE t2(key int); TRUNCATE TABLE t1; TRUNCATE TABLE t2;"
|
||||
)
|
||||
|
||||
@@ -1,17 +1,16 @@
|
||||
import time
|
||||
|
||||
from fixtures.neon_fixtures import NeonEnv, wait_replica_caughtup
|
||||
from fixtures.neon_fixtures import wait_replica_caughtup
|
||||
from fixtures.neon_tenant import NeonTestTenant
|
||||
|
||||
|
||||
#
|
||||
# Test that redo of XLOG_GIN_VACUUM_PAGE doesn't produce error
|
||||
#
|
||||
def test_gin_redo(neon_simple_env: NeonEnv):
|
||||
env = neon_simple_env
|
||||
|
||||
primary = env.endpoints.create_start(branch_name="main", endpoint_id="primary")
|
||||
def test_gin_redo(neon_tenant: NeonTestTenant):
|
||||
primary = neon_tenant.endpoints.create_start(branch_name="main", endpoint_id="primary")
|
||||
time.sleep(1)
|
||||
secondary = env.endpoints.new_replica_start(origin=primary, endpoint_id="secondary")
|
||||
secondary = neon_tenant.endpoints.new_replica_start(origin=primary, endpoint_id="secondary")
|
||||
con = primary.connect()
|
||||
cur = con.cursor()
|
||||
cur.execute("create table gin_test_tbl(id integer, i int4[])")
|
||||
|
||||
@@ -7,16 +7,16 @@ import time
|
||||
|
||||
import pytest
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import NeonEnv, PgBin
|
||||
from fixtures.neon_fixtures import PgBin
|
||||
from fixtures.neon_tenant import NeonTestTenant
|
||||
|
||||
|
||||
@pytest.mark.timeout(600)
|
||||
def test_lfc_resize(neon_simple_env: NeonEnv, pg_bin: PgBin):
|
||||
def test_lfc_resize(neon_tenant: NeonTestTenant, pg_bin: PgBin):
|
||||
"""
|
||||
Test resizing the Local File Cache
|
||||
"""
|
||||
env = neon_simple_env
|
||||
endpoint = env.endpoints.create_start(
|
||||
endpoint = neon_tenant.endpoints.create_start(
|
||||
"main",
|
||||
config_lines=[
|
||||
"neon.file_cache_path='file.cache'",
|
||||
|
||||
@@ -3,19 +3,18 @@ import queue
|
||||
import random
|
||||
import threading
|
||||
import time
|
||||
from pathlib import Path
|
||||
from typing import List
|
||||
|
||||
from fixtures.neon_fixtures import NeonEnvBuilder
|
||||
from fixtures.neon_tenant import NeonTestTenant
|
||||
from fixtures.utils import query_scalar
|
||||
|
||||
|
||||
def test_local_file_cache_unlink(neon_env_builder: NeonEnvBuilder):
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
cache_dir = os.path.join(env.repo_dir, "file_cache")
|
||||
def test_local_file_cache_unlink(neon_tenant: NeonTestTenant, test_output_dir: Path):
|
||||
cache_dir = test_output_dir / Path("file_cache")
|
||||
os.mkdir(cache_dir)
|
||||
|
||||
endpoint = env.endpoints.create_start(
|
||||
endpoint = neon_tenant.endpoints.create_start(
|
||||
"main",
|
||||
config_lines=[
|
||||
"shared_buffers='1MB'",
|
||||
@@ -66,7 +65,7 @@ def test_local_file_cache_unlink(neon_env_builder: NeonEnvBuilder):
|
||||
time.sleep(5)
|
||||
|
||||
# unlink, this is what we're actually testing
|
||||
new_cache_dir = os.path.join(env.repo_dir, "file_cache_new")
|
||||
new_cache_dir = test_output_dir / Path("file_cache_new")
|
||||
os.rename(cache_dir, new_cache_dir)
|
||||
|
||||
time.sleep(10)
|
||||
|
||||
@@ -14,6 +14,7 @@ from fixtures.neon_fixtures import (
|
||||
logical_replication_sync,
|
||||
wait_for_last_flush_lsn,
|
||||
)
|
||||
from fixtures.neon_tenant import NeonTestTenant
|
||||
from fixtures.utils import wait_until
|
||||
|
||||
|
||||
@@ -394,11 +395,9 @@ def test_restart_endpoint(neon_simple_env: NeonEnv, vanilla_pg):
|
||||
# records passed ot the WAL redo process are never large enough to hit
|
||||
# the bug.
|
||||
@pytest.mark.parametrize("pageserver_aux_file_policy", [AuxFileStore.CrossValidation])
|
||||
def test_large_records(neon_simple_env: NeonEnv, vanilla_pg):
|
||||
env = neon_simple_env
|
||||
|
||||
env.create_branch("init")
|
||||
endpoint = env.endpoints.create_start("init")
|
||||
def test_large_records(neon_tenant: NeonTestTenant, vanilla_pg):
|
||||
neon_tenant.create_branch("init")
|
||||
endpoint = neon_tenant.endpoints.create_start("init")
|
||||
|
||||
cur = endpoint.connect().cursor()
|
||||
cur.execute("CREATE TABLE reptbl(id int, largeval text);")
|
||||
@@ -466,14 +465,13 @@ def test_slots_and_branching(neon_simple_env: NeonEnv):
|
||||
|
||||
|
||||
@pytest.mark.parametrize("pageserver_aux_file_policy", [AuxFileStore.CrossValidation])
|
||||
def test_replication_shutdown(neon_simple_env: NeonEnv):
|
||||
def test_replication_shutdown(neon_tenant: NeonTestTenant):
|
||||
# Ensure Postgres can exit without stuck when a replication job is active + neon extension installed
|
||||
env = neon_simple_env
|
||||
env.create_branch("test_replication_shutdown_publisher", ancestor_branch_name="main")
|
||||
pub = env.endpoints.create("test_replication_shutdown_publisher")
|
||||
neon_tenant.create_branch("test_replication_shutdown_publisher", ancestor_branch_name="main")
|
||||
pub = neon_tenant.endpoints.create("test_replication_shutdown_publisher")
|
||||
|
||||
env.create_branch("test_replication_shutdown_subscriber")
|
||||
sub = env.endpoints.create("test_replication_shutdown_subscriber")
|
||||
neon_tenant.create_branch("test_replication_shutdown_subscriber")
|
||||
sub = neon_tenant.endpoints.create("test_replication_shutdown_subscriber")
|
||||
|
||||
pub.respec(skip_pg_catalog_updates=False)
|
||||
pub.start()
|
||||
|
||||
@@ -2,15 +2,14 @@ import time
|
||||
from contextlib import closing
|
||||
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import NeonEnvBuilder
|
||||
from fixtures.neon_tenant import NeonTestTenant
|
||||
|
||||
|
||||
# Verify that the neon extension is installed and has the correct version.
|
||||
def test_neon_extension(neon_env_builder: NeonEnvBuilder):
|
||||
env = neon_env_builder.init_start()
|
||||
env.create_branch("test_create_extension_neon")
|
||||
def test_neon_extension(neon_tenant: NeonTestTenant):
|
||||
neon_tenant.create_branch("test_create_extension_neon")
|
||||
|
||||
endpoint_main = env.endpoints.create("test_create_extension_neon")
|
||||
endpoint_main = neon_tenant.endpoints.create("test_create_extension_neon")
|
||||
# don't skip pg_catalog updates - it runs CREATE EXTENSION neon
|
||||
endpoint_main.respec(skip_pg_catalog_updates=False)
|
||||
endpoint_main.start()
|
||||
@@ -33,11 +32,10 @@ def test_neon_extension(neon_env_builder: NeonEnvBuilder):
|
||||
|
||||
|
||||
# Verify that the neon extension can be upgraded/downgraded.
|
||||
def test_neon_extension_compatibility(neon_env_builder: NeonEnvBuilder):
|
||||
env = neon_env_builder.init_start()
|
||||
env.create_branch("test_neon_extension_compatibility")
|
||||
def test_neon_extension_compatibility(neon_tenant: NeonTestTenant):
|
||||
neon_tenant.create_branch("test_neon_extension_compatibility")
|
||||
|
||||
endpoint_main = env.endpoints.create("test_neon_extension_compatibility")
|
||||
endpoint_main = neon_tenant.endpoints.create("test_neon_extension_compatibility")
|
||||
# don't skip pg_catalog updates - it runs CREATE EXTENSION neon
|
||||
endpoint_main.respec(skip_pg_catalog_updates=False)
|
||||
endpoint_main.start()
|
||||
@@ -70,11 +68,10 @@ def test_neon_extension_compatibility(neon_env_builder: NeonEnvBuilder):
|
||||
|
||||
|
||||
# Verify that the neon extension can be auto-upgraded to the latest version.
|
||||
def test_neon_extension_auto_upgrade(neon_env_builder: NeonEnvBuilder):
|
||||
env = neon_env_builder.init_start()
|
||||
env.create_branch("test_neon_extension_auto_upgrade")
|
||||
def test_neon_extension_auto_upgrade(neon_tenant: NeonTestTenant):
|
||||
neon_tenant.create_branch("test_neon_extension_auto_upgrade")
|
||||
|
||||
endpoint_main = env.endpoints.create("test_neon_extension_auto_upgrade")
|
||||
endpoint_main = neon_tenant.endpoints.create("test_neon_extension_auto_upgrade")
|
||||
# don't skip pg_catalog updates - it runs CREATE EXTENSION neon
|
||||
endpoint_main.respec(skip_pg_catalog_updates=False)
|
||||
endpoint_main.start()
|
||||
|
||||
@@ -1,16 +1,15 @@
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import NeonEnv
|
||||
from fixtures.neon_tenant import NeonTestTenant
|
||||
from fixtures.pg_version import PgVersion
|
||||
from fixtures.utils import wait_until
|
||||
|
||||
|
||||
def test_neon_superuser(neon_simple_env: NeonEnv, pg_version: PgVersion):
|
||||
env = neon_simple_env
|
||||
env.create_branch("test_neon_superuser_publisher", ancestor_branch_name="main")
|
||||
pub = env.endpoints.create("test_neon_superuser_publisher")
|
||||
def test_neon_superuser(neon_tenant: NeonTestTenant, pg_version: PgVersion):
|
||||
neon_tenant.create_branch("test_neon_superuser_publisher", ancestor_branch_name="main")
|
||||
pub = neon_tenant.endpoints.create("test_neon_superuser_publisher")
|
||||
|
||||
env.create_branch("test_neon_superuser_subscriber")
|
||||
sub = env.endpoints.create("test_neon_superuser_subscriber")
|
||||
neon_tenant.create_branch("test_neon_superuser_subscriber")
|
||||
sub = neon_tenant.endpoints.create("test_neon_superuser_subscriber")
|
||||
|
||||
pub.respec(skip_pg_catalog_updates=False)
|
||||
pub.start()
|
||||
|
||||
@@ -1,11 +1,9 @@
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import NeonEnvBuilder
|
||||
from fixtures.neon_tenant import NeonTestTenant
|
||||
|
||||
|
||||
def test_oid_overflow(neon_env_builder: NeonEnvBuilder):
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
endpoint = env.endpoints.create_start("main")
|
||||
def test_oid_overflow(neon_tenant: NeonTestTenant):
|
||||
endpoint = neon_tenant.endpoints.create_start("main")
|
||||
|
||||
conn = endpoint.connect()
|
||||
cur = conn.cursor()
|
||||
|
||||
@@ -1,7 +1,8 @@
|
||||
import asyncio
|
||||
from io import BytesIO
|
||||
|
||||
from fixtures.neon_fixtures import Endpoint, NeonEnv
|
||||
from fixtures.neon_fixtures import Endpoint
|
||||
from fixtures.neon_tenant import NeonTestTenant
|
||||
|
||||
|
||||
async def repeat_bytes(buf, repetitions: int):
|
||||
@@ -39,9 +40,8 @@ async def parallel_load_same_table(endpoint: Endpoint, n_parallel: int):
|
||||
|
||||
|
||||
# Load data into one table with COPY TO from 5 parallel connections
|
||||
def test_parallel_copy(neon_simple_env: NeonEnv, n_parallel=5):
|
||||
env = neon_simple_env
|
||||
endpoint = env.endpoints.create_start("main")
|
||||
def test_parallel_copy(neon_tenant: NeonTestTenant, n_parallel=5):
|
||||
endpoint = neon_tenant.endpoints.create_start("main")
|
||||
|
||||
# Create test table
|
||||
conn = endpoint.connect()
|
||||
|
||||
@@ -3,6 +3,7 @@ from contextlib import closing
|
||||
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import NeonEnv, NeonEnvBuilder, fork_at_current_lsn
|
||||
from fixtures.neon_tenant import NeonTestTenant
|
||||
from fixtures.utils import query_scalar
|
||||
|
||||
|
||||
@@ -114,14 +115,13 @@ def test_vm_bit_clear(neon_simple_env: NeonEnv):
|
||||
assert cur_new.fetchall() == []
|
||||
|
||||
|
||||
def test_vm_bit_clear_on_heap_lock_whitebox(neon_env_builder: NeonEnvBuilder):
|
||||
def test_vm_bit_clear_on_heap_lock_whitebox(neon_tenant: NeonTestTenant):
|
||||
"""
|
||||
Test that the ALL_FROZEN VM bit is cleared correctly at a HEAP_LOCK record.
|
||||
|
||||
This is a repro for the bug fixed in commit 66fa176cc8.
|
||||
"""
|
||||
env = neon_env_builder.init_start()
|
||||
endpoint = env.endpoints.create_start(
|
||||
endpoint = neon_tenant.endpoints.create_start(
|
||||
"main",
|
||||
config_lines=[
|
||||
# If auto-analyze runs at the same time that we run VACUUM FREEZE, it
|
||||
|
||||
Reference in New Issue
Block a user