From a082f9814ad85248c8fcd152d34b20fc0fa1855a Mon Sep 17 00:00:00 2001 From: Erik Grinaker Date: Mon, 26 May 2025 12:24:45 +0200 Subject: [PATCH] pageserver: add gRPC authentication (#12010) ## Problem We need authentication for the gRPC server. Requires #11972. Touches #11728. ## Summary of changes Add two request interceptors that decode the tenant/timeline/shard metadata and authenticate the JWT token against them. --- pageserver/src/page_service.rs | 119 +++++++++++++++++++++++++++++++-- 1 file changed, 115 insertions(+), 4 deletions(-) diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index 34dc158694..06aa207f82 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -43,12 +43,14 @@ use strum_macros::IntoStaticStr; use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt, BufWriter}; use tokio::task::JoinHandle; use tokio_util::sync::CancellationToken; +use tonic::service::Interceptor as _; use tracing::*; use utils::auth::{Claims, Scope, SwappableJwtAuth}; use utils::failpoint_support; -use utils::id::{TenantId, TimelineId}; +use utils::id::{TenantId, TenantTimelineId, TimelineId}; use utils::logging::log_slow; use utils::lsn::Lsn; +use utils::shard::ShardIndex; use utils::simple_rcu::RcuReadGuard; use utils::sync::gate::{Gate, GateGuard}; use utils::sync::spsc_fold; @@ -200,9 +202,9 @@ pub fn spawn_grpc( .max_concurrent_streams(Some(GRPC_MAX_CONCURRENT_STREAMS)); // Main page service. - let page_service = proto::PageServiceServer::new(PageServerHandler::new( + let page_service_handler = PageServerHandler::new( tenant_manager, - auth, + auth.clone(), PageServicePipeliningConfig::Serial, // TODO: unused with gRPC conf.get_vectored_concurrent_io, ConnectionPerfSpanFields::default(), @@ -210,7 +212,18 @@ pub fn spawn_grpc( ctx, cancel.clone(), gate.enter().expect("just created"), - )); + ); + + let mut tenant_interceptor = TenantMetadataInterceptor; + let mut auth_interceptor = TenantAuthInterceptor::new(auth); + let interceptors = move |mut req: tonic::Request<()>| { + req = tenant_interceptor.call(req)?; + req = auth_interceptor.call(req)?; + Ok(req) + }; + + let page_service = + proto::PageServiceServer::with_interceptor(page_service_handler, interceptors); let server = server.add_service(page_service); // Reflection service for use with e.g. grpcurl. @@ -3290,6 +3303,104 @@ impl From for QueryError { } } +/// gRPC interceptor that decodes tenant metadata and stores it as request extensions of type +/// TenantTimelineId and ShardIndex. +/// +/// TODO: consider looking up the timeline handle here and storing it. +#[derive(Clone)] +struct TenantMetadataInterceptor; + +impl tonic::service::Interceptor for TenantMetadataInterceptor { + fn call(&mut self, mut req: tonic::Request<()>) -> Result, tonic::Status> { + // Decode the tenant ID. + let tenant_id = req + .metadata() + .get("neon-tenant-id") + .ok_or_else(|| tonic::Status::invalid_argument("missing neon-tenant-id"))? + .to_str() + .map_err(|_| tonic::Status::invalid_argument("invalid neon-tenant-id"))?; + let tenant_id = TenantId::from_str(tenant_id) + .map_err(|_| tonic::Status::invalid_argument("invalid neon-tenant-id"))?; + + // Decode the timeline ID. + let timeline_id = req + .metadata() + .get("neon-timeline-id") + .ok_or_else(|| tonic::Status::invalid_argument("missing neon-timeline-id"))? + .to_str() + .map_err(|_| tonic::Status::invalid_argument("invalid neon-timeline-id"))?; + let timeline_id = TimelineId::from_str(timeline_id) + .map_err(|_| tonic::Status::invalid_argument("invalid neon-timeline-id"))?; + + // Decode the shard ID. + let shard_index = req + .metadata() + .get("neon-shard-id") + .ok_or_else(|| tonic::Status::invalid_argument("missing neon-shard-id"))? + .to_str() + .map_err(|_| tonic::Status::invalid_argument("invalid neon-shard-id"))?; + let shard_index = ShardIndex::from_str(shard_index) + .map_err(|_| tonic::Status::invalid_argument("invalid neon-shard-id"))?; + + // Stash them in the request. + let extensions = req.extensions_mut(); + extensions.insert(TenantTimelineId::new(tenant_id, timeline_id)); + extensions.insert(shard_index); + + Ok(req) + } +} + +/// Authenticates gRPC page service requests. Must run after TenantMetadataInterceptor. +#[derive(Clone)] +struct TenantAuthInterceptor { + auth: Option>, +} + +impl TenantAuthInterceptor { + fn new(auth: Option>) -> Self { + Self { auth } + } +} + +impl tonic::service::Interceptor for TenantAuthInterceptor { + fn call(&mut self, req: tonic::Request<()>) -> Result, tonic::Status> { + // Do nothing if auth is disabled. + let Some(auth) = self.auth.as_ref() else { + return Ok(req); + }; + + // Fetch the tenant ID that's been set by TenantMetadataInterceptor. + let ttid = req + .extensions() + .get::() + .expect("TenantMetadataInterceptor must run before TenantAuthInterceptor"); + + // Fetch and decode the JWT token. + let jwt = req + .metadata() + .get("authorization") + .ok_or_else(|| tonic::Status::unauthenticated("no authorization header"))? + .to_str() + .map_err(|_| tonic::Status::invalid_argument("invalid authorization header"))? + .strip_prefix("Bearer ") + .ok_or_else(|| tonic::Status::invalid_argument("invalid authorization header"))? + .trim(); + let jwtdata: TokenData = auth + .decode(jwt) + .map_err(|err| tonic::Status::invalid_argument(format!("invalid JWT token: {err}")))?; + let claims = jwtdata.claims; + + // Check if the token is valid for this tenant. + check_permission(&claims, Some(ttid.tenant_id)) + .map_err(|err| tonic::Status::permission_denied(err.to_string()))?; + + // TODO: consider stashing the claims in the request extensions, if needed. + + Ok(req) + } +} + #[derive(Debug, thiserror::Error)] pub(crate) enum GetActiveTimelineError { #[error(transparent)]