Test compute node cache invalidation

This commit is contained in:
Dmitry Ivanov
2023-05-19 16:56:22 +03:00
committed by Vadim Kharitonov
parent 9b99d4caa9
commit 2b60ad0285
2 changed files with 124 additions and 4 deletions

View File

@@ -1845,13 +1845,15 @@ class VanillaPostgres(PgProtocol):
]
)
def configure(self, options: List[str]):
def configure(self, options: List[str]) -> "VanillaPostgres":
"""Append lines into postgresql.conf file."""
assert not self.running
with open(os.path.join(self.pgdatadir, "postgresql.conf"), "a") as conf_file:
conf_file.write("\n".join(options))
def start(self, log_path: Optional[str] = None):
return self
def start(self, log_path: Optional[str] = None) -> "VanillaPostgres":
assert not self.running
self.running = True
@@ -1862,11 +1864,15 @@ class VanillaPostgres(PgProtocol):
["pg_ctl", "-w", "-D", str(self.pgdatadir), "-l", log_path, "start"]
)
def stop(self):
return self
def stop(self) -> "VanillaPostgres":
assert self.running
self.running = False
self.pg_bin.run_capture(["pg_ctl", "-w", "-D", str(self.pgdatadir), "stop"])
return self
def get_subdir_size(self, subdir) -> int:
"""Return size of pgdatadir subdirectory in bytes."""
return get_dir_size(os.path.join(self.pgdatadir, subdir))
@@ -2035,6 +2041,17 @@ class NeonProxy(PgProtocol):
*["--auth-endpoint", self.pg_conn_url],
]
@dataclass(frozen=True)
class Console(AuthBackend):
console_url: str
def extra_args(self) -> list[str]:
return [
# Postgres auth backend params
*["--auth-backend", "console"],
*["--auth-endpoint", self.console_url],
]
def __init__(
self,
neon_binpath: Path,
@@ -2240,6 +2257,33 @@ def link_proxy(
yield proxy
@pytest.fixture(scope="function")
def console_proxy(
port_distributor: PortDistributor, neon_binpath: Path, test_output_dir: Path
) -> Iterator[NeonProxy]:
"""Neon proxy that routes through link auth."""
http_port = port_distributor.get_port()
proxy_port = port_distributor.get_port()
mgmt_port = port_distributor.get_port()
console_port = port_distributor.get_port()
external_http_port = port_distributor.get_port()
console_url = f"http://127.0.0.1:{console_port}"
with NeonProxy(
neon_binpath=neon_binpath,
test_output_dir=test_output_dir,
proxy_port=proxy_port,
http_port=http_port,
mgmt_port=mgmt_port,
external_http_port=external_http_port,
auth_backend=NeonProxy.Console(console_url),
) as proxy:
proxy.start()
yield proxy
@pytest.fixture(scope="function")
def static_proxy(
vanilla_pg: VanillaPostgres,

View File

@@ -2,10 +2,12 @@ import json
import subprocess
from typing import Any, List
import logging
import psycopg2
import pytest
import requests
from fixtures.neon_fixtures import PSQL, NeonProxy, VanillaPostgres
from aiohttp import web
from fixtures.neon_fixtures import PSQL, NeonProxy, VanillaPostgres, PortDistributor
def test_proxy_select_1(static_proxy: NeonProxy):
@@ -225,3 +227,77 @@ def test_sql_over_http(static_proxy: NeonProxy):
res = q("drop table t")
assert res["command"] == "DROP"
assert res["rowCount"] is None
@pytest.mark.asyncio
async def test_compute_cache_invalidation(
port_distributor: PortDistributor, vanilla_pg: VanillaPostgres, console_proxy: NeonProxy
):
console_url = console_proxy.auth_backend.console_url
logging.info(f"mocked console's url is {console_url}")
console_port = int(console_url.split(":")[-1])
logging.info(f"mocked console's port is {console_port}")
routes = web.RouteTableDef()
@routes.get("/proxy_get_role_secret")
async def get_role_secret(request):
# corresponds to password "password"
secret = ":".join(
[
"SCRAM-SHA-256$4096",
"t33UQcz/cs1D+n9INqThsw==$1NYlCbuxtK7YF2sgECBDTv1Myf8PpHJCT3RgKSXlZL0=",
"9iLeGY91MqBQ4ez1389Smo7h+STsJJ5jvu7kNofxj08=",
]
)
return web.json_response({"role_secret": secret})
@routes.get("/proxy_wake_compute")
async def wake_compute(request):
nonlocal wake_compute_called
wake_compute_called += 1
nonlocal postgres_port
logging.info(f"compute's port is {postgres_port}")
return web.json_response(
{
"address": f"127.0.0.1:{postgres_port}",
"aux": {
"endpoint_id": "",
"project_id": "",
"branch_id": "",
},
}
)
console = web.Application()
console.add_routes(routes)
runner = web.AppRunner(console)
await runner.setup()
await web.TCPSite(runner, "127.0.0.1", console_port).start()
# Create a user we're going to use in the test sequence
(user, password) = ("borat", "password")
vanilla_pg.start().safe_psql(f"create role {user} with login password '{password}'")
async def try_connect():
magic = f"endpoint=irrelevant;{password}"
await console_proxy.connect_async(user=user, password=magic, dbname="postgres")
wake_compute_called = 0
postgres_port = vanilla_pg.default_options["port"]
# Try connecting to compute
await try_connect()
assert wake_compute_called == 1
# Change compute's port
postgres_port = port_distributor.get_port()
vanilla_pg.stop().configure([f"port = {postgres_port}"]).start()
# Try connecting to compute
await try_connect()
assert wake_compute_called == 2