mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-10 06:52:55 +00:00
pageserver: add gRPC authentication (#12010)
## Problem We need authentication for the gRPC server. Requires #11972. Touches #11728. ## Summary of changes Add two request interceptors that decode the tenant/timeline/shard metadata and authenticate the JWT token against them.
This commit is contained in:
@@ -43,12 +43,14 @@ use strum_macros::IntoStaticStr;
|
||||
use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt, BufWriter};
|
||||
use tokio::task::JoinHandle;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tonic::service::Interceptor as _;
|
||||
use tracing::*;
|
||||
use utils::auth::{Claims, Scope, SwappableJwtAuth};
|
||||
use utils::failpoint_support;
|
||||
use utils::id::{TenantId, TimelineId};
|
||||
use utils::id::{TenantId, TenantTimelineId, TimelineId};
|
||||
use utils::logging::log_slow;
|
||||
use utils::lsn::Lsn;
|
||||
use utils::shard::ShardIndex;
|
||||
use utils::simple_rcu::RcuReadGuard;
|
||||
use utils::sync::gate::{Gate, GateGuard};
|
||||
use utils::sync::spsc_fold;
|
||||
@@ -200,9 +202,9 @@ pub fn spawn_grpc(
|
||||
.max_concurrent_streams(Some(GRPC_MAX_CONCURRENT_STREAMS));
|
||||
|
||||
// Main page service.
|
||||
let page_service = proto::PageServiceServer::new(PageServerHandler::new(
|
||||
let page_service_handler = PageServerHandler::new(
|
||||
tenant_manager,
|
||||
auth,
|
||||
auth.clone(),
|
||||
PageServicePipeliningConfig::Serial, // TODO: unused with gRPC
|
||||
conf.get_vectored_concurrent_io,
|
||||
ConnectionPerfSpanFields::default(),
|
||||
@@ -210,7 +212,18 @@ pub fn spawn_grpc(
|
||||
ctx,
|
||||
cancel.clone(),
|
||||
gate.enter().expect("just created"),
|
||||
));
|
||||
);
|
||||
|
||||
let mut tenant_interceptor = TenantMetadataInterceptor;
|
||||
let mut auth_interceptor = TenantAuthInterceptor::new(auth);
|
||||
let interceptors = move |mut req: tonic::Request<()>| {
|
||||
req = tenant_interceptor.call(req)?;
|
||||
req = auth_interceptor.call(req)?;
|
||||
Ok(req)
|
||||
};
|
||||
|
||||
let page_service =
|
||||
proto::PageServiceServer::with_interceptor(page_service_handler, interceptors);
|
||||
let server = server.add_service(page_service);
|
||||
|
||||
// Reflection service for use with e.g. grpcurl.
|
||||
@@ -3290,6 +3303,104 @@ impl From<GetActiveTenantError> for QueryError {
|
||||
}
|
||||
}
|
||||
|
||||
/// gRPC interceptor that decodes tenant metadata and stores it as request extensions of type
|
||||
/// TenantTimelineId and ShardIndex.
|
||||
///
|
||||
/// TODO: consider looking up the timeline handle here and storing it.
|
||||
#[derive(Clone)]
|
||||
struct TenantMetadataInterceptor;
|
||||
|
||||
impl tonic::service::Interceptor for TenantMetadataInterceptor {
|
||||
fn call(&mut self, mut req: tonic::Request<()>) -> Result<tonic::Request<()>, tonic::Status> {
|
||||
// Decode the tenant ID.
|
||||
let tenant_id = req
|
||||
.metadata()
|
||||
.get("neon-tenant-id")
|
||||
.ok_or_else(|| tonic::Status::invalid_argument("missing neon-tenant-id"))?
|
||||
.to_str()
|
||||
.map_err(|_| tonic::Status::invalid_argument("invalid neon-tenant-id"))?;
|
||||
let tenant_id = TenantId::from_str(tenant_id)
|
||||
.map_err(|_| tonic::Status::invalid_argument("invalid neon-tenant-id"))?;
|
||||
|
||||
// Decode the timeline ID.
|
||||
let timeline_id = req
|
||||
.metadata()
|
||||
.get("neon-timeline-id")
|
||||
.ok_or_else(|| tonic::Status::invalid_argument("missing neon-timeline-id"))?
|
||||
.to_str()
|
||||
.map_err(|_| tonic::Status::invalid_argument("invalid neon-timeline-id"))?;
|
||||
let timeline_id = TimelineId::from_str(timeline_id)
|
||||
.map_err(|_| tonic::Status::invalid_argument("invalid neon-timeline-id"))?;
|
||||
|
||||
// Decode the shard ID.
|
||||
let shard_index = req
|
||||
.metadata()
|
||||
.get("neon-shard-id")
|
||||
.ok_or_else(|| tonic::Status::invalid_argument("missing neon-shard-id"))?
|
||||
.to_str()
|
||||
.map_err(|_| tonic::Status::invalid_argument("invalid neon-shard-id"))?;
|
||||
let shard_index = ShardIndex::from_str(shard_index)
|
||||
.map_err(|_| tonic::Status::invalid_argument("invalid neon-shard-id"))?;
|
||||
|
||||
// Stash them in the request.
|
||||
let extensions = req.extensions_mut();
|
||||
extensions.insert(TenantTimelineId::new(tenant_id, timeline_id));
|
||||
extensions.insert(shard_index);
|
||||
|
||||
Ok(req)
|
||||
}
|
||||
}
|
||||
|
||||
/// Authenticates gRPC page service requests. Must run after TenantMetadataInterceptor.
|
||||
#[derive(Clone)]
|
||||
struct TenantAuthInterceptor {
|
||||
auth: Option<Arc<SwappableJwtAuth>>,
|
||||
}
|
||||
|
||||
impl TenantAuthInterceptor {
|
||||
fn new(auth: Option<Arc<SwappableJwtAuth>>) -> Self {
|
||||
Self { auth }
|
||||
}
|
||||
}
|
||||
|
||||
impl tonic::service::Interceptor for TenantAuthInterceptor {
|
||||
fn call(&mut self, req: tonic::Request<()>) -> Result<tonic::Request<()>, tonic::Status> {
|
||||
// Do nothing if auth is disabled.
|
||||
let Some(auth) = self.auth.as_ref() else {
|
||||
return Ok(req);
|
||||
};
|
||||
|
||||
// Fetch the tenant ID that's been set by TenantMetadataInterceptor.
|
||||
let ttid = req
|
||||
.extensions()
|
||||
.get::<TenantTimelineId>()
|
||||
.expect("TenantMetadataInterceptor must run before TenantAuthInterceptor");
|
||||
|
||||
// Fetch and decode the JWT token.
|
||||
let jwt = req
|
||||
.metadata()
|
||||
.get("authorization")
|
||||
.ok_or_else(|| tonic::Status::unauthenticated("no authorization header"))?
|
||||
.to_str()
|
||||
.map_err(|_| tonic::Status::invalid_argument("invalid authorization header"))?
|
||||
.strip_prefix("Bearer ")
|
||||
.ok_or_else(|| tonic::Status::invalid_argument("invalid authorization header"))?
|
||||
.trim();
|
||||
let jwtdata: TokenData<Claims> = auth
|
||||
.decode(jwt)
|
||||
.map_err(|err| tonic::Status::invalid_argument(format!("invalid JWT token: {err}")))?;
|
||||
let claims = jwtdata.claims;
|
||||
|
||||
// Check if the token is valid for this tenant.
|
||||
check_permission(&claims, Some(ttid.tenant_id))
|
||||
.map_err(|err| tonic::Status::permission_denied(err.to_string()))?;
|
||||
|
||||
// TODO: consider stashing the claims in the request extensions, if needed.
|
||||
|
||||
Ok(req)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub(crate) enum GetActiveTimelineError {
|
||||
#[error(transparent)]
|
||||
|
||||
Reference in New Issue
Block a user