diff --git a/proxy/src/cancellation.rs b/proxy/src/cancellation.rs index 4b72a66e63..74415f1ffe 100644 --- a/proxy/src/cancellation.rs +++ b/proxy/src/cancellation.rs @@ -1,7 +1,8 @@ -use std::net::SocketAddr; +use std::net::{IpAddr, SocketAddr}; use std::sync::Arc; use dashmap::DashMap; +use ipnet::{IpNet, Ipv4Net, Ipv6Net}; use pq_proto::CancelKeyData; use thiserror::Error; use tokio::net::TcpStream; @@ -17,9 +18,6 @@ use crate::rate_limiter::LeakyBucketRateLimiter; use crate::redis::cancellation_publisher::{ CancellationPublisher, CancellationPublisherMut, RedisPublisherClient, }; -use std::net::IpAddr; - -use ipnet::{IpNet, Ipv4Net, Ipv6Net}; pub type CancelMap = Arc>>; pub type CancellationHandlerMain = CancellationHandler>>>; diff --git a/proxy/src/console_redirect_proxy.rs b/proxy/src/console_redirect_proxy.rs index fbd0c8e5c5..b910b524b1 100644 --- a/proxy/src/console_redirect_proxy.rs +++ b/proxy/src/console_redirect_proxy.rs @@ -1,6 +1,6 @@ use std::sync::Arc; -use futures::TryFutureExt; +use futures::{FutureExt, TryFutureExt}; use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt}; use tokio_util::sync::CancellationToken; use tracing::{debug, error, info, Instrument}; @@ -88,40 +88,37 @@ pub async fn task_main( crate::metrics::Protocol::Tcp, &config.region, ); - let span = ctx.span(); - let startup = Box::pin( - handle_client( - config, - backend, - &ctx, - cancellation_handler, - socket, - conn_gauge, - ) - .instrument(span.clone()), - ); - let res = startup.await; + let res = handle_client( + config, + backend, + &ctx, + cancellation_handler, + socket, + conn_gauge, + ) + .instrument(ctx.span()) + .boxed() + .await; match res { Err(e) => { - // todo: log and push to ctx the error kind ctx.set_error_kind(e.get_error_kind()); - error!(parent: &span, "per-client task finished with an error: {e:#}"); + error!(parent: &ctx.span(), "per-client task finished with an error: {e:#}"); } Ok(None) => { ctx.set_success(); } Ok(Some(p)) => { ctx.set_success(); - ctx.log_connect(); - match p.proxy_pass().instrument(span.clone()).await { + let _disconnect = ctx.log_connect(); + match p.proxy_pass().await { Ok(()) => {} Err(ErrorSource::Client(e)) => { - error!(parent: &span, "per-client task finished with an IO error from the client: {e:#}"); + error!(?session_id, "per-client task finished with an IO error from the client: {e:#}"); } Err(ErrorSource::Compute(e)) => { - error!(parent: &span, "per-client task finished with an IO error from the compute: {e:#}"); + error!(?session_id, "per-client task finished with an IO error from the compute: {e:#}"); } } } @@ -219,6 +216,7 @@ pub(crate) async fn handle_client( client: stream, aux: node.aux.clone(), compute: node, + session_id: ctx.session_id(), _req: request_gauge, _conn: conn_gauge, _cancel: session, diff --git a/proxy/src/context/mod.rs b/proxy/src/context/mod.rs index 6d2d2d51ce..4ec04deb25 100644 --- a/proxy/src/context/mod.rs +++ b/proxy/src/context/mod.rs @@ -272,11 +272,14 @@ impl RequestContext { this.success = true; } - pub fn log_connect(&self) { - self.0 - .try_lock() - .expect("should not deadlock") - .log_connect(); + pub fn log_connect(self) -> DisconnectLogger { + let mut this = self.0.into_inner(); + this.log_connect(); + + // close current span. + this.span = Span::none(); + + DisconnectLogger(this) } pub(crate) fn protocol(&self) -> Protocol { @@ -434,8 +437,14 @@ impl Drop for RequestContextInner { fn drop(&mut self) { if self.sender.is_some() { self.log_connect(); - } else { - self.log_disconnect(); } } } + +pub struct DisconnectLogger(RequestContextInner); + +impl Drop for DisconnectLogger { + fn drop(&mut self) { + self.0.log_disconnect(); + } +} diff --git a/proxy/src/proxy/mod.rs b/proxy/src/proxy/mod.rs index 5d9468d89a..7fe67e43de 100644 --- a/proxy/src/proxy/mod.rs +++ b/proxy/src/proxy/mod.rs @@ -10,7 +10,7 @@ pub(crate) mod wake_compute; use std::sync::Arc; pub use copy_bidirectional::{copy_bidirectional_client_compute, ErrorSource}; -use futures::TryFutureExt; +use futures::{FutureExt, TryFutureExt}; use itertools::Itertools; use once_cell::sync::OnceCell; use pq_proto::{BeMessage as Be, StartupMessageParams}; @@ -123,42 +123,39 @@ pub async fn task_main( crate::metrics::Protocol::Tcp, &config.region, ); - let span = ctx.span(); - let startup = Box::pin( - handle_client( - config, - auth_backend, - &ctx, - cancellation_handler, - socket, - ClientMode::Tcp, - endpoint_rate_limiter2, - conn_gauge, - ) - .instrument(span.clone()), - ); - let res = startup.await; + let res = handle_client( + config, + auth_backend, + &ctx, + cancellation_handler, + socket, + ClientMode::Tcp, + endpoint_rate_limiter2, + conn_gauge, + ) + .instrument(ctx.span()) + .boxed() + .await; match res { Err(e) => { - // todo: log and push to ctx the error kind ctx.set_error_kind(e.get_error_kind()); - warn!(parent: &span, "per-client task finished with an error: {e:#}"); + warn!(parent: &ctx.span(), "per-client task finished with an error: {e:#}"); } Ok(None) => { ctx.set_success(); } Ok(Some(p)) => { ctx.set_success(); - ctx.log_connect(); - match p.proxy_pass().instrument(span.clone()).await { + let _disconnect = ctx.log_connect(); + match p.proxy_pass().await { Ok(()) => {} Err(ErrorSource::Client(e)) => { - warn!(parent: &span, "per-client task finished with an IO error from the client: {e:#}"); + warn!(?session_id, "per-client task finished with an IO error from the client: {e:#}"); } Err(ErrorSource::Compute(e)) => { - error!(parent: &span, "per-client task finished with an IO error from the compute: {e:#}"); + error!(?session_id, "per-client task finished with an IO error from the compute: {e:#}"); } } } @@ -352,6 +349,7 @@ pub(crate) async fn handle_client( client: stream, aux: node.aux.clone(), compute: node, + session_id: ctx.session_id(), _req: request_gauge, _conn: conn_gauge, _cancel: session, diff --git a/proxy/src/proxy/passthrough.rs b/proxy/src/proxy/passthrough.rs index 5e07c8eeae..dcaa81e5cd 100644 --- a/proxy/src/proxy/passthrough.rs +++ b/proxy/src/proxy/passthrough.rs @@ -59,6 +59,7 @@ pub(crate) struct ProxyPassthrough { pub(crate) client: Stream, pub(crate) compute: PostgresConnection, pub(crate) aux: MetricsAuxInfo, + pub(crate) session_id: uuid::Uuid, pub(crate) _req: NumConnectionRequestsGuard<'static>, pub(crate) _conn: NumClientConnectionsGuard<'static>, @@ -69,7 +70,7 @@ impl ProxyPassthrough { pub(crate) async fn proxy_pass(self) -> Result<(), ErrorSource> { let res = proxy_pass(self.client, self.compute.stream, self.aux).await; if let Err(err) = self.compute.cancel_closure.try_cancel_query().await { - tracing::warn!(?err, "could not cancel the query in the database"); + tracing::warn!(session_id = ?self.session_id, ?err, "could not cancel the query in the database"); } res } diff --git a/proxy/src/redis/cancellation_publisher.rs b/proxy/src/redis/cancellation_publisher.rs index 633a2f1b81..228dbb7f64 100644 --- a/proxy/src/redis/cancellation_publisher.rs +++ b/proxy/src/redis/cancellation_publisher.rs @@ -1,6 +1,6 @@ +use core::net::IpAddr; use std::sync::Arc; -use core::net::IpAddr; use pq_proto::CancelKeyData; use redis::AsyncCommands; use tokio::sync::Mutex;