WIP: Share the storage env between some tests

This introduces a new 'neon_tenant: NeonTestTenant' fixture, which can
be used instead of a NeonEnv. All tests using 'neon_tenant' share the
same NeonEnv. NeonTestTenant provides many of the same members as
NeonEnv, like 'create_branch', 'endpoints' etc. But not the ones that
deal with storage, nor create_tenant.

I converted some existing tests to use the new fixture. More could be
converted, if we add more features to NeonTestTenant:

- Many tests grab the pageserver HTTP client and call GC / compact
  functions on the pageserver. They are per-tenant operations, so they
  could safely be done one a shared pageserver too.

- Some tests use failpoints to add delays or pauses to various operations
  in the pageserver. If the failpoints were tenant- or timeline-scoped,
  they could be used on a shared pageserver too.

- Some tests print intoduce errors in the pageserver logs. They set
  allowlists for those error messages, because any unexpected errors
  in the logs cause the test to fail. If the allowlist was tenant- or
  timleine-scoped, we could allow those tests to share the env.

- Some tests use helper functions like check_restored_datadir_content or
  fork_at_current_lsn, which take a NeonEnv as argument. They could use
  NeonTestTenant instead (or become methods in NeonTestTenant).

- Some tests want to use extra tenant config for the initial tenant. We
  could expand the neon_tenant fixture to allow that. (Perhaps introduce
  NeonTestTenantBuilder which allows setting config before creating the
  initial tenant)

- Some tests create multiple tenants. That could be allowed in a
  shared env, as long as we track which tenants belong to the test.

See https://github.com/neondatabase/neon/issues/9193
This commit is contained in:
Heikki Linnakangas
2024-10-04 17:15:49 +03:00
parent eae4470bb6
commit b33ca57839
19 changed files with 441 additions and 102 deletions

View File

