diff --git a/proxy/src/cancellation.rs b/proxy/src/cancellation.rs index ffc0cf43f1..60d453af14 100644 --- a/proxy/src/cancellation.rs +++ b/proxy/src/cancellation.rs @@ -350,7 +350,7 @@ impl CancellationHandler { #[derive(Debug, Clone, Serialize, Deserialize)] pub struct CancelClosure { socket_addr: SocketAddr, - cancel_token: RawCancelToken, + pub cancel_token: RawCancelToken, hostname: String, // for pg_sni router user_info: ComputeUserInfo, } diff --git a/proxy/src/compute/mod.rs b/proxy/src/compute/mod.rs index 0516cfa613..e84b839cc8 100644 --- a/proxy/src/compute/mod.rs +++ b/proxy/src/compute/mod.rs @@ -264,6 +264,19 @@ impl AuthInfo { .await?; drop(pause); + // TODO: lots of useful info but maybe we can move it elsewhere (eg traces?) + info!( + compute_id = %compute.aux.compute_id, + pid = connection.process_id, + cold_start_info = ctx.cold_start_info().as_str(), + query_id = ctx.get_testodrome_id().as_deref(), + sslmode = ?compute.ssl_mode, + "connected to compute node at {} ({}) latency={}", + compute.hostname, + compute.socket_addr, + ctx.get_proxy_latency(), + ); + let RawConnection { stream: _, parameters, @@ -272,8 +285,6 @@ impl AuthInfo { secret_key, } = connection; - tracing::Span::current().record("pid", tracing::field::display(process_id)); - // NB: CancelToken is supposed to hold socket_addr, but we use connect_raw. // Yet another reason to rework the connection establishing code. let cancel_closure = CancelClosure::new( @@ -392,18 +403,6 @@ impl ConnectInfo { let (socket_addr, stream) = self.connect_raw(config, direct).await?; drop(pause); - tracing::Span::current().record("compute_id", tracing::field::display(&aux.compute_id)); - - // TODO: lots of useful info but maybe we can move it elsewhere (eg traces?) - info!( - cold_start_info = ctx.cold_start_info().as_str(), - "connected to compute node at {} ({socket_addr}) sslmode={:?}, latency={}, query_id={}", - self.host, - self.ssl_mode, - ctx.get_proxy_latency(), - ctx.get_testodrome_id().unwrap_or_default(), - ); - let connection = ComputeConnection { stream, socket_addr, diff --git a/proxy/src/proxy/connect_compute.rs b/proxy/src/proxy/connect_compute.rs index 0b944bb7f0..d8a0af4a1f 100644 --- a/proxy/src/proxy/connect_compute.rs +++ b/proxy/src/proxy/connect_compute.rs @@ -1,4 +1,3 @@ -use async_trait::async_trait; use tokio::time; use tracing::{debug, info, warn}; @@ -33,7 +32,6 @@ pub(crate) fn invalidate_cache(node_info: control_plane::CachedNodeInfo) -> Node node_info.invalidate() } -#[async_trait] pub(crate) trait ConnectMechanism { type Connection; async fn connect_once( @@ -51,14 +49,9 @@ pub(crate) struct TcpMechanism { pub(crate) direct: bool, } -#[async_trait] impl ConnectMechanism for TcpMechanism { type Connection = ComputeConnection; - #[tracing::instrument(skip_all, fields( - pid = tracing::field::Empty, - compute_id = tracing::field::Empty - ))] async fn connect_once( &self, ctx: &RequestContext, diff --git a/proxy/src/proxy/tests/mod.rs b/proxy/src/proxy/tests/mod.rs index c3c8e4d431..f67a995eb1 100644 --- a/proxy/src/proxy/tests/mod.rs +++ b/proxy/src/proxy/tests/mod.rs @@ -424,7 +424,6 @@ impl TestConnectMechanism { #[derive(Debug)] struct TestConnection; -#[async_trait] impl ConnectMechanism for TestConnectMechanism { type Connection = TestConnection; diff --git a/proxy/src/serverless/backend.rs b/proxy/src/serverless/backend.rs index ff2c20c9ea..f62658475f 100644 --- a/proxy/src/serverless/backend.rs +++ b/proxy/src/serverless/backend.rs @@ -150,11 +150,6 @@ impl PoolingBackend { // Wake up the destination if needed. Code here is a bit involved because // we reuse the code from the usual proxy and we need to prepare few structures // that this code expects. - #[tracing::instrument(skip_all, fields( - pid = tracing::field::Empty, - compute_id = tracing::field::Empty, - conn_id = tracing::field::Empty, - ))] pub(crate) async fn connect_to_compute( &self, ctx: &RequestContext, @@ -174,7 +169,6 @@ impl PoolingBackend { return Ok(client); } let conn_id = uuid::Uuid::new_v4(); - tracing::Span::current().record("conn_id", display(conn_id)); info!(%conn_id, "pool: opening a new connection '{conn_info}'"); let backend = self.auth_backend.as_ref().map(|()| keys.info); let connection = crate::proxy::connect_compute::connect_to_compute( @@ -193,10 +187,6 @@ impl PoolingBackend { } // Wake up the destination if needed - #[tracing::instrument(skip_all, fields( - compute_id = tracing::field::Empty, - conn_id = tracing::field::Empty, - ))] pub(crate) async fn connect_to_local_proxy( &self, ctx: &RequestContext, @@ -208,7 +198,6 @@ impl PoolingBackend { } let conn_id = uuid::Uuid::new_v4(); - tracing::Span::current().record("conn_id", display(conn_id)); debug!(%conn_id, "pool: opening a new connection '{conn_info}'"); let backend = self.auth_backend.as_ref().map(|()| ComputeUserInfo { user: conn_info.user_info.user.clone(), @@ -240,10 +229,6 @@ impl PoolingBackend { /// # Panics /// /// Panics if called with a non-local_proxy backend. - #[tracing::instrument(skip_all, fields( - pid = tracing::field::Empty, - conn_id = tracing::field::Empty, - ))] pub(crate) async fn connect_to_local_postgres( &self, ctx: &RequestContext, @@ -464,6 +449,20 @@ async fn authenticate( let connection = config.authenticate(compute.stream).await?; drop(pause); + // TODO: lots of useful info but maybe we can move it elsewhere (eg traces?) + info!( + compute_id = %compute.aux.compute_id, + pid = connection.process_id, + cold_start_info = ctx.cold_start_info().as_str(), + query_id = ctx.get_testodrome_id().as_deref(), + sslmode = ?compute.ssl_mode, + %conn_id, + "connected to compute node at {} ({}) latency={}", + compute.hostname, + compute.socket_addr, + ctx.get_proxy_latency(), + ); + let (client, connection) = connection.into_managed_conn( SocketConfig { host_addr: Some(compute.socket_addr.ip()), @@ -474,8 +473,6 @@ async fn authenticate( compute.ssl_mode, ); - tracing::Span::current().record("pid", tracing::field::display(client.get_process_id())); - Ok(poll_client( pool.clone(), ctx, @@ -510,15 +507,19 @@ async fn h2handshake( .map_err(LocalProxyConnError::H2)?; drop(pause); - tracing::Span::current().record( - "compute_id", - tracing::field::display(&compute.aux.compute_id), + // TODO: lots of useful info but maybe we can move it elsewhere (eg traces?) + info!( + compute_id = %compute.aux.compute_id, + cold_start_info = ctx.cold_start_info().as_str(), + query_id = ctx.get_testodrome_id().as_deref(), + sslmode = ?compute.ssl_mode, + %conn_id, + "connected to compute node at {} ({}) latency={}", + compute.hostname, + compute.socket_addr, + ctx.get_proxy_latency(), ); - if let Some(query_id) = ctx.get_testodrome_id() { - info!("latency={}, query_id={}", ctx.get_proxy_latency(), query_id); - } - Ok(poll_http2_client( pool.clone(), ctx, diff --git a/proxy/src/serverless/conn_pool.rs b/proxy/src/serverless/conn_pool.rs index dd8cf052c5..0e1b669073 100644 --- a/proxy/src/serverless/conn_pool.rs +++ b/proxy/src/serverless/conn_pool.rs @@ -69,7 +69,7 @@ pub(crate) fn poll_client( let mut session_id = ctx.session_id(); let (tx, mut rx) = tokio::sync::watch::channel(session_id); - let span = info_span!(parent: None, "connection", %conn_id); + let span = info_span!(parent: None, "connection", %conn_id, pid=client.get_process_id(), compute_id=%aux.compute_id); let cold_start_info = ctx.cold_start_info(); span.in_scope(|| { info!(cold_start_info = cold_start_info.as_str(), %conn_info, %session_id, "new connection"); diff --git a/proxy/src/serverless/conn_pool_lib.rs b/proxy/src/serverless/conn_pool_lib.rs index 42a3ea17a2..b458b7e309 100644 --- a/proxy/src/serverless/conn_pool_lib.rs +++ b/proxy/src/serverless/conn_pool_lib.rs @@ -518,15 +518,14 @@ impl GlobalConnPool> { info!("pool: cached connection '{conn_info}' is closed, opening a new one"); return Ok(None); } - tracing::Span::current() - .record("conn_id", tracing::field::display(client.get_conn_id())); - tracing::Span::current().record( - "pid", - tracing::field::display(client.inner.get_process_id()), - ); - debug!( + info!( + conn_id = %client.get_conn_id(), + pid = client.inner.get_process_id(), + compute_id = &*client.aux.compute_id, cold_start_info = ColdStartInfo::HttpPoolHit.as_str(), - "pool: reusing connection '{conn_info}'" + query_id = ctx.get_testodrome_id().as_deref(), + "reusing connection: latency={}", + ctx.get_proxy_latency(), ); match client.get_data() { diff --git a/proxy/src/serverless/http_conn_pool.rs b/proxy/src/serverless/http_conn_pool.rs index 1c6574e57e..a38418f279 100644 --- a/proxy/src/serverless/http_conn_pool.rs +++ b/proxy/src/serverless/http_conn_pool.rs @@ -6,7 +6,7 @@ use hyper::client::conn::http2; use hyper_util::rt::{TokioExecutor, TokioIo}; use parking_lot::RwLock; use smol_str::ToSmolStr; -use tracing::{Instrument, debug, error, info, info_span}; +use tracing::{Instrument, error, info, info_span}; use super::AsyncRW; use super::backend::HttpConnError; @@ -115,7 +115,6 @@ impl Drop for HttpConnPool { } impl GlobalConnPool> { - #[expect(unused_results)] pub(crate) fn get( self: &Arc, ctx: &RequestContext, @@ -132,10 +131,13 @@ impl GlobalConnPool> { return result; }; - tracing::Span::current().record("conn_id", tracing::field::display(client.conn.conn_id)); - debug!( + info!( + conn_id = %client.conn.conn_id, + compute_id = &*client.conn.aux.compute_id, cold_start_info = ColdStartInfo::HttpPoolHit.as_str(), - "pool: reusing connection '{conn_info}'" + query_id = ctx.get_testodrome_id().as_deref(), + "reusing connection: latency={}", + ctx.get_proxy_latency(), ); ctx.set_cold_start_info(ColdStartInfo::HttpPoolHit); ctx.success(); @@ -197,7 +199,7 @@ pub(crate) fn poll_http2_client( let conn_gauge = Metrics::get().proxy.db_connections.guard(ctx.protocol()); let session_id = ctx.session_id(); - let span = info_span!(parent: None, "connection", %conn_id); + let span = info_span!(parent: None, "connection", %conn_id, compute_id=%aux.compute_id); let cold_start_info = ctx.cold_start_info(); span.in_scope(|| { info!(cold_start_info = cold_start_info.as_str(), %conn_info, %session_id, "new connection"); @@ -229,6 +231,8 @@ pub(crate) fn poll_http2_client( tokio::spawn( async move { + info!("new local proxy connection"); + let _conn_gauge = conn_gauge; let res = connection.await; match res { diff --git a/proxy/src/serverless/local_conn_pool.rs b/proxy/src/serverless/local_conn_pool.rs index c367615fb8..cf98c5f515 100644 --- a/proxy/src/serverless/local_conn_pool.rs +++ b/proxy/src/serverless/local_conn_pool.rs @@ -30,7 +30,7 @@ use serde_json::value::RawValue; use tokio::net::TcpStream; use tokio::time::Instant; use tokio_util::sync::CancellationToken; -use tracing::{Instrument, debug, error, info, info_span, warn}; +use tracing::{Instrument, error, info, info_span, warn}; use super::backend::HttpConnError; use super::conn_pool_lib::{ @@ -107,15 +107,13 @@ impl LocalConnPool { return Ok(None); } - tracing::Span::current() - .record("conn_id", tracing::field::display(client.get_conn_id())); - tracing::Span::current().record( - "pid", - tracing::field::display(client.inner.get_process_id()), - ); - debug!( + info!( + pid = client.inner.get_process_id(), + conn_id = %client.get_conn_id(), cold_start_info = ColdStartInfo::HttpPoolHit.as_str(), - "local_pool: reusing connection '{conn_info}'" + query_id = ctx.get_testodrome_id().as_deref(), + "reusing connection: latency={}", + ctx.get_proxy_latency(), ); match client.get_data() {