mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-14 08:52:56 +00:00
Add a few tracing spans, for more fine-grained tracing.
This also splits the 'connect_to_db' function, so that it only establishes the connection, and a new 'handle_connection' function is the equivalent of what 'connect_to_db' used to do. This made it easier to attach a span to specifically to the first part where we establish the connection.
This commit is contained in:
@@ -16,7 +16,7 @@ use crate::{
|
||||
use once_cell::sync::Lazy;
|
||||
use std::borrow::Cow;
|
||||
use tokio::io::{AsyncRead, AsyncWrite};
|
||||
use tracing::{info, warn};
|
||||
use tracing::{info, instrument, warn};
|
||||
|
||||
static CPLANE_WAITERS: Lazy<Waiters<mgmt::ComputeReady>> = Lazy::new(Default::default);
|
||||
|
||||
@@ -220,6 +220,7 @@ impl BackendType<'_, ClientCredentials<'_>> {
|
||||
}
|
||||
|
||||
/// Authenticate the client via the requested backend, possibly using credentials.
|
||||
#[instrument(skip_all)]
|
||||
pub async fn authenticate(
|
||||
mut self,
|
||||
extra: &ConsoleReqExtra<'_>,
|
||||
|
||||
@@ -14,7 +14,7 @@ use once_cell::sync::Lazy;
|
||||
use pq_proto::{BeMessage as Be, FeStartupPacket, StartupMessageParams};
|
||||
use std::sync::Arc;
|
||||
use tokio::io::{AsyncRead, AsyncWrite};
|
||||
use tracing::{error, info, info_span, Instrument};
|
||||
use tracing::{error, info, instrument};
|
||||
|
||||
const ERR_INSECURE_CONNECTION: &str = "connection is insecure (try using `sslmode=require`)";
|
||||
const ERR_PROTO_VIOLATION: &str = "protocol violation";
|
||||
@@ -76,8 +76,7 @@ pub async fn task_main(
|
||||
.unwrap_or_else(|e| {
|
||||
// Acknowledge that the task has finished with an error.
|
||||
error!("per-client task finished with an error: {e:#}");
|
||||
})
|
||||
.instrument(info_span!("client", session = format_args!("{session_id}"))),
|
||||
}),
|
||||
);
|
||||
}
|
||||
}
|
||||
@@ -99,8 +98,7 @@ pub async fn handle_ws_client(
|
||||
let hostname = hostname.as_deref();
|
||||
|
||||
// TLS is None here, because the connection is already encrypted.
|
||||
let do_handshake = handshake(stream, None, cancel_map).instrument(info_span!("handshake"));
|
||||
let (mut stream, params) = match do_handshake.await? {
|
||||
let (mut stream, params) = match handshake(stream, None, cancel_map).await? {
|
||||
Some(x) => x,
|
||||
None => return Ok(()), // it's a cancellation request
|
||||
};
|
||||
@@ -119,10 +117,13 @@ pub async fn handle_ws_client(
|
||||
|
||||
let client = Client::new(stream, creds, ¶ms, session_id);
|
||||
cancel_map
|
||||
.with_session(|session| client.connect_to_db(session))
|
||||
.with_session(|session| client.handle_connection(session))
|
||||
.await
|
||||
}
|
||||
|
||||
/// Handle an incoming client connection, handshake and authentication.
|
||||
/// After that, keeps forwarding all the data. This doesn't return until the
|
||||
/// connection is lost.
|
||||
async fn handle_client(
|
||||
config: &ProxyConfig,
|
||||
cancel_map: &CancelMap,
|
||||
@@ -135,9 +136,9 @@ async fn handle_client(
|
||||
NUM_CONNECTIONS_CLOSED_COUNTER.inc();
|
||||
}
|
||||
|
||||
// Process postgres startup packet and upgrade to TLS (if applicable)
|
||||
let tls = config.tls_config.as_ref();
|
||||
let do_handshake = handshake(stream, tls, cancel_map).instrument(info_span!("handshake"));
|
||||
let (mut stream, params) = match do_handshake.await? {
|
||||
let (mut stream, params) = match handshake(stream, tls, cancel_map).await? {
|
||||
Some(x) => x,
|
||||
None => return Ok(()), // it's a cancellation request
|
||||
};
|
||||
@@ -157,7 +158,7 @@ async fn handle_client(
|
||||
|
||||
let client = Client::new(stream, creds, ¶ms, session_id);
|
||||
cancel_map
|
||||
.with_session(|session| client.connect_to_db(session))
|
||||
.with_session(|session| client.handle_connection(session))
|
||||
.await
|
||||
}
|
||||
|
||||
@@ -165,6 +166,7 @@ async fn handle_client(
|
||||
/// For better testing experience, `stream` can be any object satisfying the traits.
|
||||
/// It's easier to work with owned `stream` here as we need to upgrade it to TLS;
|
||||
/// we also take an extra care of propagating only the select handshake errors to client.
|
||||
#[instrument(skip_all)]
|
||||
async fn handshake<S: AsyncRead + AsyncWrite + Unpin>(
|
||||
stream: S,
|
||||
mut tls: Option<&TlsConfig>,
|
||||
@@ -256,8 +258,21 @@ impl<'a, S> Client<'a, S> {
|
||||
}
|
||||
|
||||
impl<S: AsyncRead + AsyncWrite + Unpin + Send> Client<'_, S> {
|
||||
async fn handle_connection(self, session: cancellation::Session<'_>) -> anyhow::Result<()> {
|
||||
let (mut client, mut db) = self.connect_to_db(session).await?;
|
||||
|
||||
// Starting from here we only proxy the client's traffic.
|
||||
info!("performing the proxy pass...");
|
||||
let _ = tokio::io::copy_bidirectional(&mut client, &mut db).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Let the client authenticate and connect to the designated compute node.
|
||||
async fn connect_to_db(self, session: cancellation::Session<'_>) -> anyhow::Result<()> {
|
||||
#[instrument(skip_all)]
|
||||
async fn connect_to_db(
|
||||
self,
|
||||
session: cancellation::Session<'_>,
|
||||
) -> anyhow::Result<(MeasuredStream<S>, MeasuredStream<tokio::net::TcpStream>)> {
|
||||
let Self {
|
||||
mut stream,
|
||||
creds,
|
||||
@@ -275,7 +290,6 @@ impl<S: AsyncRead + AsyncWrite + Unpin + Send> Client<'_, S> {
|
||||
let res = creds.authenticate(&extra, &mut stream).await;
|
||||
async { res }.or_else(|e| stream.throw_error(e)).await
|
||||
}
|
||||
.instrument(info_span!("auth"))
|
||||
.await?;
|
||||
|
||||
let node = auth_result.value;
|
||||
@@ -311,15 +325,11 @@ impl<S: AsyncRead + AsyncWrite + Unpin + Send> Client<'_, S> {
|
||||
.await?;
|
||||
|
||||
let m_sent = NUM_BYTES_PROXIED_COUNTER.with_label_values(&node.aux.traffic_labels("tx"));
|
||||
let mut client = MeasuredStream::new(stream.into_inner(), m_sent);
|
||||
let client = MeasuredStream::new(stream.into_inner(), m_sent);
|
||||
|
||||
let m_recv = NUM_BYTES_PROXIED_COUNTER.with_label_values(&node.aux.traffic_labels("rx"));
|
||||
let mut db = MeasuredStream::new(db.stream, m_recv);
|
||||
let db = MeasuredStream::new(db.stream, m_recv);
|
||||
|
||||
// Starting from here we only proxy the client's traffic.
|
||||
info!("performing the proxy pass...");
|
||||
let _ = tokio::io::copy_bidirectional(&mut client, &mut db).await?;
|
||||
|
||||
Ok(())
|
||||
Ok((client, db))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -10,6 +10,7 @@ use std::{io, task};
|
||||
use thiserror::Error;
|
||||
use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt, ReadBuf};
|
||||
use tokio_rustls::server::TlsStream;
|
||||
use tracing::instrument;
|
||||
|
||||
pin_project! {
|
||||
/// Stream wrapper which implements libpq's protocol.
|
||||
@@ -105,6 +106,7 @@ impl<S: AsyncWrite + Unpin> PqStream<S> {
|
||||
/// Write the error message using [`Self::write_message`], then re-throw it.
|
||||
/// Allowing string literals is safe under the assumption they might not contain any runtime info.
|
||||
/// This method exists due to `&str` not implementing `Into<anyhow::Error>`.
|
||||
#[instrument(skip_all)]
|
||||
pub async fn throw_error_str<T>(&mut self, error: &'static str) -> anyhow::Result<T> {
|
||||
tracing::info!("forwarding error to user: {error}");
|
||||
self.write_message(&BeMessage::ErrorResponse(error, None))
|
||||
@@ -114,6 +116,7 @@ impl<S: AsyncWrite + Unpin> PqStream<S> {
|
||||
|
||||
/// Write the error message using [`Self::write_message`], then re-throw it.
|
||||
/// Trait [`UserFacingError`] acts as an allowlist for error types.
|
||||
#[instrument(skip_all)]
|
||||
pub async fn throw_error<T, E>(&mut self, error: E) -> anyhow::Result<T>
|
||||
where
|
||||
E: UserFacingError + Into<anyhow::Error>,
|
||||
|
||||
Reference in New Issue
Block a user