Merge remote-tracking branch 'origin/main' into allow-tenant_create-with-tenant-token

(Minor) Conflicts:
    pageserver/src/http/routes.rs
    test_runner/regress/test_auth.py
This commit is contained in:
Christian Schwarz
2023-06-12 10:49:29 +02:00
176 changed files with 10102 additions and 4120 deletions

View File

@@ -312,6 +312,6 @@ def neon_with_baseline(request: FixtureRequest) -> PgCompare:
implementation-specific logic is widely useful across multiple tests, it might
make sense to add methods to the PgCompare class.
"""
fixture = request.getfixturevalue(request.param) # type: ignore
fixture = request.getfixturevalue(request.param)
assert isinstance(fixture, PgCompare), f"test error: fixture {fixture} is not PgCompare"
return fixture

View File

@@ -65,12 +65,19 @@ PAGESERVER_PER_TENANT_METRICS: Tuple[str, ...] = (
"pageserver_getpage_reconstruct_seconds_bucket",
"pageserver_getpage_reconstruct_seconds_count",
"pageserver_getpage_reconstruct_seconds_sum",
"pageserver_getpage_get_reconstruct_data_seconds_bucket",
"pageserver_getpage_get_reconstruct_data_seconds_count",
"pageserver_getpage_get_reconstruct_data_seconds_sum",
"pageserver_io_operations_bytes_total",
"pageserver_io_operations_seconds_bucket",
"pageserver_io_operations_seconds_count",
"pageserver_io_operations_seconds_sum",
"pageserver_last_record_lsn",
"pageserver_materialized_cache_hits_total",
"pageserver_materialized_cache_hits_direct_total",
"pageserver_read_num_fs_layers_bucket",
"pageserver_read_num_fs_layers_count",
"pageserver_read_num_fs_layers_sum",
"pageserver_smgr_query_seconds_bucket",
"pageserver_smgr_query_seconds_count",
"pageserver_smgr_query_seconds_sum",

View File

@@ -26,7 +26,7 @@ from typing import Any, Dict, Iterator, List, Optional, Tuple, Type, Union, cast
from urllib.parse import urlparse
import asyncpg
import backoff # type: ignore
import backoff
import boto3
import jwt
import psycopg2
@@ -354,7 +354,7 @@ class PgProtocol:
Returns psycopg2's connection object.
This method passes all extra params to connstr.
"""
conn = psycopg2.connect(**self.conn_options(**kwargs))
conn: PgConnection = psycopg2.connect(**self.conn_options(**kwargs))
# WARNING: this setting affects *all* tests!
conn.autocommit = autocommit
@@ -629,7 +629,7 @@ class NeonEnvBuilder:
assert self.env is not None, "environment is not already initialized, call init() first"
self.env.start()
def init_start(self) -> NeonEnv:
def init_start(self, initial_tenant_conf: Optional[Dict[str, str]] = None) -> NeonEnv:
env = self.init_configs()
self.start()
@@ -638,7 +638,9 @@ class NeonEnvBuilder:
log.info(
f"Services started, creating initial tenant {env.initial_tenant} and its initial timeline"
)
initial_tenant, initial_timeline = env.neon_cli.create_tenant(tenant_id=env.initial_tenant)
initial_tenant, initial_timeline = env.neon_cli.create_tenant(
tenant_id=env.initial_tenant, conf=initial_tenant_conf
)
env.initial_timeline = initial_timeline
log.info(f"Initial timeline {initial_tenant}/{initial_timeline} created successfully")
@@ -661,6 +663,8 @@ class NeonEnvBuilder:
else:
raise RuntimeError(f"Unknown storage type: {remote_storage_kind}")
self.remote_storage_kind = remote_storage_kind
def enable_local_fs_remote_storage(self, force_enable: bool = True):
"""
Sets up the pageserver to use the local fs at the `test_dir/local_fs_remote_storage` path.
@@ -1444,11 +1448,12 @@ class NeonCli(AbstractNeonCli):
def endpoint_create(
self,
branch_name: str,
pg_port: int,
http_port: int,
endpoint_id: Optional[str] = None,
tenant_id: Optional[TenantId] = None,
hot_standby: bool = False,
lsn: Optional[Lsn] = None,
port: Optional[int] = None,
) -> "subprocess.CompletedProcess[str]":
args = [
"endpoint",
@@ -1462,8 +1467,10 @@ class NeonCli(AbstractNeonCli):
]
if lsn is not None:
args.extend(["--lsn", str(lsn)])
if port is not None:
args.extend(["--port", str(port)])
if pg_port is not None:
args.extend(["--pg-port", str(pg_port)])
if http_port is not None:
args.extend(["--http-port", str(http_port)])
if endpoint_id is not None:
args.append(endpoint_id)
if hot_standby:
@@ -1476,9 +1483,11 @@ class NeonCli(AbstractNeonCli):
def endpoint_start(
self,
endpoint_id: str,
pg_port: int,
http_port: int,
safekeepers: Optional[List[int]] = None,
tenant_id: Optional[TenantId] = None,
lsn: Optional[Lsn] = None,
port: Optional[int] = None,
) -> "subprocess.CompletedProcess[str]":
args = [
"endpoint",
@@ -1490,8 +1499,10 @@ class NeonCli(AbstractNeonCli):
]
if lsn is not None:
args.append(f"--lsn={lsn}")
if port is not None:
args.append(f"--port={port}")
args.extend(["--pg-port", str(pg_port)])
args.extend(["--http-port", str(http_port)])
if safekeepers is not None:
args.extend(["--safekeepers", (",".join(map(str, safekeepers)))])
if endpoint_id is not None:
args.append(endpoint_id)
@@ -1583,13 +1594,11 @@ class NeonPageserver(PgProtocol):
".*serving compute connection task.*exited with error: Postgres connection error.*",
".*serving compute connection task.*exited with error: Connection reset by peer.*",
".*serving compute connection task.*exited with error: Postgres query error.*",
".*Connection aborted: connection error: error communicating with the server: Broken pipe.*",
".*Connection aborted: connection error: error communicating with the server: Transport endpoint is not connected.*",
".*Connection aborted: connection error: error communicating with the server: Connection reset by peer.*",
".*Connection aborted: error communicating with the server: Transport endpoint is not connected.*",
# FIXME: replication patch for tokio_postgres regards any but CopyDone/CopyData message in CopyBoth stream as unexpected
".*Connection aborted: connection error: unexpected message from server*",
".*Connection aborted: unexpected message from server*",
".*kill_and_wait_impl.*: wait successful.*",
".*Replication stream finished: db error:.*ending streaming to Some*",
".*: db error:.*ending streaming to Some.*",
".*query handler for 'pagestream.*failed: Broken pipe.*", # pageserver notices compute shut down
".*query handler for 'pagestream.*failed: Connection reset by peer.*", # pageserver notices compute shut down
# safekeeper connection can fail with this, in the window between timeline creation
@@ -1603,24 +1612,25 @@ class NeonPageserver(PgProtocol):
# https://github.com/neondatabase/neon/issues/2442
".*could not remove ephemeral file.*No such file or directory.*",
# FIXME: These need investigation
".*gc_loop.*Failed to get a tenant .* Tenant .* not found.*",
".*compaction_loop.*Failed to get a tenant .* Tenant .* not found.*",
".*manual_gc.*is_shutdown_requested\\(\\) called in an unexpected task or thread.*",
".*tenant_list: timeline is not found in remote index while it is present in the tenants registry.*",
".*Removing intermediate uninit mark file.*",
# FIXME: known race condition in TaskHandle: https://github.com/neondatabase/neon/issues/2885
".*sender is dropped while join handle is still alive.*",
# Tenant::delete_timeline() can cause any of the four following errors.
# FIXME: we shouldn't be considering it an error: https://github.com/neondatabase/neon/issues/2946
".*could not flush frozen layer.*queue is in state Stopped", # when schedule layer upload fails because queued got closed before compaction got killed
".*wait for layer upload ops to complete.*", # .*Caused by:.*wait_completion aborted because upload queue was stopped
".*gc_loop.*Gc failed, retrying in.*timeline is Stopping", # When gc checks timeline state after acquiring layer_removal_cs
".*gc_loop.*Gc failed, retrying in.*: Cannot run GC iteration on inactive tenant", # Tenant::gc precondition
".*compaction_loop.*Compaction failed, retrying in.*timeline is Stopping", # When compaction checks timeline state after acquiring layer_removal_cs
".*query handler for 'pagestream.*failed: Timeline .* was not found", # postgres reconnects while timeline_delete doesn't hold the tenant's timelines.lock()
".*query handler for 'pagestream.*failed: Timeline .* is not active", # timeline delete in progress
".*task iteration took longer than the configured period.*",
# this is until #3501
".*Compaction failed, retrying in [^:]+: Cannot run compaction iteration on inactive tenant",
# these can happen anytime we do compactions from background task and shutdown pageserver
r".*ERROR.*ancestor timeline \S+ is being stopped",
# this is expected given our collaborative shutdown approach for the UploadQueue
".*Compaction failed, retrying in .*: queue is in state Stopped.*",
]
def start(
@@ -1688,6 +1698,9 @@ class NeonPageserver(PgProtocol):
else:
errors.append(line)
for error in errors:
log.info(f"not allowed error: {error.strip()}")
assert not errors
def log_contains(self, pattern: str) -> Optional[str]:
@@ -2280,17 +2293,24 @@ class Endpoint(PgProtocol):
"""An object representing a Postgres compute endpoint managed by the control plane."""
def __init__(
self, env: NeonEnv, tenant_id: TenantId, port: int, check_stop_result: bool = True
self,
env: NeonEnv,
tenant_id: TenantId,
pg_port: int,
http_port: int,
check_stop_result: bool = True,
):
super().__init__(host="localhost", port=port, user="cloud_admin", dbname="postgres")
super().__init__(host="localhost", port=pg_port, user="cloud_admin", dbname="postgres")
self.env = env
self.running = False
self.branch_name: Optional[str] = None # dubious
self.endpoint_id: Optional[str] = None # dubious, see asserts below
self.pgdata_dir: Optional[str] = None # Path to computenode PGDATA
self.tenant_id = tenant_id
self.port = port
self.pg_port = pg_port
self.http_port = http_port
self.check_stop_result = check_stop_result
self.active_safekeepers: List[int] = list(map(lambda sk: sk.id, env.safekeepers))
# path to conf is <repo_dir>/endpoints/<endpoint_id>/pgdata/postgresql.conf
def create(
@@ -2320,7 +2340,8 @@ class Endpoint(PgProtocol):
tenant_id=self.tenant_id,
lsn=lsn,
hot_standby=hot_standby,
port=self.port,
pg_port=self.pg_port,
http_port=self.http_port,
)
path = Path("endpoints") / self.endpoint_id / "pgdata"
self.pgdata_dir = os.path.join(self.env.repo_dir, path)
@@ -2345,7 +2366,13 @@ class Endpoint(PgProtocol):
log.info(f"Starting postgres endpoint {self.endpoint_id}")
self.env.neon_cli.endpoint_start(self.endpoint_id, tenant_id=self.tenant_id, port=self.port)
self.env.neon_cli.endpoint_start(
self.endpoint_id,
pg_port=self.pg_port,
http_port=self.http_port,
tenant_id=self.tenant_id,
safekeepers=self.active_safekeepers,
)
self.running = True
return self
@@ -2369,32 +2396,8 @@ class Endpoint(PgProtocol):
return os.path.join(self.pg_data_dir_path(), "pg_twophase")
def config_file_path(self) -> str:
"""Path to postgresql.conf"""
return os.path.join(self.pg_data_dir_path(), "postgresql.conf")
def adjust_for_safekeepers(self, safekeepers: str) -> "Endpoint":
"""
Adjust instance config for working with wal acceptors instead of
pageserver (pre-configured by CLI) directly.
"""
# TODO: reuse config()
with open(self.config_file_path(), "r") as f:
cfg_lines = f.readlines()
with open(self.config_file_path(), "w") as f:
for cfg_line in cfg_lines:
# walproposer uses different application_name
if (
"synchronous_standby_names" in cfg_line
or
# don't repeat safekeepers/wal_acceptors multiple times
"neon.safekeepers" in cfg_line
):
continue
f.write(cfg_line)
f.write("synchronous_standby_names = 'walproposer'\n")
f.write("neon.safekeepers = '{}'\n".format(safekeepers))
return self
"""Path to the postgresql.conf in the endpoint directory (not the one in pgdata)"""
return os.path.join(self.endpoint_path(), "postgresql.conf")
def config(self, lines: List[str]) -> "Endpoint":
"""
@@ -2499,7 +2502,8 @@ class EndpointFactory:
ep = Endpoint(
self.env,
tenant_id=tenant_id or self.env.initial_tenant,
port=self.env.port_distributor.get_port(),
pg_port=self.env.port_distributor.get_port(),
http_port=self.env.port_distributor.get_port(),
)
self.num_instances += 1
self.endpoints.append(ep)
@@ -2524,7 +2528,8 @@ class EndpointFactory:
ep = Endpoint(
self.env,
tenant_id=tenant_id or self.env.initial_tenant,
port=self.env.port_distributor.get_port(),
pg_port=self.env.port_distributor.get_port(),
http_port=self.env.port_distributor.get_port(),
)
if endpoint_id is None:
@@ -2907,6 +2912,7 @@ SKIP_FILES = frozenset(
"pg_internal.init",
"pg.log",
"zenith.signal",
"pg_hba.conf",
"postgresql.conf",
"postmaster.opts",
"postmaster.pid",

View File

@@ -1,5 +1,6 @@
from __future__ import annotations
import json
import time
from collections import defaultdict
from dataclasses import dataclass
@@ -109,6 +110,10 @@ class PageserverHttpClient(requests.Session):
if auth_token is not None:
self.headers["Authorization"] = f"Bearer {auth_token}"
@property
def base_url(self) -> str:
return f"http://localhost:{self.port}"
def verbose_error(self, res: requests.Response):
try:
res.raise_for_status()
@@ -157,7 +162,7 @@ class PageserverHttpClient(requests.Session):
res = self.post(
f"http://localhost:{self.port}/v1/tenant",
json={
"new_tenant_id": str(new_tenant_id) if new_tenant_id else None,
"new_tenant_id": str(new_tenant_id),
**(conf or {}),
},
)
@@ -168,8 +173,22 @@ class PageserverHttpClient(requests.Session):
assert isinstance(new_tenant_id, str)
return TenantId(new_tenant_id)
def tenant_attach(self, tenant_id: TenantId):
res = self.post(f"http://localhost:{self.port}/v1/tenant/{tenant_id}/attach")
def tenant_attach(
self, tenant_id: TenantId, config: None | Dict[str, Any] = None, config_null: bool = False
):
if config_null:
assert config is None
body = "null"
else:
# null-config is prohibited by the API
if config is None:
config = {}
body = json.dumps({"config": config})
res = self.post(
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/attach",
data=body,
headers={"Content-Type": "application/json"},
)
self.verbose_error(res)
def tenant_detach(self, tenant_id: TenantId, detach_ignored=False):
@@ -274,13 +293,13 @@ class PageserverHttpClient(requests.Session):
self,
pg_version: PgVersion,
tenant_id: TenantId,
new_timeline_id: Optional[TimelineId] = None,
new_timeline_id: TimelineId,
ancestor_timeline_id: Optional[TimelineId] = None,
ancestor_start_lsn: Optional[Lsn] = None,
**kwargs,
) -> Dict[Any, Any]:
body: Dict[str, Any] = {
"new_timeline_id": str(new_timeline_id) if new_timeline_id else None,
"new_timeline_id": str(new_timeline_id),
"ancestor_start_lsn": str(ancestor_start_lsn) if ancestor_start_lsn else None,
"ancestor_timeline_id": str(ancestor_timeline_id) if ancestor_timeline_id else None,
}

