This commit is contained in:
Conrad Ludgate
2024-09-29 20:29:26 +01:00
parent 4d47049b00
commit 0724df1d3f
2 changed files with 140 additions and 10 deletions

View File

@@ -56,6 +56,7 @@ from _pytest.fixtures import FixtureRequest
from psycopg2.extensions import connection as PgConnection
from psycopg2.extensions import cursor as PgCursor
from psycopg2.extensions import make_dsn, parse_dsn
from pytest_httpserver import HTTPServer
from urllib3.util.retry import Retry
from fixtures import overlayfs
@@ -440,9 +441,9 @@ class NeonEnvBuilder:
self.pageserver_virtual_file_io_engine: Optional[str] = pageserver_virtual_file_io_engine
self.pageserver_default_tenant_config_compaction_algorithm: Optional[
Dict[str, Any]
] = pageserver_default_tenant_config_compaction_algorithm
self.pageserver_default_tenant_config_compaction_algorithm: Optional[Dict[str, Any]] = (
pageserver_default_tenant_config_compaction_algorithm
)
if self.pageserver_default_tenant_config_compaction_algorithm is not None:
log.debug(
f"Overriding pageserver default compaction algorithm to {self.pageserver_default_tenant_config_compaction_algorithm}"
@@ -1072,9 +1073,9 @@ class NeonEnv:
ps_cfg["virtual_file_io_engine"] = self.pageserver_virtual_file_io_engine
if config.pageserver_default_tenant_config_compaction_algorithm is not None:
tenant_config = ps_cfg.setdefault("tenant_config", {})
tenant_config[
"compaction_algorithm"
] = config.pageserver_default_tenant_config_compaction_algorithm
tenant_config["compaction_algorithm"] = (
config.pageserver_default_tenant_config_compaction_algorithm
)
if self.pageserver_remote_storage is not None:
ps_cfg["remote_storage"] = remote_storage_to_toml_dict(
@@ -1117,9 +1118,9 @@ class NeonEnv:
if config.auth_enabled:
sk_cfg["auth_enabled"] = True
if self.safekeepers_remote_storage is not None:
sk_cfg[
"remote_storage"
] = self.safekeepers_remote_storage.to_toml_inline_table().strip()
sk_cfg["remote_storage"] = (
self.safekeepers_remote_storage.to_toml_inline_table().strip()
)
self.safekeepers.append(
Safekeeper(env=self, id=id, port=port, extra_opts=config.safekeeper_extra_opts)
)
@@ -3572,6 +3573,20 @@ class NeonProxy(PgProtocol):
]
return args
class AuthBroker(AuthBackend):
def __init__(self, endpoint: str):
self.endpoint = endpoint
def extra_args(self) -> list[str]:
args = [
# Console auth backend params
*["--auth-backend", "console"],
*["--auth-endpoint", self.endpoint],
*["--sql-over-http-pool-opt-in", "false"],
*["--is-auth-broker"],
]
return args
@dataclass(frozen=True)
class Postgres(AuthBackend):
pg_conn_url: str
@@ -3600,7 +3615,7 @@ class NeonProxy(PgProtocol):
metric_collection_interval: Optional[str] = None,
):
host = "127.0.0.1"
domain = "proxy.localtest.me" # resolves to 127.0.0.1
domain = "ep-foo-bar-1234.localtest.me" # resolves to 127.0.0.1
super().__init__(dsn=auth_backend.default_conn_url, host=domain, port=proxy_port)
self.domain = domain
@@ -3886,6 +3901,50 @@ def static_proxy(
yield proxy
@pytest.fixture(scope="function")
def static_auth_broker(
vanilla_pg: VanillaPostgres,
port_distributor: PortDistributor,
neon_binpath: Path,
test_output_dir: Path,
httpserver: HTTPServer,
) -> Iterator[NeonProxy]:
"""Neon proxy that routes directly to vanilla postgres."""
auth_endpoint = httpserver.url_for("/cplane")
port = vanilla_pg.default_options["port"]
host = vanilla_pg.default_options["host"]
httpserver.expect_request("/cplane/proxy_wake_compute").respond_with_json(
{
"address": f"{host}:{port}",
"aux": {
"endpoint_id": "ep-foo-bar-1234",
"branch_id": "br-foo-bar",
"project_id": "foo-bar",
},
}
)
proxy_port = port_distributor.get_port()
mgmt_port = port_distributor.get_port()
http_port = port_distributor.get_port()
external_http_port = port_distributor.get_port()
with NeonProxy(
neon_binpath=neon_binpath,
test_output_dir=test_output_dir,
proxy_port=proxy_port,
http_port=http_port,
mgmt_port=mgmt_port,
external_http_port=external_http_port,
auth_backend=NeonProxy.AuthBroker(auth_endpoint),
) as proxy:
proxy.start()
yield proxy
class Endpoint(PgProtocol, LogUtils):
"""An object representing a Postgres compute endpoint managed by the control plane."""

View File

@@ -0,0 +1,71 @@
import asyncio
import json
import subprocess
import time
import urllib.parse
from typing import Any, List, Optional, Tuple
import psycopg2
import pytest
import requests
from fixtures.neon_fixtures import PSQL, NeonProxy, VanillaPostgres
GET_CONNECTION_PID_QUERY = "SELECT pid FROM pg_stat_activity WHERE state = 'active'"
def test_sql_over_http(static_auth_broker: NeonProxy):
static_auth_broker.safe_psql("create role http with login password 'http' superuser")
def q(sql: str, params: Optional[List[Any]] = None) -> Any:
params = params or []
connstr = f"postgresql://http:http@{static_proxy.domain}:{static_proxy.proxy_port}/postgres"
response = requests.post(
f"https://{static_proxy.domain}:{static_proxy.external_http_port}/sql",
data=json.dumps({"query": sql, "params": params}),
headers={"Content-Type": "application/sql", "Neon-Connection-String": connstr},
verify=str(static_proxy.test_output_dir / "proxy.crt"),
)
assert response.status_code == 200, response.text
return response.json()
rows = q("select 42 as answer")["rows"]
assert rows == [{"answer": 42}]
rows = q("select $1 as answer", [42])["rows"]
assert rows == [{"answer": "42"}]
rows = q("select $1 * 1 as answer", [42])["rows"]
assert rows == [{"answer": 42}]
rows = q("select $1::int[] as answer", [[1, 2, 3]])["rows"]
assert rows == [{"answer": [1, 2, 3]}]
rows = q("select $1::json->'a' as answer", [{"a": {"b": 42}}])["rows"]
assert rows == [{"answer": {"b": 42}}]
rows = q("select $1::jsonb[] as answer", [[{}]])["rows"]
assert rows == [{"answer": [{}]}]
rows = q("select $1::jsonb[] as answer", [[{"foo": 1}, {"bar": 2}]])["rows"]
assert rows == [{"answer": [{"foo": 1}, {"bar": 2}]}]
rows = q("select * from pg_class limit 1")["rows"]
assert len(rows) == 1
res = q("create table t(id serial primary key, val int)")
assert res["command"] == "CREATE"
assert res["rowCount"] is None
res = q("insert into t(val) values (10), (20), (30) returning id")
assert res["command"] == "INSERT"
assert res["rowCount"] == 3
assert res["rows"] == [{"id": 1}, {"id": 2}, {"id": 3}]
res = q("select * from t")
assert res["command"] == "SELECT"
assert res["rowCount"] == 3
res = q("drop table t")
assert res["command"] == "DROP"
assert res["rowCount"] is None