Merge branch 'main' of https://github.com/neondatabase/neon into skyzh/rm-file-if-fail

This commit is contained in:
Alex Chi
2023-05-26 14:01:37 -04:00
71 changed files with 3242 additions and 1285 deletions

View File

@@ -312,6 +312,6 @@ def neon_with_baseline(request: FixtureRequest) -> PgCompare:
implementation-specific logic is widely useful across multiple tests, it might
make sense to add methods to the PgCompare class.
"""
fixture = request.getfixturevalue(request.param) # type: ignore
fixture = request.getfixturevalue(request.param)
assert isinstance(fixture, PgCompare), f"test error: fixture {fixture} is not PgCompare"
return fixture

View File

@@ -26,7 +26,7 @@ from typing import Any, Dict, Iterator, List, Optional, Tuple, Type, Union, cast
from urllib.parse import urlparse
import asyncpg
import backoff # type: ignore
import backoff
import boto3
import jwt
import psycopg2
@@ -354,7 +354,7 @@ class PgProtocol:
Returns psycopg2's connection object.
This method passes all extra params to connstr.
"""
conn = psycopg2.connect(**self.conn_options(**kwargs))
conn: PgConnection = psycopg2.connect(**self.conn_options(**kwargs))
# WARNING: this setting affects *all* tests!
conn.autocommit = autocommit
@@ -1603,8 +1603,6 @@ class NeonPageserver(PgProtocol):
# https://github.com/neondatabase/neon/issues/2442
".*could not remove ephemeral file.*No such file or directory.*",
# FIXME: These need investigation
".*gc_loop.*Failed to get a tenant .* Tenant .* not found.*",
".*compaction_loop.*Failed to get a tenant .* Tenant .* not found.*",
".*manual_gc.*is_shutdown_requested\\(\\) called in an unexpected task or thread.*",
".*tenant_list: timeline is not found in remote index while it is present in the tenants registry.*",
".*Removing intermediate uninit mark file.*",
@@ -1621,6 +1619,8 @@ class NeonPageserver(PgProtocol):
".*task iteration took longer than the configured period.*",
# this is until #3501
".*Compaction failed, retrying in [^:]+: Cannot run compaction iteration on inactive tenant",
# these can happen anytime we do compactions from background task and shutdown pageserver
r".*ERROR.*ancestor timeline \S+ is being stopped",
]
def start(

View File

@@ -1,5 +1,6 @@
from __future__ import annotations
import json
import time
from collections import defaultdict
from dataclasses import dataclass
@@ -109,6 +110,10 @@ class PageserverHttpClient(requests.Session):
if auth_token is not None:
self.headers["Authorization"] = f"Bearer {auth_token}"
@property
def base_url(self) -> str:
return f"http://localhost:{self.port}"
def verbose_error(self, res: requests.Response):
try:
res.raise_for_status()
@@ -150,14 +155,14 @@ class PageserverHttpClient(requests.Session):
return res_json
def tenant_create(
self, new_tenant_id: Optional[TenantId] = None, conf: Optional[Dict[str, Any]] = None
self, new_tenant_id: TenantId, conf: Optional[Dict[str, Any]] = None
) -> TenantId:
if conf is not None:
assert "new_tenant_id" not in conf.keys()
res = self.post(
f"http://localhost:{self.port}/v1/tenant",
json={
"new_tenant_id": str(new_tenant_id) if new_tenant_id else None,
"new_tenant_id": str(new_tenant_id),
**(conf or {}),
},
)
@@ -168,8 +173,22 @@ class PageserverHttpClient(requests.Session):
assert isinstance(new_tenant_id, str)
return TenantId(new_tenant_id)
def tenant_attach(self, tenant_id: TenantId):
res = self.post(f"http://localhost:{self.port}/v1/tenant/{tenant_id}/attach")
def tenant_attach(
self, tenant_id: TenantId, config: None | Dict[str, Any] = None, config_null: bool = False
):
if config_null:
assert config is None
body = "null"
else:
# null-config is prohibited by the API
if config is None:
config = {}
body = json.dumps({"config": config})
res = self.post(
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/attach",
data=body,
headers={"Content-Type": "application/json"},
)
self.verbose_error(res)
def tenant_detach(self, tenant_id: TenantId, detach_ignored=False):
@@ -274,13 +293,13 @@ class PageserverHttpClient(requests.Session):
self,
pg_version: PgVersion,
tenant_id: TenantId,
new_timeline_id: Optional[TimelineId] = None,
new_timeline_id: TimelineId,
ancestor_timeline_id: Optional[TimelineId] = None,
ancestor_start_lsn: Optional[Lsn] = None,
**kwargs,
) -> Dict[Any, Any]:
body: Dict[str, Any] = {
"new_timeline_id": str(new_timeline_id) if new_timeline_id else None,
"new_timeline_id": str(new_timeline_id),
"ancestor_start_lsn": str(ancestor_start_lsn) if ancestor_start_lsn else None,
"ancestor_timeline_id": str(ancestor_timeline_id) if ancestor_timeline_id else None,
}

