diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index 5bf5ecf99b..a008ebea1f 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -19,7 +19,7 @@ from functools import cached_property from itertools import chain, product from pathlib import Path from types import TracebackType -from typing import Any, Dict, Iterator, List, Optional, Tuple, Type, cast +from typing import Any, Dict, Iterator, List, Optional, Tuple, Type, Union, cast from urllib.parse import urlparse import asyncpg @@ -787,7 +787,9 @@ class NeonEnv: attachment_service_port = self.port_distributor.get_port() self.control_plane_api: str = f"http://127.0.0.1:{attachment_service_port}" - self.attachment_service: NeonAttachmentService = NeonAttachmentService(self) + self.attachment_service: NeonAttachmentService = NeonAttachmentService( + self, config.auth_enabled + ) # Create a config file corresponding to the options cfg: Dict[str, Any] = { @@ -1625,9 +1627,10 @@ class Pagectl(AbstractNeonCli): class NeonAttachmentService: - def __init__(self, env: NeonEnv): + def __init__(self, env: NeonEnv, auth_enabled): self.env = env self.running = False + self.auth_enabled = auth_enabled def start(self): assert not self.running @@ -1641,27 +1644,47 @@ class NeonAttachmentService: self.running = False return self - def attach_hook_issue(self, tenant_id: TenantId, pageserver_id: int) -> int: - response = requests.post( + def request(self, method, *args, **kwargs) -> requests.Response: + kwargs["headers"] = self.headers() + return requests.request(method, *args, **kwargs) + + def headers(self) -> Dict[str, str]: + headers = {} + if self.auth_enabled: + jwt_token = self.env.auth_keys.generate_pageserver_token() + headers["Authorization"] = f"Bearer {jwt_token}" + + return headers + + def attach_hook_issue( + self, tenant_shard_id: Union[TenantId, TenantShardId], pageserver_id: int + ) -> int: + response = self.request( + "POST", f"{self.env.control_plane_api}/attach-hook", - json={"tenant_id": str(tenant_id), "node_id": pageserver_id}, + json={"tenant_shard_id": str(tenant_shard_id), "node_id": pageserver_id}, + headers=self.headers(), ) response.raise_for_status() gen = response.json()["gen"] assert isinstance(gen, int) return gen - def attach_hook_drop(self, tenant_id: TenantId): - response = requests.post( + def attach_hook_drop(self, tenant_shard_id: Union[TenantId, TenantShardId]): + response = self.request( + "POST", f"{self.env.control_plane_api}/attach-hook", - json={"tenant_id": str(tenant_id), "node_id": None}, + json={"tenant_shard_id": str(tenant_shard_id), "node_id": None}, + headers=self.headers(), ) response.raise_for_status() - def inspect(self, tenant_id: TenantId) -> Optional[tuple[int, int]]: - response = requests.post( + def inspect(self, tenant_shard_id: Union[TenantId, TenantShardId]) -> Optional[tuple[int, int]]: + response = self.request( + "POST", f"{self.env.control_plane_api}/inspect", - json={"tenant_id": str(tenant_id)}, + json={"tenant_shard_id": str(tenant_shard_id)}, + headers=self.headers(), ) response.raise_for_status() json = response.json() @@ -1679,7 +1702,9 @@ class NeonAttachmentService: "listen_http_port": node.service_port.http, } log.info(f"node_register({body})") - requests.post(f"{self.env.control_plane_api}/node", json=body).raise_for_status() + self.request( + "POST", f"{self.env.control_plane_api}/node", json=body, headers=self.headers() + ).raise_for_status() def tenant_create( self, @@ -1701,28 +1726,29 @@ class NeonAttachmentService: for k, v in tenant_config.items(): body[k] = v - response = requests.post(f"{self.env.control_plane_api}/tenant", json=body) + response = self.request("POST", f"{self.env.control_plane_api}/tenant", json=body) response.raise_for_status() log.info(f"tenant_create success: {response.json()}") def tenant_timeline_create(self, tenant_id: TenantId, timeline_id: TimelineId): body: Dict[str, Any] = {"new_timeline_id": str(timeline_id)} - response = requests.post( - f"{self.env.control_plane_api}/tenant/{tenant_id}/timeline", json=body + response = self.request( + "POST", f"{self.env.control_plane_api}/tenant/{tenant_id}/timeline", json=body ) response.raise_for_status() log.info(f"tenant_timeline_create success: {response.json()}") def locate(self, tenant_id: TenantId) -> list[dict[str, Any]]: - response = requests.get(f"{self.env.control_plane_api}/tenant/{tenant_id}/locate") + response = self.request("GET", f"{self.env.control_plane_api}/tenant/{tenant_id}/locate") response.raise_for_status() body = response.json() shards: list[dict[str, Any]] = body["shards"] return shards def tenant_shard_split(self, tenant_id: TenantId, shard_count: int) -> list[TenantShardId]: - response = requests.put( + response = self.request( + "PUT", f"{self.env.control_plane_api}/tenant/{tenant_id}/shard_split", json={"new_shard_count": shard_count}, )