diff --git a/control_plane/attachment_service/src/auth.rs b/control_plane/attachment_service/src/auth.rs new file mode 100644 index 0000000000..ef47abf8c7 --- /dev/null +++ b/control_plane/attachment_service/src/auth.rs @@ -0,0 +1,9 @@ +use utils::auth::{AuthError, Claims, Scope}; + +pub fn check_permission(claims: &Claims, required_scope: Scope) -> Result<(), AuthError> { + if claims.scope != required_scope { + return Err(AuthError("Scope mismatch. Permission denied".into())); + } + + Ok(()) +} diff --git a/control_plane/attachment_service/src/http.rs b/control_plane/attachment_service/src/http.rs index f9c4535bd5..d341187ef7 100644 --- a/control_plane/attachment_service/src/http.rs +++ b/control_plane/attachment_service/src/http.rs @@ -10,8 +10,8 @@ use pageserver_api::shard::TenantShardId; use pageserver_client::mgmt_api; use std::sync::Arc; use std::time::{Duration, Instant}; -use utils::auth::SwappableJwtAuth; -use utils::http::endpoint::{auth_middleware, request_span}; +use utils::auth::{Scope, SwappableJwtAuth}; +use utils::http::endpoint::{auth_middleware, check_permission_with, request_span}; use utils::http::request::{must_get_query_param, parse_request_param}; use utils::id::{TenantId, TimelineId}; @@ -64,6 +64,8 @@ fn get_state(request: &Request) -> &HttpState { /// Pageserver calls into this on startup, to learn which tenants it should attach async fn handle_re_attach(mut req: Request) -> Result, ApiError> { + check_permissions(&req, Scope::GenerationsApi)?; + let reattach_req = json_request::(&mut req).await?; let state = get_state(&req); json_response(StatusCode::OK, state.service.re_attach(reattach_req).await?) @@ -72,6 +74,8 @@ async fn handle_re_attach(mut req: Request) -> Result, ApiE /// Pageserver calls into this before doing deletions, to confirm that it still /// holds the latest generation for the tenants with deletions enqueued async fn handle_validate(mut req: Request) -> Result, ApiError> { + check_permissions(&req, Scope::GenerationsApi)?; + let validate_req = json_request::(&mut req).await?; let state = get_state(&req); json_response(StatusCode::OK, state.service.validate(validate_req)) @@ -81,6 +85,8 @@ async fn handle_validate(mut req: Request) -> Result, ApiEr /// (in the real control plane this is unnecessary, because the same program is managing /// generation numbers and doing attachments). async fn handle_attach_hook(mut req: Request) -> Result, ApiError> { + check_permissions(&req, Scope::Admin)?; + let attach_req = json_request::(&mut req).await?; let state = get_state(&req); @@ -95,6 +101,8 @@ async fn handle_attach_hook(mut req: Request) -> Result, Ap } async fn handle_inspect(mut req: Request) -> Result, ApiError> { + check_permissions(&req, Scope::Admin)?; + let inspect_req = json_request::(&mut req).await?; let state = get_state(&req); @@ -106,6 +114,8 @@ async fn handle_tenant_create( service: Arc, mut req: Request, ) -> Result, ApiError> { + check_permissions(&req, Scope::PageServerApi)?; + let create_req = json_request::(&mut req).await?; json_response( StatusCode::CREATED, @@ -164,6 +174,8 @@ async fn handle_tenant_location_config( mut req: Request, ) -> Result, ApiError> { let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?; + check_permissions(&req, Scope::PageServerApi)?; + let config_req = json_request::(&mut req).await?; json_response( StatusCode::OK, @@ -178,6 +190,8 @@ async fn handle_tenant_time_travel_remote_storage( mut req: Request, ) -> Result, ApiError> { let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?; + check_permissions(&req, Scope::PageServerApi)?; + let time_travel_req = json_request::(&mut req).await?; let timestamp_raw = must_get_query_param(&req, "travel_to")?; @@ -211,6 +225,7 @@ async fn handle_tenant_delete( req: Request, ) -> Result, ApiError> { let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?; + check_permissions(&req, Scope::PageServerApi)?; deletion_wrapper(service, move |service| async move { service.tenant_delete(tenant_id).await @@ -223,6 +238,8 @@ async fn handle_tenant_timeline_create( mut req: Request, ) -> Result, ApiError> { let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?; + check_permissions(&req, Scope::PageServerApi)?; + let create_req = json_request::(&mut req).await?; json_response( StatusCode::CREATED, @@ -237,6 +254,8 @@ async fn handle_tenant_timeline_delete( req: Request, ) -> Result, ApiError> { let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?; + check_permissions(&req, Scope::PageServerApi)?; + let timeline_id: TimelineId = parse_request_param(&req, "timeline_id")?; deletion_wrapper(service, move |service| async move { @@ -250,6 +269,7 @@ async fn handle_tenant_timeline_passthrough( req: Request, ) -> Result, ApiError> { let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?; + check_permissions(&req, Scope::PageServerApi)?; let Some(path) = req.uri().path_and_query() else { // This should never happen, our request router only calls us if there is a path @@ -293,11 +313,15 @@ async fn handle_tenant_locate( service: Arc, req: Request, ) -> Result, ApiError> { + check_permissions(&req, Scope::Admin)?; + let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?; json_response(StatusCode::OK, service.tenant_locate(tenant_id)?) } async fn handle_node_register(mut req: Request) -> Result, ApiError> { + check_permissions(&req, Scope::Admin)?; + let register_req = json_request::(&mut req).await?; let state = get_state(&req); state.service.node_register(register_req).await?; @@ -305,17 +329,23 @@ async fn handle_node_register(mut req: Request) -> Result, } async fn handle_node_list(req: Request) -> Result, ApiError> { + check_permissions(&req, Scope::Admin)?; + let state = get_state(&req); json_response(StatusCode::OK, state.service.node_list().await?) } async fn handle_node_drop(req: Request) -> Result, ApiError> { + check_permissions(&req, Scope::Admin)?; + let state = get_state(&req); let node_id: NodeId = parse_request_param(&req, "node_id")?; json_response(StatusCode::OK, state.service.node_drop(node_id).await?) } async fn handle_node_configure(mut req: Request) -> Result, ApiError> { + check_permissions(&req, Scope::Admin)?; + let node_id: NodeId = parse_request_param(&req, "node_id")?; let config_req = json_request::(&mut req).await?; if node_id != config_req.node_id { @@ -335,6 +365,8 @@ async fn handle_tenant_shard_split( service: Arc, mut req: Request, ) -> Result, ApiError> { + check_permissions(&req, Scope::Admin)?; + let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?; let split_req = json_request::(&mut req).await?; @@ -348,6 +380,8 @@ async fn handle_tenant_shard_migrate( service: Arc, mut req: Request, ) -> Result, ApiError> { + check_permissions(&req, Scope::Admin)?; + let tenant_shard_id: TenantShardId = parse_request_param(&req, "tenant_shard_id")?; let migrate_req = json_request::(&mut req).await?; json_response( @@ -360,22 +394,30 @@ async fn handle_tenant_shard_migrate( async fn handle_tenant_drop(req: Request) -> Result, ApiError> { let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?; + check_permissions(&req, Scope::PageServerApi)?; + let state = get_state(&req); json_response(StatusCode::OK, state.service.tenant_drop(tenant_id).await?) } async fn handle_tenants_dump(req: Request) -> Result, ApiError> { + check_permissions(&req, Scope::Admin)?; + let state = get_state(&req); state.service.tenants_dump() } async fn handle_scheduler_dump(req: Request) -> Result, ApiError> { + check_permissions(&req, Scope::Admin)?; + let state = get_state(&req); state.service.scheduler_dump() } async fn handle_consistency_check(req: Request) -> Result, ApiError> { + check_permissions(&req, Scope::Admin)?; + let state = get_state(&req); json_response(StatusCode::OK, state.service.consistency_check().await?) @@ -432,6 +474,12 @@ where .await } +fn check_permissions(request: &Request, required_scope: Scope) -> Result<(), ApiError> { + check_permission_with(request, |claims| { + crate::auth::check_permission(claims, required_scope) + }) +} + pub fn make_router( service: Arc, auth: Option>, diff --git a/control_plane/attachment_service/src/lib.rs b/control_plane/attachment_service/src/lib.rs index e950a57e57..ce613e858f 100644 --- a/control_plane/attachment_service/src/lib.rs +++ b/control_plane/attachment_service/src/lib.rs @@ -1,6 +1,7 @@ use serde::{Deserialize, Serialize}; use utils::seqwait::MonotonicCounter; +mod auth; mod compute_hook; pub mod http; pub mod metrics; diff --git a/control_plane/src/attachment_service.rs b/control_plane/src/attachment_service.rs index 4a1d316fe7..f0bee1ce08 100644 --- a/control_plane/src/attachment_service.rs +++ b/control_plane/src/attachment_service.rs @@ -11,12 +11,12 @@ use pageserver_api::{ use pageserver_client::mgmt_api::ResponseErrorMessageExt; use postgres_backend::AuthType; use serde::{de::DeserializeOwned, Deserialize, Serialize}; -use std::str::FromStr; +use std::{fs, str::FromStr}; use tokio::process::Command; use tracing::instrument; use url::Url; use utils::{ - auth::{Claims, Scope}, + auth::{encode_from_key_file, Claims, Scope}, id::{NodeId, TenantId}, }; @@ -24,7 +24,7 @@ pub struct AttachmentService { env: LocalEnv, listen: String, path: Utf8PathBuf, - jwt_token: Option, + private_key: Option>, public_key: Option, postgres_port: u16, client: reqwest::Client, @@ -204,12 +204,11 @@ impl AttachmentService { .pageservers .first() .expect("Config is validated to contain at least one pageserver"); - let (jwt_token, public_key) = match ps_conf.http_auth_type { + let (private_key, public_key) = match ps_conf.http_auth_type { AuthType::Trust => (None, None), AuthType::NeonJWT => { - let jwt_token = env - .generate_auth_token(&Claims::new(None, Scope::PageServerApi)) - .unwrap(); + let private_key_path = env.get_private_key_path(); + let private_key = fs::read(private_key_path).expect("failed to read private key"); // If pageserver auth is enabled, this implicitly enables auth for this service, // using the same credentials. @@ -235,7 +234,7 @@ impl AttachmentService { } else { std::fs::read_to_string(&public_key_path).expect("Can't read public key") }; - (Some(jwt_token), Some(public_key)) + (Some(private_key), Some(public_key)) } }; @@ -243,7 +242,7 @@ impl AttachmentService { env: env.clone(), path, listen, - jwt_token, + private_key, public_key, postgres_port, client: reqwest::ClientBuilder::new() @@ -397,7 +396,10 @@ impl AttachmentService { .into_iter() .map(|s| s.to_string()) .collect::>(); - if let Some(jwt_token) = &self.jwt_token { + if let Some(private_key) = &self.private_key { + let claims = Claims::new(None, Scope::PageServerApi); + let jwt_token = + encode_from_key_file(&claims, private_key).expect("failed to generate jwt token"); args.push(format!("--jwt-token={jwt_token}")); } @@ -468,6 +470,20 @@ impl AttachmentService { Ok(()) } + fn get_claims_for_path(path: &str) -> anyhow::Result> { + let category = match path.find('/') { + Some(idx) => &path[..idx], + None => path, + }; + + match category { + "status" | "ready" => Ok(None), + "control" | "debug" => Ok(Some(Claims::new(None, Scope::Admin))), + "v1" => Ok(Some(Claims::new(None, Scope::PageServerApi))), + _ => Err(anyhow::anyhow!("Failed to determine claims for {}", path)), + } + } + /// Simple HTTP request wrapper for calling into attachment service async fn dispatch( &self, @@ -493,11 +509,16 @@ impl AttachmentService { if let Some(body) = body { builder = builder.json(&body) } - if let Some(jwt_token) = &self.jwt_token { - builder = builder.header( - reqwest::header::AUTHORIZATION, - format!("Bearer {jwt_token}"), - ); + if let Some(private_key) = &self.private_key { + println!("Getting claims for path {}", path); + if let Some(required_claims) = Self::get_claims_for_path(&path)? { + println!("Got claims {:?} for path {}", required_claims, path); + let jwt_token = encode_from_key_file(&required_claims, private_key)?; + builder = builder.header( + reqwest::header::AUTHORIZATION, + format!("Bearer {jwt_token}"), + ); + } } let response = builder.send().await?; diff --git a/control_plane/src/local_env.rs b/control_plane/src/local_env.rs index 786ea6d098..a5e1325cfe 100644 --- a/control_plane/src/local_env.rs +++ b/control_plane/src/local_env.rs @@ -412,14 +412,17 @@ impl LocalEnv { // this function is used only for testing purposes in CLI e g generate tokens during init pub fn generate_auth_token(&self, claims: &Claims) -> anyhow::Result { - let private_key_path = if self.private_key_path.is_absolute() { + let private_key_path = self.get_private_key_path(); + let key_data = fs::read(private_key_path)?; + encode_from_key_file(claims, &key_data) + } + + pub fn get_private_key_path(&self) -> PathBuf { + if self.private_key_path.is_absolute() { self.private_key_path.to_path_buf() } else { self.base_data_dir.join(&self.private_key_path) - }; - - let key_data = fs::read(private_key_path)?; - encode_from_key_file(claims, &key_data) + } } // diff --git a/control_plane/src/pageserver.rs b/control_plane/src/pageserver.rs index a52fcb4a3f..2c5cac327a 100644 --- a/control_plane/src/pageserver.rs +++ b/control_plane/src/pageserver.rs @@ -115,7 +115,7 @@ impl PageServerNode { if matches!(self.conf.http_auth_type, AuthType::NeonJWT) { let jwt_token = self .env - .generate_auth_token(&Claims::new(None, Scope::PageServerApi)) + .generate_auth_token(&Claims::new(None, Scope::GenerationsApi)) .unwrap(); overrides.push(format!("control_plane_api_token='{}'", jwt_token)); } diff --git a/docs/authentication.md b/docs/authentication.md index f768b04c5b..faac7aa28e 100644 --- a/docs/authentication.md +++ b/docs/authentication.md @@ -70,6 +70,9 @@ Should only be used e.g. for status check/tenant creation/list. Should only be used e.g. for status check. Currently also used for connection from any pageserver to any safekeeper. +"generations_api": Provides access to the upcall APIs served by the attachment service or the control plane. + +"admin": Provides access to the control plane and admin APIs of the attachment service. ### CLI CLI generates a key pair during call to `neon_local init` with the following commands: diff --git a/libs/utils/src/auth.rs b/libs/utils/src/auth.rs index e031699cfb..51ab238d77 100644 --- a/libs/utils/src/auth.rs +++ b/libs/utils/src/auth.rs @@ -32,6 +32,8 @@ pub enum Scope { // The scope used by pageservers in upcalls to storage controller and cloud control plane #[serde(rename = "generations_api")] GenerationsApi, + // Allows access to control plane managment API and some storage controller endpoints. + Admin, } /// JWT payload. See docs/authentication.md for the format diff --git a/pageserver/src/auth.rs b/pageserver/src/auth.rs index 4dee61d3ea..4785c8c4c5 100644 --- a/pageserver/src/auth.rs +++ b/pageserver/src/auth.rs @@ -14,7 +14,7 @@ pub fn check_permission(claims: &Claims, tenant_id: Option) -> Result< } (Scope::PageServerApi, None) => Ok(()), // access to management api for PageServerApi scope (Scope::PageServerApi, Some(_)) => Ok(()), // access to tenant api using PageServerApi scope - (Scope::SafekeeperData | Scope::GenerationsApi, _) => Err(AuthError( + (Scope::Admin | Scope::SafekeeperData | Scope::GenerationsApi, _) => Err(AuthError( format!( "JWT scope '{:?}' is ineligible for Pageserver auth", claims.scope diff --git a/safekeeper/src/auth.rs b/safekeeper/src/auth.rs index 96676be04d..dd9058c468 100644 --- a/safekeeper/src/auth.rs +++ b/safekeeper/src/auth.rs @@ -12,7 +12,7 @@ pub fn check_permission(claims: &Claims, tenant_id: Option) -> Result< } Ok(()) } - (Scope::PageServerApi | Scope::GenerationsApi, _) => Err(AuthError( + (Scope::Admin | Scope::PageServerApi | Scope::GenerationsApi, _) => Err(AuthError( format!( "JWT scope '{:?}' is ineligible for Safekeeper auth", claims.scope diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index 6cb7656660..55c16f73b0 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -17,6 +17,7 @@ import uuid from contextlib import closing, contextmanager from dataclasses import dataclass, field from datetime import datetime +from enum import Enum from fcntl import LOCK_EX, LOCK_UN, flock from functools import cached_property from itertools import chain, product @@ -388,7 +389,8 @@ class PgProtocol: class AuthKeys: priv: str - def generate_token(self, *, scope: str, **token_data: str) -> str: + def generate_token(self, *, scope: TokenScope, **token_data: Any) -> str: + token_data = {key: str(val) for key, val in token_data.items()} token = jwt.encode({"scope": scope, **token_data}, self.priv, algorithm="EdDSA") # cast(Any, self.priv) @@ -401,14 +403,23 @@ class AuthKeys: return token def generate_pageserver_token(self) -> str: - return self.generate_token(scope="pageserverapi") + return self.generate_token(scope=TokenScope.PAGE_SERVER_API) def generate_safekeeper_token(self) -> str: - return self.generate_token(scope="safekeeperdata") + return self.generate_token(scope=TokenScope.SAFEKEEPER_DATA) # generate token giving access to only one tenant def generate_tenant_token(self, tenant_id: TenantId) -> str: - return self.generate_token(scope="tenant", tenant_id=str(tenant_id)) + return self.generate_token(scope=TokenScope.TENANT, tenant_id=str(tenant_id)) + + +# TODO: Replace with `StrEnum` when we upgrade to python 3.11 +class TokenScope(str, Enum): + ADMIN = "admin" + PAGE_SERVER_API = "pageserverapi" + GENERATIONS_API = "generations_api" + SAFEKEEPER_DATA = "safekeeperdata" + TENANT = "tenant" class NeonEnvBuilder: @@ -1922,6 +1933,13 @@ class Pagectl(AbstractNeonCli): return IndexPartDump.from_json(parsed) +class AttachmentServiceApiException(Exception): + def __init__(self, message, status_code: int): + super().__init__(message) + self.message = message + self.status_code = status_code + + class NeonAttachmentService(MetricsGetter): def __init__(self, env: NeonEnv, auth_enabled: bool): self.env = env @@ -1940,39 +1958,60 @@ class NeonAttachmentService(MetricsGetter): self.running = False return self + @staticmethod + def raise_api_exception(res: requests.Response): + try: + res.raise_for_status() + except requests.RequestException as e: + try: + msg = res.json()["msg"] + except: # noqa: E722 + msg = "" + raise AttachmentServiceApiException(msg, res.status_code) from e + def pageserver_api(self) -> PageserverHttpClient: """ The attachment service implements a subset of the pageserver REST API, for mapping per-tenant actions into per-shard actions (e.g. timeline creation). Tests should invoke those functions via the HttpClient, as an implicit check that these APIs remain compatible. """ - return PageserverHttpClient(self.env.attachment_service_port, lambda: True) + auth_token = None + if self.auth_enabled: + auth_token = self.env.auth_keys.generate_token(scope=TokenScope.PAGE_SERVER_API) + return PageserverHttpClient(self.env.attachment_service_port, lambda: True, auth_token) def request(self, method, *args, **kwargs) -> requests.Response: - kwargs["headers"] = self.headers() - return requests.request(method, *args, **kwargs) + resp = requests.request(method, *args, **kwargs) + NeonAttachmentService.raise_api_exception(resp) - def headers(self) -> Dict[str, str]: + return resp + + def headers(self, scope: Optional[TokenScope]) -> Dict[str, str]: headers = {} - if self.auth_enabled: - jwt_token = self.env.auth_keys.generate_pageserver_token() + if self.auth_enabled and scope is not None: + jwt_token = self.env.auth_keys.generate_token(scope=scope) headers["Authorization"] = f"Bearer {jwt_token}" return headers def get_metrics(self) -> Metrics: res = self.request("GET", f"{self.env.attachment_service_api}/metrics") - res.raise_for_status() return parse_metrics(res.text) def ready(self) -> bool: - resp = self.request("GET", f"{self.env.attachment_service_api}/ready") - if resp.status_code == 503: + status = None + try: + resp = self.request("GET", f"{self.env.attachment_service_api}/ready") + status = resp.status_code + except AttachmentServiceApiException as e: + status = e.status_code + + if status == 503: return False - elif resp.status_code == 200: + elif status == 200: return True else: - raise RuntimeError(f"Unexpected status {resp.status_code} from readiness endpoint") + raise RuntimeError(f"Unexpected status {status} from readiness endpoint") def attach_hook_issue( self, tenant_shard_id: Union[TenantId, TenantShardId], pageserver_id: int @@ -1981,21 +2020,19 @@ class NeonAttachmentService(MetricsGetter): "POST", f"{self.env.attachment_service_api}/debug/v1/attach-hook", json={"tenant_shard_id": str(tenant_shard_id), "node_id": pageserver_id}, - headers=self.headers(), + headers=self.headers(TokenScope.ADMIN), ) - response.raise_for_status() gen = response.json()["gen"] assert isinstance(gen, int) return gen def attach_hook_drop(self, tenant_shard_id: Union[TenantId, TenantShardId]): - response = self.request( + self.request( "POST", f"{self.env.attachment_service_api}/debug/v1/attach-hook", json={"tenant_shard_id": str(tenant_shard_id), "node_id": None}, - headers=self.headers(), + headers=self.headers(TokenScope.ADMIN), ) - response.raise_for_status() def inspect(self, tenant_shard_id: Union[TenantId, TenantShardId]) -> Optional[tuple[int, int]]: """ @@ -2005,9 +2042,8 @@ class NeonAttachmentService(MetricsGetter): "POST", f"{self.env.attachment_service_api}/debug/v1/inspect", json={"tenant_shard_id": str(tenant_shard_id)}, - headers=self.headers(), + headers=self.headers(TokenScope.ADMIN), ) - response.raise_for_status() json = response.json() log.info(f"Response: {json}") if json["attachment"]: @@ -2027,14 +2063,15 @@ class NeonAttachmentService(MetricsGetter): "POST", f"{self.env.attachment_service_api}/control/v1/node", json=body, - headers=self.headers(), - ).raise_for_status() + headers=self.headers(TokenScope.ADMIN), + ) def node_list(self): response = self.request( - "GET", f"{self.env.attachment_service_api}/control/v1/node", headers=self.headers() + "GET", + f"{self.env.attachment_service_api}/control/v1/node", + headers=self.headers(TokenScope.ADMIN), ) - response.raise_for_status() return response.json() def node_configure(self, node_id, body: dict[str, Any]): @@ -2044,8 +2081,8 @@ class NeonAttachmentService(MetricsGetter): "PUT", f"{self.env.attachment_service_api}/control/v1/node/{node_id}/config", json=body, - headers=self.headers(), - ).raise_for_status() + headers=self.headers(TokenScope.ADMIN), + ) def tenant_create( self, @@ -2070,8 +2107,12 @@ class NeonAttachmentService(MetricsGetter): for k, v in tenant_config.items(): body[k] = v - response = self.request("POST", f"{self.env.attachment_service_api}/v1/tenant", json=body) - response.raise_for_status() + response = self.request( + "POST", + f"{self.env.attachment_service_api}/v1/tenant", + json=body, + headers=self.headers(TokenScope.PAGE_SERVER_API), + ) log.info(f"tenant_create success: {response.json()}") def locate(self, tenant_id: TenantId) -> list[dict[str, Any]]: @@ -2079,9 +2120,10 @@ class NeonAttachmentService(MetricsGetter): :return: list of {"shard_id": "", "node_id": int, "listen_pg_addr": str, "listen_pg_port": int, "listen_http_addr: str, "listen_http_port: int} """ response = self.request( - "GET", f"{self.env.attachment_service_api}/control/v1/tenant/{tenant_id}/locate" + "GET", + f"{self.env.attachment_service_api}/control/v1/tenant/{tenant_id}/locate", + headers=self.headers(TokenScope.ADMIN), ) - response.raise_for_status() body = response.json() shards: list[dict[str, Any]] = body["shards"] return shards @@ -2091,20 +2133,20 @@ class NeonAttachmentService(MetricsGetter): "PUT", f"{self.env.attachment_service_api}/control/v1/tenant/{tenant_id}/shard_split", json={"new_shard_count": shard_count}, + headers=self.headers(TokenScope.ADMIN), ) - response.raise_for_status() body = response.json() log.info(f"tenant_shard_split success: {body}") shards: list[TenantShardId] = body["new_shards"] return shards def tenant_shard_migrate(self, tenant_shard_id: TenantShardId, dest_ps_id: int): - response = self.request( + self.request( "PUT", f"{self.env.attachment_service_api}/control/v1/tenant/{tenant_shard_id}/migrate", json={"tenant_shard_id": str(tenant_shard_id), "node_id": dest_ps_id}, + headers=self.headers(TokenScope.ADMIN), ) - response.raise_for_status() log.info(f"Migrated tenant {tenant_shard_id} to pageserver {dest_ps_id}") assert self.env.get_tenant_pageserver(tenant_shard_id).id == dest_ps_id @@ -2112,11 +2154,11 @@ class NeonAttachmentService(MetricsGetter): """ Throw an exception if the service finds any inconsistencies in its state """ - response = self.request( + self.request( "POST", f"{self.env.attachment_service_api}/debug/v1/consistency_check", + headers=self.headers(TokenScope.ADMIN), ) - response.raise_for_status() log.info("Attachment service passed consistency check") def __enter__(self) -> "NeonAttachmentService": @@ -2894,7 +2936,6 @@ class NeonProxy(PgProtocol): def get_metrics(self) -> str: request_result = requests.get(f"http://{self.host}:{self.http_port}/metrics") - request_result.raise_for_status() return request_result.text @staticmethod diff --git a/test_runner/regress/test_sharding_service.py b/test_runner/regress/test_sharding_service.py index 00c3a1628e..b4f1f49543 100644 --- a/test_runner/regress/test_sharding_service.py +++ b/test_runner/regress/test_sharding_service.py @@ -1,13 +1,16 @@ import time from collections import defaultdict from datetime import datetime, timezone -from typing import List +from typing import Any, Dict, List +import pytest from fixtures.log_helper import log from fixtures.neon_fixtures import ( + AttachmentServiceApiException, NeonEnv, NeonEnvBuilder, PgBin, + TokenScope, ) from fixtures.pageserver.http import PageserverHttpClient from fixtures.pageserver.utils import ( @@ -457,37 +460,40 @@ def test_sharding_service_debug_apis(neon_env_builder: NeonEnvBuilder): # Initial tenant (1 shard) and the one we just created (2 shards) should be visible response = env.attachment_service.request( - "GET", f"{env.attachment_service_api}/debug/v1/tenant" + "GET", + f"{env.attachment_service_api}/debug/v1/tenant", + headers=env.attachment_service.headers(TokenScope.ADMIN), ) - response.raise_for_status() assert len(response.json()) == 3 # Scheduler should report the expected nodes and shard counts response = env.attachment_service.request( "GET", f"{env.attachment_service_api}/debug/v1/scheduler" ) - response.raise_for_status() # Two nodes, in a dict of node_id->node assert len(response.json()["nodes"]) == 2 assert sum(v["shard_count"] for v in response.json()["nodes"].values()) == 3 assert all(v["may_schedule"] for v in response.json()["nodes"].values()) response = env.attachment_service.request( - "POST", f"{env.attachment_service_api}/debug/v1/node/{env.pageservers[1].id}/drop" + "POST", + f"{env.attachment_service_api}/debug/v1/node/{env.pageservers[1].id}/drop", + headers=env.attachment_service.headers(TokenScope.ADMIN), ) - response.raise_for_status() assert len(env.attachment_service.node_list()) == 1 response = env.attachment_service.request( - "POST", f"{env.attachment_service_api}/debug/v1/tenant/{tenant_id}/drop" + "POST", + f"{env.attachment_service_api}/debug/v1/tenant/{tenant_id}/drop", + headers=env.attachment_service.headers(TokenScope.ADMIN), ) - response.raise_for_status() # Tenant drop should be reflected in dump output response = env.attachment_service.request( - "GET", f"{env.attachment_service_api}/debug/v1/tenant" + "GET", + f"{env.attachment_service_api}/debug/v1/tenant", + headers=env.attachment_service.headers(TokenScope.ADMIN), ) - response.raise_for_status() assert len(response.json()) == 1 # Check that the 'drop' APIs didn't leave things in a state that would fail a consistency check: they're @@ -603,3 +609,64 @@ def test_sharding_service_s3_time_travel_recovery( endpoint.safe_psql("SELECT * FROM created_foo;") env.attachment_service.consistency_check() + + +def test_sharding_service_auth(neon_env_builder: NeonEnvBuilder): + neon_env_builder.auth_enabled = True + env = neon_env_builder.init_start() + svc = env.attachment_service + api = env.attachment_service_api + + tenant_id = TenantId.generate() + body: Dict[str, Any] = {"new_tenant_id": str(tenant_id)} + + # No token + with pytest.raises( + AttachmentServiceApiException, + match="Unauthorized: missing authorization header", + ): + svc.request("POST", f"{env.attachment_service_api}/v1/tenant", json=body) + + # Token with incorrect scope + with pytest.raises( + AttachmentServiceApiException, + match="Forbidden: JWT authentication error", + ): + svc.request("POST", f"{api}/v1/tenant", json=body, headers=svc.headers(TokenScope.ADMIN)) + + # Token with correct scope + svc.request( + "POST", f"{api}/v1/tenant", json=body, headers=svc.headers(TokenScope.PAGE_SERVER_API) + ) + + # No token + with pytest.raises( + AttachmentServiceApiException, + match="Unauthorized: missing authorization header", + ): + svc.request("GET", f"{api}/debug/v1/tenant") + + # Token with incorrect scope + with pytest.raises( + AttachmentServiceApiException, + match="Forbidden: JWT authentication error", + ): + svc.request( + "GET", f"{api}/debug/v1/tenant", headers=svc.headers(TokenScope.GENERATIONS_API) + ) + + # No token + with pytest.raises( + AttachmentServiceApiException, + match="Unauthorized: missing authorization header", + ): + svc.request("POST", f"{api}/upcall/v1/re-attach") + + # Token with incorrect scope + with pytest.raises( + AttachmentServiceApiException, + match="Forbidden: JWT authentication error", + ): + svc.request( + "POST", f"{api}/upcall/v1/re-attach", headers=svc.headers(TokenScope.PAGE_SERVER_API) + )