diff --git a/Cargo.lock b/Cargo.lock index 89351432c1..4f7378e95d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4305,6 +4305,7 @@ dependencies = [ "hashlink", "hex", "hex-literal", + "http 1.1.0", "http-utils", "humantime", "humantime-serde", @@ -4367,6 +4368,7 @@ dependencies = [ "toml_edit", "tonic 0.13.1", "tonic-reflection", + "tower 0.5.2", "tracing", "tracing-utils", "twox-hash", diff --git a/pageserver/Cargo.toml b/pageserver/Cargo.toml index c4d6d58945..9591c729e8 100644 --- a/pageserver/Cargo.toml +++ b/pageserver/Cargo.toml @@ -34,6 +34,7 @@ fail.workspace = true futures.workspace = true hashlink.workspace = true hex.workspace = true +http.workspace = true http-utils.workspace = true humantime-serde.workspace = true humantime.workspace = true @@ -93,6 +94,7 @@ tokio-util.workspace = true toml_edit = { workspace = true, features = [ "serde" ] } tonic.workspace = true tonic-reflection.workspace = true +tower.workspace = true tracing.workspace = true tracing-utils.workspace = true url.workspace = true diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index c07c940b9d..0cabc42a67 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -8,12 +8,14 @@ use std::os::fd::AsRawFd; use std::pin::Pin; use std::str::FromStr; use std::sync::Arc; +use std::task::{Context, Poll}; use std::time::{Duration, Instant, SystemTime}; use std::{io, str}; -use anyhow::{Context, bail}; +use anyhow::{Context as _, bail}; use async_compression::tokio::write::GzipEncoder; use bytes::Buf; +use futures::future::BoxFuture; use futures::{FutureExt, Stream}; use itertools::Itertools; use jsonwebtoken::TokenData; @@ -115,6 +117,22 @@ const GRPC_MAX_CONCURRENT_STREAMS: u32 = 256; /////////////////////////////////////////////////////////////////////////////// +/// Records all of the given fields in the current span, as a single call. The fields must already +/// have been declared for the span with empty values. +macro_rules! span_record { + ($($tokens:tt)*) => {span_record_in!(::tracing::Span::current(); $($tokens)*)}; +} + +/// Records all of the given fields in the given span, as a single call. The fields must already +/// have been declared for the span with empty values. +macro_rules! span_record_in { + ($span:expr; $($tokens:tt)*) => { + if let Some(meta) = $span.metadata() { + $span.record_all(&tracing::valueset!(meta.fields(), $($tokens)*)); + } + }; +} + pub struct Listener { cancel: CancellationToken, /// Cancel the listener task through `listen_cancel` to shut down the listener @@ -204,24 +222,37 @@ pub fn spawn_grpc( .http2_keepalive_timeout(Some(GRPC_HTTP2_KEEPALIVE_TIMEOUT)) .max_concurrent_streams(Some(GRPC_MAX_CONCURRENT_STREAMS)); - // Main page service. + // Main page service stack. Uses a mix of Tonic interceptors and Tower layers: + // + // * Interceptors: can inspect and modify the gRPC request. Sync code only, runs before service. + // + // * Layers: allow async code, can run code after the service response. However, only has access + // to the raw HTTP request/response. let page_service_handler = GrpcPageServiceHandler { tenant_manager, ctx, }; - let mut received_at_interceptor = ReceivedAtInterceptor; + let observability_layer = ObservabilityLayer; let mut tenant_interceptor = TenantMetadataInterceptor; let mut auth_interceptor = TenantAuthInterceptor::new(auth); - let interceptors = move |mut req: tonic::Request<()>| { - req = received_at_interceptor.call(req)?; - req = tenant_interceptor.call(req)?; - req = auth_interceptor.call(req)?; - Ok(req) - }; - let page_service = - proto::PageServiceServer::with_interceptor(page_service_handler, interceptors); + let page_service = tower::ServiceBuilder::new() + // Create tracing span and start timing. + .layer(observability_layer) + // Intercept gRPC requests. + .layer(tonic::service::InterceptorLayer::new( + move |mut req: tonic::Request<()>| { + // Extract tenant metadata. + req = tenant_interceptor.call(req)?; + // Authenticate tenant JWT token. + req = auth_interceptor.call(req)?; + Ok(req) + }, + )) + // Obtain timeline handle. + .service(proto::PageServiceServer::new(page_service_handler)); + let server = server.add_service(page_service); // Reflection service for use with e.g. grpcurl. @@ -813,6 +844,22 @@ impl From for PageStreamError { } } +impl From for QueryError { + fn from(e: GetActiveTenantError) -> Self { + match e { + GetActiveTenantError::WaitForActiveTimeout { .. } => QueryError::Disconnected( + ConnectionError::Io(io::Error::new(io::ErrorKind::TimedOut, e.to_string())), + ), + GetActiveTenantError::Cancelled + | GetActiveTenantError::WillNotBecomeActive(TenantState::Stopping { .. }) => { + QueryError::Shutdown + } + e @ GetActiveTenantError::NotFound(_) => QueryError::NotFound(format!("{e}").into()), + e => QueryError::Other(anyhow::anyhow!(e)), + } + } +} + impl From for PageStreamError { fn from(value: WaitLsnError) -> Self { match value { @@ -3382,9 +3429,11 @@ pub struct GrpcPageServiceHandler { impl GrpcPageServiceHandler { /// Errors if the request is executed on a non-zero shard. Only shard 0 has a complete view of /// relations and their sizes, as well as SLRU segments and other data. + /// + /// TODO: take the timeline handle instead. #[allow(clippy::result_large_err)] fn ensure_shard_zero(req: &tonic::Request) -> Result<(), tonic::Status> { - match Self::extract::(req).shard_number.0 { + match extract::(req).shard_number.0 { 0 => Ok(()), shard => Err(tonic::Status::invalid_argument(format!( "request must execute on shard zero (is shard {shard})", @@ -3392,14 +3441,6 @@ impl GrpcPageServiceHandler { } } - /// Extracts the given type from the request extensions. It must have been set by an - /// interceptor. - fn extract(req: &tonic::Request) -> &T { - req.extensions() - .get::() - .expect("extension should be set by interceptor") - } - /// Generates a PagestreamRequest header from a ReadLsn and request ID. fn make_hdr(read_lsn: page_api::ReadLsn, req_id: u64) -> PagestreamRequest { PagestreamRequest { @@ -3415,8 +3456,8 @@ impl GrpcPageServiceHandler { &self, req: &tonic::Request, ) -> Result, GetActiveTimelineError> { - let ttid = *Self::extract::(req); - let shard_index = *Self::extract::(req); + let ttid = *extract::(req); + let shard_index = *extract::(req); let shard_selector = ShardSelector::Known(shard_index); // TODO: untangle this from TenantManagerWrapper::resolve() and Cache::get(), to avoid the @@ -3451,7 +3492,8 @@ impl GrpcPageServiceHandler { /// /// NB: errors will terminate the stream. Per-request errors should return a GetPageResponse /// with an appropriate status code instead. - async fn handle_get_page_request( + #[instrument(skip_all, fields(req_id, rel, blkno))] + async fn get_page( ctx: &RequestContext, timeline: &WeakHandle, req: proto::GetPageRequest, @@ -3461,11 +3503,15 @@ impl GrpcPageServiceHandler { let timeline = timeline.upgrade().map_err(|err| match err { HandleUpgradeError::ShutDown => tonic::Status::unavailable("timeline is shutting down"), })?; - let ctx = ctx.with_scope_page_service_pagestream(&timeline); // Validate the request and convert it to a Pagestream request. let req: page_api::GetPageRequest = req.try_into()?; + span_record!(req_id = %req.request_id, rel = %req.rel, blkno = %req.block_numbers[0]); + + info!("XXX"); + + let ctx = ctx.with_scope_page_service_pagestream(&timeline); let effective_lsn = match PageServerHandler::effective_request_lsn( &timeline, timeline.get_last_record_lsn(), @@ -3539,10 +3585,8 @@ impl GrpcPageServiceHandler { /// Implements the gRPC page service. /// -/// TODO: when the libpq impl is removed, simplify this: -/// * Add Tower middleware for timeline handle, rate limiting, and timing. -/// * Remove the intermediate Pagestream types. -/// * Inline the handler code. +/// TODO: when libpq impl is removed, remove intermediate Pagestream types and inline the handlers. +/// TODO: Tower middleware for timeline handle, rate limiting, and timing. #[tonic::async_trait] impl proto::PageService for GrpcPageServiceHandler { type GetBaseBackupStream = Pin< @@ -3552,17 +3596,21 @@ impl proto::PageService for GrpcPageServiceHandler { type GetPagesStream = Pin> + Send>>; + #[instrument(skip_all, fields(rel, req_lsn))] async fn check_rel_exists( &self, req: tonic::Request, ) -> Result, tonic::Status> { - let received_at = Self::extract::(&req).0; + let received_at = extract::(&req).0; let timeline = self.get_request_timeline(&req).await?; let ctx = self.ctx.with_scope_page_service_pagestream(&timeline); // Validate the request and convert it to a Pagestream request. Self::ensure_shard_zero(&req)?; + let req: page_api::CheckRelExistsRequest = req.into_inner().try_into()?; + span_record!(rel = %req.rel, req_lsn = %req.read_lsn.request_lsn); + let req = PagestreamExistsRequest { hdr: Self::make_hdr(req.read_lsn, 0), rel: req.rel, @@ -3581,6 +3629,7 @@ impl proto::PageService for GrpcPageServiceHandler { Ok(tonic::Response::new(resp.into())) } + #[instrument(skip_all)] async fn get_base_backup( &self, _: tonic::Request, @@ -3592,7 +3641,7 @@ impl proto::PageService for GrpcPageServiceHandler { &self, req: tonic::Request, ) -> Result, tonic::Status> { - let received_at = Self::extract::(&req).0; + let received_at = extract::(&req).0; let timeline = self.get_request_timeline(&req).await?; let ctx = self.ctx.with_scope_page_service_pagestream(&timeline); @@ -3622,8 +3671,8 @@ impl proto::PageService for GrpcPageServiceHandler { req: tonic::Request>, ) -> Result, tonic::Status> { // Extract the timeline from the request and check that it exists. - let ttid = *Self::extract::(&req); - let shard_index = *Self::extract::(&req); + let ttid = *extract::(&req); + let shard_index = *extract::(&req); let shard_selector = ShardSelector::Known(shard_index); let mut handles = TimelineHandles::new(self.tenant_manager.clone()); @@ -3632,6 +3681,7 @@ impl proto::PageService for GrpcPageServiceHandler { .await?; let ctx = self.ctx.attached_child(); + let span = tracing::Span::current(); // propagate span into the stream future let mut reqs = req.into_inner(); let resps = async_stream::try_stream! { @@ -3641,7 +3691,14 @@ impl proto::PageService for GrpcPageServiceHandler { .downgrade(); while let Some(req) = reqs.message().await? { // TODO: implement IoConcurrency sidecar. - yield Self::handle_get_page_request(&ctx, &timeline, req, IoConcurrency::Sequential).await? + yield Self::get_page( + &ctx, + &timeline, + req, + IoConcurrency::Sequential, + ) + .instrument(span.clone()) + .await? } }; @@ -3652,7 +3709,7 @@ impl proto::PageService for GrpcPageServiceHandler { &self, req: tonic::Request, ) -> Result, tonic::Status> { - let received_at = Self::extract::(&req).0; + let received_at = extract::(&req).0; let timeline = self.get_request_timeline(&req).await?; let ctx = self.ctx.with_scope_page_service_pagestream(&timeline); @@ -3681,7 +3738,7 @@ impl proto::PageService for GrpcPageServiceHandler { &self, req: tonic::Request, ) -> Result, tonic::Status> { - let received_at = Self::extract::(&req).0; + let received_at = extract::(&req).0; let timeline = self.get_request_timeline(&req).await?; let ctx = self.ctx.with_scope_page_service_pagestream(&timeline); @@ -3709,39 +3766,77 @@ impl proto::PageService for GrpcPageServiceHandler { } } -impl From for QueryError { - fn from(e: GetActiveTenantError) -> Self { - match e { - GetActiveTenantError::WaitForActiveTimeout { .. } => QueryError::Disconnected( - ConnectionError::Io(io::Error::new(io::ErrorKind::TimedOut, e.to_string())), - ), - GetActiveTenantError::Cancelled - | GetActiveTenantError::WillNotBecomeActive(TenantState::Stopping { .. }) => { - QueryError::Shutdown - } - e @ GetActiveTenantError::NotFound(_) => QueryError::NotFound(format!("{e}").into()), - e => QueryError::Other(anyhow::anyhow!(e)), - } +/// Extracts the given type from the request extensions, or panics if it is missing. +fn extract(req: &tonic::Request) -> &T { + let Some(value) = req.extensions().get::() else { + let name = std::any::type_name::(); + panic!("extension {name} should be set by interceptor or layer"); + }; + value +} + +/// gRPC layer that handles observability concerns: +/// +/// * Records the initial request timestamp as a ReceivedAt extension. +/// * Creates a tracing span with request metadata and instruments the future. +#[derive(Clone)] +struct ObservabilityLayer; + +impl tower::Layer for ObservabilityLayer { + type Service = ObservabilityService; + + fn layer(&self, inner: S) -> Self::Service { + ObservabilityService { inner } } } -/// gRPC interceptor that records the start time of request processing as a ReceivedAt extension. -/// -/// TODO: generalize this for other observability information. #[derive(Clone)] -struct ReceivedAtInterceptor; +struct ObservabilityService { + inner: S, +} + +impl tower::Service> for ObservabilityService +where + S: tower::Service>, + S::Future: Send + 'static, +{ + type Response = S::Response; + type Error = S::Error; + type Future = BoxFuture<'static, Result>; + + fn call(&mut self, mut req: http::Request) -> Self::Future { + // Stash the request timestamp as a ReceivedAt extension. + // TODO: start a timer here instead. + req.extensions_mut().insert(ReceivedAt(Instant::now())); + + // Create a basic tracing span. Enter the span for the current thread (to use it for inner + // non-async code like interceptors), and instrument the future (to use it for inner async + // code like the service itself). + let span = info_span!( + "grpc:pageservice", + // These are set later by TenantMetadataInterceptor. + tenant_id = field::Empty, + timeline_id = field::Empty, + shard_id = field::Empty, + ); + let _guard = span.enter(); + + Box::pin(self.inner.call(req).instrument(span.clone())) + } + + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + self.inner.poll_ready(cx) + } +} + +impl tonic::server::NamedService for ObservabilityService { + const NAME: &'static str = S::NAME; // propagate service name +} #[derive(Clone)] struct ReceivedAt(Instant); -impl tonic::service::Interceptor for ReceivedAtInterceptor { - fn call(&mut self, mut req: tonic::Request<()>) -> Result, tonic::Status> { - req.extensions_mut().insert(ReceivedAt(Instant::now())); - Ok(req) - } -} - -/// gRPC interceptor that decodes tenant metadata and stores it as request extensions of type +/// gRPC interceptor that decodes tenant metadata and stores it as extensions of type /// TenantTimelineId and ShardIndex. #[derive(Clone)] struct TenantMetadataInterceptor; @@ -3783,11 +3878,24 @@ impl tonic::service::Interceptor for TenantMetadataInterceptor { extensions.insert(TenantTimelineId::new(tenant_id, timeline_id)); extensions.insert(shard_index); + // Decorate the tracing span. This doesn't run in an async context, so it can't use + // tracing::Span::current(). + let tsid = TenantShardId { + tenant_id, + shard_number: shard_index.shard_number, + shard_count: shard_index.shard_count, + }; + let shard_id = tsid.shard_slug(); + + span_record!(%tenant_id, %timeline_id, %shard_id); + + info!("YYY interceptor"); + Ok(req) } } -/// Authenticates gRPC page service requests. Must run after TenantMetadataInterceptor. +/// Authenticates gRPC page service requests. #[derive(Clone)] struct TenantAuthInterceptor { auth: Option>, @@ -3806,11 +3914,7 @@ impl tonic::service::Interceptor for TenantAuthInterceptor { return Ok(req); }; - // Fetch the tenant ID that's been set by TenantMetadataInterceptor. - let ttid = req - .extensions() - .get::() - .expect("TenantMetadataInterceptor must run before TenantAuthInterceptor"); + let TenantTimelineId { tenant_id, .. } = *extract::(&req); // Fetch and decode the JWT token. let jwt = req @@ -3828,7 +3932,7 @@ impl tonic::service::Interceptor for TenantAuthInterceptor { let claims = jwtdata.claims; // Check if the token is valid for this tenant. - check_permission(&claims, Some(ttid.tenant_id)) + check_permission(&claims, Some(tenant_id)) .map_err(|err| tonic::Status::permission_denied(err.to_string()))?; // TODO: consider stashing the claims in the request extensions, if needed.