mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-20 06:30:43 +00:00
115 lines
3.5 KiB
Python
115 lines
3.5 KiB
Python
import json
|
|
|
|
import pytest
|
|
from fixtures.neon_fixtures import NeonAuthBroker
|
|
from jwcrypto import jwk, jwt
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
@pytest.mark.parametrize("role_names", [["anonymous", "authenticated"]])
|
|
@pytest.mark.parametrize("audience", [None])
|
|
async def test_auth_broker_happy(
|
|
static_auth_broker: NeonAuthBroker,
|
|
cplane_wake_compute_local_proxy: None,
|
|
cplane_endpoint_jwks: jwk.JWK,
|
|
):
|
|
"""
|
|
Signs a JWT and uses it to authorize a query to local_proxy.
|
|
"""
|
|
|
|
token = jwt.JWT(
|
|
header={"kid": cplane_endpoint_jwks.key_id, "alg": "RS256"}, claims={"sub": "user1"}
|
|
)
|
|
token.make_signed_token(cplane_endpoint_jwks)
|
|
res = await static_auth_broker.query(
|
|
"foo", ["arg1"], user="anonymous", token=token.serialize(), expected_code=200
|
|
)
|
|
|
|
# local proxy mock just echos back the request
|
|
# check that we forward the correct data
|
|
|
|
assert (
|
|
res["headers"]["authorization"] == f"Bearer {token.serialize()}"
|
|
), "JWT should be forwarded"
|
|
|
|
assert (
|
|
"anonymous" in res["headers"]["neon-connection-string"]
|
|
), "conn string should be forwarded"
|
|
|
|
assert json.loads(res["body"]) == {
|
|
"query": "foo",
|
|
"params": ["arg1"],
|
|
}, "Query body should be forwarded"
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
@pytest.mark.parametrize("role_names", [["anonymous", "authenticated"]])
|
|
@pytest.mark.parametrize("audience", [None])
|
|
async def test_auth_broker_incorrect_role(
|
|
static_auth_broker: NeonAuthBroker,
|
|
cplane_wake_compute_local_proxy: None,
|
|
cplane_endpoint_jwks: jwk.JWK,
|
|
):
|
|
"""
|
|
Connects to auth broker with the wrong role associated with the JWKs
|
|
"""
|
|
|
|
token = jwt.JWT(
|
|
header={"kid": cplane_endpoint_jwks.key_id, "alg": "RS256"}, claims={"sub": "user1"}
|
|
)
|
|
token.make_signed_token(cplane_endpoint_jwks)
|
|
res = await static_auth_broker.query(
|
|
"foo", ["arg1"], user="wrong_role", token=token.serialize(), expected_code=400
|
|
)
|
|
|
|
# if the user is wrong, we announce that the jwk was not found.
|
|
assert "jwk not found" in res["message"]
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
@pytest.mark.parametrize("role_names", [["anonymous", "authenticated"]])
|
|
@pytest.mark.parametrize("audience", ["neon"])
|
|
async def test_auth_broker_incorrect_aud(
|
|
static_auth_broker: NeonAuthBroker,
|
|
cplane_wake_compute_local_proxy: None,
|
|
cplane_endpoint_jwks: jwk.JWK,
|
|
):
|
|
"""
|
|
Connects to auth broker with the wrong audience associated with the JWKs
|
|
"""
|
|
|
|
token = jwt.JWT(
|
|
header={"kid": cplane_endpoint_jwks.key_id, "alg": "RS256"},
|
|
claims={"sub": "user1", "aud": "wrong_aud"},
|
|
)
|
|
token.make_signed_token(cplane_endpoint_jwks)
|
|
res = await static_auth_broker.query(
|
|
"foo", ["arg1"], user="anonymous", token=token.serialize(), expected_code=400
|
|
)
|
|
|
|
assert "invalid JWT token audience" in res["message"]
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
@pytest.mark.parametrize("role_names", [["anonymous", "authenticated"]])
|
|
@pytest.mark.parametrize("audience", ["neon"])
|
|
async def test_auth_broker_missing_aud(
|
|
static_auth_broker: NeonAuthBroker,
|
|
cplane_wake_compute_local_proxy: None,
|
|
cplane_endpoint_jwks: jwk.JWK,
|
|
):
|
|
"""
|
|
Connects to auth broker with no audience
|
|
"""
|
|
|
|
token = jwt.JWT(
|
|
header={"kid": cplane_endpoint_jwks.key_id, "alg": "RS256"},
|
|
claims={"sub": "user1"},
|
|
)
|
|
token.make_signed_token(cplane_endpoint_jwks)
|
|
res = await static_auth_broker.query(
|
|
"foo", ["arg1"], user="anonymous", token=token.serialize(), expected_code=400
|
|
)
|
|
|
|
assert "invalid JWT token audience" in res["message"]
|