Files
neon/test_runner/regress/test_proxy.py
Vadim Kharitonov 8f49af45a7 Try to fix test
2023-05-24 15:39:39 +02:00

305 lines
10 KiB
Python

import json
import logging
import subprocess
from typing import Any, List, cast
import psycopg2
import pytest
import requests
from aiohttp import web
from fixtures.neon_fixtures import PSQL, NeonProxy, PortDistributor, VanillaPostgres
def test_proxy_select_1(static_proxy: NeonProxy):
"""
A simplest smoke test: check proxy against a local postgres instance.
"""
# no SNI, deprecated `options=project` syntax (before we had several endpoint in project)
out = static_proxy.safe_psql("select 1", sslsni=0, options="project=generic-project-name")
assert out[0][0] == 1
# no SNI, new `options=endpoint` syntax
out = static_proxy.safe_psql("select 1", sslsni=0, options="endpoint=generic-project-name")
assert out[0][0] == 1
# with SNI
out = static_proxy.safe_psql("select 42", host="generic-project-name.localtest.me")
assert out[0][0] == 42
def test_password_hack(static_proxy: NeonProxy):
"""
Check the PasswordHack auth flow: an alternative to SCRAM auth for
clients which can't provide the project/endpoint name via SNI or `options`.
"""
user = "borat"
password = "password"
static_proxy.safe_psql(f"create role {user} with login password '{password}'")
# Note the format of `magic`!
magic = f"project=irrelevant;{password}"
out = static_proxy.safe_psql("select 1", sslsni=0, user=user, password=magic)
assert out[0][0] == 1
magic = f"endpoint=irrelevant;{password}"
out = static_proxy.safe_psql("select 1", sslsni=0, user=user, password=magic)
assert out[0][0] == 1
# Must also check that invalid magic won't be accepted.
with pytest.raises(psycopg2.OperationalError):
magic = "broken"
static_proxy.safe_psql("select 1", sslsni=0, user=user, password=magic)
@pytest.mark.asyncio
async def test_link_auth(vanilla_pg: VanillaPostgres, link_proxy: NeonProxy):
"""
Check the Link auth flow: a lightweight auth method which delegates
all necessary checks to the console by sending client an auth URL.
"""
psql = await PSQL(host=link_proxy.host, port=link_proxy.proxy_port).run("select 42")
base_uri = link_proxy.link_auth_uri
link = await NeonProxy.find_auth_link(base_uri, psql)
psql_session_id = NeonProxy.get_session_id(base_uri, link)
await NeonProxy.activate_link_auth(vanilla_pg, link_proxy, psql_session_id)
assert psql.stdout is not None
out = (await psql.stdout.read()).decode("utf-8").strip()
assert out == "42"
@pytest.mark.parametrize("option_name", ["project", "endpoint"])
def test_proxy_options(static_proxy: NeonProxy, option_name: str):
"""
Check that we pass extra `options` to the PostgreSQL server:
* `project=...` and `endpoint=...` shouldn't be passed at all
* (otherwise postgres will raise an error).
* everything else should be passed as-is.
"""
options = f"{option_name}=irrelevant -cproxytest.option=value"
out = static_proxy.safe_psql("show proxytest.option", options=options, sslsni=0)
assert out[0][0] == "value"
options = f"-c proxytest.foo=\\ str {option_name}=irrelevant"
out = static_proxy.safe_psql("show proxytest.foo", options=options, sslsni=0)
assert out[0][0] == " str"
options = "-cproxytest.option=value"
out = static_proxy.safe_psql("show proxytest.option", options=options)
assert out[0][0] == "value"
options = "-c proxytest.foo=\\ str"
out = static_proxy.safe_psql("show proxytest.foo", options=options)
assert out[0][0] == " str"
def test_auth_errors(static_proxy: NeonProxy):
"""
Check that we throw very specific errors in some unsuccessful auth scenarios.
"""
# User does not exist
with pytest.raises(psycopg2.Error) as exprinfo:
static_proxy.connect(user="pinocchio")
text = str(exprinfo.value).strip()
assert text.find("password authentication failed for user 'pinocchio'") != -1
static_proxy.safe_psql(
"create role pinocchio with login password 'magic'",
)
# User exists, but password is missing
with pytest.raises(psycopg2.Error) as exprinfo:
static_proxy.connect(user="pinocchio", password=None)
text = str(exprinfo.value).strip()
assert text.find("password authentication failed for user 'pinocchio'") != -1
# User exists, but password is wrong
with pytest.raises(psycopg2.Error) as exprinfo:
static_proxy.connect(user="pinocchio", password="bad")
text = str(exprinfo.value).strip()
assert text.find("password authentication failed for user 'pinocchio'") != -1
# Finally, check that the user can connect
with static_proxy.connect(user="pinocchio", password="magic"):
pass
def test_forward_params_to_client(static_proxy: NeonProxy):
"""
Check that we forward all necessary PostgreSQL server params to client.
"""
# A subset of parameters (GUCs) which postgres
# sends to the client during connection setup.
# Unfortunately, `GUC_REPORT` can't be queried.
# Proxy *should* forward them, otherwise client library
# might misbehave (e.g. parse timestamps incorrectly).
reported_params_subset = [
"client_encoding",
"integer_datetimes",
"is_superuser",
"server_encoding",
"server_version",
"session_authorization",
"standard_conforming_strings",
]
query = """
select name, setting
from pg_catalog.pg_settings
where name = any(%s)
"""
with static_proxy.connect() as conn:
with conn.cursor() as cur:
cur.execute(query, (reported_params_subset,))
for name, value in cur.fetchall():
# Check that proxy has forwarded this parameter.
assert conn.get_parameter_status(name) == value
@pytest.mark.timeout(5)
def test_close_on_connections_exit(static_proxy: NeonProxy):
# Open two connections, send SIGTERM, then ensure that proxy doesn't exit
# until after connections close.
with static_proxy.connect(), static_proxy.connect():
static_proxy.terminate()
with pytest.raises(subprocess.TimeoutExpired):
static_proxy.wait_for_exit(timeout=2)
# Ensure we don't accept any more connections
with pytest.raises(psycopg2.OperationalError):
static_proxy.connect()
static_proxy.wait_for_exit()
def test_sql_over_http(static_proxy: NeonProxy):
static_proxy.safe_psql("create role http with login password 'http' superuser")
def q(sql: str, params: List[Any] = []) -> Any:
connstr = f"postgresql://http:http@{static_proxy.domain}:{static_proxy.proxy_port}/postgres"
response = requests.post(
f"https://{static_proxy.domain}:{static_proxy.external_http_port}/sql",
data=json.dumps({"query": sql, "params": params}),
headers={"Content-Type": "application/sql", "Neon-Connection-String": connstr},
verify=str(static_proxy.test_output_dir / "proxy.crt"),
)
assert response.status_code == 200
return response.json()
rows = q("select 42 as answer")["rows"]
assert rows == [{"answer": 42}]
rows = q("select $1 as answer", [42])["rows"]
assert rows == [{"answer": "42"}]
rows = q("select $1 * 1 as answer", [42])["rows"]
assert rows == [{"answer": 42}]
rows = q("select $1::int[] as answer", [[1, 2, 3]])["rows"]
assert rows == [{"answer": [1, 2, 3]}]
rows = q("select $1::json->'a' as answer", [{"a": {"b": 42}}])["rows"]
assert rows == [{"answer": {"b": 42}}]
rows = q("select * from pg_class limit 1")["rows"]
assert len(rows) == 1
res = q("create table t(id serial primary key, val int)")
assert res["command"] == "CREATE"
assert res["rowCount"] is None
res = q("insert into t(val) values (10), (20), (30) returning id")
assert res["command"] == "INSERT"
assert res["rowCount"] == 3
assert res["rows"] == [{"id": 1}, {"id": 2}, {"id": 3}]
res = q("select * from t")
assert res["command"] == "SELECT"
assert res["rowCount"] == 3
res = q("drop table t")
assert res["command"] == "DROP"
assert res["rowCount"] is None
@pytest.mark.asyncio
async def test_compute_cache_invalidation(
port_distributor: PortDistributor, vanilla_pg: VanillaPostgres, console_proxy: NeonProxy
):
console_url = cast(NeonProxy.Console, console_proxy.auth_backend).console_url
logging.info(f"mocked console's url is {console_url}")
console_port = int(console_url.split(":")[-1])
logging.info(f"mocked console's port is {console_port}")
routes = web.RouteTableDef()
@routes.get("/proxy_get_role_secret")
async def get_role_secret(request):
# corresponds to password "password"
secret = ":".join(
[
"SCRAM-SHA-256$4096",
"t33UQcz/cs1D+n9INqThsw==$1NYlCbuxtK7YF2sgECBDTv1Myf8PpHJCT3RgKSXlZL0=",
"9iLeGY91MqBQ4ez1389Smo7h+STsJJ5jvu7kNofxj08=",
]
)
return web.json_response({"role_secret": secret})
wake_compute_called = 0
postgres_port = vanilla_pg.default_options["port"]
@routes.get("/proxy_wake_compute")
async def wake_compute(request):
nonlocal wake_compute_called
wake_compute_called += 1
nonlocal postgres_port
logging.info(f"compute's port is {postgres_port}")
return web.json_response(
{
"address": f"127.0.0.1:{postgres_port}",
"aux": {
"endpoint_id": "",
"project_id": "",
"branch_id": "",
},
}
)
console = web.Application()
console.add_routes(routes)
runner = web.AppRunner(console)
await runner.setup()
await web.TCPSite(runner, "127.0.0.1", console_port).start()
# Create a user we're going to use in the test sequence
user, password = "borat", "password"
vanilla_pg.start().safe_psql(f"create role {user} with login password '{password}'")
async def try_connect():
await console_proxy.connect_async(user=user, password=password, dbname="postgres")
assert wake_compute_called == 0
# Try connecting to compute
await try_connect()
assert wake_compute_called == 1
# Change compute's port
postgres_port = port_distributor.get_port()
vanilla_pg.stop().configure([f"port = {postgres_port}"]).start()
# Try connecting to compute
await try_connect()
assert wake_compute_called == 2