diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index 1d824ac846..49928a9036 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -50,6 +50,7 @@ use tokio::io::{AsyncRead, AsyncReadExt as _, AsyncWrite, AsyncWriteExt as _, Bu use tokio::task::JoinHandle; use tokio_util::sync::CancellationToken; use tonic::service::Interceptor as _; +use tonic::transport::server::TcpConnectInfo; use tracing::*; use utils::auth::{Claims, Scope, SwappableJwtAuth}; use utils::id::{TenantId, TenantTimelineId, TimelineId}; @@ -3685,8 +3686,15 @@ impl proto::PageService for GrpcPageServiceHandler { yield match result { Ok(resp) => resp, // Convert per-request errors to GetPageResponses as appropriate, or terminate - // the stream with a tonic::Status. - Err(err) => page_api::GetPageResponse::try_from_status(err, req_id)?.into(), + // the stream with a tonic::Status. Log the error regardless, since + // ObservabilityLayer can't automatically log stream errors. + Err(status) => { + // TODO: it would be nice if we could propagate the get_page() fields here. + span.in_scope(|| { + warn!("request failed with {:?}: {}", status.code(), status.message()); + }); + page_api::GetPageResponse::try_from_status(status, req_id)?.into() + } } } }; @@ -3824,40 +3832,85 @@ impl tonic::server::NamedService for Observabili const NAME: &'static str = S::NAME; // propagate inner service name } -impl tower::Service> for ObservabilityLayerService +impl tower::Service> for ObservabilityLayerService where - S: tower::Service>, + S: tower::Service, Response = http::Response> + Send, S::Future: Send + 'static, { type Response = S::Response; type Error = S::Error; type Future = BoxFuture<'static, Result>; - fn call(&mut self, mut req: http::Request) -> Self::Future { + fn call(&mut self, mut req: http::Request) -> Self::Future { // Record the request start time as a request extension. // // TODO: we should start a timer here instead, but it currently requires a timeline handle // and SmgrQueryType, which we don't have yet. Refactor it to provide it later. req.extensions_mut().insert(ReceivedAt(Instant::now())); - // Create a basic tracing span. Enter the span for the current thread (to use it for inner - // sync code like interceptors), and instrument the future (to use it for inner async code - // like the page service itself). + // Extract the peer address and gRPC method. + let peer = req + .extensions() + .get::() + .and_then(|info| info.remote_addr()) + .map(|addr| addr.to_string()) + .unwrap_or_default(); + + let method = req + .uri() + .path() + .split('/') + .nth(2) + .unwrap_or(req.uri().path()) + .to_string(); + + // Create a basic tracing span. // - // The instrument() call below is not sufficient. It only affects the returned future, and - // only takes effect when the caller polls it. Any sync code executed when we call - // self.inner.call() below (such as interceptors) runs outside of the returned future, and - // is not affected by it. We therefore have to enter the span on the current thread too. + // Enter the span for the current thread and instrument the future. It is not sufficient to + // only instrument the future, since it only takes effect after the future is returned and + // polled, not when the inner service is called below (e.g. during interceptor execution). let span = info_span!( "grpc:pageservice", - // Set by TenantMetadataInterceptor. + // These will be populated by TenantMetadataInterceptor. tenant_id = field::Empty, timeline_id = field::Empty, shard_id = field::Empty, + // NB: empty fields must be listed first above. Otherwise, the field names will be + // clobbered when the empty fields are populated. They will be output last regardless. + %peer, + %method, ); let _guard = span.enter(); - Box::pin(self.inner.call(req).instrument(span.clone())) + // Construct a future for calling the inner service, but don't await it. This avoids having + // to clone the inner service into the future below. + let call = self.inner.call(req); + + async move { + // Await the inner service call. + let result = call.await; + + // Log gRPC error statuses. This won't include request info from handler spans, but it + // will catch all errors (even those emitted before handler spans are constructed). Only + // unary request errors are logged here, not streaming response errors. + if let Ok(ref resp) = result + && let Some(status) = tonic::Status::from_header_map(resp.headers()) + && status.code() != tonic::Code::Ok + { + // TODO: it would be nice if we could propagate the handler span's request fields + // here. This could e.g. be done by attaching the request fields to + // tonic::Status::metadata via a proc macro. + warn!( + "request failed with {:?}: {}", + status.code(), + status.message() + ); + } + + result + } + .instrument(span.clone()) + .boxed() } fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> {