mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-26 09:30:37 +00:00
Proxy control plane rate limiter (#5785)
## Problem Proxy might overload the control plane. ## Summary of changes Implement rate limiter for proxy<->control plane connection. Resolves https://github.com/neondatabase/neon/issues/5707 Used implementation ideas from https://github.com/conradludgate/squeeze/
This commit is contained in:
@@ -2179,6 +2179,29 @@ class NeonProxy(PgProtocol):
|
||||
*["--allow-self-signed-compute", "true"],
|
||||
]
|
||||
|
||||
class Console(AuthBackend):
|
||||
def __init__(self, endpoint: str, fixed_rate_limit: Optional[int] = None):
|
||||
self.endpoint = endpoint
|
||||
self.fixed_rate_limit = fixed_rate_limit
|
||||
|
||||
def extra_args(self) -> list[str]:
|
||||
args = [
|
||||
# Console auth backend params
|
||||
*["--auth-backend", "console"],
|
||||
*["--auth-endpoint", self.endpoint],
|
||||
]
|
||||
if self.fixed_rate_limit is not None:
|
||||
args += [
|
||||
*["--disable-dynamic-rate-limiter", "false"],
|
||||
*["--rate-limit-algorithm", "aimd"],
|
||||
*["--initial-limit", str(1)],
|
||||
*["--rate-limiter-timeout", "1s"],
|
||||
*["--aimd-min-limit", "0"],
|
||||
*["--aimd-increase-by", "1"],
|
||||
*["--wake-compute-cache", "size=0"], # Disable cache to test rate limiter.
|
||||
]
|
||||
return args
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class Postgres(AuthBackend):
|
||||
pg_conn_url: str
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
import asyncio
|
||||
import json
|
||||
import subprocess
|
||||
import time
|
||||
@@ -11,6 +12,29 @@ from fixtures.neon_fixtures import PSQL, NeonProxy, VanillaPostgres
|
||||
GET_CONNECTION_PID_QUERY = "SELECT pid FROM pg_stat_activity WHERE state = 'active'"
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_http_pool_begin_1(static_proxy: NeonProxy):
|
||||
static_proxy.safe_psql("create user http_auth with password 'http' superuser")
|
||||
|
||||
def query(*args) -> Any:
|
||||
static_proxy.http_query(
|
||||
"SELECT pg_sleep(10);",
|
||||
args,
|
||||
user="http_auth",
|
||||
password="http",
|
||||
expected_code=200,
|
||||
)
|
||||
|
||||
query()
|
||||
loop = asyncio.get_running_loop()
|
||||
tasks = [loop.run_in_executor(None, query) for _ in range(10)]
|
||||
# Wait for all the tasks to complete
|
||||
completed, pending = await asyncio.wait(tasks)
|
||||
# Get the results
|
||||
results = [task.result() for task in completed]
|
||||
print(results)
|
||||
|
||||
|
||||
def test_proxy_select_1(static_proxy: NeonProxy):
|
||||
"""
|
||||
A simplest smoke test: check proxy against a local postgres instance.
|
||||
|
||||
84
test_runner/regress/test_proxy_rate_limiter.py
Normal file
84
test_runner/regress/test_proxy_rate_limiter.py
Normal file
@@ -0,0 +1,84 @@
|
||||
import asyncio
|
||||
import time
|
||||
from pathlib import Path
|
||||
from typing import Iterator
|
||||
|
||||
import pytest
|
||||
from fixtures.neon_fixtures import (
|
||||
PSQL,
|
||||
NeonProxy,
|
||||
)
|
||||
from fixtures.port_distributor import PortDistributor
|
||||
from pytest_httpserver import HTTPServer
|
||||
from werkzeug.wrappers.response import Response
|
||||
|
||||
|
||||
def waiting_handler(status_code: int) -> Response:
|
||||
# wait more than timeout to make sure that both (two) connections are open.
|
||||
# It would be better to use a barrier here, but I don't know how to do that together with pytest-httpserver.
|
||||
time.sleep(2)
|
||||
return Response(status=status_code)
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def proxy_with_rate_limit(
|
||||
port_distributor: PortDistributor,
|
||||
neon_binpath: Path,
|
||||
httpserver_listen_address,
|
||||
test_output_dir: Path,
|
||||
) -> Iterator[NeonProxy]:
|
||||
"""Neon proxy that routes directly to vanilla postgres."""
|
||||
|
||||
proxy_port = port_distributor.get_port()
|
||||
mgmt_port = port_distributor.get_port()
|
||||
http_port = port_distributor.get_port()
|
||||
external_http_port = port_distributor.get_port()
|
||||
(host, port) = httpserver_listen_address
|
||||
endpoint = f"http://{host}:{port}/billing/api/v1/usage_events"
|
||||
|
||||
with NeonProxy(
|
||||
neon_binpath=neon_binpath,
|
||||
test_output_dir=test_output_dir,
|
||||
proxy_port=proxy_port,
|
||||
http_port=http_port,
|
||||
mgmt_port=mgmt_port,
|
||||
external_http_port=external_http_port,
|
||||
auth_backend=NeonProxy.Console(endpoint, fixed_rate_limit=5),
|
||||
) as proxy:
|
||||
proxy.start()
|
||||
yield proxy
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_proxy_rate_limit(
|
||||
httpserver: HTTPServer,
|
||||
proxy_with_rate_limit: NeonProxy,
|
||||
):
|
||||
uri = "/billing/api/v1/usage_events/proxy_get_role_secret"
|
||||
# mock control plane service
|
||||
httpserver.expect_ordered_request(uri, method="GET").respond_with_handler(
|
||||
lambda _: Response(status=200)
|
||||
)
|
||||
httpserver.expect_ordered_request(uri, method="GET").respond_with_handler(
|
||||
lambda _: waiting_handler(429)
|
||||
)
|
||||
httpserver.expect_ordered_request(uri, method="GET").respond_with_handler(
|
||||
lambda _: waiting_handler(500)
|
||||
)
|
||||
|
||||
psql = PSQL(host=proxy_with_rate_limit.host, port=proxy_with_rate_limit.proxy_port)
|
||||
f = await psql.run("select 42;")
|
||||
await proxy_with_rate_limit.find_auth_link(uri, f)
|
||||
# Limit should be 2.
|
||||
|
||||
# Run two queries in parallel.
|
||||
f1, f2 = await asyncio.gather(psql.run("select 42;"), psql.run("select 42;"))
|
||||
await proxy_with_rate_limit.find_auth_link(uri, f1)
|
||||
await proxy_with_rate_limit.find_auth_link(uri, f2)
|
||||
|
||||
# Now limit should be 0.
|
||||
f = await psql.run("select 42;")
|
||||
await proxy_with_rate_limit.find_auth_link(uri, f)
|
||||
|
||||
# There last query shouldn't reach the http-server.
|
||||
assert httpserver.assertions == []
|
||||
Reference in New Issue
Block a user