From 2b60ad02858b10e3103c78ee2418b3f97540a4bf Mon Sep 17 00:00:00 2001 From: Dmitry Ivanov Date: Fri, 19 May 2023 16:56:22 +0300 Subject: [PATCH] Test compute node cache invalidation --- test_runner/fixtures/neon_fixtures.py | 50 +++++++++++++++-- test_runner/regress/test_proxy.py | 78 ++++++++++++++++++++++++++- 2 files changed, 124 insertions(+), 4 deletions(-) diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index bde91e6783..7ae15dd777 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -1845,13 +1845,15 @@ class VanillaPostgres(PgProtocol): ] ) - def configure(self, options: List[str]): + def configure(self, options: List[str]) -> "VanillaPostgres": """Append lines into postgresql.conf file.""" assert not self.running with open(os.path.join(self.pgdatadir, "postgresql.conf"), "a") as conf_file: conf_file.write("\n".join(options)) - def start(self, log_path: Optional[str] = None): + return self + + def start(self, log_path: Optional[str] = None) -> "VanillaPostgres": assert not self.running self.running = True @@ -1862,11 +1864,15 @@ class VanillaPostgres(PgProtocol): ["pg_ctl", "-w", "-D", str(self.pgdatadir), "-l", log_path, "start"] ) - def stop(self): + return self + + def stop(self) -> "VanillaPostgres": assert self.running self.running = False self.pg_bin.run_capture(["pg_ctl", "-w", "-D", str(self.pgdatadir), "stop"]) + return self + def get_subdir_size(self, subdir) -> int: """Return size of pgdatadir subdirectory in bytes.""" return get_dir_size(os.path.join(self.pgdatadir, subdir)) @@ -2035,6 +2041,17 @@ class NeonProxy(PgProtocol): *["--auth-endpoint", self.pg_conn_url], ] + @dataclass(frozen=True) + class Console(AuthBackend): + console_url: str + + def extra_args(self) -> list[str]: + return [ + # Postgres auth backend params + *["--auth-backend", "console"], + *["--auth-endpoint", self.console_url], + ] + def __init__( self, neon_binpath: Path, @@ -2240,6 +2257,33 @@ def link_proxy( yield proxy +@pytest.fixture(scope="function") +def console_proxy( + port_distributor: PortDistributor, neon_binpath: Path, test_output_dir: Path +) -> Iterator[NeonProxy]: + """Neon proxy that routes through link auth.""" + + http_port = port_distributor.get_port() + proxy_port = port_distributor.get_port() + mgmt_port = port_distributor.get_port() + console_port = port_distributor.get_port() + external_http_port = port_distributor.get_port() + + console_url = f"http://127.0.0.1:{console_port}" + + with NeonProxy( + neon_binpath=neon_binpath, + test_output_dir=test_output_dir, + proxy_port=proxy_port, + http_port=http_port, + mgmt_port=mgmt_port, + external_http_port=external_http_port, + auth_backend=NeonProxy.Console(console_url), + ) as proxy: + proxy.start() + yield proxy + + @pytest.fixture(scope="function") def static_proxy( vanilla_pg: VanillaPostgres, diff --git a/test_runner/regress/test_proxy.py b/test_runner/regress/test_proxy.py index 6be3995714..dd1bfa0aa5 100644 --- a/test_runner/regress/test_proxy.py +++ b/test_runner/regress/test_proxy.py @@ -2,10 +2,12 @@ import json import subprocess from typing import Any, List +import logging import psycopg2 import pytest import requests -from fixtures.neon_fixtures import PSQL, NeonProxy, VanillaPostgres +from aiohttp import web +from fixtures.neon_fixtures import PSQL, NeonProxy, VanillaPostgres, PortDistributor def test_proxy_select_1(static_proxy: NeonProxy): @@ -225,3 +227,77 @@ def test_sql_over_http(static_proxy: NeonProxy): res = q("drop table t") assert res["command"] == "DROP" assert res["rowCount"] is None + + +@pytest.mark.asyncio +async def test_compute_cache_invalidation( + port_distributor: PortDistributor, vanilla_pg: VanillaPostgres, console_proxy: NeonProxy +): + console_url = console_proxy.auth_backend.console_url + logging.info(f"mocked console's url is {console_url}") + console_port = int(console_url.split(":")[-1]) + logging.info(f"mocked console's port is {console_port}") + + routes = web.RouteTableDef() + + @routes.get("/proxy_get_role_secret") + async def get_role_secret(request): + # corresponds to password "password" + secret = ":".join( + [ + "SCRAM-SHA-256$4096", + "t33UQcz/cs1D+n9INqThsw==$1NYlCbuxtK7YF2sgECBDTv1Myf8PpHJCT3RgKSXlZL0=", + "9iLeGY91MqBQ4ez1389Smo7h+STsJJ5jvu7kNofxj08=", + ] + ) + + return web.json_response({"role_secret": secret}) + + @routes.get("/proxy_wake_compute") + async def wake_compute(request): + nonlocal wake_compute_called + wake_compute_called += 1 + + nonlocal postgres_port + logging.info(f"compute's port is {postgres_port}") + + return web.json_response( + { + "address": f"127.0.0.1:{postgres_port}", + "aux": { + "endpoint_id": "", + "project_id": "", + "branch_id": "", + }, + } + ) + + console = web.Application() + console.add_routes(routes) + + runner = web.AppRunner(console) + await runner.setup() + await web.TCPSite(runner, "127.0.0.1", console_port).start() + + # Create a user we're going to use in the test sequence + (user, password) = ("borat", "password") + vanilla_pg.start().safe_psql(f"create role {user} with login password '{password}'") + + async def try_connect(): + magic = f"endpoint=irrelevant;{password}" + await console_proxy.connect_async(user=user, password=magic, dbname="postgres") + + wake_compute_called = 0 + postgres_port = vanilla_pg.default_options["port"] + + # Try connecting to compute + await try_connect() + assert wake_compute_called == 1 + + # Change compute's port + postgres_port = port_distributor.get_port() + vanilla_pg.stop().configure([f"port = {postgres_port}"]).start() + + # Try connecting to compute + await try_connect() + assert wake_compute_called == 2