This commit is contained in:
Erik Grinaker
2025-05-31 18:42:49 +02:00
parent 232591e457
commit 90e3313dbb
3 changed files with 175 additions and 67 deletions

2
Cargo.lock generated
View File

@@ -4305,6 +4305,7 @@ dependencies = [
"hashlink",
"hex",
"hex-literal",
"http 1.1.0",
"http-utils",
"humantime",
"humantime-serde",
@@ -4367,6 +4368,7 @@ dependencies = [
"toml_edit",
"tonic 0.13.1",
"tonic-reflection",
"tower 0.5.2",
"tracing",
"tracing-utils",
"twox-hash",

View File

@@ -34,6 +34,7 @@ fail.workspace = true
futures.workspace = true
hashlink.workspace = true
hex.workspace = true
http.workspace = true
http-utils.workspace = true
humantime-serde.workspace = true
humantime.workspace = true
@@ -93,6 +94,7 @@ tokio-util.workspace = true
toml_edit = { workspace = true, features = [ "serde" ] }
tonic.workspace = true
tonic-reflection.workspace = true
tower.workspace = true
tracing.workspace = true
tracing-utils.workspace = true
url.workspace = true

View File

@@ -8,12 +8,14 @@ use std::os::fd::AsRawFd;
use std::pin::Pin;
use std::str::FromStr;
use std::sync::Arc;
use std::task::{Context, Poll};
use std::time::{Duration, Instant, SystemTime};
use std::{io, str};
use anyhow::{Context, bail};
use anyhow::{Context as _, bail};
use async_compression::tokio::write::GzipEncoder;
use bytes::Buf;
use futures::future::BoxFuture;
use futures::{FutureExt, Stream};
use itertools::Itertools;
use jsonwebtoken::TokenData;
@@ -115,6 +117,22 @@ const GRPC_MAX_CONCURRENT_STREAMS: u32 = 256;
///////////////////////////////////////////////////////////////////////////////
/// Records all of the given fields in the current span, as a single call. The fields must already
/// have been declared for the span with empty values.
macro_rules! span_record {
($($tokens:tt)*) => {span_record_in!(::tracing::Span::current(); $($tokens)*)};
}
/// Records all of the given fields in the given span, as a single call. The fields must already
/// have been declared for the span with empty values.
macro_rules! span_record_in {
($span:expr; $($tokens:tt)*) => {
if let Some(meta) = $span.metadata() {
$span.record_all(&tracing::valueset!(meta.fields(), $($tokens)*));
}
};
}
pub struct Listener {
cancel: CancellationToken,
/// Cancel the listener task through `listen_cancel` to shut down the listener
@@ -204,24 +222,37 @@ pub fn spawn_grpc(
.http2_keepalive_timeout(Some(GRPC_HTTP2_KEEPALIVE_TIMEOUT))
.max_concurrent_streams(Some(GRPC_MAX_CONCURRENT_STREAMS));
// Main page service.
// Main page service stack. Uses a mix of Tonic interceptors and Tower layers:
//
// * Interceptors: can inspect and modify the gRPC request. Sync code only, runs before service.
//
// * Layers: allow async code, can run code after the service response. However, only has access
// to the raw HTTP request/response.
let page_service_handler = GrpcPageServiceHandler {
tenant_manager,
ctx,
};
let mut received_at_interceptor = ReceivedAtInterceptor;
let observability_layer = ObservabilityLayer;
let mut tenant_interceptor = TenantMetadataInterceptor;
let mut auth_interceptor = TenantAuthInterceptor::new(auth);
let interceptors = move |mut req: tonic::Request<()>| {
req = received_at_interceptor.call(req)?;
req = tenant_interceptor.call(req)?;
req = auth_interceptor.call(req)?;
Ok(req)
};
let page_service =
proto::PageServiceServer::with_interceptor(page_service_handler, interceptors);
let page_service = tower::ServiceBuilder::new()
// Create tracing span and start timing.
.layer(observability_layer)
// Intercept gRPC requests.
.layer(tonic::service::InterceptorLayer::new(
move |mut req: tonic::Request<()>| {
// Extract tenant metadata.
req = tenant_interceptor.call(req)?;
// Authenticate tenant JWT token.
req = auth_interceptor.call(req)?;
Ok(req)
},
))
// Obtain timeline handle.
.service(proto::PageServiceServer::new(page_service_handler));
let server = server.add_service(page_service);
// Reflection service for use with e.g. grpcurl.
@@ -813,6 +844,22 @@ impl From<GetActiveTimelineError> for PageStreamError {
}
}
impl From<GetActiveTenantError> for QueryError {
fn from(e: GetActiveTenantError) -> Self {
match e {
GetActiveTenantError::WaitForActiveTimeout { .. } => QueryError::Disconnected(
ConnectionError::Io(io::Error::new(io::ErrorKind::TimedOut, e.to_string())),
),
GetActiveTenantError::Cancelled
| GetActiveTenantError::WillNotBecomeActive(TenantState::Stopping { .. }) => {
QueryError::Shutdown
}
e @ GetActiveTenantError::NotFound(_) => QueryError::NotFound(format!("{e}").into()),
e => QueryError::Other(anyhow::anyhow!(e)),
}
}
}
impl From<WaitLsnError> for PageStreamError {
fn from(value: WaitLsnError) -> Self {
match value {
@@ -3382,9 +3429,11 @@ pub struct GrpcPageServiceHandler {
impl GrpcPageServiceHandler {
/// Errors if the request is executed on a non-zero shard. Only shard 0 has a complete view of
/// relations and their sizes, as well as SLRU segments and other data.
///
/// TODO: take the timeline handle instead.
#[allow(clippy::result_large_err)]
fn ensure_shard_zero(req: &tonic::Request<impl Any>) -> Result<(), tonic::Status> {
match Self::extract::<ShardIndex>(req).shard_number.0 {
match extract::<ShardIndex>(req).shard_number.0 {
0 => Ok(()),
shard => Err(tonic::Status::invalid_argument(format!(
"request must execute on shard zero (is shard {shard})",
@@ -3392,14 +3441,6 @@ impl GrpcPageServiceHandler {
}
}
/// Extracts the given type from the request extensions. It must have been set by an
/// interceptor.
fn extract<T: Send + Sync + 'static>(req: &tonic::Request<impl Any>) -> &T {
req.extensions()
.get::<T>()
.expect("extension should be set by interceptor")
}
/// Generates a PagestreamRequest header from a ReadLsn and request ID.
fn make_hdr(read_lsn: page_api::ReadLsn, req_id: u64) -> PagestreamRequest {
PagestreamRequest {
@@ -3415,8 +3456,8 @@ impl GrpcPageServiceHandler {
&self,
req: &tonic::Request<impl Any>,
) -> Result<timeline::handle::Handle<TenantManagerTypes>, GetActiveTimelineError> {
let ttid = *Self::extract::<TenantTimelineId>(req);
let shard_index = *Self::extract::<ShardIndex>(req);
let ttid = *extract::<TenantTimelineId>(req);
let shard_index = *extract::<ShardIndex>(req);
let shard_selector = ShardSelector::Known(shard_index);
// TODO: untangle this from TenantManagerWrapper::resolve() and Cache::get(), to avoid the
@@ -3451,7 +3492,8 @@ impl GrpcPageServiceHandler {
///
/// NB: errors will terminate the stream. Per-request errors should return a GetPageResponse
/// with an appropriate status code instead.
async fn handle_get_page_request(
#[instrument(skip_all, fields(req_id, rel, blkno))]
async fn get_page(
ctx: &RequestContext,
timeline: &WeakHandle<TenantManagerTypes>,
req: proto::GetPageRequest,
@@ -3461,11 +3503,15 @@ impl GrpcPageServiceHandler {
let timeline = timeline.upgrade().map_err(|err| match err {
HandleUpgradeError::ShutDown => tonic::Status::unavailable("timeline is shutting down"),
})?;
let ctx = ctx.with_scope_page_service_pagestream(&timeline);
// Validate the request and convert it to a Pagestream request.
let req: page_api::GetPageRequest = req.try_into()?;
span_record!(req_id = %req.request_id, rel = %req.rel, blkno = %req.block_numbers[0]);
info!("XXX");
let ctx = ctx.with_scope_page_service_pagestream(&timeline);
let effective_lsn = match PageServerHandler::effective_request_lsn(
&timeline,
timeline.get_last_record_lsn(),
@@ -3539,10 +3585,8 @@ impl GrpcPageServiceHandler {
/// Implements the gRPC page service.
///
/// TODO: when the libpq impl is removed, simplify this:
/// * Add Tower middleware for timeline handle, rate limiting, and timing.
/// * Remove the intermediate Pagestream types.
/// * Inline the handler code.
/// TODO: when libpq impl is removed, remove intermediate Pagestream types and inline the handlers.
/// TODO: Tower middleware for timeline handle, rate limiting, and timing.
#[tonic::async_trait]
impl proto::PageService for GrpcPageServiceHandler {
type GetBaseBackupStream = Pin<
@@ -3552,17 +3596,21 @@ impl proto::PageService for GrpcPageServiceHandler {
type GetPagesStream =
Pin<Box<dyn Stream<Item = Result<proto::GetPageResponse, tonic::Status>> + Send>>;
#[instrument(skip_all, fields(rel, req_lsn))]
async fn check_rel_exists(
&self,
req: tonic::Request<proto::CheckRelExistsRequest>,
) -> Result<tonic::Response<proto::CheckRelExistsResponse>, tonic::Status> {
let received_at = Self::extract::<ReceivedAt>(&req).0;
let received_at = extract::<ReceivedAt>(&req).0;
let timeline = self.get_request_timeline(&req).await?;
let ctx = self.ctx.with_scope_page_service_pagestream(&timeline);
// Validate the request and convert it to a Pagestream request.
Self::ensure_shard_zero(&req)?;
let req: page_api::CheckRelExistsRequest = req.into_inner().try_into()?;
span_record!(rel = %req.rel, req_lsn = %req.read_lsn.request_lsn);
let req = PagestreamExistsRequest {
hdr: Self::make_hdr(req.read_lsn, 0),
rel: req.rel,
@@ -3581,6 +3629,7 @@ impl proto::PageService for GrpcPageServiceHandler {
Ok(tonic::Response::new(resp.into()))
}
#[instrument(skip_all)]
async fn get_base_backup(
&self,
_: tonic::Request<proto::GetBaseBackupRequest>,
@@ -3592,7 +3641,7 @@ impl proto::PageService for GrpcPageServiceHandler {
&self,
req: tonic::Request<proto::GetDbSizeRequest>,
) -> Result<tonic::Response<proto::GetDbSizeResponse>, tonic::Status> {
let received_at = Self::extract::<ReceivedAt>(&req).0;
let received_at = extract::<ReceivedAt>(&req).0;
let timeline = self.get_request_timeline(&req).await?;
let ctx = self.ctx.with_scope_page_service_pagestream(&timeline);
@@ -3622,8 +3671,8 @@ impl proto::PageService for GrpcPageServiceHandler {
req: tonic::Request<tonic::Streaming<proto::GetPageRequest>>,
) -> Result<tonic::Response<Self::GetPagesStream>, tonic::Status> {
// Extract the timeline from the request and check that it exists.
let ttid = *Self::extract::<TenantTimelineId>(&req);
let shard_index = *Self::extract::<ShardIndex>(&req);
let ttid = *extract::<TenantTimelineId>(&req);
let shard_index = *extract::<ShardIndex>(&req);
let shard_selector = ShardSelector::Known(shard_index);
let mut handles = TimelineHandles::new(self.tenant_manager.clone());
@@ -3632,6 +3681,7 @@ impl proto::PageService for GrpcPageServiceHandler {
.await?;
let ctx = self.ctx.attached_child();
let span = tracing::Span::current(); // propagate span into the stream future
let mut reqs = req.into_inner();
let resps = async_stream::try_stream! {
@@ -3641,7 +3691,14 @@ impl proto::PageService for GrpcPageServiceHandler {
.downgrade();
while let Some(req) = reqs.message().await? {
// TODO: implement IoConcurrency sidecar.
yield Self::handle_get_page_request(&ctx, &timeline, req, IoConcurrency::Sequential).await?
yield Self::get_page(
&ctx,
&timeline,
req,
IoConcurrency::Sequential,
)
.instrument(span.clone())
.await?
}
};
@@ -3652,7 +3709,7 @@ impl proto::PageService for GrpcPageServiceHandler {
&self,
req: tonic::Request<proto::GetRelSizeRequest>,
) -> Result<tonic::Response<proto::GetRelSizeResponse>, tonic::Status> {
let received_at = Self::extract::<ReceivedAt>(&req).0;
let received_at = extract::<ReceivedAt>(&req).0;
let timeline = self.get_request_timeline(&req).await?;
let ctx = self.ctx.with_scope_page_service_pagestream(&timeline);
@@ -3681,7 +3738,7 @@ impl proto::PageService for GrpcPageServiceHandler {
&self,
req: tonic::Request<proto::GetSlruSegmentRequest>,
) -> Result<tonic::Response<proto::GetSlruSegmentResponse>, tonic::Status> {
let received_at = Self::extract::<ReceivedAt>(&req).0;
let received_at = extract::<ReceivedAt>(&req).0;
let timeline = self.get_request_timeline(&req).await?;
let ctx = self.ctx.with_scope_page_service_pagestream(&timeline);
@@ -3709,39 +3766,77 @@ impl proto::PageService for GrpcPageServiceHandler {
}
}
impl From<GetActiveTenantError> for QueryError {
fn from(e: GetActiveTenantError) -> Self {
match e {
GetActiveTenantError::WaitForActiveTimeout { .. } => QueryError::Disconnected(
ConnectionError::Io(io::Error::new(io::ErrorKind::TimedOut, e.to_string())),
),
GetActiveTenantError::Cancelled
| GetActiveTenantError::WillNotBecomeActive(TenantState::Stopping { .. }) => {
QueryError::Shutdown
}
e @ GetActiveTenantError::NotFound(_) => QueryError::NotFound(format!("{e}").into()),
e => QueryError::Other(anyhow::anyhow!(e)),
}
/// Extracts the given type from the request extensions, or panics if it is missing.
fn extract<T: Send + Sync + 'static>(req: &tonic::Request<impl Any>) -> &T {
let Some(value) = req.extensions().get::<T>() else {
let name = std::any::type_name::<T>();
panic!("extension {name} should be set by interceptor or layer");
};
value
}
/// gRPC layer that handles observability concerns:
///
/// * Records the initial request timestamp as a ReceivedAt extension.
/// * Creates a tracing span with request metadata and instruments the future.
#[derive(Clone)]
struct ObservabilityLayer;
impl<S: tonic::server::NamedService> tower::Layer<S> for ObservabilityLayer {
type Service = ObservabilityService<S>;
fn layer(&self, inner: S) -> Self::Service {
ObservabilityService { inner }
}
}
/// gRPC interceptor that records the start time of request processing as a ReceivedAt extension.
///
/// TODO: generalize this for other observability information.
#[derive(Clone)]
struct ReceivedAtInterceptor;
struct ObservabilityService<S> {
inner: S,
}
impl<S, B> tower::Service<http::Request<B>> for ObservabilityService<S>
where
S: tower::Service<http::Request<B>>,
S::Future: Send + 'static,
{
type Response = S::Response;
type Error = S::Error;
type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
fn call(&mut self, mut req: http::Request<B>) -> Self::Future {
// Stash the request timestamp as a ReceivedAt extension.
// TODO: start a timer here instead.
req.extensions_mut().insert(ReceivedAt(Instant::now()));
// Create a basic tracing span. Enter the span for the current thread (to use it for inner
// non-async code like interceptors), and instrument the future (to use it for inner async
// code like the service itself).
let span = info_span!(
"grpc:pageservice",
// These are set later by TenantMetadataInterceptor.
tenant_id = field::Empty,
timeline_id = field::Empty,
shard_id = field::Empty,
);
let _guard = span.enter();
Box::pin(self.inner.call(req).instrument(span.clone()))
}
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.inner.poll_ready(cx)
}
}
impl<S: tonic::server::NamedService> tonic::server::NamedService for ObservabilityService<S> {
const NAME: &'static str = S::NAME; // propagate service name
}
#[derive(Clone)]
struct ReceivedAt(Instant);
impl tonic::service::Interceptor for ReceivedAtInterceptor {
fn call(&mut self, mut req: tonic::Request<()>) -> Result<tonic::Request<()>, tonic::Status> {
req.extensions_mut().insert(ReceivedAt(Instant::now()));
Ok(req)
}
}
/// gRPC interceptor that decodes tenant metadata and stores it as request extensions of type
/// gRPC interceptor that decodes tenant metadata and stores it as extensions of type
/// TenantTimelineId and ShardIndex.
#[derive(Clone)]
struct TenantMetadataInterceptor;
@@ -3783,11 +3878,24 @@ impl tonic::service::Interceptor for TenantMetadataInterceptor {
extensions.insert(TenantTimelineId::new(tenant_id, timeline_id));
extensions.insert(shard_index);
// Decorate the tracing span. This doesn't run in an async context, so it can't use
// tracing::Span::current().
let tsid = TenantShardId {
tenant_id,
shard_number: shard_index.shard_number,
shard_count: shard_index.shard_count,
};
let shard_id = tsid.shard_slug();
span_record!(%tenant_id, %timeline_id, %shard_id);
info!("YYY interceptor");
Ok(req)
}
}
/// Authenticates gRPC page service requests. Must run after TenantMetadataInterceptor.
/// Authenticates gRPC page service requests.
#[derive(Clone)]
struct TenantAuthInterceptor {
auth: Option<Arc<SwappableJwtAuth>>,
@@ -3806,11 +3914,7 @@ impl tonic::service::Interceptor for TenantAuthInterceptor {
return Ok(req);
};
// Fetch the tenant ID that's been set by TenantMetadataInterceptor.
let ttid = req
.extensions()
.get::<TenantTimelineId>()
.expect("TenantMetadataInterceptor must run before TenantAuthInterceptor");
let TenantTimelineId { tenant_id, .. } = *extract::<TenantTimelineId>(&req);
// Fetch and decode the JWT token.
let jwt = req
@@ -3828,7 +3932,7 @@ impl tonic::service::Interceptor for TenantAuthInterceptor {
let claims = jwtdata.claims;
// Check if the token is valid for this tenant.
check_permission(&claims, Some(ttid.tenant_id))
check_permission(&claims, Some(tenant_id))
.map_err(|err| tonic::Status::permission_denied(err.to_string()))?;
// TODO: consider stashing the claims in the request extensions, if needed.