tests: use auth tokens for attachment service

This commit is contained in:
John Spray
2024-01-04 14:06:40 +00:00
parent 5ae1efaea8
commit 0c85cd3766

View File

@@ -19,7 +19,7 @@ from functools import cached_property
from itertools import chain, product
from pathlib import Path
from types import TracebackType
from typing import Any, Dict, Iterator, List, Optional, Tuple, Type, cast
from typing import Any, Dict, Iterator, List, Optional, Tuple, Type, Union, cast
from urllib.parse import urlparse
import asyncpg
@@ -787,7 +787,9 @@ class NeonEnv:
attachment_service_port = self.port_distributor.get_port()
self.control_plane_api: str = f"http://127.0.0.1:{attachment_service_port}"
self.attachment_service: NeonAttachmentService = NeonAttachmentService(self)
self.attachment_service: NeonAttachmentService = NeonAttachmentService(
self, config.auth_enabled
)
# Create a config file corresponding to the options
cfg: Dict[str, Any] = {
@@ -1625,9 +1627,10 @@ class Pagectl(AbstractNeonCli):
class NeonAttachmentService:
def __init__(self, env: NeonEnv):
def __init__(self, env: NeonEnv, auth_enabled):
self.env = env
self.running = False
self.auth_enabled = auth_enabled
def start(self):
assert not self.running
@@ -1641,27 +1644,47 @@ class NeonAttachmentService:
self.running = False
return self
def attach_hook_issue(self, tenant_id: TenantId, pageserver_id: int) -> int:
response = requests.post(
def request(self, method, *args, **kwargs) -> requests.Response:
kwargs["headers"] = self.headers()
return requests.request(method, *args, **kwargs)
def headers(self) -> Dict[str, str]:
headers = {}
if self.auth_enabled:
jwt_token = self.env.auth_keys.generate_pageserver_token()
headers["Authorization"] = f"Bearer {jwt_token}"
return headers
def attach_hook_issue(
self, tenant_shard_id: Union[TenantId, TenantShardId], pageserver_id: int
) -> int:
response = self.request(
"POST",
f"{self.env.control_plane_api}/attach-hook",
json={"tenant_id": str(tenant_id), "node_id": pageserver_id},
json={"tenant_shard_id": str(tenant_shard_id), "node_id": pageserver_id},
headers=self.headers(),
)
response.raise_for_status()
gen = response.json()["gen"]
assert isinstance(gen, int)
return gen
def attach_hook_drop(self, tenant_id: TenantId):
response = requests.post(
def attach_hook_drop(self, tenant_shard_id: Union[TenantId, TenantShardId]):
response = self.request(
"POST",
f"{self.env.control_plane_api}/attach-hook",
json={"tenant_id": str(tenant_id), "node_id": None},
json={"tenant_shard_id": str(tenant_shard_id), "node_id": None},
headers=self.headers(),
)
response.raise_for_status()
def inspect(self, tenant_id: TenantId) -> Optional[tuple[int, int]]:
response = requests.post(
def inspect(self, tenant_shard_id: Union[TenantId, TenantShardId]) -> Optional[tuple[int, int]]:
response = self.request(
"POST",
f"{self.env.control_plane_api}/inspect",
json={"tenant_id": str(tenant_id)},
json={"tenant_shard_id": str(tenant_shard_id)},
headers=self.headers(),
)
response.raise_for_status()
json = response.json()
@@ -1679,7 +1702,9 @@ class NeonAttachmentService:
"listen_http_port": node.service_port.http,
}
log.info(f"node_register({body})")
requests.post(f"{self.env.control_plane_api}/node", json=body).raise_for_status()
self.request(
"POST", f"{self.env.control_plane_api}/node", json=body, headers=self.headers()
).raise_for_status()
def tenant_create(
self,
@@ -1701,28 +1726,29 @@ class NeonAttachmentService:
for k, v in tenant_config.items():
body[k] = v
response = requests.post(f"{self.env.control_plane_api}/tenant", json=body)
response = self.request("POST", f"{self.env.control_plane_api}/tenant", json=body)
response.raise_for_status()
log.info(f"tenant_create success: {response.json()}")
def tenant_timeline_create(self, tenant_id: TenantId, timeline_id: TimelineId):
body: Dict[str, Any] = {"new_timeline_id": str(timeline_id)}
response = requests.post(
f"{self.env.control_plane_api}/tenant/{tenant_id}/timeline", json=body
response = self.request(
"POST", f"{self.env.control_plane_api}/tenant/{tenant_id}/timeline", json=body
)
response.raise_for_status()
log.info(f"tenant_timeline_create success: {response.json()}")
def locate(self, tenant_id: TenantId) -> list[dict[str, Any]]:
response = requests.get(f"{self.env.control_plane_api}/tenant/{tenant_id}/locate")
response = self.request("GET", f"{self.env.control_plane_api}/tenant/{tenant_id}/locate")
response.raise_for_status()
body = response.json()
shards: list[dict[str, Any]] = body["shards"]
return shards
def tenant_shard_split(self, tenant_id: TenantId, shard_count: int) -> list[TenantShardId]:
response = requests.put(
response = self.request(
"PUT",
f"{self.env.control_plane_api}/tenant/{tenant_id}/shard_split",
json={"new_shard_count": shard_count},
)