View File

@@ -27,6 +27,10 @@ class PgVersion(str, enum.Enum):
def __repr__(self) -> str:
return f"'{self.value}'"
# Make this explicit for Python 3.11 compatibility, which changes the behavior of enums
def __str__(self) -> str:
return self.value
# In GitHub workflows we use Postgres version with v-prefix (e.g. v14 instead of just 14),
# sometime we need to do so in tests.
@property
@@ -78,11 +82,11 @@ def pytest_addoption(parser: Parser):
@pytest.fixture(scope="session")
def pg_version(request: FixtureRequest) -> Iterator[PgVersion]:
if v := request.config.getoption("--pg-version"):
version, source = v, "from --pg-version commad-line argument"
version, source = v, "from --pg-version command-line argument"
elif v := os.environ.get("DEFAULT_PG_VERSION"):
version, source = PgVersion(v), "from DEFAULT_PG_VERSION environment variable"
else:
version, source = DEFAULT_VERSION, "default verson"
version, source = DEFAULT_VERSION, "default version"
log.info(f"pg_version is {version} ({source})")
yield version

View File

@@ -2,7 +2,7 @@ from contextlib import closing
import pytest
from fixtures.compare_fixtures import PgCompare
from pytest_lazyfixture import lazy_fixture # type: ignore
from pytest_lazyfixture import lazy_fixture
@pytest.mark.parametrize(

View File

@@ -0,0 +1,76 @@
import pytest
from fixtures.benchmark_fixture import MetricReport, NeonBenchmarker
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnvBuilder
@pytest.mark.timeout(10000)
def test_gc_feedback(neon_env_builder: NeonEnvBuilder, zenbenchmark: NeonBenchmarker):
"""
Test that GC is able to collect all old layers even if them are forming
"stairs" and there are not three delta layers since last image layer.
Information about image layers needed to collect old layers should
be propagated by GC to compaction task which should take in in account
when make a decision which new image layers needs to be created.
"""
env = neon_env_builder.init_start()
client = env.pageserver.http_client()
tenant_id, _ = env.neon_cli.create_tenant(
conf={
# disable default GC and compaction
"gc_period": "1000 m",
"compaction_period": "0 s",
"gc_horizon": f"{1024 ** 2}",
"checkpoint_distance": f"{1024 ** 2}",
"compaction_target_size": f"{1024 ** 2}",
# set PITR interval to be small, so we can do GC
"pitr_interval": "10 s",
# "compaction_threshold": "3",
# "image_creation_threshold": "2",
}
)
endpoint = env.endpoints.create_start("main", tenant_id=tenant_id)
timeline_id = endpoint.safe_psql("show neon.timeline_id")[0][0]
n_steps = 10
n_update_iters = 100
step_size = 10000
with endpoint.cursor() as cur:
cur.execute("SET statement_timeout='1000s'")
cur.execute(
"CREATE TABLE t(step bigint, count bigint default 0, payload text default repeat(' ', 100)) with (fillfactor=50)"
)
cur.execute("CREATE INDEX ON t(step)")
# In each step, we insert 'step_size' new rows, and update the newly inserted rows
# 'n_update_iters' times. This creates a lot of churn and generates lots of WAL at the end of the table,
# without modifying the earlier parts of the table.
for step in range(n_steps):
cur.execute(f"INSERT INTO t (step) SELECT {step} FROM generate_series(1, {step_size})")
for i in range(n_update_iters):
cur.execute(f"UPDATE t set count=count+1 where step = {step}")
cur.execute("vacuum t")
# cur.execute("select pg_table_size('t')")
# logical_size = cur.fetchone()[0]
logical_size = client.timeline_detail(tenant_id, timeline_id)["current_logical_size"]
log.info(f"Logical storage size {logical_size}")
client.timeline_checkpoint(tenant_id, timeline_id)
# Do compaction and GC
client.timeline_gc(tenant_id, timeline_id, 0)
client.timeline_compact(tenant_id, timeline_id)
# One more iteration to check that no excessive image layers are generated
client.timeline_gc(tenant_id, timeline_id, 0)
client.timeline_compact(tenant_id, timeline_id)
physical_size = client.timeline_detail(tenant_id, timeline_id)["current_physical_size"]
log.info(f"Physical storage size {physical_size}")
MB = 1024 * 1024
zenbenchmark.record("logical_size", logical_size // MB, "Mb", MetricReport.LOWER_IS_BETTER)
zenbenchmark.record("physical_size", physical_size // MB, "Mb", MetricReport.LOWER_IS_BETTER)
zenbenchmark.record(
"physical/logical ratio", physical_size / logical_size, "", MetricReport.LOWER_IS_BETTER
)

View File

@@ -2,7 +2,7 @@ from contextlib import closing
import pytest
from fixtures.compare_fixtures import PgCompare
from pytest_lazyfixture import lazy_fixture # type: ignore
from pytest_lazyfixture import lazy_fixture
@pytest.mark.parametrize(

View File

@@ -2,7 +2,7 @@ from contextlib import closing
import pytest
from fixtures.compare_fixtures import PgCompare
from pytest_lazyfixture import lazy_fixture # type: ignore
from pytest_lazyfixture import lazy_fixture
@pytest.mark.parametrize(

View File

@@ -6,7 +6,7 @@ import pytest
from fixtures.benchmark_fixture import MetricReport
from fixtures.compare_fixtures import PgCompare
from fixtures.log_helper import log
from pytest_lazyfixture import lazy_fixture # type: ignore
from pytest_lazyfixture import lazy_fixture
@pytest.mark.parametrize(

View File

@@ -0,0 +1,200 @@
from dataclasses import dataclass
from typing import Generator, Optional
import pytest
from fixtures.neon_fixtures import (
LocalFsStorage,
NeonEnv,
NeonEnvBuilder,
RemoteStorageKind,
)
from fixtures.pageserver.http import PageserverApiException, TenantConfig
from fixtures.types import TenantId
from fixtures.utils import wait_until
@pytest.fixture
def positive_env(neon_env_builder: NeonEnvBuilder) -> NeonEnv:
neon_env_builder.enable_remote_storage(
remote_storage_kind=RemoteStorageKind.LOCAL_FS,
test_name="test_attach_tenant_config",
)
env = neon_env_builder.init_start()
assert isinstance(env.remote_storage, LocalFsStorage)
return env
@dataclass
class NegativeTests:
neon_env: NeonEnv
tenant_id: TenantId
config_pre_detach: TenantConfig
@pytest.fixture
def negative_env(neon_env_builder: NeonEnvBuilder) -> Generator[NegativeTests, None, None]:
neon_env_builder.enable_remote_storage(
remote_storage_kind=RemoteStorageKind.LOCAL_FS,
test_name="test_attach_tenant_config",
)
env = neon_env_builder.init_start()
assert isinstance(env.remote_storage, LocalFsStorage)
ps_http = env.pageserver.http_client()
(tenant_id, _) = env.neon_cli.create_tenant()
assert ps_http.tenant_config(tenant_id).tenant_specific_overrides == {}
config_pre_detach = ps_http.tenant_config(tenant_id)
assert tenant_id in [TenantId(t["id"]) for t in ps_http.tenant_list()]
ps_http.tenant_detach(tenant_id)
assert tenant_id not in [TenantId(t["id"]) for t in ps_http.tenant_list()]
yield NegativeTests(env, tenant_id, config_pre_detach)
assert tenant_id not in [
TenantId(t["id"]) for t in ps_http.tenant_list()
], "tenant should not be attached after negative test"
env.pageserver.allowed_errors.append(".*Error processing HTTP request: Bad request")
def log_contains_bad_request():
env.pageserver.log_contains(".*Error processing HTTP request: Bad request")
wait_until(50, 0.1, log_contains_bad_request)
def test_null_body(negative_env: NegativeTests):
"""
If we send `null` in the body, the request should be rejected with status 400.
"""
env = negative_env.neon_env
tenant_id = negative_env.tenant_id
ps_http = env.pageserver.http_client()
res = ps_http.post(
f"{ps_http.base_url}/v1/tenant/{tenant_id}/attach",
data=b"null",
headers={"Content-Type": "application/json"},
)
assert res.status_code == 400
def test_null_config(negative_env: NegativeTests):
"""
If the `config` field is `null`, the request should be rejected with status 400.
"""
env = negative_env.neon_env
tenant_id = negative_env.tenant_id
ps_http = env.pageserver.http_client()
res = ps_http.post(
f"{ps_http.base_url}/v1/tenant/{tenant_id}/attach",
data=b'{"config": null}',
headers={"Content-Type": "application/json"},
)
assert res.status_code == 400
def test_config_with_unknown_keys_is_bad_request(negative_env: NegativeTests):
"""
If we send a config with unknown keys, the request should be rejected with status 400.
"""
env = negative_env.neon_env
tenant_id = negative_env.tenant_id
ps_http = env.pageserver.http_client()
config_with_unknown_keys = {
"compaction_period": "1h",
"this_key_does_not_exist": "some value",
}
with pytest.raises(PageserverApiException) as e:
ps_http.tenant_attach(tenant_id, config=config_with_unknown_keys)
assert e.type == PageserverApiException
assert e.value.status_code == 400
@pytest.mark.parametrize("content_type", [None, "application/json"])
def test_empty_body(positive_env: NeonEnv, content_type: Optional[str]):
"""
For backwards-compatiblity: if we send an empty body,
the request should be accepted and the config should be the default config.
"""
env = positive_env
ps_http = env.pageserver.http_client()
(tenant_id, _) = env.neon_cli.create_tenant()
assert ps_http.tenant_config(tenant_id).tenant_specific_overrides == {}
config_pre_detach = ps_http.tenant_config(tenant_id)
assert tenant_id in [TenantId(t["id"]) for t in ps_http.tenant_list()]
ps_http.tenant_detach(tenant_id)
assert tenant_id not in [TenantId(t["id"]) for t in ps_http.tenant_list()]
ps_http.post(
f"{ps_http.base_url}/v1/tenant/{tenant_id}/attach",
data=b"",
headers=None if content_type else {"Content-Type": "application/json"},
).raise_for_status()
assert ps_http.tenant_config(tenant_id).tenant_specific_overrides == {}
assert ps_http.tenant_config(tenant_id).effective_config == config_pre_detach.effective_config
def test_fully_custom_config(positive_env: NeonEnv):
"""
If we send a valid config in the body, the request should be accepted and the config should be applied.
"""
env = positive_env
fully_custom_config = {
"compaction_period": "1h",
"compaction_threshold": 13,
"compaction_target_size": 1048576,
"checkpoint_distance": 10000,
"checkpoint_timeout": "13m",
"eviction_policy": {
"kind": "LayerAccessThreshold",
"period": "20s",
"threshold": "23h",
},
"evictions_low_residence_duration_metric_threshold": "2days",
"gc_horizon": 23 * (1024 * 1024),
"gc_period": "2h 13m",
"image_creation_threshold": 7,
"pitr_interval": "1m",
"lagging_wal_timeout": "23m",
"max_lsn_wal_lag": 230000,
"min_resident_size_override": 23,
"trace_read_requests": True,
"walreceiver_connect_timeout": "13m",
}
ps_http = env.pageserver.http_client()
initial_tenant_config = ps_http.tenant_config(env.initial_tenant)
assert initial_tenant_config.tenant_specific_overrides == {}
assert set(initial_tenant_config.effective_config.keys()) == set(
fully_custom_config.keys()
), "ensure we cover all config options"
(tenant_id, _) = env.neon_cli.create_tenant()
ps_http.set_tenant_config(tenant_id, fully_custom_config)
our_tenant_config = ps_http.tenant_config(tenant_id)
assert our_tenant_config.tenant_specific_overrides == fully_custom_config
assert set(our_tenant_config.effective_config.keys()) == set(
fully_custom_config.keys()
), "ensure we cover all config options"
assert {
k: initial_tenant_config.effective_config[k] != our_tenant_config.effective_config[k]
for k in fully_custom_config.keys()
} == {
k: True for k in fully_custom_config.keys()
}, "ensure our custom config has different values than the default config for all config options, so we know we overrode everything"
ps_http.tenant_detach(tenant_id)
ps_http.tenant_attach(tenant_id, config=fully_custom_config)
assert ps_http.tenant_config(tenant_id).tenant_specific_overrides == fully_custom_config
assert set(ps_http.tenant_config(tenant_id).effective_config.keys()) == set(
fully_custom_config.keys()
), "ensure we cover all config options"

View File

@@ -3,7 +3,7 @@ from contextlib import closing
import pytest
from fixtures.neon_fixtures import NeonEnvBuilder, PgProtocol
from fixtures.pageserver.http import PageserverApiException
from fixtures.types import TenantId
from fixtures.types import TenantId, TimelineId
def test_pageserver_auth(neon_env_builder: NeonEnvBuilder):
@@ -25,21 +25,19 @@ def test_pageserver_auth(neon_env_builder: NeonEnvBuilder):
ps.safe_psql("set FOO", password=tenant_token)
ps.safe_psql("set FOO", password=pageserver_token)
new_timeline_id = env.neon_cli.create_branch(
"test_pageserver_auth", tenant_id=env.initial_tenant
)
# tenant can create branches
tenant_http_client.timeline_create(
pg_version=env.pg_version,
tenant_id=env.initial_tenant,
ancestor_timeline_id=new_timeline_id,
new_timeline_id=TimelineId.generate(),
ancestor_timeline_id=env.initial_timeline,
)
# console can create branches for tenant
pageserver_http_client.timeline_create(
pg_version=env.pg_version,
tenant_id=env.initial_tenant,
ancestor_timeline_id=new_timeline_id,
new_timeline_id=TimelineId.generate(),
ancestor_timeline_id=env.initial_timeline,
)
# fail to create branch using token with different tenant_id
@@ -49,18 +47,19 @@ def test_pageserver_auth(neon_env_builder: NeonEnvBuilder):
invalid_tenant_http_client.timeline_create(
pg_version=env.pg_version,
tenant_id=env.initial_tenant,
ancestor_timeline_id=new_timeline_id,
new_timeline_id=TimelineId.generate(),
ancestor_timeline_id=env.initial_timeline,
)
# create tenant using management token
pageserver_http_client.tenant_create()
pageserver_http_client.tenant_create(TenantId.generate())
# fail to create tenant using tenant token
with pytest.raises(
PageserverApiException,
match="Forbidden: Attempt to access management api with tenant scope. Permission denied",
):
tenant_http_client.tenant_create()
tenant_http_client.tenant_create(TenantId.generate())
def test_compute_auth_to_pageserver(neon_env_builder: NeonEnvBuilder):

View File

@@ -0,0 +1,210 @@
from types import TracebackType
from typing import Any, Dict, List, Optional, Tuple, Type
import psycopg2
import pytest
from fixtures.log_helper import log
from fixtures.neon_fixtures import VanillaPostgres
from pytest_httpserver import HTTPServer
from werkzeug.wrappers.request import Request
from werkzeug.wrappers.response import Response
def handle_db(dbs, roles, operation):
if operation["op"] == "set":
if "old_name" in operation and operation["old_name"] in dbs:
dbs[operation["name"]] = dbs[operation["old_name"]]
dbs.pop(operation["old_name"])
if "owner" in operation:
dbs[operation["name"]] = operation["owner"]
elif operation["op"] == "del":
dbs.pop(operation["name"])
else:
raise ValueError("Invalid op")
def handle_role(dbs, roles, operation):
if operation["op"] == "set":
if "old_name" in operation and operation["old_name"] in roles:
roles[operation["name"]] = roles[operation["old_name"]]
roles.pop(operation["old_name"])
for db, owner in dbs.items():
if owner == operation["old_name"]:
dbs[db] = operation["name"]
if "password" in operation:
roles[operation["name"]] = operation["password"]
elif operation["op"] == "del":
if "old_name" in operation:
roles.pop(operation["old_name"])
roles.pop(operation["name"])
else:
raise ValueError("Invalid op")
fail = False
def ddl_forward_handler(request: Request, dbs: Dict[str, str], roles: Dict[str, str]) -> Response:
log.info(f"Received request with data {request.get_data(as_text=True)}")
if fail:
log.info("FAILING")
return Response(status=500, response="Failed just cuz")
if request.json is None:
log.info("Received invalid JSON")
return Response(status=400)
json = request.json
# Handle roles first
if "roles" in json:
for operation in json["roles"]:
handle_role(dbs, roles, operation)
if "dbs" in json:
for operation in json["dbs"]:
handle_db(dbs, roles, operation)
return Response(status=200)
class DdlForwardingContext:
def __init__(self, httpserver: HTTPServer, vanilla_pg: VanillaPostgres, host: str, port: int):
self.server = httpserver
self.pg = vanilla_pg
self.host = host
self.port = port
self.dbs: Dict[str, str] = {}
self.roles: Dict[str, str] = {}
endpoint = "/management/api/v2/roles_and_databases"
ddl_url = f"http://{host}:{port}{endpoint}"
self.pg.configure(
[
f"neon.console_url={ddl_url}",
"shared_preload_libraries = 'neon'",
]
)
log.info(f"Listening on {ddl_url}")
self.server.expect_request(endpoint, method="PATCH").respond_with_handler(
lambda request: ddl_forward_handler(request, self.dbs, self.roles)
)
def __enter__(self):
self.pg.start()
return self
def __exit__(
self,
exc_type: Optional[Type[BaseException]],
exc: Optional[BaseException],
tb: Optional[TracebackType],
):
self.pg.stop()
def send(self, query: str) -> List[Tuple[Any, ...]]:
return self.pg.safe_psql(query)
def wait(self, timeout=3):
self.server.wait(timeout=timeout)
def send_and_wait(self, query: str, timeout=3) -> List[Tuple[Any, ...]]:
res = self.send(query)
self.wait(timeout=timeout)
return res
@pytest.fixture(scope="function")
def ddl(
httpserver: HTTPServer, vanilla_pg: VanillaPostgres, httpserver_listen_address: tuple[str, int]
):
(host, port) = httpserver_listen_address
with DdlForwardingContext(httpserver, vanilla_pg, host, port) as ddl:
yield ddl
def test_ddl_forwarding(ddl: DdlForwardingContext):
curr_user = ddl.send("SELECT current_user")[0][0]
log.info(f"Current user is {curr_user}")
ddl.send_and_wait("CREATE DATABASE bork")
assert ddl.dbs == {"bork": curr_user}
ddl.send_and_wait("CREATE ROLE volk WITH PASSWORD 'nu_zayats'")
ddl.send_and_wait("ALTER DATABASE bork RENAME TO nu_pogodi")
assert ddl.dbs == {"nu_pogodi": curr_user}
ddl.send_and_wait("ALTER DATABASE nu_pogodi OWNER TO volk")
assert ddl.dbs == {"nu_pogodi": "volk"}
ddl.send_and_wait("DROP DATABASE nu_pogodi")
assert ddl.dbs == {}
ddl.send_and_wait("DROP ROLE volk")
assert ddl.roles == {}
ddl.send_and_wait("CREATE ROLE tarzan WITH PASSWORD 'of_the_apes'")
assert ddl.roles == {"tarzan": "of_the_apes"}
ddl.send_and_wait("DROP ROLE tarzan")
assert ddl.roles == {}
ddl.send_and_wait("CREATE ROLE tarzan WITH PASSWORD 'of_the_apes'")
assert ddl.roles == {"tarzan": "of_the_apes"}
ddl.send_and_wait("ALTER ROLE tarzan WITH PASSWORD 'jungle_man'")
assert ddl.roles == {"tarzan": "jungle_man"}
ddl.send_and_wait("ALTER ROLE tarzan RENAME TO mowgli")
assert ddl.roles == {"mowgli": "jungle_man"}
ddl.send_and_wait("DROP ROLE mowgli")
assert ddl.roles == {}
conn = ddl.pg.connect()
cur = conn.cursor()
cur.execute("BEGIN")
cur.execute("CREATE ROLE bork WITH PASSWORD 'cork'")
cur.execute("COMMIT")
ddl.wait()
assert ddl.roles == {"bork": "cork"}
cur.execute("BEGIN")
cur.execute("CREATE ROLE stork WITH PASSWORD 'pork'")
cur.execute("ABORT")
ddl.wait()
assert ("stork", "pork") not in ddl.roles.items()
cur.execute("BEGIN")
cur.execute("ALTER ROLE bork WITH PASSWORD 'pork'")
cur.execute("ALTER ROLE bork RENAME TO stork")
cur.execute("COMMIT")
ddl.wait()
assert ddl.roles == {"stork": "pork"}
cur.execute("BEGIN")
cur.execute("CREATE ROLE dork WITH PASSWORD 'york'")
cur.execute("SAVEPOINT point")
cur.execute("ALTER ROLE dork WITH PASSWORD 'zork'")
cur.execute("ALTER ROLE dork RENAME TO fork")
cur.execute("ROLLBACK TO SAVEPOINT point")
cur.execute("ALTER ROLE dork WITH PASSWORD 'fork'")
cur.execute("ALTER ROLE dork RENAME TO zork")
cur.execute("RELEASE SAVEPOINT point")
cur.execute("COMMIT")
ddl.wait()
assert ddl.roles == {"stork": "pork", "zork": "fork"}
cur.execute("DROP ROLE stork")
cur.execute("DROP ROLE zork")
ddl.wait()
assert ddl.roles == {}
cur.execute("CREATE ROLE bork WITH PASSWORD 'dork'")
cur.execute("CREATE ROLE stork WITH PASSWORD 'cork'")
cur.execute("BEGIN")
cur.execute("DROP ROLE bork")
cur.execute("ALTER ROLE stork RENAME TO bork")
cur.execute("COMMIT")
ddl.wait()
assert ddl.roles == {"bork": "cork"}
cur.execute("DROP ROLE bork")
ddl.wait()
assert ddl.roles == {}
cur.execute("CREATE ROLE bork WITH PASSWORD 'dork'")
cur.execute("CREATE DATABASE stork WITH OWNER=bork")
cur.execute("ALTER ROLE bork RENAME TO cork")
ddl.wait()
assert ddl.dbs == {"stork": "cork"}
with pytest.raises(psycopg2.InternalError):
global fail
fail = True
cur.execute("CREATE DATABASE failure WITH OWNER=cork")
ddl.wait()
conn.close()

View File

@@ -228,7 +228,6 @@ def proxy_with_metric_collector(
@pytest.mark.asyncio
async def test_proxy_metric_collection(
httpserver: HTTPServer,
httpserver_listen_address,
proxy_with_metric_collector: NeonProxy,
vanilla_pg: VanillaPostgres,
):

View File

@@ -4,7 +4,7 @@ from pathlib import Path
from types import TracebackType
from typing import Optional, Type
import backoff # type: ignore
import backoff
from fixtures.log_helper import log
from fixtures.neon_fixtures import PgProtocol, PortDistributor, VanillaPostgres

View File

@@ -62,6 +62,7 @@ eviction_policy = { "kind" = "LayerAccessThreshold", period = "20s", threshold =
log.info(f"show {env.initial_tenant}")
pscur.execute(f"show {env.initial_tenant}")
res = pscur.fetchone()
assert res is not None
assert all(
i in res.items()
for i in {
@@ -101,6 +102,7 @@ eviction_policy = { "kind" = "LayerAccessThreshold", period = "20s", threshold =
pscur.execute(f"show {tenant}")
res = pscur.fetchone()
log.info(f"res: {res}")
assert res is not None
assert all(
i in res.items()
for i in {
@@ -163,6 +165,7 @@ eviction_policy = { "kind" = "LayerAccessThreshold", period = "20s", threshold =
pscur.execute(f"show {tenant}")
res = pscur.fetchone()
log.info(f"after config res: {res}")
assert res is not None
assert all(
i in res.items()
for i in {
@@ -218,6 +221,7 @@ eviction_policy = { "kind" = "LayerAccessThreshold", period = "20s", threshold =
pscur.execute(f"show {tenant}")
res = pscur.fetchone()
log.info(f"after restart res: {res}")
assert res is not None
assert all(
i in res.items()
for i in {
@@ -278,6 +282,7 @@ eviction_policy = { "kind" = "LayerAccessThreshold", period = "20s", threshold =
pscur.execute(f"show {tenant}")
res = pscur.fetchone()
log.info(f"after restart res: {res}")
assert res is not None
assert all(
i in res.items()
for i in {

View File

@@ -315,21 +315,22 @@ def test_pageserver_with_empty_tenants(
client = env.pageserver.http_client()
tenant_with_empty_timelines_dir = client.tenant_create()
temp_timelines = client.timeline_list(tenant_with_empty_timelines_dir)
tenant_with_empty_timelines = TenantId.generate()
client.tenant_create(tenant_with_empty_timelines)
temp_timelines = client.timeline_list(tenant_with_empty_timelines)
for temp_timeline in temp_timelines:
client.timeline_delete(
tenant_with_empty_timelines_dir, TimelineId(temp_timeline["timeline_id"])
tenant_with_empty_timelines, TimelineId(temp_timeline["timeline_id"])
)
files_in_timelines_dir = sum(
1
for _p in Path.iterdir(
Path(env.repo_dir) / "tenants" / str(tenant_with_empty_timelines_dir) / "timelines"
Path(env.repo_dir) / "tenants" / str(tenant_with_empty_timelines) / "timelines"
)
)
assert (
files_in_timelines_dir == 0
), f"Tenant {tenant_with_empty_timelines_dir} should have an empty timelines/ directory"
), f"Tenant {tenant_with_empty_timelines} should have an empty timelines/ directory"
# Trigger timeline re-initialization after pageserver restart
env.endpoints.stop_all()
@@ -357,15 +358,15 @@ def test_pageserver_with_empty_tenants(
assert env.pageserver.log_contains(".*Setting tenant as Broken state, reason:.*")
[loaded_tenant] = [t for t in tenants if t["id"] == str(tenant_with_empty_timelines_dir)]
[loaded_tenant] = [t for t in tenants if t["id"] == str(tenant_with_empty_timelines)]
assert (
loaded_tenant["state"]["slug"] == "Active"
), "Tenant {tenant_with_empty_timelines_dir} with empty timelines dir should be active and ready for timeline creation"
), "Tenant {tenant_with_empty_timelines} with empty timelines dir should be active and ready for timeline creation"
loaded_tenant_status = client.tenant_status(tenant_with_empty_timelines_dir)
loaded_tenant_status = client.tenant_status(tenant_with_empty_timelines)
assert (
loaded_tenant_status["state"]["slug"] == "Active"
), f"Tenant {tenant_with_empty_timelines_dir} without timelines dir should be active"
), f"Tenant {tenant_with_empty_timelines} without timelines dir should be active"
time.sleep(1) # to allow metrics propagation
@@ -375,7 +376,7 @@ def test_pageserver_with_empty_tenants(
"state": "Broken",
}
active_tenants_metric_filter = {
"tenant_id": str(tenant_with_empty_timelines_dir),
"tenant_id": str(tenant_with_empty_timelines),
"state": "Active",
}
@@ -387,7 +388,7 @@ def test_pageserver_with_empty_tenants(
assert (
tenant_active_count == 1
), f"Tenant {tenant_with_empty_timelines_dir} should have metric as active"
), f"Tenant {tenant_with_empty_timelines} should have metric as active"
tenant_broken_count = int(
ps_metrics.query_one(