From 1b687409be393d53ebeeb7bf8d15752e222ba90f Mon Sep 17 00:00:00 2001 From: Conrad Ludgate Date: Tue, 11 Jul 2023 15:16:49 +0100 Subject: [PATCH] instrument with cancellation log --- Cargo.lock | 1 + libs/tracing-utils/Cargo.toml | 1 + libs/tracing-utils/src/instrument.rs | 74 ++++++++++++++++++++++++++++ libs/tracing-utils/src/lib.rs | 1 + proxy/src/console/mgmt.rs | 15 ++---- 5 files changed, 80 insertions(+), 12 deletions(-) create mode 100644 libs/tracing-utils/src/instrument.rs diff --git a/Cargo.lock b/Cargo.lock index b163d4fe46..c3d84af9f0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4666,6 +4666,7 @@ dependencies = [ "opentelemetry", "opentelemetry-otlp", "opentelemetry-semantic-conventions", + "pin-project-lite", "reqwest", "tokio", "tracing", diff --git a/libs/tracing-utils/Cargo.toml b/libs/tracing-utils/Cargo.toml index b285c9b5b0..f06c5dd9aa 100644 --- a/libs/tracing-utils/Cargo.toml +++ b/libs/tracing-utils/Cargo.toml @@ -16,3 +16,4 @@ tracing-opentelemetry.workspace = true tracing-subscriber.workspace = true workspace_hack.workspace = true +pin-project-lite = "0.2" diff --git a/libs/tracing-utils/src/instrument.rs b/libs/tracing-utils/src/instrument.rs new file mode 100644 index 0000000000..545c2e691b --- /dev/null +++ b/libs/tracing-utils/src/instrument.rs @@ -0,0 +1,74 @@ +use std::{ + future::Future, + pin::Pin, + task::{Context, Poll}, +}; + +use pin_project_lite::pin_project; +use tracing::{Level, Span}; + +pub trait InstrumentCancel: Sized { + fn instrument_with_cancel(self, span: Span) -> InstrumentedCancel { + InstrumentedCancel { + inner: self, + span, + cancel_level: Some(Level::INFO), + } + } +} +impl InstrumentCancel for T {} + +impl InstrumentedCancel { + pub fn with_level(mut self, level: Level) -> Self { + self.cancel_level = Some(level); + self + } +} + +pin_project! { + /// A [`Future`] that has been instrumented with a `tracing` [`Span`]. It will emit a log on cancellation + /// + /// This type is returned by the [`Instrument`] extension trait. See that + /// trait's documentation for details. + /// + /// [`Future`]: std::future::Future + /// [`Span`]: crate::Span + #[derive(Debug, Clone)] + #[must_use = "futures do nothing unless you `.await` or poll them"] + pub struct InstrumentedCancel { + #[pin] + inner: T, + span: Span, + cancel_level: Option, + } + + impl PinnedDrop for InstrumentedCancel { + fn drop(this: Pin<&mut Self>) { + let this = this.project(); + if let Some(level) = this.cancel_level.take() { + let _enter = this.span.enter(); + match level { + Level::TRACE => tracing::event!(Level::TRACE, "task was cancelled"), + Level::DEBUG => tracing::event!(Level::DEBUG, "task was cancelled"), + Level::INFO => tracing::event!(Level::INFO, "task was cancelled"), + Level::WARN => tracing::event!(Level::WARN, "task was cancelled"), + Level::ERROR => tracing::event!(Level::ERROR, "task was cancelled"), + } + } + } + } +} + +impl Future for InstrumentedCancel { + type Output = T::Output; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.project(); + let _enter = this.span.enter(); + let res = this.inner.poll(cx); + if res.is_ready() { + *this.cancel_level = None; + } + res + } +} diff --git a/libs/tracing-utils/src/lib.rs b/libs/tracing-utils/src/lib.rs index de0e2ad799..e38cdd6bb2 100644 --- a/libs/tracing-utils/src/lib.rs +++ b/libs/tracing-utils/src/lib.rs @@ -41,6 +41,7 @@ use opentelemetry_otlp::{OTEL_EXPORTER_OTLP_ENDPOINT, OTEL_EXPORTER_OTLP_TRACES_ pub use tracing_opentelemetry::OpenTelemetryLayer; pub mod http; +pub mod instrument; /// Set up OpenTelemetry exporter, using configuration from environment variables. /// diff --git a/proxy/src/console/mgmt.rs b/proxy/src/console/mgmt.rs index 35d1ff59b7..0284b215eb 100644 --- a/proxy/src/console/mgmt.rs +++ b/proxy/src/console/mgmt.rs @@ -8,7 +8,8 @@ use postgres_backend::{self, AuthType, PostgresBackend, PostgresBackendTCP, Quer use pq_proto::{BeMessage, SINGLE_COL_ROWDESC}; use std::future; use tokio::net::{TcpListener, TcpStream}; -use tracing::{error, info, info_span, Instrument}; +use tracing::{error, info, info_span}; +use tracing_utils::instrument::InstrumentCancel; static CPLANE_WAITERS: Lazy> = Lazy::new(Default::default); @@ -50,23 +51,13 @@ pub async fn task_main(listener: TcpListener) -> anyhow::Result<()> { async move { info!("serving a new console management API connection"); - // these might be long running connections, have a separate logging for cancelling - // on shutdown and other ways of stopping. - let cancelled = scopeguard::guard(tracing::Span::current(), |span| { - let _e = span.entered(); - info!("console management API task cancelled"); - }); - if let Err(e) = handle_connection(socket).await { error!("serving failed with an error: {e}"); } else { info!("serving completed"); } - - // we can no longer get dropped - scopeguard::ScopeGuard::into_inner(cancelled); } - .instrument(span), + .instrument_with_cancel(span), ); } }