mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-08 14:02:55 +00:00
## Problem
Test `test_metric_collection` become flaky:
```
AssertionError: assert not ['2023-05-25T14:03:41.644042Z ERROR metrics_collection: failed to send metrics: reqwest::Error { kind: Request, url: Url { scheme: "http", cannot_be_a_base: false, username: "", password: None, host: Some(Domain("localhost")), port: Some(18022), path: "/billing/api/v1/usage_events", query: None, fragment: None }, source: hyper::Error(Connect, ConnectError("tcp connect error", Os { code: 99, kind: AddrNotAvailable, message: "Cannot assign requested address" })) }',
...]
```
I suspect it is caused by having 2 places when we define
`httpserver_listen_address` fixture (which is internally used by
`pytest-httpserver` plugin)
## Summary of changes
- Remove the definition of `httpserver_listen_address` from
`test_runner/regress/test_ddl_forwarding.py` and keep one in
`test_runner/fixtures/neon_fixtures.py`
- Also remote unused `httpserver_listen_address` parameter from
`test_proxy_metric_collection`
211 lines
7.0 KiB
Python
211 lines
7.0 KiB
Python
from types import TracebackType
|
|
from typing import Any, Dict, List, Optional, Tuple, Type
|
|
|
|
import psycopg2
|
|
import pytest
|
|
from fixtures.log_helper import log
|
|
from fixtures.neon_fixtures import VanillaPostgres
|
|
from pytest_httpserver import HTTPServer
|
|
from werkzeug.wrappers.request import Request
|
|
from werkzeug.wrappers.response import Response
|
|
|
|
|
|
def handle_db(dbs, roles, operation):
|
|
if operation["op"] == "set":
|
|
if "old_name" in operation and operation["old_name"] in dbs:
|
|
dbs[operation["name"]] = dbs[operation["old_name"]]
|
|
dbs.pop(operation["old_name"])
|
|
if "owner" in operation:
|
|
dbs[operation["name"]] = operation["owner"]
|
|
elif operation["op"] == "del":
|
|
dbs.pop(operation["name"])
|
|
else:
|
|
raise ValueError("Invalid op")
|
|
|
|
|
|
def handle_role(dbs, roles, operation):
|
|
if operation["op"] == "set":
|
|
if "old_name" in operation and operation["old_name"] in roles:
|
|
roles[operation["name"]] = roles[operation["old_name"]]
|
|
roles.pop(operation["old_name"])
|
|
for db, owner in dbs.items():
|
|
if owner == operation["old_name"]:
|
|
dbs[db] = operation["name"]
|
|
if "password" in operation:
|
|
roles[operation["name"]] = operation["password"]
|
|
elif operation["op"] == "del":
|
|
if "old_name" in operation:
|
|
roles.pop(operation["old_name"])
|
|
roles.pop(operation["name"])
|
|
else:
|
|
raise ValueError("Invalid op")
|
|
|
|
|
|
fail = False
|
|
|
|
|
|
def ddl_forward_handler(request: Request, dbs: Dict[str, str], roles: Dict[str, str]) -> Response:
|
|
log.info(f"Received request with data {request.get_data(as_text=True)}")
|
|
if fail:
|
|
log.info("FAILING")
|
|
return Response(status=500, response="Failed just cuz")
|
|
if request.json is None:
|
|
log.info("Received invalid JSON")
|
|
return Response(status=400)
|
|
json = request.json
|
|
# Handle roles first
|
|
if "roles" in json:
|
|
for operation in json["roles"]:
|
|
handle_role(dbs, roles, operation)
|
|
if "dbs" in json:
|
|
for operation in json["dbs"]:
|
|
handle_db(dbs, roles, operation)
|
|
return Response(status=200)
|
|
|
|
|
|
class DdlForwardingContext:
|
|
def __init__(self, httpserver: HTTPServer, vanilla_pg: VanillaPostgres, host: str, port: int):
|
|
self.server = httpserver
|
|
self.pg = vanilla_pg
|
|
self.host = host
|
|
self.port = port
|
|
self.dbs: Dict[str, str] = {}
|
|
self.roles: Dict[str, str] = {}
|
|
endpoint = "/management/api/v2/roles_and_databases"
|
|
ddl_url = f"http://{host}:{port}{endpoint}"
|
|
self.pg.configure(
|
|
[
|
|
f"neon.console_url={ddl_url}",
|
|
"shared_preload_libraries = 'neon'",
|
|
]
|
|
)
|
|
log.info(f"Listening on {ddl_url}")
|
|
self.server.expect_request(endpoint, method="PATCH").respond_with_handler(
|
|
lambda request: ddl_forward_handler(request, self.dbs, self.roles)
|
|
)
|
|
|
|
def __enter__(self):
|
|
self.pg.start()
|
|
return self
|
|
|
|
def __exit__(
|
|
self,
|
|
exc_type: Optional[Type[BaseException]],
|
|
exc: Optional[BaseException],
|
|
tb: Optional[TracebackType],
|
|
):
|
|
self.pg.stop()
|
|
|
|
def send(self, query: str) -> List[Tuple[Any, ...]]:
|
|
return self.pg.safe_psql(query)
|
|
|
|
def wait(self, timeout=3):
|
|
self.server.wait(timeout=timeout)
|
|
|
|
def send_and_wait(self, query: str, timeout=3) -> List[Tuple[Any, ...]]:
|
|
res = self.send(query)
|
|
self.wait(timeout=timeout)
|
|
return res
|
|
|
|
|
|
@pytest.fixture(scope="function")
|
|
def ddl(
|
|
httpserver: HTTPServer, vanilla_pg: VanillaPostgres, httpserver_listen_address: tuple[str, int]
|
|
):
|
|
(host, port) = httpserver_listen_address
|
|
with DdlForwardingContext(httpserver, vanilla_pg, host, port) as ddl:
|
|
yield ddl
|
|
|
|
|
|
def test_ddl_forwarding(ddl: DdlForwardingContext):
|
|
curr_user = ddl.send("SELECT current_user")[0][0]
|
|
log.info(f"Current user is {curr_user}")
|
|
ddl.send_and_wait("CREATE DATABASE bork")
|
|
assert ddl.dbs == {"bork": curr_user}
|
|
ddl.send_and_wait("CREATE ROLE volk WITH PASSWORD 'nu_zayats'")
|
|
ddl.send_and_wait("ALTER DATABASE bork RENAME TO nu_pogodi")
|
|
assert ddl.dbs == {"nu_pogodi": curr_user}
|
|
ddl.send_and_wait("ALTER DATABASE nu_pogodi OWNER TO volk")
|
|
assert ddl.dbs == {"nu_pogodi": "volk"}
|
|
ddl.send_and_wait("DROP DATABASE nu_pogodi")
|
|
assert ddl.dbs == {}
|
|
ddl.send_and_wait("DROP ROLE volk")
|
|
assert ddl.roles == {}
|
|
|
|
ddl.send_and_wait("CREATE ROLE tarzan WITH PASSWORD 'of_the_apes'")
|
|
assert ddl.roles == {"tarzan": "of_the_apes"}
|
|
ddl.send_and_wait("DROP ROLE tarzan")
|
|
assert ddl.roles == {}
|
|
ddl.send_and_wait("CREATE ROLE tarzan WITH PASSWORD 'of_the_apes'")
|
|
assert ddl.roles == {"tarzan": "of_the_apes"}
|
|
ddl.send_and_wait("ALTER ROLE tarzan WITH PASSWORD 'jungle_man'")
|
|
assert ddl.roles == {"tarzan": "jungle_man"}
|
|
ddl.send_and_wait("ALTER ROLE tarzan RENAME TO mowgli")
|
|
assert ddl.roles == {"mowgli": "jungle_man"}
|
|
ddl.send_and_wait("DROP ROLE mowgli")
|
|
assert ddl.roles == {}
|
|
|
|
conn = ddl.pg.connect()
|
|
cur = conn.cursor()
|
|
|
|
cur.execute("BEGIN")
|
|
cur.execute("CREATE ROLE bork WITH PASSWORD 'cork'")
|
|
cur.execute("COMMIT")
|
|
ddl.wait()
|
|
assert ddl.roles == {"bork": "cork"}
|
|
cur.execute("BEGIN")
|
|
cur.execute("CREATE ROLE stork WITH PASSWORD 'pork'")
|
|
cur.execute("ABORT")
|
|
ddl.wait()
|
|
assert ("stork", "pork") not in ddl.roles.items()
|
|
cur.execute("BEGIN")
|
|
cur.execute("ALTER ROLE bork WITH PASSWORD 'pork'")
|
|
cur.execute("ALTER ROLE bork RENAME TO stork")
|
|
cur.execute("COMMIT")
|
|
ddl.wait()
|
|
assert ddl.roles == {"stork": "pork"}
|
|
cur.execute("BEGIN")
|
|
cur.execute("CREATE ROLE dork WITH PASSWORD 'york'")
|
|
cur.execute("SAVEPOINT point")
|
|
cur.execute("ALTER ROLE dork WITH PASSWORD 'zork'")
|
|
cur.execute("ALTER ROLE dork RENAME TO fork")
|
|
cur.execute("ROLLBACK TO SAVEPOINT point")
|
|
cur.execute("ALTER ROLE dork WITH PASSWORD 'fork'")
|
|
cur.execute("ALTER ROLE dork RENAME TO zork")
|
|
cur.execute("RELEASE SAVEPOINT point")
|
|
cur.execute("COMMIT")
|
|
ddl.wait()
|
|
assert ddl.roles == {"stork": "pork", "zork": "fork"}
|
|
|
|
cur.execute("DROP ROLE stork")
|
|
cur.execute("DROP ROLE zork")
|
|
ddl.wait()
|
|
assert ddl.roles == {}
|
|
|
|
cur.execute("CREATE ROLE bork WITH PASSWORD 'dork'")
|
|
cur.execute("CREATE ROLE stork WITH PASSWORD 'cork'")
|
|
cur.execute("BEGIN")
|
|
cur.execute("DROP ROLE bork")
|
|
cur.execute("ALTER ROLE stork RENAME TO bork")
|
|
cur.execute("COMMIT")
|
|
ddl.wait()
|
|
assert ddl.roles == {"bork": "cork"}
|
|
|
|
cur.execute("DROP ROLE bork")
|
|
ddl.wait()
|
|
assert ddl.roles == {}
|
|
|
|
cur.execute("CREATE ROLE bork WITH PASSWORD 'dork'")
|
|
cur.execute("CREATE DATABASE stork WITH OWNER=bork")
|
|
cur.execute("ALTER ROLE bork RENAME TO cork")
|
|
ddl.wait()
|
|
assert ddl.dbs == {"stork": "cork"}
|
|
|
|
with pytest.raises(psycopg2.InternalError):
|
|
global fail
|
|
fail = True
|
|
cur.execute("CREATE DATABASE failure WITH OWNER=cork")
|
|
ddl.wait()
|
|
|
|
conn.close()
|