View File

@@ -1,8 +1,8 @@
import time
from typing import Optional
from typing import Any, Dict, Optional
from fixtures.log_helper import log
from fixtures.pageserver.http import PageserverHttpClient
from fixtures.pageserver.http import PageserverApiException, PageserverHttpClient
from fixtures.types import Lsn, TenantId, TimelineId
@@ -72,7 +72,7 @@ def wait_until_tenant_state(
expected_state: str,
iterations: int,
period: float = 1.0,
) -> bool:
) -> Dict[str, Any]:
"""
Does not use `wait_until` for debugging purposes
"""
@@ -81,7 +81,7 @@ def wait_until_tenant_state(
tenant = pageserver_http.tenant_status(tenant_id=tenant_id)
log.debug(f"Tenant {tenant_id} data: {tenant}")
if tenant["state"]["slug"] == expected_state:
return True
return tenant
except Exception as e:
log.debug(f"Tenant {tenant_id} state retrieval failure: {e}")
@@ -92,6 +92,41 @@ def wait_until_tenant_state(
)
def wait_until_timeline_state(
pageserver_http: PageserverHttpClient,
tenant_id: TenantId,
timeline_id: TimelineId,
expected_state: str,
iterations: int,
period: float = 1.0,
) -> Dict[str, Any]:
"""
Does not use `wait_until` for debugging purposes
"""
for i in range(iterations):
try:
timeline = pageserver_http.timeline_detail(tenant_id=tenant_id, timeline_id=timeline_id)
log.debug(f"Timeline {tenant_id}/{timeline_id} data: {timeline}")
if isinstance(timeline["state"], str):
if timeline["state"] == expected_state:
return timeline
elif isinstance(timeline, Dict):
if timeline["state"].get(expected_state):
return timeline
except Exception as e:
log.debug(f"Timeline {tenant_id}/{timeline_id} state retrieval failure: {e}")
if i == iterations - 1:
# do not sleep last time, we already know that we failed
break
time.sleep(period)
raise Exception(
f"Timeline {tenant_id}/{timeline_id} did not become {expected_state} within {iterations * period} seconds"
)
def wait_until_tenant_active(
pageserver_http: PageserverHttpClient,
tenant_id: TenantId,
@@ -156,3 +191,21 @@ def wait_for_upload_queue_empty(
if all(m.value == 0 for m in tl):
return
time.sleep(0.2)
def assert_timeline_detail_404(
pageserver_http: PageserverHttpClient,
tenant_id: TenantId,
timeline_id: TimelineId,
):
"""Asserts that timeline_detail returns 404, or dumps the detail."""
try:
data = pageserver_http.timeline_detail(tenant_id, timeline_id)
log.error(f"detail {data}")
except PageserverApiException as e:
log.error(e)
if e.status_code == 404:
return
else:
raise
raise Exception("detail succeeded (it should return 404)")

View File

@@ -27,6 +27,10 @@ class PgVersion(str, enum.Enum):
def __repr__(self) -> str:
return f"'{self.value}'"
# Make this explicit for Python 3.11 compatibility, which changes the behavior of enums
def __str__(self) -> str:
return self.value
# In GitHub workflows we use Postgres version with v-prefix (e.g. v14 instead of just 14),
# sometime we need to do so in tests.
@property
@@ -78,11 +82,11 @@ def pytest_addoption(parser: Parser):
@pytest.fixture(scope="session")
def pg_version(request: FixtureRequest) -> Iterator[PgVersion]:
if v := request.config.getoption("--pg-version"):
version, source = v, "from --pg-version commad-line argument"
version, source = v, "from --pg-version command-line argument"
elif v := os.environ.get("DEFAULT_PG_VERSION"):
version, source = PgVersion(v), "from DEFAULT_PG_VERSION environment variable"
else:
version, source = DEFAULT_VERSION, "default verson"
version, source = DEFAULT_VERSION, "default version"
log.info(f"pg_version is {version} ({source})")
yield version

View File

@@ -2,7 +2,7 @@ from contextlib import closing
import pytest
from fixtures.compare_fixtures import PgCompare
from pytest_lazyfixture import lazy_fixture # type: ignore
from pytest_lazyfixture import lazy_fixture
@pytest.mark.parametrize(

View File

@@ -0,0 +1,76 @@
import pytest
from fixtures.benchmark_fixture import MetricReport, NeonBenchmarker
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnvBuilder
@pytest.mark.timeout(10000)
def test_gc_feedback(neon_env_builder: NeonEnvBuilder, zenbenchmark: NeonBenchmarker):
"""
Test that GC is able to collect all old layers even if them are forming
"stairs" and there are not three delta layers since last image layer.
Information about image layers needed to collect old layers should
be propagated by GC to compaction task which should take in in account
when make a decision which new image layers needs to be created.
"""
env = neon_env_builder.init_start()
client = env.pageserver.http_client()
tenant_id, _ = env.neon_cli.create_tenant(
conf={
# disable default GC and compaction
"gc_period": "1000 m",
"compaction_period": "0 s",
"gc_horizon": f"{1024 ** 2}",
"checkpoint_distance": f"{1024 ** 2}",
"compaction_target_size": f"{1024 ** 2}",
# set PITR interval to be small, so we can do GC
"pitr_interval": "10 s",
# "compaction_threshold": "3",
# "image_creation_threshold": "2",
}
)
endpoint = env.endpoints.create_start("main", tenant_id=tenant_id)
timeline_id = endpoint.safe_psql("show neon.timeline_id")[0][0]
n_steps = 10
n_update_iters = 100
step_size = 10000
with endpoint.cursor() as cur:
cur.execute("SET statement_timeout='1000s'")
cur.execute(
"CREATE TABLE t(step bigint, count bigint default 0, payload text default repeat(' ', 100)) with (fillfactor=50)"
)
cur.execute("CREATE INDEX ON t(step)")
# In each step, we insert 'step_size' new rows, and update the newly inserted rows
# 'n_update_iters' times. This creates a lot of churn and generates lots of WAL at the end of the table,
# without modifying the earlier parts of the table.
for step in range(n_steps):
cur.execute(f"INSERT INTO t (step) SELECT {step} FROM generate_series(1, {step_size})")
for i in range(n_update_iters):
cur.execute(f"UPDATE t set count=count+1 where step = {step}")
cur.execute("vacuum t")
# cur.execute("select pg_table_size('t')")
# logical_size = cur.fetchone()[0]
logical_size = client.timeline_detail(tenant_id, timeline_id)["current_logical_size"]
log.info(f"Logical storage size {logical_size}")
client.timeline_checkpoint(tenant_id, timeline_id)
# Do compaction and GC
client.timeline_gc(tenant_id, timeline_id, 0)
client.timeline_compact(tenant_id, timeline_id)
# One more iteration to check that no excessive image layers are generated
client.timeline_gc(tenant_id, timeline_id, 0)
client.timeline_compact(tenant_id, timeline_id)
physical_size = client.timeline_detail(tenant_id, timeline_id)["current_physical_size"]
log.info(f"Physical storage size {physical_size}")
MB = 1024 * 1024
zenbenchmark.record("logical_size", logical_size // MB, "Mb", MetricReport.LOWER_IS_BETTER)
zenbenchmark.record("physical_size", physical_size // MB, "Mb", MetricReport.LOWER_IS_BETTER)
zenbenchmark.record(
"physical/logical ratio", physical_size / logical_size, "", MetricReport.LOWER_IS_BETTER
)

View File

@@ -2,7 +2,7 @@ from contextlib import closing
import pytest
from fixtures.compare_fixtures import PgCompare
from pytest_lazyfixture import lazy_fixture # type: ignore
from pytest_lazyfixture import lazy_fixture
@pytest.mark.parametrize(

View File

@@ -2,7 +2,7 @@ from contextlib import closing
import pytest
from fixtures.compare_fixtures import PgCompare
from pytest_lazyfixture import lazy_fixture # type: ignore
from pytest_lazyfixture import lazy_fixture
@pytest.mark.parametrize(

View File

@@ -6,7 +6,7 @@ import pytest
from fixtures.benchmark_fixture import MetricReport
from fixtures.compare_fixtures import PgCompare
from fixtures.log_helper import log
from pytest_lazyfixture import lazy_fixture # type: ignore
from pytest_lazyfixture import lazy_fixture
@pytest.mark.parametrize(

View File

@@ -1,10 +1,63 @@
from contextlib import closing
import pytest
from fixtures.benchmark_fixture import NeonBenchmarker
import requests
from fixtures.benchmark_fixture import MetricReport, NeonBenchmarker
from fixtures.neon_fixtures import NeonEnvBuilder
# Just start and measure duration.
#
# This test runs pretty quickly and can be informative when used in combination
# with emulated network delay. Some useful delay commands:
#
# 1. Add 2msec delay to all localhost traffic
# `sudo tc qdisc add dev lo root handle 1:0 netem delay 2msec`
#
# 2. Test that it works (you should see 4ms ping)
# `ping localhost`
#
# 3. Revert back to normal
# `sudo tc qdisc del dev lo root netem`
#
# NOTE this test might not represent the real startup time because the basebackup
# for a large database might be larger if there's a lof of transaction metadata,
# or safekeepers might need more syncing, or there might be more operations to
# apply during config step, like more users, databases, or extensions. By default
# we load extensions 'neon,pg_stat_statements,timescaledb,pg_cron', but in this
# test we only load neon.
def test_startup_simple(neon_env_builder: NeonEnvBuilder, zenbenchmark: NeonBenchmarker):
neon_env_builder.num_safekeepers = 3
env = neon_env_builder.init_start()
env.neon_cli.create_branch("test_startup")
# We do two iterations so we can see if the second startup is faster. It should
# be because the compute node should already be configured with roles, databases,
# extensions, etc from the first run.
for i in range(2):
# Start
with zenbenchmark.record_duration(f"{i}_start_and_select"):
endpoint = env.endpoints.create_start("test_startup")
endpoint.safe_psql("select 1;")
# Get metrics
metrics = requests.get(f"http://localhost:{endpoint.http_port}/metrics.json").json()
durations = {
"wait_for_spec_ms": f"{i}_wait_for_spec",
"sync_safekeepers_ms": f"{i}_sync_safekeepers",
"basebackup_ms": f"{i}_basebackup",
"config_ms": f"{i}_config",
"total_startup_ms": f"{i}_total_startup",
}
for key, name in durations.items():
value = metrics[key]
zenbenchmark.record(name, value, "ms", report=MetricReport.LOWER_IS_BETTER)
# Stop so we can restart
endpoint.stop()
# This test sometimes runs for longer than the global 5 minute timeout.
@pytest.mark.timeout(600)
def test_startup(neon_env_builder: NeonEnvBuilder, zenbenchmark: NeonBenchmarker):

View File

@@ -0,0 +1,206 @@
from dataclasses import dataclass
from typing import Generator, Optional
import pytest
from fixtures.neon_fixtures import (
LocalFsStorage,
NeonEnv,
NeonEnvBuilder,
RemoteStorageKind,
)
from fixtures.pageserver.http import PageserverApiException, TenantConfig
from fixtures.types import TenantId
from fixtures.utils import wait_until
@pytest.fixture
def positive_env(neon_env_builder: NeonEnvBuilder) -> NeonEnv:
neon_env_builder.enable_remote_storage(
remote_storage_kind=RemoteStorageKind.LOCAL_FS,
test_name="test_attach_tenant_config",
)
env = neon_env_builder.init_start()
# eviction might be the first one after an attach to access the layers
env.pageserver.allowed_errors.append(
".*unexpectedly on-demand downloading remote layer remote.* for task kind Eviction"
)
assert isinstance(env.remote_storage, LocalFsStorage)
return env
@dataclass
class NegativeTests:
neon_env: NeonEnv
tenant_id: TenantId
config_pre_detach: TenantConfig
@pytest.fixture
def negative_env(neon_env_builder: NeonEnvBuilder) -> Generator[NegativeTests, None, None]:
neon_env_builder.enable_remote_storage(
remote_storage_kind=RemoteStorageKind.LOCAL_FS,
test_name="test_attach_tenant_config",
)
env = neon_env_builder.init_start()
assert isinstance(env.remote_storage, LocalFsStorage)
ps_http = env.pageserver.http_client()
(tenant_id, _) = env.neon_cli.create_tenant()
assert ps_http.tenant_config(tenant_id).tenant_specific_overrides == {}
config_pre_detach = ps_http.tenant_config(tenant_id)
assert tenant_id in [TenantId(t["id"]) for t in ps_http.tenant_list()]
ps_http.tenant_detach(tenant_id)
assert tenant_id not in [TenantId(t["id"]) for t in ps_http.tenant_list()]
yield NegativeTests(env, tenant_id, config_pre_detach)
assert tenant_id not in [
TenantId(t["id"]) for t in ps_http.tenant_list()
], "tenant should not be attached after negative test"
env.pageserver.allowed_errors.append(".*Error processing HTTP request: Bad request")
def log_contains_bad_request():
env.pageserver.log_contains(".*Error processing HTTP request: Bad request")
wait_until(50, 0.1, log_contains_bad_request)
def test_null_body(negative_env: NegativeTests):
"""
If we send `null` in the body, the request should be rejected with status 400.
"""
env = negative_env.neon_env
tenant_id = negative_env.tenant_id
ps_http = env.pageserver.http_client()
res = ps_http.post(
f"{ps_http.base_url}/v1/tenant/{tenant_id}/attach",
data=b"null",
headers={"Content-Type": "application/json"},
)
assert res.status_code == 400
def test_null_config(negative_env: NegativeTests):
"""
If the `config` field is `null`, the request should be rejected with status 400.
"""
env = negative_env.neon_env
tenant_id = negative_env.tenant_id
ps_http = env.pageserver.http_client()
res = ps_http.post(
f"{ps_http.base_url}/v1/tenant/{tenant_id}/attach",
data=b'{"config": null}',
headers={"Content-Type": "application/json"},
)
assert res.status_code == 400
def test_config_with_unknown_keys_is_bad_request(negative_env: NegativeTests):
"""
If we send a config with unknown keys, the request should be rejected with status 400.
"""
env = negative_env.neon_env
tenant_id = negative_env.tenant_id
ps_http = env.pageserver.http_client()
config_with_unknown_keys = {
"compaction_period": "1h",
"this_key_does_not_exist": "some value",
}
with pytest.raises(PageserverApiException) as e:
ps_http.tenant_attach(tenant_id, config=config_with_unknown_keys)
assert e.type == PageserverApiException
assert e.value.status_code == 400
@pytest.mark.parametrize("content_type", [None, "application/json"])
def test_empty_body(positive_env: NeonEnv, content_type: Optional[str]):
"""
For backwards-compatiblity: if we send an empty body,
the request should be accepted and the config should be the default config.
"""
env = positive_env
ps_http = env.pageserver.http_client()
(tenant_id, _) = env.neon_cli.create_tenant()
assert ps_http.tenant_config(tenant_id).tenant_specific_overrides == {}
config_pre_detach = ps_http.tenant_config(tenant_id)
assert tenant_id in [TenantId(t["id"]) for t in ps_http.tenant_list()]
ps_http.tenant_detach(tenant_id)
assert tenant_id not in [TenantId(t["id"]) for t in ps_http.tenant_list()]
ps_http.post(
f"{ps_http.base_url}/v1/tenant/{tenant_id}/attach",
data=b"",
headers=None if content_type else {"Content-Type": "application/json"},
).raise_for_status()
assert ps_http.tenant_config(tenant_id).tenant_specific_overrides == {}
assert ps_http.tenant_config(tenant_id).effective_config == config_pre_detach.effective_config
def test_fully_custom_config(positive_env: NeonEnv):
"""
If we send a valid config in the body, the request should be accepted and the config should be applied.
"""
env = positive_env
fully_custom_config = {
"compaction_period": "1h",
"compaction_threshold": 13,
"compaction_target_size": 1048576,
"checkpoint_distance": 10000,
"checkpoint_timeout": "13m",
"eviction_policy": {
"kind": "LayerAccessThreshold",
"period": "20s",
"threshold": "23h",
},
"evictions_low_residence_duration_metric_threshold": "2days",
"gc_feedback": True,
"gc_horizon": 23 * (1024 * 1024),
"gc_period": "2h 13m",
"image_creation_threshold": 7,
"pitr_interval": "1m",
"lagging_wal_timeout": "23m",
"max_lsn_wal_lag": 230000,
"min_resident_size_override": 23,
"trace_read_requests": True,
"walreceiver_connect_timeout": "13m",
}
ps_http = env.pageserver.http_client()
initial_tenant_config = ps_http.tenant_config(env.initial_tenant)
assert initial_tenant_config.tenant_specific_overrides == {}
assert set(initial_tenant_config.effective_config.keys()) == set(
fully_custom_config.keys()
), "ensure we cover all config options"
(tenant_id, _) = env.neon_cli.create_tenant()
ps_http.set_tenant_config(tenant_id, fully_custom_config)
our_tenant_config = ps_http.tenant_config(tenant_id)
assert our_tenant_config.tenant_specific_overrides == fully_custom_config
assert set(our_tenant_config.effective_config.keys()) == set(
fully_custom_config.keys()
), "ensure we cover all config options"
assert {
k: initial_tenant_config.effective_config[k] != our_tenant_config.effective_config[k]
for k in fully_custom_config.keys()
} == {
k: True for k in fully_custom_config.keys()
}, "ensure our custom config has different values than the default config for all config options, so we know we overrode everything"
ps_http.tenant_detach(tenant_id)
ps_http.tenant_attach(tenant_id, config=fully_custom_config)
assert ps_http.tenant_config(tenant_id).tenant_specific_overrides == fully_custom_config
assert set(ps_http.tenant_config(tenant_id).effective_config.keys()) == set(
fully_custom_config.keys()
), "ensure we cover all config options"

View File

@@ -3,7 +3,7 @@ from contextlib import closing
import pytest
from fixtures.neon_fixtures import NeonEnvBuilder, PgProtocol
from fixtures.pageserver.http import PageserverApiException
from fixtures.types import TenantId
from fixtures.types import TenantId, TimelineId
def test_pageserver_auth(neon_env_builder: NeonEnvBuilder):
@@ -25,21 +25,19 @@ def test_pageserver_auth(neon_env_builder: NeonEnvBuilder):
ps.safe_psql("set FOO", password=tenant_token)
ps.safe_psql("set FOO", password=pageserver_token)
new_timeline_id = env.neon_cli.create_branch(
"test_pageserver_auth", tenant_id=env.initial_tenant
)
# tenant can create branches
tenant_http_client.timeline_create(
pg_version=env.pg_version,
tenant_id=env.initial_tenant,
ancestor_timeline_id=new_timeline_id,
new_timeline_id=TimelineId.generate(),
ancestor_timeline_id=env.initial_timeline,
)
# console can create branches for tenant
pageserver_http_client.timeline_create(
pg_version=env.pg_version,
tenant_id=env.initial_tenant,
ancestor_timeline_id=new_timeline_id,
new_timeline_id=TimelineId.generate(),
ancestor_timeline_id=env.initial_timeline,
)
# fail to create branch using token with different tenant_id
@@ -49,7 +47,8 @@ def test_pageserver_auth(neon_env_builder: NeonEnvBuilder):
invalid_tenant_http_client.timeline_create(
pg_version=env.pg_version,
tenant_id=env.initial_tenant,
ancestor_timeline_id=new_timeline_id,
new_timeline_id=TimelineId.generate(),
ancestor_timeline_id=env.initial_timeline,
)
# create tenant using management token

View File

@@ -20,7 +20,7 @@ def test_broken_timeline(neon_env_builder: NeonEnvBuilder):
".*is not active. Current state: Broken.*",
".*will not become active. Current state: Broken.*",
".*failed to load metadata.*",
".*could not load tenant.*load local timeline.*",
".*load failed.*load local timeline.*",
]
)

View File

@@ -383,6 +383,9 @@ def check_neon_works(
cli_target = NeonCli(config_target)
# And the current binaries to launch computes
snapshot_config["neon_distrib_dir"] = str(neon_current_binpath)
with (snapshot_config_toml).open("w") as f:
toml.dump(snapshot_config, f)
config_current = copy.copy(config)
config_current.neon_binpath = neon_current_binpath
cli_current = NeonCli(config_current)
@@ -391,7 +394,8 @@ def check_neon_works(
request.addfinalizer(lambda: cli_target.raw_cli(["stop"]))
pg_port = port_distributor.get_port()
cli_current.endpoint_start("main", port=pg_port)
http_port = port_distributor.get_port()
cli_current.endpoint_start("main", pg_port=pg_port, http_port=http_port)
request.addfinalizer(lambda: cli_current.endpoint_stop("main"))
connstr = f"host=127.0.0.1 port={pg_port} user=cloud_admin dbname=postgres"

View File

@@ -1,253 +0,0 @@
import os
from pathlib import Path
from subprocess import TimeoutExpired
from fixtures.log_helper import log
from fixtures.neon_fixtures import ComputeCtl, NeonEnvBuilder, PgBin
# Test that compute_ctl works and prints "--sync-safekeepers" logs.
def test_sync_safekeepers_logs(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin):
neon_env_builder.num_safekeepers = 3
env = neon_env_builder.init_start()
ctl = ComputeCtl(env)
env.neon_cli.create_branch("test_compute_ctl", "main")
endpoint = env.endpoints.create_start("test_compute_ctl")
endpoint.safe_psql("CREATE TABLE t(key int primary key, value text)")
with open(endpoint.config_file_path(), "r") as f:
cfg_lines = f.readlines()
cfg_map = {}
for line in cfg_lines:
if "=" in line:
k, v = line.split("=")
cfg_map[k] = v.strip("\n '\"")
log.info(f"postgres config: {cfg_map}")
pgdata = endpoint.pg_data_dir_path()
pg_bin_path = os.path.join(pg_bin.pg_bin_path, "postgres")
endpoint.stop_and_destroy()
# stop_and_destroy removes the whole endpoint directory. Recreate it.
Path(pgdata).mkdir(parents=True)
spec = (
"""
{
"format_version": 1.0,
"timestamp": "2021-05-23T18:25:43.511Z",
"operation_uuid": "0f657b36-4b0f-4a2d-9c2e-1dcd615e7d8b",
"cluster": {
"cluster_id": "test-cluster-42",
"name": "Neon Test",
"state": "restarted",
"roles": [
],
"databases": [
],
"settings": [
{
"name": "fsync",
"value": "off",
"vartype": "bool"
},
{
"name": "wal_level",
"value": "replica",
"vartype": "enum"
},
{
"name": "neon.safekeepers",
"value": """
+ f'"{cfg_map["neon.safekeepers"]}"'
+ """,
"vartype": "string"
},
{
"name": "wal_log_hints",
"value": "on",
"vartype": "bool"
},
{
"name": "log_connections",
"value": "on",
"vartype": "bool"
},
{
"name": "shared_buffers",
"value": "32768",
"vartype": "integer"
},
{
"name": "port",
"value": """
+ f'"{cfg_map["port"]}"'
+ """,
"vartype": "integer"
},
{
"name": "max_connections",
"value": "100",
"vartype": "integer"
},
{
"name": "max_wal_senders",
"value": "10",
"vartype": "integer"
},
{
"name": "listen_addresses",
"value": "0.0.0.0",
"vartype": "string"
},
{
"name": "wal_sender_timeout",
"value": "0",
"vartype": "integer"
},
{
"name": "password_encryption",
"value": "md5",
"vartype": "enum"
},
{
"name": "maintenance_work_mem",
"value": "65536",
"vartype": "integer"
},
{
"name": "max_parallel_workers",
"value": "8",
"vartype": "integer"
},
{
"name": "max_worker_processes",
"value": "8",
"vartype": "integer"
},
{
"name": "neon.tenant_id",
"value": """
+ f'"{cfg_map["neon.tenant_id"]}"'
+ """,
"vartype": "string"
},
{
"name": "max_replication_slots",
"value": "10",
"vartype": "integer"
},
{
"name": "neon.timeline_id",
"value": """
+ f'"{cfg_map["neon.timeline_id"]}"'
+ """,
"vartype": "string"
},
{
"name": "shared_preload_libraries",
"value": "neon",
"vartype": "string"
},
{
"name": "synchronous_standby_names",
"value": "walproposer",
"vartype": "string"
},
{
"name": "neon.pageserver_connstring",
"value": """
+ f'"{cfg_map["neon.pageserver_connstring"]}"'
+ """,
"vartype": "string"
}
]
},
"delta_operations": [
]
}
"""
)
ps_connstr = cfg_map["neon.pageserver_connstring"]
log.info(f"ps_connstr: {ps_connstr}, pgdata: {pgdata}")
# run compute_ctl and wait for 10s
try:
ctl.raw_cli(
[
"--connstr",
"postgres://invalid/",
"--pgdata",
pgdata,
"--spec",
spec,
"--pgbin",
pg_bin_path,
],
timeout=10,
)
except TimeoutExpired as exc:
ctl_logs = (exc.stderr or b"").decode("utf-8")
log.info(f"compute_ctl stderr:\n{ctl_logs}")
with ExternalProcessManager(Path(pgdata) / "postmaster.pid"):
start = "starting safekeepers syncing"
end = "safekeepers synced at LSN"
start_pos = ctl_logs.index(start)
assert start_pos != -1
end_pos = ctl_logs.index(end, start_pos)
assert end_pos != -1
sync_safekeepers_logs = ctl_logs[start_pos : end_pos + len(end)]
log.info("sync_safekeepers_logs:\n" + sync_safekeepers_logs)
# assert that --sync-safekeepers logs are present in the output
assert "connecting with node" in sync_safekeepers_logs
assert "connected with node" in sync_safekeepers_logs
assert "proposer connected to quorum (2)" in sync_safekeepers_logs
assert "got votes from majority (2)" in sync_safekeepers_logs
assert "sending elected msg to node" in sync_safekeepers_logs
class ExternalProcessManager:
"""
Context manager that kills a process with a pid file on exit.
"""
def __init__(self, pid_file: Path):
self.path = pid_file
self.pid_file = open(pid_file, "r")
self.pid = int(self.pid_file.readline().strip())
def __enter__(self):
return self
def leave_alive(self):
self.pid_file.close()
def __exit__(self, _type, _value, _traceback):
import signal
import time
if self.pid_file.closed:
return
with self.pid_file:
try:
os.kill(self.pid, signal.SIGTERM)
except OSError as e:
if not self.path.is_file():
return
log.info(f"Failed to kill {self.pid}, but the pidfile remains: {e}")
return
for _ in range(20):
if not self.path.is_file():
return
time.sleep(0.2)
log.info("Process failed to stop after SIGTERM: {self.pid}")
os.kill(self.pid, signal.SIGKILL)

View File

@@ -0,0 +1,210 @@
from types import TracebackType
from typing import Any, Dict, List, Optional, Tuple, Type
import psycopg2
import pytest
from fixtures.log_helper import log
from fixtures.neon_fixtures import VanillaPostgres
from pytest_httpserver import HTTPServer
from werkzeug.wrappers.request import Request
from werkzeug.wrappers.response import Response
def handle_db(dbs, roles, operation):
if operation["op"] == "set":
if "old_name" in operation and operation["old_name"] in dbs:
dbs[operation["name"]] = dbs[operation["old_name"]]
dbs.pop(operation["old_name"])
if "owner" in operation:
dbs[operation["name"]] = operation["owner"]
elif operation["op"] == "del":
dbs.pop(operation["name"])
else:
raise ValueError("Invalid op")
def handle_role(dbs, roles, operation):
if operation["op"] == "set":
if "old_name" in operation and operation["old_name"] in roles:
roles[operation["name"]] = roles[operation["old_name"]]
roles.pop(operation["old_name"])
for db, owner in dbs.items():
if owner == operation["old_name"]:
dbs[db] = operation["name"]
if "password" in operation:
roles[operation["name"]] = operation["password"]
elif operation["op"] == "del":
if "old_name" in operation:
roles.pop(operation["old_name"])
roles.pop(operation["name"])
else:
raise ValueError("Invalid op")
fail = False
def ddl_forward_handler(request: Request, dbs: Dict[str, str], roles: Dict[str, str]) -> Response:
log.info(f"Received request with data {request.get_data(as_text=True)}")
if fail:
log.info("FAILING")
return Response(status=500, response="Failed just cuz")
if request.json is None:
log.info("Received invalid JSON")
return Response(status=400)
json = request.json
# Handle roles first
if "roles" in json:
for operation in json["roles"]:
handle_role(dbs, roles, operation)
if "dbs" in json:
for operation in json["dbs"]:
handle_db(dbs, roles, operation)
return Response(status=200)
class DdlForwardingContext:
def __init__(self, httpserver: HTTPServer, vanilla_pg: VanillaPostgres, host: str, port: int):
self.server = httpserver
self.pg = vanilla_pg
self.host = host
self.port = port
self.dbs: Dict[str, str] = {}
self.roles: Dict[str, str] = {}
endpoint = "/management/api/v2/roles_and_databases"
ddl_url = f"http://{host}:{port}{endpoint}"
self.pg.configure(
[
f"neon.console_url={ddl_url}",
"shared_preload_libraries = 'neon'",
]
)
log.info(f"Listening on {ddl_url}")
self.server.expect_request(endpoint, method="PATCH").respond_with_handler(
lambda request: ddl_forward_handler(request, self.dbs, self.roles)
)
def __enter__(self):
self.pg.start()
return self
def __exit__(
self,
exc_type: Optional[Type[BaseException]],
exc: Optional[BaseException],
tb: Optional[TracebackType],
):
self.pg.stop()
def send(self, query: str) -> List[Tuple[Any, ...]]:
return self.pg.safe_psql(query)
def wait(self, timeout=3):
self.server.wait(timeout=timeout)
def send_and_wait(self, query: str, timeout=3) -> List[Tuple[Any, ...]]:
res = self.send(query)
self.wait(timeout=timeout)
return res
@pytest.fixture(scope="function")
def ddl(
httpserver: HTTPServer, vanilla_pg: VanillaPostgres, httpserver_listen_address: tuple[str, int]
):
(host, port) = httpserver_listen_address
with DdlForwardingContext(httpserver, vanilla_pg, host, port) as ddl:
yield ddl
def test_ddl_forwarding(ddl: DdlForwardingContext):
curr_user = ddl.send("SELECT current_user")[0][0]
log.info(f"Current user is {curr_user}")
ddl.send_and_wait("CREATE DATABASE bork")
assert ddl.dbs == {"bork": curr_user}
ddl.send_and_wait("CREATE ROLE volk WITH PASSWORD 'nu_zayats'")
ddl.send_and_wait("ALTER DATABASE bork RENAME TO nu_pogodi")
assert ddl.dbs == {"nu_pogodi": curr_user}
ddl.send_and_wait("ALTER DATABASE nu_pogodi OWNER TO volk")
assert ddl.dbs == {"nu_pogodi": "volk"}
ddl.send_and_wait("DROP DATABASE nu_pogodi")
assert ddl.dbs == {}
ddl.send_and_wait("DROP ROLE volk")
assert ddl.roles == {}
ddl.send_and_wait("CREATE ROLE tarzan WITH PASSWORD 'of_the_apes'")
assert ddl.roles == {"tarzan": "of_the_apes"}
ddl.send_and_wait("DROP ROLE tarzan")
assert ddl.roles == {}
ddl.send_and_wait("CREATE ROLE tarzan WITH PASSWORD 'of_the_apes'")
assert ddl.roles == {"tarzan": "of_the_apes"}
ddl.send_and_wait("ALTER ROLE tarzan WITH PASSWORD 'jungle_man'")
assert ddl.roles == {"tarzan": "jungle_man"}
ddl.send_and_wait("ALTER ROLE tarzan RENAME TO mowgli")
assert ddl.roles == {"mowgli": "jungle_man"}
ddl.send_and_wait("DROP ROLE mowgli")
assert ddl.roles == {}
conn = ddl.pg.connect()
cur = conn.cursor()
cur.execute("BEGIN")
cur.execute("CREATE ROLE bork WITH PASSWORD 'cork'")
cur.execute("COMMIT")
ddl.wait()
assert ddl.roles == {"bork": "cork"}
cur.execute("BEGIN")
cur.execute("CREATE ROLE stork WITH PASSWORD 'pork'")
cur.execute("ABORT")
ddl.wait()
assert ("stork", "pork") not in ddl.roles.items()
cur.execute("BEGIN")
cur.execute("ALTER ROLE bork WITH PASSWORD 'pork'")
cur.execute("ALTER ROLE bork RENAME TO stork")
cur.execute("COMMIT")
ddl.wait()
assert ddl.roles == {"stork": "pork"}
cur.execute("BEGIN")
cur.execute("CREATE ROLE dork WITH PASSWORD 'york'")
cur.execute("SAVEPOINT point")
cur.execute("ALTER ROLE dork WITH PASSWORD 'zork'")
cur.execute("ALTER ROLE dork RENAME TO fork")
cur.execute("ROLLBACK TO SAVEPOINT point")
cur.execute("ALTER ROLE dork WITH PASSWORD 'fork'")
cur.execute("ALTER ROLE dork RENAME TO zork")
cur.execute("RELEASE SAVEPOINT point")
cur.execute("COMMIT")
ddl.wait()
assert ddl.roles == {"stork": "pork", "zork": "fork"}
cur.execute("DROP ROLE stork")
cur.execute("DROP ROLE zork")
ddl.wait()
assert ddl.roles == {}
cur.execute("CREATE ROLE bork WITH PASSWORD 'dork'")
cur.execute("CREATE ROLE stork WITH PASSWORD 'cork'")
cur.execute("BEGIN")
cur.execute("DROP ROLE bork")
cur.execute("ALTER ROLE stork RENAME TO bork")
cur.execute("COMMIT")
ddl.wait()
assert ddl.roles == {"bork": "cork"}
cur.execute("DROP ROLE bork")
ddl.wait()
assert ddl.roles == {}
cur.execute("CREATE ROLE bork WITH PASSWORD 'dork'")
cur.execute("CREATE DATABASE stork WITH OWNER=bork")
cur.execute("ALTER ROLE bork RENAME TO cork")
ddl.wait()
assert ddl.dbs == {"stork": "cork"}
with pytest.raises(psycopg2.InternalError):
global fail
fail = True
cur.execute("CREATE DATABASE failure WITH OWNER=cork")
ddl.wait()
conn.close()

View File

@@ -110,6 +110,12 @@ class EvictionEnv:
overrides=(
"--pageserver-config-override=disk_usage_based_eviction="
+ enc.dump_inline_table(disk_usage_config).replace("\n", " "),
# Disk usage based eviction runs as a background task.
# But pageserver startup delays launch of background tasks for some time, to prioritize initial logical size calculations during startup.
# But, initial logical size calculation may not be triggered if safekeepers don't publish new broker messages.
# But, we only have a 10-second-timeout in this test.
# So, disable the delay for this test.
"--pageserver-config-override=background_task_maximum_delay='0s'",
),
)

View File

@@ -79,6 +79,7 @@ def test_import_from_vanilla(test_output_dir, pg_bin, vanilla_pg, neon_env_build
# Set up pageserver for import
neon_env_builder.enable_local_fs_remote_storage()
env = neon_env_builder.init_start()
client = env.pageserver.http_client()
client.tenant_create(tenant)
@@ -145,6 +146,11 @@ def test_import_from_vanilla(test_output_dir, pg_bin, vanilla_pg, neon_env_build
)
# NOTE: delete can easily come before upload operations are completed
# https://github.com/neondatabase/neon/issues/4326
env.pageserver.allowed_errors.append(
".*files not bound to index_file.json, proceeding with their deletion.*"
)
client.timeline_delete(tenant, timeline)
# Importing correct backup works

View File

@@ -228,7 +228,6 @@ def proxy_with_metric_collector(
@pytest.mark.asyncio
async def test_proxy_metric_collection(
httpserver: HTTPServer,
httpserver_listen_address,
proxy_with_metric_collector: NeonProxy,
vanilla_pg: VanillaPostgres,
):

View File

@@ -9,11 +9,18 @@ def test_neon_cli_basics(neon_env_builder: NeonEnvBuilder, port_distributor: Por
try:
env.neon_cli.start()
env.neon_cli.create_tenant(tenant_id=env.initial_tenant, set_default=True)
env.neon_cli.endpoint_start(endpoint_id="ep-main", port=port_distributor.get_port())
pg_port = port_distributor.get_port()
http_port = port_distributor.get_port()
env.neon_cli.endpoint_start(
endpoint_id="ep-basic-main", pg_port=pg_port, http_port=http_port
)
env.neon_cli.create_branch(new_branch_name="migration_check")
pg_port = port_distributor.get_port()
http_port = port_distributor.get_port()
env.neon_cli.endpoint_start(
endpoint_id="ep-migration_check", port=port_distributor.get_port()
endpoint_id="ep-migration_check", pg_port=pg_port, http_port=http_port
)
finally:
env.neon_cli.stop()

View File

@@ -58,11 +58,8 @@ def test_ondemand_download_large_rel(
)
##### First start, insert secret data and upload it to the remote storage
env = neon_env_builder.init_start()
# Override defaults, to create more layers
tenant, _ = env.neon_cli.create_tenant(
conf={
env = neon_env_builder.init_start(
initial_tenant_conf={
# disable background GC
"gc_period": "0s",
"gc_horizon": f"{10 * 1024 ** 3}", # 10 GB
@@ -75,7 +72,6 @@ def test_ondemand_download_large_rel(
"compaction_period": "0s",
}
)
env.initial_tenant = tenant
endpoint = env.endpoints.create_start("main")

View File

@@ -17,12 +17,6 @@ def test_pageserver_restarts_under_worload(neon_simple_env: NeonEnv, pg_bin: PgB
n_restarts = 10
scale = 10
# the background task may complete the init task delay after finding an
# active tenant, but shutdown starts right before Tenant::gc_iteration
env.pageserver.allowed_errors.append(
r".*Gc failed, retrying in \S+: Cannot run GC iteration on inactive tenant"
)
def run_pgbench(connstr: str):
log.info(f"Start a pgbench workload on pg {connstr}")
pg_bin.run_capture(["pgbench", "-i", f"-s{scale}", connstr])

View File

@@ -225,3 +225,37 @@ def test_sql_over_http(static_proxy: NeonProxy):
res = q("drop table t")
assert res["command"] == "DROP"
assert res["rowCount"] is None
def test_sql_over_http_output_options(static_proxy: NeonProxy):
static_proxy.safe_psql("create role http2 with login password 'http2' superuser")
def q(sql: str, raw_text: bool, array_mode: bool, params: List[Any] = []) -> Any:
connstr = (
f"postgresql://http2:http2@{static_proxy.domain}:{static_proxy.proxy_port}/postgres"
)
response = requests.post(
f"https://{static_proxy.domain}:{static_proxy.external_http_port}/sql",
data=json.dumps({"query": sql, "params": params}),
headers={
"Content-Type": "application/sql",
"Neon-Connection-String": connstr,
"Neon-Raw-Text-Output": "true" if raw_text else "false",
"Neon-Array-Mode": "true" if array_mode else "false",
},
verify=str(static_proxy.test_output_dir / "proxy.crt"),
)
assert response.status_code == 200
return response.json()
rows = q("select 1 as n, 'a' as s, '{1,2,3}'::int4[] as arr", False, False)["rows"]
assert rows == [{"arr": [1, 2, 3], "n": 1, "s": "a"}]
rows = q("select 1 as n, 'a' as s, '{1,2,3}'::int4[] as arr", False, True)["rows"]
assert rows == [[1, "a", [1, 2, 3]]]
rows = q("select 1 as n, 'a' as s, '{1,2,3}'::int4[] as arr", True, False)["rows"]
assert rows == [{"arr": "{1,2,3}", "n": "1", "s": "a"}]
rows = q("select 1 as n, 'a' as s, '{1,2,3}'::int4[] as arr", True, True)["rows"]
assert rows == [["1", "a", "{1,2,3}"]]

View File

@@ -20,6 +20,7 @@ from fixtures.neon_fixtures import (
)
from fixtures.pageserver.http import PageserverApiException, PageserverHttpClient
from fixtures.pageserver.utils import (
assert_timeline_detail_404,
wait_for_last_record_lsn,
wait_for_upload,
wait_until_tenant_active,
@@ -140,14 +141,19 @@ def test_remote_storage_backup_and_restore(
# This is before the failures injected by test_remote_failures, so it's a permanent error.
pageserver_http.configure_failpoints(("storage-sync-list-remote-timelines", "return"))
env.pageserver.allowed_errors.append(
".*error attaching tenant: storage-sync-list-remote-timelines",
".*attach failed.*: storage-sync-list-remote-timelines",
)
# Attach it. This HTTP request will succeed and launch a
# background task to load the tenant. In that background task,
# listing the remote timelines will fail because of the failpoint,
# and the tenant will be marked as Broken.
client.tenant_attach(tenant_id)
wait_until_tenant_state(pageserver_http, tenant_id, "Broken", 15)
tenant_info = wait_until_tenant_state(pageserver_http, tenant_id, "Broken", 15)
assert tenant_info["attachment_status"] == {
"slug": "failed",
"data": {"reason": "storage-sync-list-remote-timelines"},
}
# Ensure that even though the tenant is broken, we can't attach it again.
with pytest.raises(Exception, match=f"tenant {tenant_id} already exists, state: Broken"):
@@ -177,7 +183,7 @@ def test_remote_storage_backup_and_restore(
wait_until_tenant_active(
pageserver_http=client,
tenant_id=tenant_id,
iterations=5,
iterations=10, # make it longer for real_s3 tests when unreliable wrapper is involved
)
detail = client.timeline_detail(tenant_id, timeline_id)
@@ -593,8 +599,23 @@ def test_timeline_deletion_with_files_stuck_in_upload_queue(
)
client.timeline_delete(tenant_id, timeline_id)
env.pageserver.allowed_errors.append(f".*Timeline {tenant_id}/{timeline_id} was not found.*")
env.pageserver.allowed_errors.append(
".*files not bound to index_file.json, proceeding with their deletion.*"
)
wait_until(2, 0.5, lambda: assert_timeline_detail_404(client, tenant_id, timeline_id))
assert not timeline_path.exists()
# to please mypy
assert isinstance(env.remote_storage, LocalFsStorage)
remote_timeline_path = (
env.remote_storage.root / "tenants" / str(tenant_id) / "timelines" / str(timeline_id)
)
assert not list(remote_timeline_path.iterdir())
# timeline deletion should kill ongoing uploads, so, the metric will be gone
assert get_queued_count(file_kind="index", op_kind="upload") is None
@@ -693,15 +714,15 @@ def test_empty_branch_remote_storage_upload_on_restart(
f".*POST.* path=/v1/tenant/{env.initial_tenant}/timeline.* request was dropped before completing"
)
# index upload is now hitting the failpoint, should not block the shutdown
env.pageserver.stop()
# index upload is now hitting the failpoint, it should block the shutdown
env.pageserver.stop(immediate=True)
timeline_path = (
Path("tenants") / str(env.initial_tenant) / "timelines" / str(new_branch_timeline_id)
)
local_metadata = env.repo_dir / timeline_path / "metadata"
assert local_metadata.is_file(), "timeout cancelled timeline branching, not the upload"
assert local_metadata.is_file()
assert isinstance(env.remote_storage, LocalFsStorage)
new_branch_on_remote_storage = env.remote_storage.root / timeline_path

View File

@@ -4,7 +4,7 @@ from pathlib import Path
from types import TracebackType
from typing import Optional, Type
import backoff # type: ignore
import backoff
from fixtures.log_helper import log
from fixtures.neon_fixtures import PgProtocol, PortDistributor, VanillaPostgres
@@ -37,6 +37,7 @@ class PgSniRouter(PgProtocol):
destination: str,
tls_cert: Path,
tls_key: Path,
test_output_dir: Path,
):
# Must use a hostname rather than IP here, for SNI to work
host = "localhost"
@@ -49,6 +50,7 @@ class PgSniRouter(PgProtocol):
self.tls_cert = tls_cert
self.tls_key = tls_key
self._popen: Optional[subprocess.Popen[bytes]] = None
self.test_output_dir = test_output_dir
def start(self) -> "PgSniRouter":
assert self._popen is None
@@ -60,8 +62,12 @@ class PgSniRouter(PgProtocol):
*["--destination", self.destination],
]
self._popen = subprocess.Popen(args)
router_log_path = self.test_output_dir / "pg_sni_router.log"
router_log = open(router_log_path, "w")
self._popen = subprocess.Popen(args, stderr=router_log)
self._wait_until_ready()
log.info(f"pg_sni_router started, log file: {router_log_path}")
return self
@backoff.on_exception(backoff.expo, OSError, max_time=10)
@@ -121,6 +127,7 @@ def test_pg_sni_router(
destination="localtest.me",
tls_cert=test_output_dir / "router.crt",
tls_key=test_output_dir / "router.key",
test_output_dir=test_output_dir,
) as router:
router.start()

View File

@@ -62,6 +62,7 @@ eviction_policy = { "kind" = "LayerAccessThreshold", period = "20s", threshold =
log.info(f"show {env.initial_tenant}")
pscur.execute(f"show {env.initial_tenant}")
res = pscur.fetchone()
assert res is not None
assert all(
i in res.items()
for i in {
@@ -101,6 +102,7 @@ eviction_policy = { "kind" = "LayerAccessThreshold", period = "20s", threshold =
pscur.execute(f"show {tenant}")
res = pscur.fetchone()
log.info(f"res: {res}")
assert res is not None
assert all(
i in res.items()
for i in {
@@ -163,6 +165,7 @@ eviction_policy = { "kind" = "LayerAccessThreshold", period = "20s", threshold =
pscur.execute(f"show {tenant}")
res = pscur.fetchone()
log.info(f"after config res: {res}")
assert res is not None
assert all(
i in res.items()
for i in {
@@ -218,6 +221,7 @@ eviction_policy = { "kind" = "LayerAccessThreshold", period = "20s", threshold =
pscur.execute(f"show {tenant}")
res = pscur.fetchone()
log.info(f"after restart res: {res}")
assert res is not None
assert all(
i in res.items()
for i in {
@@ -278,6 +282,7 @@ eviction_policy = { "kind" = "LayerAccessThreshold", period = "20s", threshold =
pscur.execute(f"show {tenant}")
res = pscur.fetchone()
log.info(f"after restart res: {res}")
assert res is not None
assert all(
i in res.items()
for i in {

View File

@@ -59,6 +59,13 @@ def test_tenant_reattach(
# create new nenant
tenant_id, timeline_id = env.neon_cli.create_tenant()
# Attempts to connect from compute to pageserver while the tenant is
# temporarily detached produces these errors in the pageserver log.
env.pageserver.allowed_errors.append(f".*Tenant {tenant_id} not found.*")
env.pageserver.allowed_errors.append(
f".*Tenant {tenant_id} will not become active\\. Current state: Stopping.*"
)
with env.endpoints.create_start("main", tenant_id=tenant_id) as endpoint:
with endpoint.cursor() as cur:
cur.execute("CREATE TABLE t(key int primary key, value text)")
@@ -223,13 +230,6 @@ def test_tenant_reattach_while_busy(
)
env = neon_env_builder.init_start()
# Attempts to connect from compute to pageserver while the tenant is
# temporarily detached produces these errors in the pageserver log.
env.pageserver.allowed_errors.append(".*Tenant .* not found.*")
env.pageserver.allowed_errors.append(
".*Tenant .* will not become active\\. Current state: Stopping.*"
)
pageserver_http = env.pageserver.http_client()
# create new nenant
@@ -238,6 +238,13 @@ def test_tenant_reattach_while_busy(
conf={"checkpoint_distance": "100000"}
)
# Attempts to connect from compute to pageserver while the tenant is
# temporarily detached produces these errors in the pageserver log.
env.pageserver.allowed_errors.append(f".*Tenant {tenant_id} not found.*")
env.pageserver.allowed_errors.append(
f".*Tenant {tenant_id} will not become active\\. Current state: Stopping.*"
)
endpoint = env.endpoints.create_start("main", tenant_id=tenant_id)
cur = endpoint.connect().cursor()
@@ -275,6 +282,13 @@ def test_tenant_detach_smoke(neon_env_builder: NeonEnvBuilder):
# create new nenant
tenant_id, timeline_id = env.neon_cli.create_tenant()
# Attempts to connect from compute to pageserver while the tenant is
# temporarily detached produces these errors in the pageserver log.
env.pageserver.allowed_errors.append(f".*Tenant {tenant_id} not found.*")
env.pageserver.allowed_errors.append(
f".*Tenant {tenant_id} will not become active\\. Current state: Stopping.*"
)
# assert tenant exists on disk
assert (env.repo_dir / "tenants" / str(tenant_id)).exists()
@@ -336,6 +350,13 @@ def test_tenant_detach_ignored_tenant(neon_simple_env: NeonEnv):
# create a new tenant
tenant_id, _ = env.neon_cli.create_tenant()
# Attempts to connect from compute to pageserver while the tenant is
# temporarily detached produces these errors in the pageserver log.
env.pageserver.allowed_errors.append(f".*Tenant {tenant_id} not found.*")
env.pageserver.allowed_errors.append(
f".*Tenant {tenant_id} will not become active\\. Current state: Stopping.*"
)
# assert tenant exists on disk
assert (env.repo_dir / "tenants" / str(tenant_id)).exists()
@@ -385,6 +406,13 @@ def test_tenant_detach_regular_tenant(neon_simple_env: NeonEnv):
# create a new tenant
tenant_id, _ = env.neon_cli.create_tenant()
# Attempts to connect from compute to pageserver while the tenant is
# temporarily detached produces these errors in the pageserver log.
env.pageserver.allowed_errors.append(f".*Tenant {tenant_id} not found.*")
env.pageserver.allowed_errors.append(
f".*Tenant {tenant_id} will not become active\\. Current state: Stopping.*"
)
# assert tenant exists on disk
assert (env.repo_dir / "tenants" / str(tenant_id)).exists()
@@ -399,6 +427,7 @@ def test_tenant_detach_regular_tenant(neon_simple_env: NeonEnv):
log.info("detaching regular tenant with detach ignored flag")
client.tenant_detach(tenant_id, True)
log.info("regular tenant detached without error")
# check that nothing is left on disk for deleted tenant
@@ -432,6 +461,13 @@ def test_detach_while_attaching(
tenant_id = TenantId(endpoint.safe_psql("show neon.tenant_id")[0][0])
timeline_id = TimelineId(endpoint.safe_psql("show neon.timeline_id")[0][0])
# Attempts to connect from compute to pageserver while the tenant is
# temporarily detached produces these errors in the pageserver log.
env.pageserver.allowed_errors.append(f".*Tenant {tenant_id} not found.*")
env.pageserver.allowed_errors.append(
f".*Tenant {tenant_id} will not become active\\. Current state: Stopping.*"
)
# Create table, and insert some rows. Make it big enough that it doesn't fit in
# shared_buffers, otherwise the SELECT after restart will just return answer
# from shared_buffers without hitting the page server, which defeats the point
@@ -496,7 +532,7 @@ def test_ignored_tenant_reattach(
):
neon_env_builder.enable_remote_storage(
remote_storage_kind=remote_storage_kind,
test_name="test_remote_storage_backup_and_restore",
test_name="test_ignored_tenant_reattach",
)
env = neon_env_builder.init_start()
pageserver_http = env.pageserver.http_client()
@@ -577,6 +613,13 @@ def test_ignored_tenant_download_missing_layers(
tenant_id = TenantId(endpoint.safe_psql("show neon.tenant_id")[0][0])
timeline_id = TimelineId(endpoint.safe_psql("show neon.timeline_id")[0][0])
# Attempts to connect from compute to pageserver while the tenant is
# temporarily detached produces these errors in the pageserver log.
env.pageserver.allowed_errors.append(f".*Tenant {tenant_id} not found.*")
env.pageserver.allowed_errors.append(
f".*Tenant {tenant_id} will not become active\\. Current state: Stopping.*"
)
data_id = 1
data_secret = "very secret secret"
insert_test_data(pageserver_http, tenant_id, timeline_id, data_id, data_secret, endpoint)
@@ -636,6 +679,13 @@ def test_ignored_tenant_stays_broken_without_metadata(
tenant_id = TenantId(endpoint.safe_psql("show neon.tenant_id")[0][0])
timeline_id = TimelineId(endpoint.safe_psql("show neon.timeline_id")[0][0])
# Attempts to connect from compute to pageserver while the tenant is
# temporarily detached produces these errors in the pageserver log.
env.pageserver.allowed_errors.append(f".*Tenant {tenant_id} not found.*")
env.pageserver.allowed_errors.append(
f".*Tenant {tenant_id} will not become active\\. Current state: Broken.*"
)
# ignore the tenant and remove its metadata
pageserver_http.tenant_ignore(tenant_id)
tenant_timeline_dir = env.repo_dir / "tenants" / str(tenant_id) / "timelines" / str(timeline_id)
@@ -647,7 +697,9 @@ def test_ignored_tenant_stays_broken_without_metadata(
metadata_removed = True
assert metadata_removed, f"Failed to find metadata file in {tenant_timeline_dir}"
env.pageserver.allowed_errors.append(".*could not load tenant .*?: failed to load metadata.*")
env.pageserver.allowed_errors.append(
f".*{tenant_id}.*: load failed.*: failed to load metadata.*"
)
# now, load it from the local files and expect it to be broken due to inability to load tenant files into memory
pageserver_http.tenant_load(tenant_id=tenant_id)
@@ -670,6 +722,13 @@ def test_load_attach_negatives(
tenant_id = TenantId(endpoint.safe_psql("show neon.tenant_id")[0][0])
# Attempts to connect from compute to pageserver while the tenant is
# temporarily detached produces these errors in the pageserver log.
env.pageserver.allowed_errors.append(f".*Tenant {tenant_id} not found.*")
env.pageserver.allowed_errors.append(
f".*Tenant {tenant_id} will not become active\\. Current state: Stopping.*"
)
env.pageserver.allowed_errors.append(".*tenant .*? already exists, state:.*")
with pytest.raises(
expected_exception=PageserverApiException,
@@ -712,6 +771,13 @@ def test_ignore_while_attaching(
tenant_id = TenantId(endpoint.safe_psql("show neon.tenant_id")[0][0])
timeline_id = TimelineId(endpoint.safe_psql("show neon.timeline_id")[0][0])
# Attempts to connect from compute to pageserver while the tenant is
# temporarily detached produces these errors in the pageserver log.
env.pageserver.allowed_errors.append(f".*Tenant {tenant_id} not found.*")
env.pageserver.allowed_errors.append(
f".*Tenant {tenant_id} will not become active\\. Current state: Stopping.*"
)
data_id = 1
data_secret = "very secret secret"
insert_test_data(pageserver_http, tenant_id, timeline_id, data_id, data_secret, endpoint)

View File

@@ -318,7 +318,7 @@ def test_only_heads_within_horizon(neon_simple_env: NeonEnv, test_output_dir: Pa
def test_single_branch_get_tenant_size_grows(
neon_env_builder: NeonEnvBuilder, test_output_dir: Path
neon_env_builder: NeonEnvBuilder, test_output_dir: Path, pg_version: PgVersion
):
"""
Operate on single branch reading the tenants size after each transaction.
@@ -333,6 +333,13 @@ def test_single_branch_get_tenant_size_grows(
# that there next_gc_cutoff could be smaller than initdb_lsn, which will
# obviously lead to issues when calculating the size.
gc_horizon = 0x38000
# it's a bit of a hack, but different versions of postgres have different
# amount of WAL generated for the same amount of data. so we need to
# adjust the gc_horizon accordingly.
if pg_version == PgVersion.V14:
gc_horizon = 0x40000
neon_env_builder.pageserver_config_override = f"tenant_config={{compaction_period='0s', gc_period='0s', pitr_interval='0sec', gc_horizon={gc_horizon}}}"
env = neon_env_builder.init_start()

View File

@@ -22,6 +22,7 @@ from fixtures.neon_fixtures import (
available_remote_storages,
)
from fixtures.types import Lsn, TenantId, TimelineId
from fixtures.utils import wait_until
from prometheus_client.samples import Sample
@@ -266,6 +267,7 @@ def test_pageserver_metrics_removed_after_detach(
cur.execute("INSERT INTO t SELECT generate_series(1,100000), 'payload'")
cur.execute("SELECT sum(key) FROM t")
assert cur.fetchone() == (5000050000,)
endpoint.stop()
def get_ps_metric_samples_for_tenant(tenant_id: TenantId) -> List[Sample]:
ps_metrics = env.pageserver.http_client().get_metrics()
@@ -308,9 +310,7 @@ def test_pageserver_with_empty_tenants(
env.pageserver.allowed_errors.append(
".*marking .* as locally complete, while it doesnt exist in remote index.*"
)
env.pageserver.allowed_errors.append(
".*could not load tenant.*Failed to list timelines directory.*"
)
env.pageserver.allowed_errors.append(".*load failed.*list timelines directory.*")
client = env.pageserver.http_client()
@@ -341,9 +341,15 @@ def test_pageserver_with_empty_tenants(
env.pageserver.start()
client = env.pageserver.http_client()
tenants = client.tenant_list()
assert len(tenants) == 2
def not_loading():
tenants = client.tenant_list()
assert len(tenants) == 2
assert all(t["state"]["slug"] != "Loading" for t in tenants)
wait_until(10, 0.2, not_loading)
tenants = client.tenant_list()
[broken_tenant] = [t for t in tenants if t["id"] == str(tenant_without_timelines_dir)]
assert (
@@ -355,7 +361,7 @@ def test_pageserver_with_empty_tenants(
broken_tenant_status["state"]["slug"] == "Broken"
), f"Tenant {tenant_without_timelines_dir} without timelines dir should be broken"
assert env.pageserver.log_contains(".*Setting tenant as Broken state, reason:.*")
assert env.pageserver.log_contains(".*load failed, setting tenant state to Broken:.*")
[loaded_tenant] = [t for t in tenants if t["id"] == str(tenant_with_empty_timelines)]
assert (

View File

@@ -3,6 +3,7 @@ import queue
import shutil
import threading
from pathlib import Path
from typing import Optional
import pytest
import requests
@@ -11,13 +12,16 @@ from fixtures.neon_fixtures import (
NeonEnv,
NeonEnvBuilder,
RemoteStorageKind,
S3Storage,
available_remote_storages,
)
from fixtures.pageserver.http import PageserverApiException
from fixtures.pageserver.utils import (
assert_timeline_detail_404,
wait_for_last_record_lsn,
wait_for_upload,
wait_until_tenant_active,
wait_until_timeline_state,
)
from fixtures.types import Lsn, TenantId, TimelineId
from fixtures.utils import query_scalar, wait_until
@@ -68,7 +72,7 @@ def test_timeline_delete(neon_simple_env: NeonEnv):
ps_http.timeline_delete(env.initial_tenant, parent_timeline_id)
assert exc.value.status_code == 400
assert exc.value.status_code == 412
timeline_path = (
env.repo_dir / "tenants" / str(env.initial_tenant) / "timelines" / str(leaf_timeline_id)
@@ -130,13 +134,25 @@ def test_delete_timeline_post_rm_failure(
env = neon_env_builder.init_start()
assert env.initial_timeline
env.pageserver.allowed_errors.append(".*Error: failpoint: timeline-delete-after-rm")
env.pageserver.allowed_errors.append(".*Ignoring state update Stopping for broken timeline")
ps_http = env.pageserver.http_client()
failpoint_name = "timeline-delete-after-rm"
ps_http.configure_failpoints((failpoint_name, "return"))
with pytest.raises(PageserverApiException, match=f"failpoint: {failpoint_name}"):
ps_http.timeline_delete(env.initial_tenant, env.initial_timeline)
ps_http.timeline_delete(env.initial_tenant, env.initial_timeline)
timeline_info = wait_until_timeline_state(
pageserver_http=ps_http,
tenant_id=env.initial_tenant,
timeline_id=env.initial_timeline,
expected_state="Broken",
iterations=2, # effectively try immediately and retry once in one second
)
timeline_info["state"]["Broken"]["reason"] == "failpoint: timeline-delete-after-rm"
at_failpoint_log_message = f".*{env.initial_timeline}.*at failpoint {failpoint_name}.*"
env.pageserver.allowed_errors.append(at_failpoint_log_message)
@@ -148,11 +164,14 @@ def test_delete_timeline_post_rm_failure(
ps_http.configure_failpoints((failpoint_name, "off"))
# this should succeed
# this also checks that delete can be retried even when timeline is in Broken state
ps_http.timeline_delete(env.initial_tenant, env.initial_timeline, timeout=2)
# the second call will try to transition the timeline into Stopping state, but it's already in that state
env.pageserver.allowed_errors.append(
f".*{env.initial_timeline}.*Ignoring new state, equal to the existing one: Stopping"
)
with pytest.raises(PageserverApiException) as e:
ps_http.timeline_detail(env.initial_tenant, env.initial_timeline)
assert e.value.status_code == 404
env.pageserver.allowed_errors.append(f".*NotFound: Timeline.*{env.initial_timeline}.*")
env.pageserver.allowed_errors.append(
f".*{env.initial_timeline}.*timeline directory not found, proceeding anyway.*"
)
@@ -230,6 +249,12 @@ def test_timeline_resurrection_on_attach(
# delete new timeline
ps_http.timeline_delete(tenant_id=tenant_id, timeline_id=branch_timeline_id)
env.pageserver.allowed_errors.append(
f".*Timeline {tenant_id}/{branch_timeline_id} was not found.*"
)
wait_until(2, 0.5, lambda: assert_timeline_detail_404(ps_http, tenant_id, branch_timeline_id))
##### Stop the pageserver instance, erase all its data
env.endpoints.stop_all()
env.pageserver.stop()
@@ -252,12 +277,31 @@ def test_timeline_resurrection_on_attach(
assert all([tl["state"] == "Active" for tl in timelines])
def assert_prefix_empty(neon_env_builder: NeonEnvBuilder, prefix: Optional[str] = None):
# For local_fs we need to properly handle empty directories, which we currently dont, so for simplicity stick to s3 api.
assert neon_env_builder.remote_storage_kind in (
RemoteStorageKind.MOCK_S3,
RemoteStorageKind.REAL_S3,
)
# For mypy
assert isinstance(neon_env_builder.remote_storage, S3Storage)
# Note that this doesnt use pagination, so list is not guaranteed to be exhaustive.
response = neon_env_builder.remote_storage_client.list_objects_v2(
Bucket=neon_env_builder.remote_storage.bucket_name,
Prefix=prefix or neon_env_builder.remote_storage.prefix_in_bucket or "",
)
objects = response.get("Contents")
assert (
response["KeyCount"] == 0
), f"remote dir with prefix {prefix} is not empty after deletion: {objects}"
def test_timeline_delete_fail_before_local_delete(neon_env_builder: NeonEnvBuilder):
"""
When deleting a timeline, if we succeed in setting the deleted flag remotely
but fail to delete the local state, restarting the pageserver should resume
the deletion of the local state.
(Deletion of the state in S3 is not implemented yet.)
"""
neon_env_builder.enable_remote_storage(
@@ -271,8 +315,9 @@ def test_timeline_delete_fail_before_local_delete(neon_env_builder: NeonEnvBuild
env.pageserver.allowed_errors.append(
".*Ignoring new state, equal to the existing one: Stopping"
)
# this happens, because the stuck timeline is visible to shutdown
env.pageserver.allowed_errors.append(
".*during shutdown: cannot flush frozen layers when flush_loop is not running, state is Exited"
".*freeze_and_flush_on_shutdown.+: failed to freeze and flush: cannot flush frozen layers when flush_loop is not running, state is Exited"
)
ps_http = env.pageserver.http_client()
@@ -292,11 +337,17 @@ def test_timeline_delete_fail_before_local_delete(neon_env_builder: NeonEnvBuild
env.repo_dir / "tenants" / str(env.initial_tenant) / "timelines" / str(leaf_timeline_id)
)
with pytest.raises(
PageserverApiException,
match="failpoint: timeline-delete-before-rm",
):
ps_http.timeline_delete(env.initial_tenant, leaf_timeline_id)
ps_http.timeline_delete(env.initial_tenant, leaf_timeline_id)
timeline_info = wait_until_timeline_state(
pageserver_http=ps_http,
tenant_id=env.initial_tenant,
timeline_id=leaf_timeline_id,
expected_state="Broken",
iterations=2, # effectively try immediately and retry once in one second
)
timeline_info["state"]["Broken"]["reason"] == "failpoint: timeline-delete-after-rm"
assert leaf_timeline_path.exists(), "the failpoint didn't work"
@@ -304,7 +355,14 @@ def test_timeline_delete_fail_before_local_delete(neon_env_builder: NeonEnvBuild
env.pageserver.start()
# Wait for tenant to finish loading.
wait_until_tenant_active(ps_http, tenant_id=env.initial_tenant, iterations=10, period=0.5)
wait_until_tenant_active(ps_http, tenant_id=env.initial_tenant, iterations=10, period=1)
env.pageserver.allowed_errors.append(
f".*Timeline {env.initial_tenant}/{leaf_timeline_id} was not found.*"
)
wait_until(
2, 0.5, lambda: assert_timeline_detail_404(ps_http, env.initial_tenant, leaf_timeline_id)
)
assert (
not leaf_timeline_path.exists()
@@ -316,6 +374,50 @@ def test_timeline_delete_fail_before_local_delete(neon_env_builder: NeonEnvBuild
}, "other timelines should not have been affected"
assert all([tl["state"] == "Active" for tl in timelines])
assert_prefix_empty(
neon_env_builder,
prefix="/".join(
(
"tenants",
str(env.initial_tenant),
"timelines",
str(leaf_timeline_id),
)
),
)
assert env.initial_timeline is not None
for timeline_id in (intermediate_timeline_id, env.initial_timeline):
ps_http.timeline_delete(env.initial_tenant, timeline_id)
env.pageserver.allowed_errors.append(
f".*Timeline {env.initial_tenant}/{timeline_id} was not found.*"
)
wait_until(
2, 0.5, lambda: assert_timeline_detail_404(ps_http, env.initial_tenant, timeline_id)
)
assert_prefix_empty(
neon_env_builder,
prefix="/".join(
(
"tenants",
str(env.initial_tenant),
"timelines",
str(timeline_id),
)
),
)
# for some reason the check above doesnt immediately take effect for the below.
# Assume it is mock server incosistency and check twice.
wait_until(
2,
0.5,
lambda: assert_prefix_empty(neon_env_builder),
)
def test_concurrent_timeline_delete_if_first_stuck_at_index_upload(
neon_env_builder: NeonEnvBuilder,
@@ -371,7 +473,7 @@ def test_concurrent_timeline_delete_if_first_stuck_at_index_upload(
# make the second call and assert behavior
log.info("second call start")
error_msg_re = "another task is already setting the deleted_flag, started at"
error_msg_re = "timeline deletion is already in progress"
with pytest.raises(PageserverApiException, match=error_msg_re) as second_call_err:
ps_http.timeline_delete(env.initial_tenant, child_timeline_id)
assert second_call_err.value.status_code == 500
@@ -437,12 +539,106 @@ def test_delete_timeline_client_hangup(neon_env_builder: NeonEnvBuilder):
wait_until(50, 0.1, got_hangup_log_message)
# ok, retry without failpoint, it should succeed
# check that the timeline is still present
ps_http.timeline_detail(env.initial_tenant, child_timeline_id)
# ok, disable the failpoint to let the deletion finish
ps_http.configure_failpoints((failpoint_name, "off"))
# this should succeed
ps_http.timeline_delete(env.initial_tenant, child_timeline_id, timeout=2)
# the second call will try to transition the timeline into Stopping state, but it's already in that state
env.pageserver.allowed_errors.append(
f".*{child_timeline_id}.*Ignoring new state, equal to the existing one: Stopping"
def first_request_finished():
message = f".*DELETE.*{child_timeline_id}.*Cancelled request finished"
assert env.pageserver.log_contains(message)
wait_until(50, 0.1, first_request_finished)
# check that the timeline is gone
notfound_message = f"Timeline {env.initial_tenant}/{child_timeline_id} was not found"
env.pageserver.allowed_errors.append(".*" + notfound_message)
with pytest.raises(PageserverApiException, match=notfound_message) as exc:
ps_http.timeline_detail(env.initial_tenant, child_timeline_id)
assert exc.value.status_code == 404
@pytest.mark.parametrize(
"remote_storage_kind",
list(
filter(
lambda s: s in (RemoteStorageKind.MOCK_S3, RemoteStorageKind.REAL_S3),
available_remote_storages(),
)
),
)
def test_timeline_delete_works_for_remote_smoke(
neon_env_builder: NeonEnvBuilder,
remote_storage_kind: RemoteStorageKind,
):
neon_env_builder.enable_remote_storage(
remote_storage_kind=remote_storage_kind,
test_name="test_timeline_delete_works_for_remote_smoke",
)
env = neon_env_builder.init_start()
ps_http = env.pageserver.http_client()
pg = env.endpoints.create_start("main")
tenant_id = TenantId(pg.safe_psql("show neon.tenant_id")[0][0])
main_timeline_id = TimelineId(pg.safe_psql("show neon.timeline_id")[0][0])
assert tenant_id == env.initial_tenant
assert main_timeline_id == env.initial_timeline
timeline_ids = [env.initial_timeline]
for i in range(2):
branch_timeline_id = env.neon_cli.create_branch(f"new{i}", "main")
pg = env.endpoints.create_start(f"new{i}")
with pg.cursor() as cur:
cur.execute("CREATE TABLE f (i integer);")
cur.execute("INSERT INTO f VALUES (generate_series(1,1000));")
current_lsn = Lsn(query_scalar(cur, "SELECT pg_current_wal_flush_lsn()"))
# wait until pageserver receives that data
wait_for_last_record_lsn(ps_http, tenant_id, branch_timeline_id, current_lsn)
# run checkpoint manually to be sure that data landed in remote storage
ps_http.timeline_checkpoint(tenant_id, branch_timeline_id)
# wait until pageserver successfully uploaded a checkpoint to remote storage
log.info("waiting for checkpoint upload")
wait_for_upload(ps_http, tenant_id, branch_timeline_id, current_lsn)
log.info("upload of checkpoint is done")
timeline_id = TimelineId(pg.safe_psql("show neon.timeline_id")[0][0])
timeline_ids.append(timeline_id)
for timeline_id in reversed(timeline_ids):
# note that we need to finish previous deletion before scheduling next one
# otherwise we can get an "HasChildren" error if deletion is not fast enough (real_s3)
ps_http.timeline_delete(tenant_id=tenant_id, timeline_id=timeline_id)
env.pageserver.allowed_errors.append(
f".*Timeline {env.initial_tenant}/{timeline_id} was not found.*"
)
wait_until(2, 0.5, lambda: assert_timeline_detail_404(ps_http, tenant_id, timeline_id))
assert_prefix_empty(
neon_env_builder,
prefix="/".join(
(
"tenants",
str(env.initial_tenant),
"timelines",
str(timeline_id),
)
),
)
# for some reason the check above doesnt immediately take effect for the below.
# Assume it is mock server incosistency and check twice.
wait_until(
2,
0.5,
lambda: assert_prefix_empty(neon_env_builder),
)

View File

@@ -1001,9 +1001,6 @@ def test_safekeeper_without_pageserver(
def test_replace_safekeeper(neon_env_builder: NeonEnvBuilder):
def safekeepers_guc(env: NeonEnv, sk_names: List[int]) -> str:
return ",".join([f"localhost:{sk.port.pg}" for sk in env.safekeepers if sk.id in sk_names])
def execute_payload(endpoint: Endpoint):
with closing(endpoint.connect()) as conn:
with conn.cursor() as cur:
@@ -1032,9 +1029,8 @@ def test_replace_safekeeper(neon_env_builder: NeonEnvBuilder):
log.info("Use only first 3 safekeepers")
env.safekeepers[3].stop()
active_safekeepers = [1, 2, 3]
endpoint = env.endpoints.create("test_replace_safekeeper")
endpoint.adjust_for_safekeepers(safekeepers_guc(env, active_safekeepers))
endpoint.active_safekeepers = [1, 2, 3]
endpoint.start()
# learn neon timeline from compute
@@ -1072,9 +1068,8 @@ def test_replace_safekeeper(neon_env_builder: NeonEnvBuilder):
log.info("Recreate postgres to replace failed sk1 with new sk4")
endpoint.stop_and_destroy().create("test_replace_safekeeper")
active_safekeepers = [2, 3, 4]
env.safekeepers[3].start()
endpoint.adjust_for_safekeepers(safekeepers_guc(env, active_safekeepers))
endpoint.active_safekeepers = [2, 3, 4]
endpoint.start()
execute_payload(endpoint)
@@ -1293,9 +1288,8 @@ def test_pull_timeline(neon_env_builder: NeonEnvBuilder):
log.info("Use only first 3 safekeepers")
env.safekeepers[3].stop()
active_safekeepers = [1, 2, 3]
endpoint = env.endpoints.create("test_pull_timeline")
endpoint.adjust_for_safekeepers(safekeepers_guc(env, active_safekeepers))
endpoint.active_safekeepers = [1, 2, 3]
endpoint.start()
# learn neon timeline from compute
@@ -1332,10 +1326,8 @@ def test_pull_timeline(neon_env_builder: NeonEnvBuilder):
show_statuses(env.safekeepers, tenant_id, timeline_id)
log.info("Restarting compute with new config to verify that it works")
active_safekeepers = [1, 3, 4]
endpoint.stop_and_destroy().create("test_pull_timeline")
endpoint.adjust_for_safekeepers(safekeepers_guc(env, active_safekeepers))
endpoint.active_safekeepers = [1, 3, 4]
endpoint.start()
execute_payload(endpoint)

View File

@@ -2,9 +2,11 @@ import asyncio
import random
import time
from dataclasses import dataclass
from pathlib import Path
from typing import List, Optional
import asyncpg
import toml
from fixtures.log_helper import getLogger
from fixtures.neon_fixtures import Endpoint, NeonEnv, NeonEnvBuilder, Safekeeper
from fixtures.types import Lsn, TenantId, TimelineId
@@ -251,7 +253,8 @@ def endpoint_create_start(env: NeonEnv, branch: str, pgdir_name: Optional[str]):
endpoint = Endpoint(
env,
tenant_id=env.initial_tenant,
port=env.port_distributor.get_port(),
pg_port=env.port_distributor.get_port(),
http_port=env.port_distributor.get_port(),
# In these tests compute has high probability of terminating on its own
# before our stop() due to lost consensus leadership.
check_stop_result=False,
@@ -536,15 +539,20 @@ def test_race_conditions(neon_env_builder: NeonEnvBuilder):
# Check that pageserver can select safekeeper with largest commit_lsn
# and switch if LSN is not updated for some time (NoWalTimeout).
async def run_wal_lagging(env: NeonEnv, endpoint: Endpoint):
def safekeepers_guc(env: NeonEnv, active_sk: List[bool]) -> str:
# use ports 10, 11 and 12 to simulate unavailable safekeepers
return ",".join(
[
f"localhost:{sk.port.pg if active else 10 + i}"
for i, (sk, active) in enumerate(zip(env.safekeepers, active_sk))
]
)
async def run_wal_lagging(env: NeonEnv, endpoint: Endpoint, test_output_dir: Path):
def adjust_safekeepers(env: NeonEnv, active_sk: List[bool]):
# Change the pg ports of the inactive safekeepers in the config file to be
# invalid, to make them unavailable to the endpoint. We use
# ports 10, 11 and 12 to simulate unavailable safekeepers.
config = toml.load(test_output_dir / "repo" / "config")
for i, (sk, active) in enumerate(zip(env.safekeepers, active_sk)):
if active:
config["safekeepers"][i]["pg_port"] = env.safekeepers[i].port.pg
else:
config["safekeepers"][i]["pg_port"] = 10 + i
with open(test_output_dir / "repo" / "config", "w") as f:
toml.dump(config, f)
conn = await endpoint.connect_async()
await conn.execute("CREATE TABLE t(key int primary key, value text)")
@@ -565,7 +573,7 @@ async def run_wal_lagging(env: NeonEnv, endpoint: Endpoint):
it -= 1
continue
endpoint.adjust_for_safekeepers(safekeepers_guc(env, active_sk))
adjust_safekeepers(env, active_sk)
log.info(f"Iteration {it}: {active_sk}")
endpoint.start()
@@ -579,7 +587,7 @@ async def run_wal_lagging(env: NeonEnv, endpoint: Endpoint):
await conn.close()
endpoint.stop()
endpoint.adjust_for_safekeepers(safekeepers_guc(env, [True] * len(env.safekeepers)))
adjust_safekeepers(env, [True] * len(env.safekeepers))
endpoint.start()
conn = await endpoint.connect_async()
@@ -590,11 +598,11 @@ async def run_wal_lagging(env: NeonEnv, endpoint: Endpoint):
# do inserts while restarting postgres and messing with safekeeper addresses
def test_wal_lagging(neon_env_builder: NeonEnvBuilder):
def test_wal_lagging(neon_env_builder: NeonEnvBuilder, test_output_dir: Path):
neon_env_builder.num_safekeepers = 3
env = neon_env_builder.init_start()
env.neon_cli.create_branch("test_wal_lagging")
endpoint = env.endpoints.create_start("test_wal_lagging")
asyncio.run(run_wal_lagging(env, endpoint))
asyncio.run(run_wal_lagging(env, endpoint, test_output_dir))

View File

@@ -77,7 +77,8 @@ def test_pageserver_lsn_wait_error_safekeeper_stop(neon_env_builder: NeonEnvBuil
try:
trigger_wait_lsn_timeout(env, tenant_id)
except Exception as e:
exception_string = str(e)
# Strip out the part before stdout, as it contains full command with the list of all safekeepers
exception_string = str(e).split("stdout", 1)[-1]
assert expected_timeout_error in exception_string, "Should time out during waiting for WAL"
for safekeeper in env.safekeepers:

View File

@@ -83,6 +83,9 @@ def test_walredo_not_left_behind_on_detach(neon_env_builder: NeonEnvBuilder):
# XXX this is quite brittle as the lifecycle of the WAL redo process is an implementation detail
assert_child_processes(pagserver_pid, wal_redo_present=True, defunct_present=False)
# Stop the compute before detaching, to avoid errors in the log.
endpoint.stop()
last_error = None
for i in range(3):
try: