mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-29 19:10:38 +00:00
Merge remote-tracking branch 'origin/main' into problame/async-timeline-get/dont-hold-timelines-lock-inside-tenant-state-send-modify
This commit is contained in:
@@ -1619,6 +1619,8 @@ class NeonPageserver(PgProtocol):
|
||||
".*task iteration took longer than the configured period.*",
|
||||
# this is until #3501
|
||||
".*Compaction failed, retrying in [^:]+: Cannot run compaction iteration on inactive tenant",
|
||||
# these can happen anytime we do compactions from background task and shutdown pageserver
|
||||
r".*ERROR.*ancestor timeline \S+ is being stopped",
|
||||
]
|
||||
|
||||
def start(
|
||||
|
||||
@@ -27,6 +27,10 @@ class PgVersion(str, enum.Enum):
|
||||
def __repr__(self) -> str:
|
||||
return f"'{self.value}'"
|
||||
|
||||
# Make this explicit for Python 3.11 compatibility, which changes the behavior of enums
|
||||
def __str__(self) -> str:
|
||||
return self.value
|
||||
|
||||
# In GitHub workflows we use Postgres version with v-prefix (e.g. v14 instead of just 14),
|
||||
# sometime we need to do so in tests.
|
||||
@property
|
||||
@@ -78,11 +82,11 @@ def pytest_addoption(parser: Parser):
|
||||
@pytest.fixture(scope="session")
|
||||
def pg_version(request: FixtureRequest) -> Iterator[PgVersion]:
|
||||
if v := request.config.getoption("--pg-version"):
|
||||
version, source = v, "from --pg-version commad-line argument"
|
||||
version, source = v, "from --pg-version command-line argument"
|
||||
elif v := os.environ.get("DEFAULT_PG_VERSION"):
|
||||
version, source = PgVersion(v), "from DEFAULT_PG_VERSION environment variable"
|
||||
else:
|
||||
version, source = DEFAULT_VERSION, "default verson"
|
||||
version, source = DEFAULT_VERSION, "default version"
|
||||
|
||||
log.info(f"pg_version is {version} ({source})")
|
||||
yield version
|
||||
|
||||
219
test_runner/regress/test_ddl_forwarding.py
Normal file
219
test_runner/regress/test_ddl_forwarding.py
Normal file
@@ -0,0 +1,219 @@
|
||||
from types import TracebackType
|
||||
from typing import Any, Dict, List, Optional, Tuple, Type
|
||||
|
||||
import psycopg2
|
||||
import pytest
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import (
|
||||
PortDistributor,
|
||||
VanillaPostgres,
|
||||
)
|
||||
from pytest_httpserver import HTTPServer
|
||||
from werkzeug.wrappers.request import Request
|
||||
from werkzeug.wrappers.response import Response
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def httpserver_listen_address(port_distributor: PortDistributor):
|
||||
port = port_distributor.get_port()
|
||||
return ("localhost", port)
|
||||
|
||||
|
||||
def handle_db(dbs, roles, operation):
|
||||
if operation["op"] == "set":
|
||||
if "old_name" in operation and operation["old_name"] in dbs:
|
||||
dbs[operation["name"]] = dbs[operation["old_name"]]
|
||||
dbs.pop(operation["old_name"])
|
||||
if "owner" in operation:
|
||||
dbs[operation["name"]] = operation["owner"]
|
||||
elif operation["op"] == "del":
|
||||
dbs.pop(operation["name"])
|
||||
else:
|
||||
raise ValueError("Invalid op")
|
||||
|
||||
|
||||
def handle_role(dbs, roles, operation):
|
||||
if operation["op"] == "set":
|
||||
if "old_name" in operation and operation["old_name"] in roles:
|
||||
roles[operation["name"]] = roles[operation["old_name"]]
|
||||
roles.pop(operation["old_name"])
|
||||
for db, owner in dbs.items():
|
||||
if owner == operation["old_name"]:
|
||||
dbs[db] = operation["name"]
|
||||
if "password" in operation:
|
||||
roles[operation["name"]] = operation["password"]
|
||||
elif operation["op"] == "del":
|
||||
if "old_name" in operation:
|
||||
roles.pop(operation["old_name"])
|
||||
roles.pop(operation["name"])
|
||||
else:
|
||||
raise ValueError("Invalid op")
|
||||
|
||||
|
||||
fail = False
|
||||
|
||||
|
||||
def ddl_forward_handler(request: Request, dbs: Dict[str, str], roles: Dict[str, str]) -> Response:
|
||||
log.info(f"Received request with data {request.get_data(as_text=True)}")
|
||||
if fail:
|
||||
log.info("FAILING")
|
||||
return Response(status=500, response="Failed just cuz")
|
||||
if request.json is None:
|
||||
log.info("Received invalid JSON")
|
||||
return Response(status=400)
|
||||
json = request.json
|
||||
# Handle roles first
|
||||
if "roles" in json:
|
||||
for operation in json["roles"]:
|
||||
handle_role(dbs, roles, operation)
|
||||
if "dbs" in json:
|
||||
for operation in json["dbs"]:
|
||||
handle_db(dbs, roles, operation)
|
||||
return Response(status=200)
|
||||
|
||||
|
||||
class DdlForwardingContext:
|
||||
def __init__(self, httpserver: HTTPServer, vanilla_pg: VanillaPostgres, host: str, port: int):
|
||||
self.server = httpserver
|
||||
self.pg = vanilla_pg
|
||||
self.host = host
|
||||
self.port = port
|
||||
self.dbs: Dict[str, str] = {}
|
||||
self.roles: Dict[str, str] = {}
|
||||
endpoint = "/management/api/v2/roles_and_databases"
|
||||
ddl_url = f"http://{host}:{port}{endpoint}"
|
||||
self.pg.configure(
|
||||
[
|
||||
f"neon.console_url={ddl_url}",
|
||||
"shared_preload_libraries = 'neon'",
|
||||
]
|
||||
)
|
||||
log.info(f"Listening on {ddl_url}")
|
||||
self.server.expect_request(endpoint, method="PATCH").respond_with_handler(
|
||||
lambda request: ddl_forward_handler(request, self.dbs, self.roles)
|
||||
)
|
||||
|
||||
def __enter__(self):
|
||||
self.pg.start()
|
||||
return self
|
||||
|
||||
def __exit__(
|
||||
self,
|
||||
exc_type: Optional[Type[BaseException]],
|
||||
exc: Optional[BaseException],
|
||||
tb: Optional[TracebackType],
|
||||
):
|
||||
self.pg.stop()
|
||||
|
||||
def send(self, query: str) -> List[Tuple[Any, ...]]:
|
||||
return self.pg.safe_psql(query)
|
||||
|
||||
def wait(self, timeout=3):
|
||||
self.server.wait(timeout=timeout)
|
||||
|
||||
def send_and_wait(self, query: str, timeout=3) -> List[Tuple[Any, ...]]:
|
||||
res = self.send(query)
|
||||
self.wait(timeout=timeout)
|
||||
return res
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def ddl(
|
||||
httpserver: HTTPServer, vanilla_pg: VanillaPostgres, httpserver_listen_address: tuple[str, int]
|
||||
):
|
||||
(host, port) = httpserver_listen_address
|
||||
with DdlForwardingContext(httpserver, vanilla_pg, host, port) as ddl:
|
||||
yield ddl
|
||||
|
||||
|
||||
def test_ddl_forwarding(ddl: DdlForwardingContext):
|
||||
curr_user = ddl.send("SELECT current_user")[0][0]
|
||||
log.info(f"Current user is {curr_user}")
|
||||
ddl.send_and_wait("CREATE DATABASE bork")
|
||||
assert ddl.dbs == {"bork": curr_user}
|
||||
ddl.send_and_wait("CREATE ROLE volk WITH PASSWORD 'nu_zayats'")
|
||||
ddl.send_and_wait("ALTER DATABASE bork RENAME TO nu_pogodi")
|
||||
assert ddl.dbs == {"nu_pogodi": curr_user}
|
||||
ddl.send_and_wait("ALTER DATABASE nu_pogodi OWNER TO volk")
|
||||
assert ddl.dbs == {"nu_pogodi": "volk"}
|
||||
ddl.send_and_wait("DROP DATABASE nu_pogodi")
|
||||
assert ddl.dbs == {}
|
||||
ddl.send_and_wait("DROP ROLE volk")
|
||||
assert ddl.roles == {}
|
||||
|
||||
ddl.send_and_wait("CREATE ROLE tarzan WITH PASSWORD 'of_the_apes'")
|
||||
assert ddl.roles == {"tarzan": "of_the_apes"}
|
||||
ddl.send_and_wait("DROP ROLE tarzan")
|
||||
assert ddl.roles == {}
|
||||
ddl.send_and_wait("CREATE ROLE tarzan WITH PASSWORD 'of_the_apes'")
|
||||
assert ddl.roles == {"tarzan": "of_the_apes"}
|
||||
ddl.send_and_wait("ALTER ROLE tarzan WITH PASSWORD 'jungle_man'")
|
||||
assert ddl.roles == {"tarzan": "jungle_man"}
|
||||
ddl.send_and_wait("ALTER ROLE tarzan RENAME TO mowgli")
|
||||
assert ddl.roles == {"mowgli": "jungle_man"}
|
||||
ddl.send_and_wait("DROP ROLE mowgli")
|
||||
assert ddl.roles == {}
|
||||
|
||||
conn = ddl.pg.connect()
|
||||
cur = conn.cursor()
|
||||
|
||||
cur.execute("BEGIN")
|
||||
cur.execute("CREATE ROLE bork WITH PASSWORD 'cork'")
|
||||
cur.execute("COMMIT")
|
||||
ddl.wait()
|
||||
assert ddl.roles == {"bork": "cork"}
|
||||
cur.execute("BEGIN")
|
||||
cur.execute("CREATE ROLE stork WITH PASSWORD 'pork'")
|
||||
cur.execute("ABORT")
|
||||
ddl.wait()
|
||||
assert ("stork", "pork") not in ddl.roles.items()
|
||||
cur.execute("BEGIN")
|
||||
cur.execute("ALTER ROLE bork WITH PASSWORD 'pork'")
|
||||
cur.execute("ALTER ROLE bork RENAME TO stork")
|
||||
cur.execute("COMMIT")
|
||||
ddl.wait()
|
||||
assert ddl.roles == {"stork": "pork"}
|
||||
cur.execute("BEGIN")
|
||||
cur.execute("CREATE ROLE dork WITH PASSWORD 'york'")
|
||||
cur.execute("SAVEPOINT point")
|
||||
cur.execute("ALTER ROLE dork WITH PASSWORD 'zork'")
|
||||
cur.execute("ALTER ROLE dork RENAME TO fork")
|
||||
cur.execute("ROLLBACK TO SAVEPOINT point")
|
||||
cur.execute("ALTER ROLE dork WITH PASSWORD 'fork'")
|
||||
cur.execute("ALTER ROLE dork RENAME TO zork")
|
||||
cur.execute("RELEASE SAVEPOINT point")
|
||||
cur.execute("COMMIT")
|
||||
ddl.wait()
|
||||
assert ddl.roles == {"stork": "pork", "zork": "fork"}
|
||||
|
||||
cur.execute("DROP ROLE stork")
|
||||
cur.execute("DROP ROLE zork")
|
||||
ddl.wait()
|
||||
assert ddl.roles == {}
|
||||
|
||||
cur.execute("CREATE ROLE bork WITH PASSWORD 'dork'")
|
||||
cur.execute("CREATE ROLE stork WITH PASSWORD 'cork'")
|
||||
cur.execute("BEGIN")
|
||||
cur.execute("DROP ROLE bork")
|
||||
cur.execute("ALTER ROLE stork RENAME TO bork")
|
||||
cur.execute("COMMIT")
|
||||
ddl.wait()
|
||||
assert ddl.roles == {"bork": "cork"}
|
||||
|
||||
cur.execute("DROP ROLE bork")
|
||||
ddl.wait()
|
||||
assert ddl.roles == {}
|
||||
|
||||
cur.execute("CREATE ROLE bork WITH PASSWORD 'dork'")
|
||||
cur.execute("CREATE DATABASE stork WITH OWNER=bork")
|
||||
cur.execute("ALTER ROLE bork RENAME TO cork")
|
||||
ddl.wait()
|
||||
assert ddl.dbs == {"stork": "cork"}
|
||||
|
||||
with pytest.raises(psycopg2.InternalError):
|
||||
global fail
|
||||
fail = True
|
||||
cur.execute("CREATE DATABASE failure WITH OWNER=cork")
|
||||
ddl.wait()
|
||||
|
||||
conn.close()
|
||||
Reference in New Issue
Block a user