mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-15 01:12:56 +00:00
instrument with cancellation log
This commit is contained in:
1
Cargo.lock
generated
1
Cargo.lock
generated
@@ -4666,6 +4666,7 @@ dependencies = [
|
||||
"opentelemetry",
|
||||
"opentelemetry-otlp",
|
||||
"opentelemetry-semantic-conventions",
|
||||
"pin-project-lite",
|
||||
"reqwest",
|
||||
"tokio",
|
||||
"tracing",
|
||||
|
||||
@@ -16,3 +16,4 @@ tracing-opentelemetry.workspace = true
|
||||
tracing-subscriber.workspace = true
|
||||
|
||||
workspace_hack.workspace = true
|
||||
pin-project-lite = "0.2"
|
||||
|
||||
74
libs/tracing-utils/src/instrument.rs
Normal file
74
libs/tracing-utils/src/instrument.rs
Normal file
@@ -0,0 +1,74 @@
|
||||
use std::{
|
||||
future::Future,
|
||||
pin::Pin,
|
||||
task::{Context, Poll},
|
||||
};
|
||||
|
||||
use pin_project_lite::pin_project;
|
||||
use tracing::{Level, Span};
|
||||
|
||||
pub trait InstrumentCancel: Sized {
|
||||
fn instrument_with_cancel(self, span: Span) -> InstrumentedCancel<Self> {
|
||||
InstrumentedCancel {
|
||||
inner: self,
|
||||
span,
|
||||
cancel_level: Some(Level::INFO),
|
||||
}
|
||||
}
|
||||
}
|
||||
impl<T: Sized> InstrumentCancel for T {}
|
||||
|
||||
impl<T> InstrumentedCancel<T> {
|
||||
pub fn with_level(mut self, level: Level) -> Self {
|
||||
self.cancel_level = Some(level);
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
pin_project! {
|
||||
/// A [`Future`] that has been instrumented with a `tracing` [`Span`]. It will emit a log on cancellation
|
||||
///
|
||||
/// This type is returned by the [`Instrument`] extension trait. See that
|
||||
/// trait's documentation for details.
|
||||
///
|
||||
/// [`Future`]: std::future::Future
|
||||
/// [`Span`]: crate::Span
|
||||
#[derive(Debug, Clone)]
|
||||
#[must_use = "futures do nothing unless you `.await` or poll them"]
|
||||
pub struct InstrumentedCancel<T> {
|
||||
#[pin]
|
||||
inner: T,
|
||||
span: Span,
|
||||
cancel_level: Option<Level>,
|
||||
}
|
||||
|
||||
impl<T> PinnedDrop for InstrumentedCancel<T> {
|
||||
fn drop(this: Pin<&mut Self>) {
|
||||
let this = this.project();
|
||||
if let Some(level) = this.cancel_level.take() {
|
||||
let _enter = this.span.enter();
|
||||
match level {
|
||||
Level::TRACE => tracing::event!(Level::TRACE, "task was cancelled"),
|
||||
Level::DEBUG => tracing::event!(Level::DEBUG, "task was cancelled"),
|
||||
Level::INFO => tracing::event!(Level::INFO, "task was cancelled"),
|
||||
Level::WARN => tracing::event!(Level::WARN, "task was cancelled"),
|
||||
Level::ERROR => tracing::event!(Level::ERROR, "task was cancelled"),
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: Future> Future for InstrumentedCancel<T> {
|
||||
type Output = T::Output;
|
||||
|
||||
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
let this = self.project();
|
||||
let _enter = this.span.enter();
|
||||
let res = this.inner.poll(cx);
|
||||
if res.is_ready() {
|
||||
*this.cancel_level = None;
|
||||
}
|
||||
res
|
||||
}
|
||||
}
|
||||
@@ -41,6 +41,7 @@ use opentelemetry_otlp::{OTEL_EXPORTER_OTLP_ENDPOINT, OTEL_EXPORTER_OTLP_TRACES_
|
||||
pub use tracing_opentelemetry::OpenTelemetryLayer;
|
||||
|
||||
pub mod http;
|
||||
pub mod instrument;
|
||||
|
||||
/// Set up OpenTelemetry exporter, using configuration from environment variables.
|
||||
///
|
||||
|
||||
@@ -8,7 +8,8 @@ use postgres_backend::{self, AuthType, PostgresBackend, PostgresBackendTCP, Quer
|
||||
use pq_proto::{BeMessage, SINGLE_COL_ROWDESC};
|
||||
use std::future;
|
||||
use tokio::net::{TcpListener, TcpStream};
|
||||
use tracing::{error, info, info_span, Instrument};
|
||||
use tracing::{error, info, info_span};
|
||||
use tracing_utils::instrument::InstrumentCancel;
|
||||
|
||||
static CPLANE_WAITERS: Lazy<Waiters<ComputeReady>> = Lazy::new(Default::default);
|
||||
|
||||
@@ -50,23 +51,13 @@ pub async fn task_main(listener: TcpListener) -> anyhow::Result<()> {
|
||||
async move {
|
||||
info!("serving a new console management API connection");
|
||||
|
||||
// these might be long running connections, have a separate logging for cancelling
|
||||
// on shutdown and other ways of stopping.
|
||||
let cancelled = scopeguard::guard(tracing::Span::current(), |span| {
|
||||
let _e = span.entered();
|
||||
info!("console management API task cancelled");
|
||||
});
|
||||
|
||||
if let Err(e) = handle_connection(socket).await {
|
||||
error!("serving failed with an error: {e}");
|
||||
} else {
|
||||
info!("serving completed");
|
||||
}
|
||||
|
||||
// we can no longer get dropped
|
||||
scopeguard::ScopeGuard::into_inner(cancelled);
|
||||
}
|
||||
.instrument(span),
|
||||
.instrument_with_cancel(span),
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user