pageserver: add gRPC observability middleware (#12093)

## Problem

The page service logic asserts that a tracing span is present with
tenant/timeline/shard IDs. An initial gRPC page service implementation
thus requires a tracing span.

Touches https://github.com/neondatabase/neon/issues/11728.

## Summary of changes

Adds an `ObservabilityLayer` middleware that generates a tracing span
and decorates it with IDs from the gRPC metadata.

This is a minimal implementation to address the tracing span assertion.
It will be extended with additional observability in later PRs.
This commit is contained in:
Erik Grinaker
2025-06-02 13:46:50 +02:00
committed by GitHub
parent 5b62749c42
commit 8d7ed2a4ee
5 changed files with 134 additions and 27 deletions

View File

@@ -7,12 +7,14 @@ use std::os::fd::AsRawFd;
use std::pin::Pin;
use std::str::FromStr;
use std::sync::Arc;
use std::task::{Context, Poll};
use std::time::{Duration, Instant, SystemTime};
use std::{io, str};
use anyhow::{Context, bail};
use anyhow::{Context as _, bail};
use async_compression::tokio::write::GzipEncoder;
use bytes::Buf;
use futures::future::BoxFuture;
use futures::{FutureExt, Stream};
use itertools::Itertools;
use jsonwebtoken::TokenData;
@@ -46,7 +48,6 @@ use tokio_util::sync::CancellationToken;
use tonic::service::Interceptor as _;
use tracing::*;
use utils::auth::{Claims, Scope, SwappableJwtAuth};
use utils::failpoint_support;
use utils::id::{TenantId, TenantTimelineId, TimelineId};
use utils::logging::log_slow;
use utils::lsn::Lsn;
@@ -54,6 +55,7 @@ use utils::shard::ShardIndex;
use utils::simple_rcu::RcuReadGuard;
use utils::sync::gate::{Gate, GateGuard};
use utils::sync::spsc_fold;
use utils::{failpoint_support, span_record};
use crate::auth::check_permission;
use crate::basebackup::{self, BasebackupError};
@@ -195,13 +197,17 @@ pub fn spawn_grpc(
// Set up the gRPC server.
//
// TODO: consider tuning window sizes.
// TODO: wire up tracing.
let mut server = tonic::transport::Server::builder()
.http2_keepalive_interval(Some(GRPC_HTTP2_KEEPALIVE_INTERVAL))
.http2_keepalive_timeout(Some(GRPC_HTTP2_KEEPALIVE_TIMEOUT))
.max_concurrent_streams(Some(GRPC_MAX_CONCURRENT_STREAMS));
// Main page service.
// Main page service stack. Uses a mix of Tonic interceptors and Tower layers:
//
// * Interceptors: can inspect and modify the gRPC request. Sync code only, runs before service.
//
// * Layers: allow async code, can run code after the service response. However, only has access
// to the raw HTTP request/response, not the gRPC types.
let page_service_handler = PageServerHandler::new(
tenant_manager,
auth.clone(),
@@ -214,16 +220,22 @@ pub fn spawn_grpc(
gate.enter().expect("just created"),
);
let observability_layer = ObservabilityLayer;
let mut tenant_interceptor = TenantMetadataInterceptor;
let mut auth_interceptor = TenantAuthInterceptor::new(auth);
let interceptors = move |mut req: tonic::Request<()>| {
req = tenant_interceptor.call(req)?;
req = auth_interceptor.call(req)?;
Ok(req)
};
let page_service =
proto::PageServiceServer::with_interceptor(page_service_handler, interceptors);
let page_service = tower::ServiceBuilder::new()
// Create tracing span.
.layer(observability_layer)
// Intercept gRPC requests.
.layer(tonic::service::InterceptorLayer::new(move |mut req| {
// Extract tenant metadata.
req = tenant_interceptor.call(req)?;
// Authenticate tenant JWT token.
req = auth_interceptor.call(req)?;
Ok(req)
}))
.service(proto::PageServiceServer::new(page_service_handler));
let server = server.add_service(page_service);
// Reflection service for use with e.g. grpcurl.
@@ -3311,6 +3323,7 @@ impl proto::PageService for PageServerHandler {
type GetPagesStream =
Pin<Box<dyn Stream<Item = Result<proto::GetPageResponse, tonic::Status>> + Send>>;
#[instrument(skip_all)]
async fn check_rel_exists(
&self,
_: tonic::Request<proto::CheckRelExistsRequest>,
@@ -3318,6 +3331,7 @@ impl proto::PageService for PageServerHandler {
Err(tonic::Status::unimplemented("not implemented"))
}
#[instrument(skip_all)]
async fn get_base_backup(
&self,
_: tonic::Request<proto::GetBaseBackupRequest>,
@@ -3325,6 +3339,7 @@ impl proto::PageService for PageServerHandler {
Err(tonic::Status::unimplemented("not implemented"))
}
#[instrument(skip_all)]
async fn get_db_size(
&self,
_: tonic::Request<proto::GetDbSizeRequest>,
@@ -3332,6 +3347,7 @@ impl proto::PageService for PageServerHandler {
Err(tonic::Status::unimplemented("not implemented"))
}
// NB: don't instrument this, instrument each streamed request.
async fn get_pages(
&self,
_: tonic::Request<tonic::Streaming<proto::GetPageRequest>>,
@@ -3339,6 +3355,7 @@ impl proto::PageService for PageServerHandler {
Err(tonic::Status::unimplemented("not implemented"))
}
#[instrument(skip_all)]
async fn get_rel_size(
&self,
_: tonic::Request<proto::GetRelSizeRequest>,
@@ -3346,6 +3363,7 @@ impl proto::PageService for PageServerHandler {
Err(tonic::Status::unimplemented("not implemented"))
}
#[instrument(skip_all)]
async fn get_slru_segment(
&self,
_: tonic::Request<proto::GetSlruSegmentRequest>,
@@ -3354,19 +3372,65 @@ impl proto::PageService for PageServerHandler {
}
}
impl From<GetActiveTenantError> for QueryError {
fn from(e: GetActiveTenantError) -> Self {
match e {
GetActiveTenantError::WaitForActiveTimeout { .. } => QueryError::Disconnected(
ConnectionError::Io(io::Error::new(io::ErrorKind::TimedOut, e.to_string())),
),
GetActiveTenantError::Cancelled
| GetActiveTenantError::WillNotBecomeActive(TenantState::Stopping { .. }) => {
QueryError::Shutdown
}
e @ GetActiveTenantError::NotFound(_) => QueryError::NotFound(format!("{e}").into()),
e => QueryError::Other(anyhow::anyhow!(e)),
}
/// gRPC middleware layer that handles observability concerns:
///
/// * Creates and enters a tracing span.
///
/// TODO: add perf tracing.
/// TODO: add timing and metrics.
/// TODO: add logging.
#[derive(Clone)]
struct ObservabilityLayer;
impl<S: tonic::server::NamedService> tower::Layer<S> for ObservabilityLayer {
type Service = ObservabilityLayerService<S>;
fn layer(&self, inner: S) -> Self::Service {
Self::Service { inner }
}
}
#[derive(Clone)]
struct ObservabilityLayerService<S> {
inner: S,
}
impl<S: tonic::server::NamedService> tonic::server::NamedService for ObservabilityLayerService<S> {
const NAME: &'static str = S::NAME; // propagate inner service name
}
impl<S, B> tower::Service<http::Request<B>> for ObservabilityLayerService<S>
where
S: tower::Service<http::Request<B>>,
S::Future: Send + 'static,
{
type Response = S::Response;
type Error = S::Error;
type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
fn call(&mut self, req: http::Request<B>) -> Self::Future {
// Create a basic tracing span. Enter the span for the current thread (to use it for inner
// sync code like interceptors), and instrument the future (to use it for inner async code
// like the page service itself).
//
// The instrument() call below is not sufficient. It only affects the returned future, and
// only takes effect when the caller polls it. Any sync code executed when we call
// self.inner.call() below (such as interceptors) runs outside of the returned future, and
// is not affected by it. We therefore have to enter the span on the current thread too.
let span = info_span!(
"grpc:pageservice",
// Set by TenantMetadataInterceptor.
tenant_id = field::Empty,
timeline_id = field::Empty,
shard_id = field::Empty,
);
let _guard = span.enter();
Box::pin(self.inner.call(req).instrument(span.clone()))
}
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.inner.poll_ready(cx)
}
}
@@ -3400,19 +3464,22 @@ impl tonic::service::Interceptor for TenantMetadataInterceptor {
.map_err(|_| tonic::Status::invalid_argument("invalid neon-timeline-id"))?;
// Decode the shard ID.
let shard_index = req
let shard_id = req
.metadata()
.get("neon-shard-id")
.ok_or_else(|| tonic::Status::invalid_argument("missing neon-shard-id"))?
.to_str()
.map_err(|_| tonic::Status::invalid_argument("invalid neon-shard-id"))?;
let shard_index = ShardIndex::from_str(shard_index)
let shard_id = ShardIndex::from_str(shard_id)
.map_err(|_| tonic::Status::invalid_argument("invalid neon-shard-id"))?;
// Stash them in the request.
let extensions = req.extensions_mut();
extensions.insert(TenantTimelineId::new(tenant_id, timeline_id));
extensions.insert(shard_index);
extensions.insert(shard_id);
// Decorate the tracing span.
span_record!(%tenant_id, %timeline_id, %shard_id);
Ok(req)
}
@@ -3486,6 +3553,22 @@ impl From<GetActiveTimelineError> for QueryError {
}
}
impl From<GetActiveTenantError> for QueryError {
fn from(e: GetActiveTenantError) -> Self {
match e {
GetActiveTenantError::WaitForActiveTimeout { .. } => QueryError::Disconnected(
ConnectionError::Io(io::Error::new(io::ErrorKind::TimedOut, e.to_string())),
),
GetActiveTenantError::Cancelled
| GetActiveTenantError::WillNotBecomeActive(TenantState::Stopping { .. }) => {
QueryError::Shutdown
}
e @ GetActiveTenantError::NotFound(_) => QueryError::NotFound(format!("{e}").into()),
e => QueryError::Other(anyhow::anyhow!(e)),
}
}
}
impl From<crate::tenant::timeline::handle::HandleUpgradeError> for QueryError {
fn from(e: crate::tenant::timeline::handle::HandleUpgradeError) -> Self {
match e {