standardise logging

This commit is contained in:
Conrad Ludgate
2025-06-24 18:14:08 +01:00
parent 010cd34635
commit c0e1e1dd74
9 changed files with 64 additions and 71 deletions

View File

@@ -350,7 +350,7 @@ impl CancellationHandler {
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CancelClosure {
socket_addr: SocketAddr,
cancel_token: RawCancelToken,
pub cancel_token: RawCancelToken,
hostname: String, // for pg_sni router
user_info: ComputeUserInfo,
}

View File

@@ -264,6 +264,19 @@ impl AuthInfo {
.await?;
drop(pause);
// TODO: lots of useful info but maybe we can move it elsewhere (eg traces?)
info!(
compute_id = %compute.aux.compute_id,
pid = connection.process_id,
cold_start_info = ctx.cold_start_info().as_str(),
query_id = ctx.get_testodrome_id().as_deref(),
sslmode = ?compute.ssl_mode,
"connected to compute node at {} ({}) latency={}",
compute.hostname,
compute.socket_addr,
ctx.get_proxy_latency(),
);
let RawConnection {
stream: _,
parameters,
@@ -272,8 +285,6 @@ impl AuthInfo {
secret_key,
} = connection;
tracing::Span::current().record("pid", tracing::field::display(process_id));
// NB: CancelToken is supposed to hold socket_addr, but we use connect_raw.
// Yet another reason to rework the connection establishing code.
let cancel_closure = CancelClosure::new(
@@ -392,18 +403,6 @@ impl ConnectInfo {
let (socket_addr, stream) = self.connect_raw(config, direct).await?;
drop(pause);
tracing::Span::current().record("compute_id", tracing::field::display(&aux.compute_id));
// TODO: lots of useful info but maybe we can move it elsewhere (eg traces?)
info!(
cold_start_info = ctx.cold_start_info().as_str(),
"connected to compute node at {} ({socket_addr}) sslmode={:?}, latency={}, query_id={}",
self.host,
self.ssl_mode,
ctx.get_proxy_latency(),
ctx.get_testodrome_id().unwrap_or_default(),
);
let connection = ComputeConnection {
stream,
socket_addr,

View File

@@ -1,4 +1,3 @@
use async_trait::async_trait;
use tokio::time;
use tracing::{debug, info, warn};
@@ -33,7 +32,6 @@ pub(crate) fn invalidate_cache(node_info: control_plane::CachedNodeInfo) -> Node
node_info.invalidate()
}
#[async_trait]
pub(crate) trait ConnectMechanism {
type Connection;
async fn connect_once(
@@ -51,14 +49,9 @@ pub(crate) struct TcpMechanism {
pub(crate) direct: bool,
}
#[async_trait]
impl ConnectMechanism for TcpMechanism {
type Connection = ComputeConnection;
#[tracing::instrument(skip_all, fields(
pid = tracing::field::Empty,
compute_id = tracing::field::Empty
))]
async fn connect_once(
&self,
ctx: &RequestContext,

View File

@@ -424,7 +424,6 @@ impl TestConnectMechanism {
#[derive(Debug)]
struct TestConnection;
#[async_trait]
impl ConnectMechanism for TestConnectMechanism {
type Connection = TestConnection;

View File

@@ -150,11 +150,6 @@ impl PoolingBackend {
// Wake up the destination if needed. Code here is a bit involved because
// we reuse the code from the usual proxy and we need to prepare few structures
// that this code expects.
#[tracing::instrument(skip_all, fields(
pid = tracing::field::Empty,
compute_id = tracing::field::Empty,
conn_id = tracing::field::Empty,
))]
pub(crate) async fn connect_to_compute(
&self,
ctx: &RequestContext,
@@ -174,7 +169,6 @@ impl PoolingBackend {
return Ok(client);
}
let conn_id = uuid::Uuid::new_v4();
tracing::Span::current().record("conn_id", display(conn_id));
info!(%conn_id, "pool: opening a new connection '{conn_info}'");
let backend = self.auth_backend.as_ref().map(|()| keys.info);
let connection = crate::proxy::connect_compute::connect_to_compute(
@@ -193,10 +187,6 @@ impl PoolingBackend {
}
// Wake up the destination if needed
#[tracing::instrument(skip_all, fields(
compute_id = tracing::field::Empty,
conn_id = tracing::field::Empty,
))]
pub(crate) async fn connect_to_local_proxy(
&self,
ctx: &RequestContext,
@@ -208,7 +198,6 @@ impl PoolingBackend {
}
let conn_id = uuid::Uuid::new_v4();
tracing::Span::current().record("conn_id", display(conn_id));
debug!(%conn_id, "pool: opening a new connection '{conn_info}'");
let backend = self.auth_backend.as_ref().map(|()| ComputeUserInfo {
user: conn_info.user_info.user.clone(),
@@ -240,10 +229,6 @@ impl PoolingBackend {
/// # Panics
///
/// Panics if called with a non-local_proxy backend.
#[tracing::instrument(skip_all, fields(
pid = tracing::field::Empty,
conn_id = tracing::field::Empty,
))]
pub(crate) async fn connect_to_local_postgres(
&self,
ctx: &RequestContext,
@@ -464,6 +449,20 @@ async fn authenticate(
let connection = config.authenticate(compute.stream).await?;
drop(pause);
// TODO: lots of useful info but maybe we can move it elsewhere (eg traces?)
info!(
compute_id = %compute.aux.compute_id,
pid = connection.process_id,
cold_start_info = ctx.cold_start_info().as_str(),
query_id = ctx.get_testodrome_id().as_deref(),
sslmode = ?compute.ssl_mode,
%conn_id,
"connected to compute node at {} ({}) latency={}",
compute.hostname,
compute.socket_addr,
ctx.get_proxy_latency(),
);
let (client, connection) = connection.into_managed_conn(
SocketConfig {
host_addr: Some(compute.socket_addr.ip()),
@@ -474,8 +473,6 @@ async fn authenticate(
compute.ssl_mode,
);
tracing::Span::current().record("pid", tracing::field::display(client.get_process_id()));
Ok(poll_client(
pool.clone(),
ctx,
@@ -510,15 +507,19 @@ async fn h2handshake(
.map_err(LocalProxyConnError::H2)?;
drop(pause);
tracing::Span::current().record(
"compute_id",
tracing::field::display(&compute.aux.compute_id),
// TODO: lots of useful info but maybe we can move it elsewhere (eg traces?)
info!(
compute_id = %compute.aux.compute_id,
cold_start_info = ctx.cold_start_info().as_str(),
query_id = ctx.get_testodrome_id().as_deref(),
sslmode = ?compute.ssl_mode,
%conn_id,
"connected to compute node at {} ({}) latency={}",
compute.hostname,
compute.socket_addr,
ctx.get_proxy_latency(),
);
if let Some(query_id) = ctx.get_testodrome_id() {
info!("latency={}, query_id={}", ctx.get_proxy_latency(), query_id);
}
Ok(poll_http2_client(
pool.clone(),
ctx,

View File

@@ -69,7 +69,7 @@ pub(crate) fn poll_client<C: ClientInnerExt>(
let mut session_id = ctx.session_id();
let (tx, mut rx) = tokio::sync::watch::channel(session_id);
let span = info_span!(parent: None, "connection", %conn_id);
let span = info_span!(parent: None, "connection", %conn_id, pid=client.get_process_id(), compute_id=%aux.compute_id);
let cold_start_info = ctx.cold_start_info();
span.in_scope(|| {
info!(cold_start_info = cold_start_info.as_str(), %conn_info, %session_id, "new connection");

View File

@@ -518,15 +518,14 @@ impl<C: ClientInnerExt> GlobalConnPool<C, EndpointConnPool<C>> {
info!("pool: cached connection '{conn_info}' is closed, opening a new one");
return Ok(None);
}
tracing::Span::current()
.record("conn_id", tracing::field::display(client.get_conn_id()));
tracing::Span::current().record(
"pid",
tracing::field::display(client.inner.get_process_id()),
);
debug!(
info!(
conn_id = %client.get_conn_id(),
pid = client.inner.get_process_id(),
compute_id = &*client.aux.compute_id,
cold_start_info = ColdStartInfo::HttpPoolHit.as_str(),
"pool: reusing connection '{conn_info}'"
query_id = ctx.get_testodrome_id().as_deref(),
"reusing connection: latency={}",
ctx.get_proxy_latency(),
);
match client.get_data() {

View File

@@ -6,7 +6,7 @@ use hyper::client::conn::http2;
use hyper_util::rt::{TokioExecutor, TokioIo};
use parking_lot::RwLock;
use smol_str::ToSmolStr;
use tracing::{Instrument, debug, error, info, info_span};
use tracing::{Instrument, error, info, info_span};
use super::AsyncRW;
use super::backend::HttpConnError;
@@ -115,7 +115,6 @@ impl<C: ClientInnerExt + Clone> Drop for HttpConnPool<C> {
}
impl<C: ClientInnerExt + Clone> GlobalConnPool<C, HttpConnPool<C>> {
#[expect(unused_results)]
pub(crate) fn get(
self: &Arc<Self>,
ctx: &RequestContext,
@@ -132,10 +131,13 @@ impl<C: ClientInnerExt + Clone> GlobalConnPool<C, HttpConnPool<C>> {
return result;
};
tracing::Span::current().record("conn_id", tracing::field::display(client.conn.conn_id));
debug!(
info!(
conn_id = %client.conn.conn_id,
compute_id = &*client.conn.aux.compute_id,
cold_start_info = ColdStartInfo::HttpPoolHit.as_str(),
"pool: reusing connection '{conn_info}'"
query_id = ctx.get_testodrome_id().as_deref(),
"reusing connection: latency={}",
ctx.get_proxy_latency(),
);
ctx.set_cold_start_info(ColdStartInfo::HttpPoolHit);
ctx.success();
@@ -197,7 +199,7 @@ pub(crate) fn poll_http2_client(
let conn_gauge = Metrics::get().proxy.db_connections.guard(ctx.protocol());
let session_id = ctx.session_id();
let span = info_span!(parent: None, "connection", %conn_id);
let span = info_span!(parent: None, "connection", %conn_id, compute_id=%aux.compute_id);
let cold_start_info = ctx.cold_start_info();
span.in_scope(|| {
info!(cold_start_info = cold_start_info.as_str(), %conn_info, %session_id, "new connection");
@@ -229,6 +231,8 @@ pub(crate) fn poll_http2_client(
tokio::spawn(
async move {
info!("new local proxy connection");
let _conn_gauge = conn_gauge;
let res = connection.await;
match res {

View File

@@ -30,7 +30,7 @@ use serde_json::value::RawValue;
use tokio::net::TcpStream;
use tokio::time::Instant;
use tokio_util::sync::CancellationToken;
use tracing::{Instrument, debug, error, info, info_span, warn};
use tracing::{Instrument, error, info, info_span, warn};
use super::backend::HttpConnError;
use super::conn_pool_lib::{
@@ -107,15 +107,13 @@ impl<C: ClientInnerExt> LocalConnPool<C> {
return Ok(None);
}
tracing::Span::current()
.record("conn_id", tracing::field::display(client.get_conn_id()));
tracing::Span::current().record(
"pid",
tracing::field::display(client.inner.get_process_id()),
);
debug!(
info!(
pid = client.inner.get_process_id(),
conn_id = %client.get_conn_id(),
cold_start_info = ColdStartInfo::HttpPoolHit.as_str(),
"local_pool: reusing connection '{conn_info}'"
query_id = ctx.get_testodrome_id().as_deref(),
"reusing connection: latency={}",
ctx.get_proxy_latency(),
);
match client.get_data() {