use instrument internally

This commit is contained in:
Conrad Ludgate
2023-07-11 15:27:03 +01:00
parent 1b687409be
commit 0cb37311c2
2 changed files with 20 additions and 21 deletions

View File

@@ -5,23 +5,23 @@ use std::{
};
use pin_project_lite::pin_project;
use tracing::{Level, Span};
use tracing::{instrument::Instrumented, Level};
pub trait InstrumentCancel: Sized {
fn instrument_with_cancel(self, span: Span) -> InstrumentedCancel<Self> {
InstrumentedCancel {
inner: self,
span,
cancel_level: Some(Level::INFO),
}
type Inner;
fn with_cancel_info(self) -> CancelLog<Self::Inner> {
self.with_cancel_log(Level::INFO)
}
fn with_cancel_log(self, level: Level) -> CancelLog<Self::Inner>;
}
impl<T: Sized> InstrumentCancel for T {}
impl<T> InstrumentedCancel<T> {
pub fn with_level(mut self, level: Level) -> Self {
self.cancel_level = Some(level);
self
impl<T: Sized> InstrumentCancel for Instrumented<T> {
type Inner = T;
fn with_cancel_log(self, level: Level) -> CancelLog<T> {
CancelLog {
inner: self,
cancel_level: Some(level),
}
}
}
@@ -35,18 +35,17 @@ pin_project! {
/// [`Span`]: crate::Span
#[derive(Debug, Clone)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct InstrumentedCancel<T> {
pub struct CancelLog<T> {
#[pin]
inner: T,
span: Span,
inner: Instrumented<T>,
cancel_level: Option<Level>,
}
impl<T> PinnedDrop for InstrumentedCancel<T> {
impl<T> PinnedDrop for CancelLog<T> {
fn drop(this: Pin<&mut Self>) {
let this = this.project();
if let Some(level) = this.cancel_level.take() {
let _enter = this.span.enter();
let _enter = this.inner.span().enter();
match level {
Level::TRACE => tracing::event!(Level::TRACE, "task was cancelled"),
Level::DEBUG => tracing::event!(Level::DEBUG, "task was cancelled"),
@@ -59,12 +58,11 @@ pin_project! {
}
}
impl<T: Future> Future for InstrumentedCancel<T> {
impl<T: Future> Future for CancelLog<T> {
type Output = T::Output;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
let _enter = this.span.enter();
let res = this.inner.poll(cx);
if res.is_ready() {
*this.cancel_level = None;

View File

@@ -8,7 +8,7 @@ use postgres_backend::{self, AuthType, PostgresBackend, PostgresBackendTCP, Quer
use pq_proto::{BeMessage, SINGLE_COL_ROWDESC};
use std::future;
use tokio::net::{TcpListener, TcpStream};
use tracing::{error, info, info_span};
use tracing::{error, info, info_span, Instrument};
use tracing_utils::instrument::InstrumentCancel;
static CPLANE_WAITERS: Lazy<Waiters<ComputeReady>> = Lazy::new(Default::default);
@@ -57,7 +57,8 @@ pub async fn task_main(listener: TcpListener) -> anyhow::Result<()> {
info!("serving completed");
}
}
.instrument_with_cancel(span),
.instrument(span)
.with_cancel_info(),
);
}
}