@@ -5,6 +5,7 @@ pytest_plugins = (
"fixtures.compute_reconfigure",
"fixtures.storage_controller_proxy",
"fixtures.neon_fixtures",
"fixtures.neon_tenant",
"fixtures.benchmark_fixture",
"fixtures.pg_stats",
"fixtures.compare_fixtures",

View File

@@ -454,9 +454,10 @@ class NeonEnvBuilder:
self.pageserver_io_buffer_alignment = pageserver_io_buffer_alignment
assert test_name.startswith(
"test_"
), "Unexpectedly instantiated from outside a test function"
# FIXME
# assert test_name.startswith(
# "test_"
# ), "Unexpectedly instantiated from outside a test function"
self.test_name = test_name
def init_configs(self, default_remote_storage_if_missing: bool = True) -> NeonEnv:

View File

@@ -0,0 +1,352 @@
from __future__ import annotations
import random
import string
import threading
import uuid
from dataclasses import dataclass
from pathlib import Path
from types import TracebackType
from typing import (
Any,
Dict,
Iterator,
List,
Optional,
Type,
cast,
)
import pytest
from _pytest.config import Config
from _pytest.fixtures import FixtureRequest
from fixtures.common_types import Lsn, TenantId, TimelineId
from fixtures.log_helper import log
from fixtures.neon_fixtures import Endpoint, NeonEnv, NeonEnvBuilder
from fixtures.pg_version import PgVersion
from fixtures.port_distributor import PortDistributor
from fixtures.remote_storage import (
MockS3Server,
)
from fixtures.utils import AuxFileStore
DEFAULT_BRANCH_NAME: str = "main"
@pytest.fixture(scope="function")
def neon_tenant(shared_env: NeonEnv) -> Iterator[NeonTestTenant]:
tenant = NeonTestTenant(shared_env)
tenant.create()
yield tenant
# TODO: clean up the tenant
@dataclass(frozen=True)
class NeonEnvDiscriminants:
"""The options that define which environments can be shared"""
neon_binpath: Path
pageserver_virtual_file_io_engine: str
pageserver_aux_file_policy: Optional[AuxFileStore]
pageserver_default_tenant_config_compaction_algorithm: Optional[Dict[str, Any]]
pageserver_io_buffer_alignment: Optional[int]
class NeonSharedEnvs:
def __init__(
self,
port_distributor: PortDistributor,
run_id: uuid.UUID,
mock_s3_server: MockS3Server,
pg_distrib_dir: Path,
top_output_dir: Path,
preserve_database_files: bool,
):
self.port_distributor = port_distributor
self.run_id = run_id
self.mock_s3_server = mock_s3_server
self.pg_distrib_dir = pg_distrib_dir
self.top_output_dir = top_output_dir
self.preserve_database_files = preserve_database_files
self.lock = threading.Lock()
self.envs: Dict[NeonEnvDiscriminants, NeonEnv] = {}
self.builders: List[NeonEnvBuilder] = []
def get_repo_dir(self, disc: NeonEnvDiscriminants) -> Path:
# FIXME use discriminants
randstr = "".join(
random.SystemRandom().choice(string.ascii_uppercase + string.digits) for _ in range(10)
)
s = Path(f"shared-{randstr}")
return self.top_output_dir / s
def get_or_create(self, disc: NeonEnvDiscriminants) -> NeonEnv:
with self.lock:
env = self.envs.get(disc)
if env is None:
builder = NeonEnvBuilder(
repo_dir=self.get_repo_dir(disc),
port_distributor=self.port_distributor,
run_id=self.run_id,
mock_s3_server=self.mock_s3_server,
pg_distrib_dir=self.pg_distrib_dir,
preserve_database_files=self.preserve_database_files,
pg_version=PgVersion("17"), # FIXME: this should go unused. Pass None?
test_name="shared", # FIXME
test_output_dir=Path("shared"), # FIXME
test_overlay_dir=None,
top_output_dir=self.top_output_dir,
neon_binpath=disc.neon_binpath,
pageserver_virtual_file_io_engine=disc.pageserver_virtual_file_io_engine,
pageserver_aux_file_policy=disc.pageserver_aux_file_policy,
pageserver_default_tenant_config_compaction_algorithm=disc.pageserver_default_tenant_config_compaction_algorithm,
# FIXME: only support defaults for these currently
# pageserver_remote_storage
# pageserver_config_override
# num_safekeepers
# num_pageservers
# safekeepers_id_start
# safekeepers_enable_fsync
# auth_enabled
# rust_log_override
# default_branch_name
initial_tenant=None, # FIXME should go unused
initial_timeline=None, # FIXME should go unused
# safekeeper_extra_opts: Optional[list[str]] = None,
# storage_controller_port_override: Optional[int] = None,
# pageserver_io_buffer_alignment: Optional[int] = None,
)
env = builder.init_start()
self.envs[disc] = env
return env
def __enter__(self) -> "NeonSharedEnvs":
return self
def __exit__(
self,
exc_type: Optional[Type[BaseException]],
exc: Optional[BaseException],
tb: Optional[TracebackType],
):
for env in self.envs.values():
env.stop(immediate=True)
@pytest.fixture(scope="session")
def shared_environments(
# session fixtures
port_distributor: PortDistributor,
run_id: uuid.UUID,
mock_s3_server: MockS3Server,
pg_distrib_dir: Path,
top_output_dir: Path,
pytestconfig: Config,
) -> Iterator[NeonSharedEnvs]:
with NeonSharedEnvs(
port_distributor=port_distributor,
run_id=run_id,
mock_s3_server=mock_s3_server,
pg_distrib_dir=pg_distrib_dir,
top_output_dir=top_output_dir,
# rust_log_override=rust_log_override, # FIXME
preserve_database_files=cast(bool, pytestconfig.getoption("--preserve-database-files")),
) as envs:
yield envs
@pytest.fixture(scope="function")
def shared_env(
request: FixtureRequest,
# session fixture holding all the envs
shared_environments: NeonSharedEnvs,
# other session fixtures
port_distributor: PortDistributor,
mock_s3_server: MockS3Server,
run_id: uuid.UUID,
top_output_dir: Path,
pg_distrib_dir: Path,
# these define the env to use
neon_binpath: Path,
pageserver_virtual_file_io_engine: str,
pageserver_aux_file_policy: Optional[AuxFileStore],
pageserver_default_tenant_config_compaction_algorithm: Optional[Dict[str, Any]],
pageserver_io_buffer_alignment: Optional[int],
) -> NeonEnv:
disc = NeonEnvDiscriminants(
neon_binpath=neon_binpath,
# FIXME: There's no difference in e.g. having pageserver_virtual_file_io_engine=None, and
# explicitly specifying whatever the default is. We could share those envs.
pageserver_virtual_file_io_engine=pageserver_virtual_file_io_engine,
pageserver_aux_file_policy=pageserver_aux_file_policy,
pageserver_default_tenant_config_compaction_algorithm=pageserver_default_tenant_config_compaction_algorithm,
pageserver_io_buffer_alignment=pageserver_io_buffer_alignment,
)
return shared_environments.get_or_create(disc)
class NeonTestTenant:
"""
An object representing a single Neon tenant, in a shared environment
Notable functions and fields:
endpoints - A factory object for creating postgres compute nodes.
tenant_id - tenant ID of the initial tenant created in the repository
initial_timeline - timeline ID of the "main" branch
create_branch() - branch a new timeline from an existing one, returns
the new timeline id
create_timeline() - initializes a new timeline by running initdb, returns
the new timeline id
"""
def __init__(self, env: NeonEnv):
self.tenant_id = TenantId.generate()
self.initial_timeline = TimelineId.generate()
self.created = False
self.endpoints = TenantEndpointFactory(self)
self.env = env
def create(
self,
conf: Optional[Dict[str, Any]] = None,
shard_count: Optional[int] = None,
shard_stripe_size: Optional[int] = None,
placement_policy: Optional[str] = None,
aux_file_policy: Optional[AuxFileStore] = None,
):
assert not self.created
self.env.create_tenant(
tenant_id=self.tenant_id,
timeline_id=self.initial_timeline,
conf=conf,
shard_count=shard_count,
shard_stripe_size=shard_stripe_size,
placement_policy=placement_policy,
set_default=False,
aux_file_policy=aux_file_policy,
)
self.created = True
# Todo: this could be imeplemented
# def config_tenant(self, conf: Dict[str, str]):
def create_branch(
self,
new_branch_name: str = DEFAULT_BRANCH_NAME,
ancestor_branch_name: Optional[str] = None,
ancestor_start_lsn: Optional[Lsn] = None,
new_timeline_id: Optional[TimelineId] = None,
) -> TimelineId:
return self.env.create_branch(
new_branch_name=new_branch_name,
tenant_id=self.tenant_id,
ancestor_branch_name=ancestor_branch_name,
ancestor_start_lsn=ancestor_start_lsn,
new_timeline_id=new_timeline_id,
)
def create_timeline(
self,
new_branch_name: str,
timeline_id: Optional[TimelineId] = None,
) -> TimelineId:
return self.env.create_timeline(new_branch_name=new_branch_name, timeline_id=timeline_id)
class TenantEndpointFactory:
"""An object representing multiple compute endpoints of a single tenant."""
def __init__(self, tenant: NeonTestTenant):
self.tenant = tenant
self.num_instances: int = 0
self.endpoints: List[Endpoint] = []
def create(
self,
branch_name: str,
endpoint_id: Optional[str] = None,
lsn: Optional[Lsn] = None,
hot_standby: bool = False,
config_lines: Optional[List[str]] = None,
) -> Endpoint:
ep = Endpoint(
self.tenant.env,
tenant_id=self.tenant.tenant_id,
pg_port=self.tenant.env.port_distributor.get_port(),
http_port=self.tenant.env.port_distributor.get_port(),
)
endpoint_id = endpoint_id or self.tenant.env.generate_endpoint_id()
self.num_instances += 1
self.endpoints.append(ep)
return ep.create(
branch_name=branch_name,
endpoint_id=endpoint_id,
lsn=lsn,
hot_standby=hot_standby,
config_lines=config_lines,
)
# FIXME: extra args for start
# remote_ext_config: Optional[str] = None,
# basebackup_request_tries: Optional[int] = None,
def create_start(self, *args, **kwargs):
ep = self.create(*args, **kwargs)
ep.start()
return ep
def stop_all(self, fail_on_error=True) -> "TenantEndpointFactory":
exception = None
for ep in self.endpoints:
try:
ep.stop()
except Exception as e:
log.error(f"Failed to stop endpoint {ep.endpoint_id}: {e}")
exception = e
if fail_on_error and exception is not None:
raise exception
return self
def new_replica(
self, origin: Endpoint, endpoint_id: str, config_lines: Optional[List[str]] = None
):
branch_name = origin.branch_name
assert origin in self.endpoints
assert branch_name is not None
return self.create(
branch_name=branch_name,
endpoint_id=endpoint_id,
lsn=None,
hot_standby=True,
config_lines=config_lines,
)
def new_replica_start(
self, origin: Endpoint, endpoint_id: str, config_lines: Optional[List[str]] = None
):
branch_name = origin.branch_name
assert origin in self.endpoints
assert branch_name is not None
return self.create_start(
branch_name=branch_name,
endpoint_id=endpoint_id,
lsn=None,
hot_standby=True,
config_lines=config_lines,
)

View File

@@ -2,16 +2,14 @@ import os
import time
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnv
from fixtures.neon_tenant import NeonTestTenant
from fixtures.utils import query_scalar
#
# Test compute node start after clog truncation
#
def test_clog_truncate(neon_simple_env: NeonEnv):
env = neon_simple_env
def test_clog_truncate(neon_tenant: NeonTestTenant):
# set aggressive autovacuum to make sure that truncation will happen
config = [
"autovacuum_max_workers=10",
@@ -23,7 +21,7 @@ def test_clog_truncate(neon_simple_env: NeonEnv):
"autovacuum_freeze_max_age=100000",
]
endpoint = env.endpoints.create_start("main", config_lines=config)
endpoint = neon_tenant.endpoints.create_start("main", config_lines=config)
# Install extension containing function needed for test
endpoint.safe_psql("CREATE EXTENSION neon_test_utils")
@@ -56,12 +54,12 @@ def test_clog_truncate(neon_simple_env: NeonEnv):
# create new branch after clog truncation and start a compute node on it
log.info(f"create branch at lsn_after_truncation {lsn_after_truncation}")
env.create_branch(
neon_tenant.create_branch(
"test_clog_truncate_new",
ancestor_branch_name="main",
ancestor_start_lsn=lsn_after_truncation,
)
endpoint2 = env.endpoints.create_start("test_clog_truncate_new")
endpoint2 = neon_tenant.endpoints.create_start("test_clog_truncate_new")
# check that new node doesn't contain truncated segment
pg_xact_0000_path_new = os.path.join(endpoint2.pg_xact_dir_path(), "0000")

View File

@@ -1,11 +1,9 @@
import requests
from fixtures.neon_fixtures import NeonEnv
from fixtures.neon_tenant import NeonTestTenant
def test_compute_catalog(neon_simple_env: NeonEnv):
env = neon_simple_env
endpoint = env.endpoints.create_start("main", config_lines=["log_min_messages=debug1"])
def test_compute_catalog(neon_tenant: NeonTestTenant):
endpoint = neon_tenant.endpoints.create_start("main", config_lines=["log_min_messages=debug1"])
client = endpoint.http_client()
objects = client.dbs_and_roles()

View File

@@ -4,6 +4,7 @@ import pathlib
import pytest
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnv, check_restored_datadir_content
from fixtures.neon_tenant import NeonTestTenant
from fixtures.pg_version import PgVersion
from fixtures.utils import query_scalar
@@ -12,18 +13,17 @@ from fixtures.utils import query_scalar
# Test CREATE DATABASE when there have been relmapper changes
#
@pytest.mark.parametrize("strategy", ["file_copy", "wal_log"])
def test_createdb(neon_simple_env: NeonEnv, strategy: str):
env = neon_simple_env
if env.pg_version == PgVersion.V14 and strategy == "wal_log":
def test_createdb(neon_tenant: NeonTestTenant, strategy: str, pg_version: PgVersion):
if pg_version == PgVersion.V14 and strategy == "wal_log":
pytest.skip("wal_log strategy not supported on PostgreSQL 14")
endpoint = env.endpoints.create_start("main")
endpoint = neon_tenant.endpoints.create_start("main")
with endpoint.cursor() as cur:
# Cause a 'relmapper' change in the original branch
cur.execute("VACUUM FULL pg_class")
if env.pg_version == PgVersion.V14:
if pg_version == PgVersion.V14:
cur.execute("CREATE DATABASE foodb")
else:
cur.execute(f"CREATE DATABASE foodb STRATEGY={strategy}")
@@ -31,8 +31,8 @@ def test_createdb(neon_simple_env: NeonEnv, strategy: str):
lsn = query_scalar(cur, "SELECT pg_current_wal_insert_lsn()")
# Create a branch
env.create_branch("test_createdb2", ancestor_branch_name="main", ancestor_start_lsn=lsn)
endpoint2 = env.endpoints.create_start("test_createdb2")
neon_tenant.create_branch("test_createdb2", ancestor_branch_name="main", ancestor_start_lsn=lsn)
endpoint2 = neon_tenant.endpoints.create_start("test_createdb2")
# Test that you can connect to the new database on both branches
for db in (endpoint, endpoint2):

View File

@@ -1,13 +1,12 @@
from fixtures.neon_fixtures import NeonEnv
from fixtures.neon_tenant import NeonTestTenant
from fixtures.utils import query_scalar
#
# Test CREATE USER to check shared catalog restore
#
def test_createuser(neon_simple_env: NeonEnv):
env = neon_simple_env
endpoint = env.endpoints.create_start("main")
def test_createuser(neon_tenant: NeonTestTenant):
endpoint = neon_tenant.endpoints.create_start("main")
with endpoint.cursor() as cur:
# Cause a 'relmapper' change in the original branch
@@ -18,8 +17,10 @@ def test_createuser(neon_simple_env: NeonEnv):
lsn = query_scalar(cur, "SELECT pg_current_wal_insert_lsn()")
# Create a branch
env.create_branch("test_createuser2", ancestor_branch_name="main", ancestor_start_lsn=lsn)
endpoint2 = env.endpoints.create_start("test_createuser2")
neon_tenant.create_branch(
"test_createuser2", ancestor_branch_name="main", ancestor_start_lsn=lsn
)
endpoint2 = neon_tenant.endpoints.create_start("test_createuser2")
# Test that you can connect to new branch as a new user
assert endpoint2.safe_psql("select current_user", user="testuser") == [("testuser",)]

View File

@@ -4,7 +4,8 @@ from typing import Any, Dict, List, Optional, Tuple, Type
import psycopg2
import pytest
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnv, VanillaPostgres
from fixtures.neon_fixtures import VanillaPostgres
from fixtures.neon_tenant import NeonTestTenant
from pytest_httpserver import HTTPServer
from werkzeug.wrappers.request import Request
from werkzeug.wrappers.response import Response
@@ -288,9 +289,8 @@ def assert_db_connlimit(endpoint: Any, db_name: str, connlimit: int, msg: str):
# 2. User can ignore, then compute_ctl will drop invalid databases
# automatically during full configuration
# Here we test the latter. The first one is tested in test_ddl_forwarding
def test_ddl_forwarding_invalid_db(neon_simple_env: NeonEnv):
env = neon_simple_env
endpoint = env.endpoints.create_start(
def test_ddl_forwarding_invalid_db(neon_tenant: NeonTestTenant):
endpoint = neon_tenant.endpoints.create_start(
"main",
# Some non-existent url
config_lines=["neon.console_url=http://localhost:9999/unknown/api/v0/roles_and_databases"],

View File

@@ -1,5 +1,5 @@
import pytest
from fixtures.neon_fixtures import NeonEnvBuilder
from fixtures.neon_tenant import NeonTestTenant
@pytest.mark.parametrize(
@@ -10,13 +10,12 @@ from fixtures.neon_fixtures import NeonEnvBuilder
"💣", # calls `trigger_segfault` internally
],
)
def test_endpoint_crash(neon_env_builder: NeonEnvBuilder, sql_func: str):
def test_endpoint_crash(neon_tenant: NeonTestTenant, sql_func: str):
"""
Test that triggering crash from neon_test_utils crashes the endpoint
"""
env = neon_env_builder.init_start()
env.create_branch("test_endpoint_crash")
endpoint = env.endpoints.create_start("test_endpoint_crash")
neon_tenant.create_branch("test_endpoint_crash")
endpoint = neon_tenant.endpoints.create_start("test_endpoint_crash")
endpoint.safe_psql("CREATE EXTENSION neon_test_utils;")
with pytest.raises(Exception, match="This probably means the server terminated abnormally"):

View File

@@ -1,10 +1,9 @@
from fixtures.neon_fixtures import NeonEnvBuilder
from fixtures.neon_tenant import NeonTestTenant
def test_fsm_truncate(neon_env_builder: NeonEnvBuilder):
env = neon_env_builder.init_start()
env.create_branch("test_fsm_truncate")
endpoint = env.endpoints.create_start("test_fsm_truncate")
def test_fsm_truncate(neon_tenant: NeonTestTenant):
neon_tenant.create_branch("test_fsm_truncate")
endpoint = neon_tenant.endpoints.create_start("test_fsm_truncate")
endpoint.safe_psql(
"CREATE TABLE t1(key int); CREATE TABLE t2(key int); TRUNCATE TABLE t1; TRUNCATE TABLE t2;"
)

View File

@@ -1,17 +1,16 @@
import time
from fixtures.neon_fixtures import NeonEnv, wait_replica_caughtup
from fixtures.neon_fixtures import wait_replica_caughtup
from fixtures.neon_tenant import NeonTestTenant
#
# Test that redo of XLOG_GIN_VACUUM_PAGE doesn't produce error
#
def test_gin_redo(neon_simple_env: NeonEnv):
env = neon_simple_env
primary = env.endpoints.create_start(branch_name="main", endpoint_id="primary")
def test_gin_redo(neon_tenant: NeonTestTenant):
primary = neon_tenant.endpoints.create_start(branch_name="main", endpoint_id="primary")
time.sleep(1)
secondary = env.endpoints.new_replica_start(origin=primary, endpoint_id="secondary")
secondary = neon_tenant.endpoints.new_replica_start(origin=primary, endpoint_id="secondary")
con = primary.connect()
cur = con.cursor()
cur.execute("create table gin_test_tbl(id integer, i int4[])")

View File

@@ -7,16 +7,16 @@ import time
import pytest
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnv, PgBin
from fixtures.neon_fixtures import PgBin
from fixtures.neon_tenant import NeonTestTenant
@pytest.mark.timeout(600)
def test_lfc_resize(neon_simple_env: NeonEnv, pg_bin: PgBin):
def test_lfc_resize(neon_tenant: NeonTestTenant, pg_bin: PgBin):
"""
Test resizing the Local File Cache
"""
env = neon_simple_env
endpoint = env.endpoints.create_start(
endpoint = neon_tenant.endpoints.create_start(
"main",
config_lines=[
"neon.file_cache_path='file.cache'",

View File

@@ -3,19 +3,18 @@ import queue
import random
import threading
import time
from pathlib import Path
from typing import List
from fixtures.neon_fixtures import NeonEnvBuilder
from fixtures.neon_tenant import NeonTestTenant
from fixtures.utils import query_scalar
def test_local_file_cache_unlink(neon_env_builder: NeonEnvBuilder):
env = neon_env_builder.init_start()
cache_dir = os.path.join(env.repo_dir, "file_cache")
def test_local_file_cache_unlink(neon_tenant: NeonTestTenant, test_output_dir: Path):
cache_dir = test_output_dir / Path("file_cache")
os.mkdir(cache_dir)
endpoint = env.endpoints.create_start(
endpoint = neon_tenant.endpoints.create_start(
"main",
config_lines=[
"shared_buffers='1MB'",
@@ -66,7 +65,7 @@ def test_local_file_cache_unlink(neon_env_builder: NeonEnvBuilder):
time.sleep(5)
# unlink, this is what we're actually testing
new_cache_dir = os.path.join(env.repo_dir, "file_cache_new")
new_cache_dir = test_output_dir / Path("file_cache_new")
os.rename(cache_dir, new_cache_dir)
time.sleep(10)

View File

@@ -14,6 +14,7 @@ from fixtures.neon_fixtures import (
logical_replication_sync,
wait_for_last_flush_lsn,
)
from fixtures.neon_tenant import NeonTestTenant
from fixtures.utils import wait_until
@@ -394,11 +395,9 @@ def test_restart_endpoint(neon_simple_env: NeonEnv, vanilla_pg):
# records passed ot the WAL redo process are never large enough to hit
# the bug.
@pytest.mark.parametrize("pageserver_aux_file_policy", [AuxFileStore.CrossValidation])
def test_large_records(neon_simple_env: NeonEnv, vanilla_pg):
env = neon_simple_env
env.create_branch("init")
endpoint = env.endpoints.create_start("init")
def test_large_records(neon_tenant: NeonTestTenant, vanilla_pg):
neon_tenant.create_branch("init")
endpoint = neon_tenant.endpoints.create_start("init")
cur = endpoint.connect().cursor()
cur.execute("CREATE TABLE reptbl(id int, largeval text);")
@@ -466,14 +465,13 @@ def test_slots_and_branching(neon_simple_env: NeonEnv):
@pytest.mark.parametrize("pageserver_aux_file_policy", [AuxFileStore.CrossValidation])
def test_replication_shutdown(neon_simple_env: NeonEnv):
def test_replication_shutdown(neon_tenant: NeonTestTenant):
# Ensure Postgres can exit without stuck when a replication job is active + neon extension installed
env = neon_simple_env
env.create_branch("test_replication_shutdown_publisher", ancestor_branch_name="main")
pub = env.endpoints.create("test_replication_shutdown_publisher")
neon_tenant.create_branch("test_replication_shutdown_publisher", ancestor_branch_name="main")
pub = neon_tenant.endpoints.create("test_replication_shutdown_publisher")
env.create_branch("test_replication_shutdown_subscriber")
sub = env.endpoints.create("test_replication_shutdown_subscriber")
neon_tenant.create_branch("test_replication_shutdown_subscriber")
sub = neon_tenant.endpoints.create("test_replication_shutdown_subscriber")
pub.respec(skip_pg_catalog_updates=False)
pub.start()

View File

@@ -2,15 +2,14 @@ import time
from contextlib import closing
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnvBuilder
from fixtures.neon_tenant import NeonTestTenant
# Verify that the neon extension is installed and has the correct version.
def test_neon_extension(neon_env_builder: NeonEnvBuilder):
env = neon_env_builder.init_start()
env.create_branch("test_create_extension_neon")
def test_neon_extension(neon_tenant: NeonTestTenant):
neon_tenant.create_branch("test_create_extension_neon")
endpoint_main = env.endpoints.create("test_create_extension_neon")
endpoint_main = neon_tenant.endpoints.create("test_create_extension_neon")
# don't skip pg_catalog updates - it runs CREATE EXTENSION neon
endpoint_main.respec(skip_pg_catalog_updates=False)
endpoint_main.start()
@@ -33,11 +32,10 @@ def test_neon_extension(neon_env_builder: NeonEnvBuilder):
# Verify that the neon extension can be upgraded/downgraded.
def test_neon_extension_compatibility(neon_env_builder: NeonEnvBuilder):
env = neon_env_builder.init_start()
env.create_branch("test_neon_extension_compatibility")
def test_neon_extension_compatibility(neon_tenant: NeonTestTenant):
neon_tenant.create_branch("test_neon_extension_compatibility")
endpoint_main = env.endpoints.create("test_neon_extension_compatibility")
endpoint_main = neon_tenant.endpoints.create("test_neon_extension_compatibility")
# don't skip pg_catalog updates - it runs CREATE EXTENSION neon
endpoint_main.respec(skip_pg_catalog_updates=False)
endpoint_main.start()
@@ -70,11 +68,10 @@ def test_neon_extension_compatibility(neon_env_builder: NeonEnvBuilder):
# Verify that the neon extension can be auto-upgraded to the latest version.
def test_neon_extension_auto_upgrade(neon_env_builder: NeonEnvBuilder):
env = neon_env_builder.init_start()
env.create_branch("test_neon_extension_auto_upgrade")
def test_neon_extension_auto_upgrade(neon_tenant: NeonTestTenant):
neon_tenant.create_branch("test_neon_extension_auto_upgrade")
endpoint_main = env.endpoints.create("test_neon_extension_auto_upgrade")
endpoint_main = neon_tenant.endpoints.create("test_neon_extension_auto_upgrade")
# don't skip pg_catalog updates - it runs CREATE EXTENSION neon
endpoint_main.respec(skip_pg_catalog_updates=False)
endpoint_main.start()

View File

@@ -1,16 +1,15 @@
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnv
from fixtures.neon_tenant import NeonTestTenant
from fixtures.pg_version import PgVersion
from fixtures.utils import wait_until
def test_neon_superuser(neon_simple_env: NeonEnv, pg_version: PgVersion):
env = neon_simple_env
env.create_branch("test_neon_superuser_publisher", ancestor_branch_name="main")
pub = env.endpoints.create("test_neon_superuser_publisher")
def test_neon_superuser(neon_tenant: NeonTestTenant, pg_version: PgVersion):
neon_tenant.create_branch("test_neon_superuser_publisher", ancestor_branch_name="main")
pub = neon_tenant.endpoints.create("test_neon_superuser_publisher")
env.create_branch("test_neon_superuser_subscriber")
sub = env.endpoints.create("test_neon_superuser_subscriber")
neon_tenant.create_branch("test_neon_superuser_subscriber")
sub = neon_tenant.endpoints.create("test_neon_superuser_subscriber")
pub.respec(skip_pg_catalog_updates=False)
pub.start()

View File

@@ -1,11 +1,9 @@
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnvBuilder
from fixtures.neon_tenant import NeonTestTenant
def test_oid_overflow(neon_env_builder: NeonEnvBuilder):
env = neon_env_builder.init_start()
endpoint = env.endpoints.create_start("main")
def test_oid_overflow(neon_tenant: NeonTestTenant):
endpoint = neon_tenant.endpoints.create_start("main")
conn = endpoint.connect()
cur = conn.cursor()

View File

@@ -1,7 +1,8 @@
import asyncio
from io import BytesIO
from fixtures.neon_fixtures import Endpoint, NeonEnv
from fixtures.neon_fixtures import Endpoint
from fixtures.neon_tenant import NeonTestTenant
async def repeat_bytes(buf, repetitions: int):
@@ -39,9 +40,8 @@ async def parallel_load_same_table(endpoint: Endpoint, n_parallel: int):
# Load data into one table with COPY TO from 5 parallel connections
def test_parallel_copy(neon_simple_env: NeonEnv, n_parallel=5):
env = neon_simple_env
endpoint = env.endpoints.create_start("main")
def test_parallel_copy(neon_tenant: NeonTestTenant, n_parallel=5):
endpoint = neon_tenant.endpoints.create_start("main")
# Create test table
conn = endpoint.connect()

View File

@@ -3,6 +3,7 @@ from contextlib import closing
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnv, NeonEnvBuilder, fork_at_current_lsn
from fixtures.neon_tenant import NeonTestTenant
from fixtures.utils import query_scalar
@@ -114,14 +115,13 @@ def test_vm_bit_clear(neon_simple_env: NeonEnv):
assert cur_new.fetchall() == []
def test_vm_bit_clear_on_heap_lock_whitebox(neon_env_builder: NeonEnvBuilder):
def test_vm_bit_clear_on_heap_lock_whitebox(neon_tenant: NeonTestTenant):
"""
Test that the ALL_FROZEN VM bit is cleared correctly at a HEAP_LOCK record.
This is a repro for the bug fixed in commit 66fa176cc8.
"""
env = neon_env_builder.init_start()
endpoint = env.endpoints.create_start(
endpoint = neon_tenant.endpoints.create_start(
"main",
config_lines=[
# If auto-analyze runs at the same time that we run VACUUM FREEZE, it