diff --git a/Cargo.lock b/Cargo.lock index 89351432c1..4f7378e95d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4305,6 +4305,7 @@ dependencies = [ "hashlink", "hex", "hex-literal", + "http 1.1.0", "http-utils", "humantime", "humantime-serde", @@ -4367,6 +4368,7 @@ dependencies = [ "toml_edit", "tonic 0.13.1", "tonic-reflection", + "tower 0.5.2", "tracing", "tracing-utils", "twox-hash", diff --git a/libs/utils/src/lib.rs b/libs/utils/src/lib.rs index 206b8bbd8f..11f787562c 100644 --- a/libs/utils/src/lib.rs +++ b/libs/utils/src/lib.rs @@ -73,6 +73,7 @@ pub mod error; /// async timeout helper pub mod timeout; +pub mod span; pub mod sync; pub mod failpoint_support; diff --git a/libs/utils/src/span.rs b/libs/utils/src/span.rs new file mode 100644 index 0000000000..4dbc99044b --- /dev/null +++ b/libs/utils/src/span.rs @@ -0,0 +1,19 @@ +//! Tracing span helpers. + +/// Records the given fields in the current span, as a single call. The fields must already have +/// been declared for the span (typically with empty values). +#[macro_export] +macro_rules! span_record { + ($($tokens:tt)*) => {$crate::span_record_in!(::tracing::Span::current(), $($tokens)*)}; +} + +/// Records the given fields in the given span, as a single call. The fields must already have been +/// declared for the span (typically with empty values). +#[macro_export] +macro_rules! span_record_in { + ($span:expr, $($tokens:tt)*) => { + if let Some(meta) = $span.metadata() { + $span.record_all(&tracing::valueset!(meta.fields(), $($tokens)*)); + } + }; +} diff --git a/pageserver/Cargo.toml b/pageserver/Cargo.toml index c4d6d58945..9591c729e8 100644 --- a/pageserver/Cargo.toml +++ b/pageserver/Cargo.toml @@ -34,6 +34,7 @@ fail.workspace = true futures.workspace = true hashlink.workspace = true hex.workspace = true +http.workspace = true http-utils.workspace = true humantime-serde.workspace = true humantime.workspace = true @@ -93,6 +94,7 @@ tokio-util.workspace = true toml_edit = { workspace = true, features = [ "serde" ] } tonic.workspace = true tonic-reflection.workspace = true +tower.workspace = true tracing.workspace = true tracing-utils.workspace = true url.workspace = true diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index e96787e027..f011ed49d0 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -7,12 +7,14 @@ use std::os::fd::AsRawFd; use std::pin::Pin; use std::str::FromStr; use std::sync::Arc; +use std::task::{Context, Poll}; use std::time::{Duration, Instant, SystemTime}; use std::{io, str}; -use anyhow::{Context, bail}; +use anyhow::{Context as _, bail}; use async_compression::tokio::write::GzipEncoder; use bytes::Buf; +use futures::future::BoxFuture; use futures::{FutureExt, Stream}; use itertools::Itertools; use jsonwebtoken::TokenData; @@ -46,7 +48,6 @@ use tokio_util::sync::CancellationToken; use tonic::service::Interceptor as _; use tracing::*; use utils::auth::{Claims, Scope, SwappableJwtAuth}; -use utils::failpoint_support; use utils::id::{TenantId, TenantTimelineId, TimelineId}; use utils::logging::log_slow; use utils::lsn::Lsn; @@ -54,6 +55,7 @@ use utils::shard::ShardIndex; use utils::simple_rcu::RcuReadGuard; use utils::sync::gate::{Gate, GateGuard}; use utils::sync::spsc_fold; +use utils::{failpoint_support, span_record}; use crate::auth::check_permission; use crate::basebackup::{self, BasebackupError}; @@ -195,13 +197,17 @@ pub fn spawn_grpc( // Set up the gRPC server. // // TODO: consider tuning window sizes. - // TODO: wire up tracing. let mut server = tonic::transport::Server::builder() .http2_keepalive_interval(Some(GRPC_HTTP2_KEEPALIVE_INTERVAL)) .http2_keepalive_timeout(Some(GRPC_HTTP2_KEEPALIVE_TIMEOUT)) .max_concurrent_streams(Some(GRPC_MAX_CONCURRENT_STREAMS)); - // Main page service. + // Main page service stack. Uses a mix of Tonic interceptors and Tower layers: + // + // * Interceptors: can inspect and modify the gRPC request. Sync code only, runs before service. + // + // * Layers: allow async code, can run code after the service response. However, only has access + // to the raw HTTP request/response, not the gRPC types. let page_service_handler = PageServerHandler::new( tenant_manager, auth.clone(), @@ -214,16 +220,22 @@ pub fn spawn_grpc( gate.enter().expect("just created"), ); + let observability_layer = ObservabilityLayer; let mut tenant_interceptor = TenantMetadataInterceptor; let mut auth_interceptor = TenantAuthInterceptor::new(auth); - let interceptors = move |mut req: tonic::Request<()>| { - req = tenant_interceptor.call(req)?; - req = auth_interceptor.call(req)?; - Ok(req) - }; - let page_service = - proto::PageServiceServer::with_interceptor(page_service_handler, interceptors); + let page_service = tower::ServiceBuilder::new() + // Create tracing span. + .layer(observability_layer) + // Intercept gRPC requests. + .layer(tonic::service::InterceptorLayer::new(move |mut req| { + // Extract tenant metadata. + req = tenant_interceptor.call(req)?; + // Authenticate tenant JWT token. + req = auth_interceptor.call(req)?; + Ok(req) + })) + .service(proto::PageServiceServer::new(page_service_handler)); let server = server.add_service(page_service); // Reflection service for use with e.g. grpcurl. @@ -3311,6 +3323,7 @@ impl proto::PageService for PageServerHandler { type GetPagesStream = Pin> + Send>>; + #[instrument(skip_all)] async fn check_rel_exists( &self, _: tonic::Request, @@ -3318,6 +3331,7 @@ impl proto::PageService for PageServerHandler { Err(tonic::Status::unimplemented("not implemented")) } + #[instrument(skip_all)] async fn get_base_backup( &self, _: tonic::Request, @@ -3325,6 +3339,7 @@ impl proto::PageService for PageServerHandler { Err(tonic::Status::unimplemented("not implemented")) } + #[instrument(skip_all)] async fn get_db_size( &self, _: tonic::Request, @@ -3332,6 +3347,7 @@ impl proto::PageService for PageServerHandler { Err(tonic::Status::unimplemented("not implemented")) } + // NB: don't instrument this, instrument each streamed request. async fn get_pages( &self, _: tonic::Request>, @@ -3339,6 +3355,7 @@ impl proto::PageService for PageServerHandler { Err(tonic::Status::unimplemented("not implemented")) } + #[instrument(skip_all)] async fn get_rel_size( &self, _: tonic::Request, @@ -3346,6 +3363,7 @@ impl proto::PageService for PageServerHandler { Err(tonic::Status::unimplemented("not implemented")) } + #[instrument(skip_all)] async fn get_slru_segment( &self, _: tonic::Request, @@ -3354,19 +3372,65 @@ impl proto::PageService for PageServerHandler { } } -impl From for QueryError { - fn from(e: GetActiveTenantError) -> Self { - match e { - GetActiveTenantError::WaitForActiveTimeout { .. } => QueryError::Disconnected( - ConnectionError::Io(io::Error::new(io::ErrorKind::TimedOut, e.to_string())), - ), - GetActiveTenantError::Cancelled - | GetActiveTenantError::WillNotBecomeActive(TenantState::Stopping { .. }) => { - QueryError::Shutdown - } - e @ GetActiveTenantError::NotFound(_) => QueryError::NotFound(format!("{e}").into()), - e => QueryError::Other(anyhow::anyhow!(e)), - } +/// gRPC middleware layer that handles observability concerns: +/// +/// * Creates and enters a tracing span. +/// +/// TODO: add perf tracing. +/// TODO: add timing and metrics. +/// TODO: add logging. +#[derive(Clone)] +struct ObservabilityLayer; + +impl tower::Layer for ObservabilityLayer { + type Service = ObservabilityLayerService; + + fn layer(&self, inner: S) -> Self::Service { + Self::Service { inner } + } +} + +#[derive(Clone)] +struct ObservabilityLayerService { + inner: S, +} + +impl tonic::server::NamedService for ObservabilityLayerService { + const NAME: &'static str = S::NAME; // propagate inner service name +} + +impl tower::Service> for ObservabilityLayerService +where + S: tower::Service>, + S::Future: Send + 'static, +{ + type Response = S::Response; + type Error = S::Error; + type Future = BoxFuture<'static, Result>; + + fn call(&mut self, req: http::Request) -> Self::Future { + // Create a basic tracing span. Enter the span for the current thread (to use it for inner + // sync code like interceptors), and instrument the future (to use it for inner async code + // like the page service itself). + // + // The instrument() call below is not sufficient. It only affects the returned future, and + // only takes effect when the caller polls it. Any sync code executed when we call + // self.inner.call() below (such as interceptors) runs outside of the returned future, and + // is not affected by it. We therefore have to enter the span on the current thread too. + let span = info_span!( + "grpc:pageservice", + // Set by TenantMetadataInterceptor. + tenant_id = field::Empty, + timeline_id = field::Empty, + shard_id = field::Empty, + ); + let _guard = span.enter(); + + Box::pin(self.inner.call(req).instrument(span.clone())) + } + + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + self.inner.poll_ready(cx) } } @@ -3400,19 +3464,22 @@ impl tonic::service::Interceptor for TenantMetadataInterceptor { .map_err(|_| tonic::Status::invalid_argument("invalid neon-timeline-id"))?; // Decode the shard ID. - let shard_index = req + let shard_id = req .metadata() .get("neon-shard-id") .ok_or_else(|| tonic::Status::invalid_argument("missing neon-shard-id"))? .to_str() .map_err(|_| tonic::Status::invalid_argument("invalid neon-shard-id"))?; - let shard_index = ShardIndex::from_str(shard_index) + let shard_id = ShardIndex::from_str(shard_id) .map_err(|_| tonic::Status::invalid_argument("invalid neon-shard-id"))?; // Stash them in the request. let extensions = req.extensions_mut(); extensions.insert(TenantTimelineId::new(tenant_id, timeline_id)); - extensions.insert(shard_index); + extensions.insert(shard_id); + + // Decorate the tracing span. + span_record!(%tenant_id, %timeline_id, %shard_id); Ok(req) } @@ -3486,6 +3553,22 @@ impl From for QueryError { } } +impl From for QueryError { + fn from(e: GetActiveTenantError) -> Self { + match e { + GetActiveTenantError::WaitForActiveTimeout { .. } => QueryError::Disconnected( + ConnectionError::Io(io::Error::new(io::ErrorKind::TimedOut, e.to_string())), + ), + GetActiveTenantError::Cancelled + | GetActiveTenantError::WillNotBecomeActive(TenantState::Stopping { .. }) => { + QueryError::Shutdown + } + e @ GetActiveTenantError::NotFound(_) => QueryError::NotFound(format!("{e}").into()), + e => QueryError::Other(anyhow::anyhow!(e)), + } + } +} + impl From for QueryError { fn from(e: crate::tenant::timeline::handle::HandleUpgradeError) -> Self { match e {