diff --git a/proxy/src/auth/backend.rs b/proxy/src/auth/backend.rs index 5cb8074cd5..11af85caa4 100644 --- a/proxy/src/auth/backend.rs +++ b/proxy/src/auth/backend.rs @@ -194,7 +194,6 @@ async fn auth_quirks( let res = hacks::password_hack_no_authentication(ctx, info, client).await?; ctx.set_endpoint_id(res.info.endpoint.clone()); - tracing::Span::current().record("ep", &tracing::field::display(&res.info.endpoint)); let password = match res.keys { ComputeCredentialKeys::Password(p) => p, _ => unreachable!("password hack should return a password"), diff --git a/proxy/src/auth/backend/classic.rs b/proxy/src/auth/backend/classic.rs index d075331846..b98fa63120 100644 --- a/proxy/src/auth/backend/classic.rs +++ b/proxy/src/auth/backend/classic.rs @@ -44,7 +44,7 @@ pub(super) async fn authenticate( ) .await .map_err(|e| { - warn!("error processing scram messages error = authentication timed out, execution time exeeded {} seconds", config.scram_protocol_timeout.as_secs()); + warn!("error processing scram messages error = authentication timed out, execution time exceeded {} seconds", config.scram_protocol_timeout.as_secs()); auth::AuthError::user_timeout(e) })??; diff --git a/proxy/src/auth/backend/link.rs b/proxy/src/auth/backend/link.rs index bf9ebf4c18..ec7d891247 100644 --- a/proxy/src/auth/backend/link.rs +++ b/proxy/src/auth/backend/link.rs @@ -102,7 +102,6 @@ pub(super) async fn authenticate( ctx.set_user(db_info.user.into()); ctx.set_project(db_info.aux.clone()); - tracing::Span::current().record("ep", &tracing::field::display(&db_info.aux.endpoint_id)); // Backwards compatibility. pg_sni_proxy uses "--" in domain names // while direct connections do not. Once we migrate to pg_sni_proxy diff --git a/proxy/src/auth/credentials.rs b/proxy/src/auth/credentials.rs index d318b3be54..89773aa1ff 100644 --- a/proxy/src/auth/credentials.rs +++ b/proxy/src/auth/credentials.rs @@ -142,10 +142,9 @@ impl ComputeUserInfoMaybeEndpoint { if let Some(ep) = &endpoint { ctx.set_endpoint_id(ep.clone()); - tracing::Span::current().record("ep", &tracing::field::display(ep)); } - info!(%user, project = endpoint.as_deref(), "credentials"); + info!(%user, "credentials"); if sni.is_some() { info!("Connection with sni"); NUM_CONNECTION_ACCEPTED_BY_SNI diff --git a/proxy/src/context.rs b/proxy/src/context.rs index 4d8ced6f8f..abad8a6412 100644 --- a/proxy/src/context.rs +++ b/proxy/src/context.rs @@ -5,6 +5,7 @@ use once_cell::sync::OnceCell; use smol_str::SmolStr; use std::net::IpAddr; use tokio::sync::mpsc; +use tracing::{field::display, info_span, Span}; use uuid::Uuid; use crate::{ @@ -29,6 +30,7 @@ pub struct RequestMonitoring { pub protocol: &'static str, first_packet: chrono::DateTime, region: &'static str, + pub span: Span, // filled in as they are discovered project: Option, @@ -64,12 +66,21 @@ impl RequestMonitoring { protocol: &'static str, region: &'static str, ) -> Self { + let span = info_span!( + "connect_request", + %protocol, + ?session_id, + %peer_addr, + ep = tracing::field::Empty, + ); + Self { peer_addr, session_id, protocol, first_packet: Utc::now(), region, + span, project: None, branch: None, @@ -101,8 +112,8 @@ impl RequestMonitoring { } pub fn set_project(&mut self, x: MetricsAuxInfo) { + self.set_endpoint_id(x.endpoint_id); self.branch = Some(x.branch_id); - self.endpoint_id = Some(x.endpoint_id); self.project = Some(x.project_id); self.is_cold_start = x.is_cold_start; } @@ -112,6 +123,7 @@ impl RequestMonitoring { } pub fn set_endpoint_id(&mut self, endpoint_id: EndpointId) { + self.span.record("ep", display(&endpoint_id)); crate::metrics::CONNECTING_ENDPOINTS .with_label_values(&[self.protocol]) .measure(&endpoint_id); diff --git a/proxy/src/proxy.rs b/proxy/src/proxy.rs index 8a9445303a..d94fc67491 100644 --- a/proxy/src/proxy.rs +++ b/proxy/src/proxy.rs @@ -22,7 +22,6 @@ use crate::{ stream::{PqStream, Stream}, EndpointCacheKey, }; -use anyhow::{bail, Context}; use futures::TryFutureExt; use itertools::Itertools; use once_cell::sync::OnceCell; @@ -33,7 +32,7 @@ use std::sync::Arc; use thiserror::Error; use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt}; use tokio_util::sync::CancellationToken; -use tracing::{error, info, info_span, Instrument}; +use tracing::{error, info, Instrument}; use self::{ connect_compute::{connect_to_compute, TcpMechanism}, @@ -83,68 +82,67 @@ pub async fn task_main( let cancellation_handler = Arc::clone(&cancellation_handler); let endpoint_rate_limiter = endpoint_rate_limiter.clone(); - let session_span = info_span!( - "handle_client", - ?session_id, - peer_addr = tracing::field::Empty, - ep = tracing::field::Empty, - ); - - connections.spawn( - async move { - info!("accepted postgres client connection"); - - let mut socket = WithClientIp::new(socket); - let mut peer_addr = peer_addr.ip(); - if let Some(addr) = socket.wait_for_addr().await? { - peer_addr = addr.ip(); - tracing::Span::current().record("peer_addr", &tracing::field::display(addr)); - } else if config.require_client_ip { - bail!("missing required client IP"); + connections.spawn(async move { + let mut socket = WithClientIp::new(socket); + let mut peer_addr = peer_addr.ip(); + match socket.wait_for_addr().await { + Ok(Some(addr)) => peer_addr = addr.ip(), + Err(e) => { + error!("per-client task finished with an error: {e:#}"); + return; } + Ok(None) if config.require_client_ip => { + error!("missing required client IP"); + return; + } + Ok(None) => {} + } - socket - .inner - .set_nodelay(true) - .context("failed to set socket option")?; + match socket.inner.set_nodelay(true) { + Ok(()) => {}, + Err(e) => { + error!("per-client task finished with an error: failed to set socket option: {e:#}"); + return; + }, + }; - let mut ctx = RequestMonitoring::new(session_id, peer_addr, "tcp", &config.region); + let mut ctx = RequestMonitoring::new(session_id, peer_addr, "tcp", &config.region); + let span = ctx.span.clone(); - let res = handle_client( - config, - &mut ctx, - cancellation_handler, - socket, - ClientMode::Tcp, - endpoint_rate_limiter, - ) - .await; + let res = handle_client( + config, + &mut ctx, + cancellation_handler, + socket, + ClientMode::Tcp, + endpoint_rate_limiter, + ) + .instrument(span.clone()) + .await; - match res { - Err(e) => { - // todo: log and push to ctx the error kind - ctx.set_error_kind(e.get_error_kind()); - ctx.log(); - Err(e.into()) - } - Ok(None) => { - ctx.set_success(); - ctx.log(); - Ok(()) - } - Ok(Some(p)) => { - ctx.set_success(); - ctx.log(); - p.proxy_pass().await + match res { + Err(e) => { + // todo: log and push to ctx the error kind + ctx.set_error_kind(e.get_error_kind()); + ctx.log(); + error!(parent: &span, "per-client task finished with an error: {e:#}"); + } + Ok(None) => { + ctx.set_success(); + ctx.log(); + } + Ok(Some(p)) => { + ctx.set_success(); + ctx.log(); + match p.proxy_pass().instrument(span.clone()).await { + Ok(()) => {} + Err(e) => { + error!(parent: &span, "per-client task finished with an error: {e:#}"); + } } } } - .unwrap_or_else(move |e| { - // Acknowledge that the task has finished with an error. - error!("per-client task finished with an error: {e:#}"); - }) - .instrument(session_span), - ); + }); } connections.close(); @@ -232,10 +230,7 @@ pub async fn handle_client( mode: ClientMode, endpoint_rate_limiter: Arc, ) -> Result>, ClientRequestError> { - info!( - protocol = ctx.protocol, - "handling interactive connection from client" - ); + info!("handling interactive connection from client"); let proto = ctx.protocol; let _client_gauge = NUM_CLIENT_CONNECTION_GAUGE diff --git a/proxy/src/proxy/tests.rs b/proxy/src/proxy/tests.rs index c407a5572a..595d9c4979 100644 --- a/proxy/src/proxy/tests.rs +++ b/proxy/src/proxy/tests.rs @@ -17,6 +17,7 @@ use crate::console::{self, CachedNodeInfo, NodeInfo}; use crate::error::ErrorKind; use crate::proxy::retry::{retry_after, NUM_RETRIES_CONNECT}; use crate::{auth, http, sasl, scram}; +use anyhow::{bail, Context}; use async_trait::async_trait; use rstest::rstest; use tokio_postgres::config::SslMode; diff --git a/proxy/src/serverless.rs b/proxy/src/serverless.rs index dbf4f9cc74..b5806aec53 100644 --- a/proxy/src/serverless.rs +++ b/proxy/src/serverless.rs @@ -34,13 +34,14 @@ use hyper::{ Body, Method, Request, Response, }; +use std::convert::Infallible; use std::net::IpAddr; use std::task::Poll; use std::{future::ready, sync::Arc}; use tls_listener::TlsListener; use tokio::net::TcpListener; use tokio_util::sync::CancellationToken; -use tracing::{error, info, info_span, warn, Instrument}; +use tracing::{error, info, warn, Instrument}; use utils::http::{error::ApiError, json::json_response}; pub const SERVERLESS_DRIVER_SNI: &str = "api"; @@ -134,24 +135,19 @@ pub async fn task_main( let cancellation_handler = cancellation_handler.clone(); async move { - let session_id = uuid::Uuid::new_v4(); - - request_handler( - req, - config, - backend, - ws_connections, - cancellation_handler, - session_id, - peer_addr.ip(), - endpoint_rate_limiter, + Ok::<_, Infallible>( + request_handler( + req, + config, + backend, + ws_connections, + cancellation_handler, + peer_addr.ip(), + endpoint_rate_limiter, + ) + .await + .map_or_else(|e| e.into_response(), |r| r), ) - .instrument(info_span!( - "serverless", - session = %session_id, - %peer_addr, - )) - .await } }, ))) @@ -210,10 +206,11 @@ async fn request_handler( backend: Arc, ws_connections: TaskTracker, cancellation_handler: Arc, - session_id: uuid::Uuid, peer_addr: IpAddr, endpoint_rate_limiter: Arc, ) -> Result, ApiError> { + let session_id = uuid::Uuid::new_v4(); + let host = request .headers() .get("host") @@ -223,15 +220,15 @@ async fn request_handler( // Check if the request is a websocket upgrade request. if hyper_tungstenite::is_upgrade_request(&request) { - info!(session_id = ?session_id, "performing websocket upgrade"); + let ctx = RequestMonitoring::new(session_id, peer_addr, "ws", &config.region); + let span = ctx.span.clone(); + info!(parent: &span, "performing websocket upgrade"); let (response, websocket) = hyper_tungstenite::upgrade(&mut request, None) .map_err(|e| ApiError::BadRequest(e.into()))?; ws_connections.spawn( async move { - let ctx = RequestMonitoring::new(session_id, peer_addr, "ws", &config.region); - if let Err(e) = websocket::serve_websocket( config, ctx, @@ -242,18 +239,21 @@ async fn request_handler( ) .await { - error!(session_id = ?session_id, "error in websocket connection: {e:#}"); + error!("error in websocket connection: {e:#}"); } } - .in_current_span(), + .instrument(span), ); // Return the response so the spawned future can continue. Ok(response) } else if request.uri().path() == "/sql" && request.method() == Method::POST { let ctx = RequestMonitoring::new(session_id, peer_addr, "http", &config.region); + let span = ctx.span.clone(); - sql_over_http::handle(config, ctx, request, backend).await + sql_over_http::handle(config, ctx, request, backend) + .instrument(span) + .await } else if request.uri().path() == "/sql" && request.method() == Method::OPTIONS { Response::builder() .header("Allow", "OPTIONS, POST") diff --git a/proxy/src/serverless/sql_over_http.rs b/proxy/src/serverless/sql_over_http.rs index 63fe87eade..7f51ba82cc 100644 --- a/proxy/src/serverless/sql_over_http.rs +++ b/proxy/src/serverless/sql_over_http.rs @@ -21,7 +21,6 @@ use tokio_postgres::ReadyForQueryStatus; use tokio_postgres::Transaction; use tracing::error; use tracing::info; -use tracing::instrument; use url::Url; use utils::http::error::ApiError; use utils::http::json::json_response; @@ -291,7 +290,7 @@ pub async fn handle( // ctx.set_error_kind(crate::error::ErrorKind::RateLimit); let message = format!( - "HTTP-Connection timed out, execution time exeeded {} seconds", + "HTTP-Connection timed out, execution time exceeded {} seconds", config.http_config.request_timeout.as_secs() ); error!(message); @@ -309,14 +308,6 @@ pub async fn handle( Ok(response) } -#[instrument( - name = "sql-over-http", - skip_all, - fields( - pid = tracing::field::Empty, - conn_id = tracing::field::Empty - ) -)] async fn handle_inner( config: &'static ProxyConfig, ctx: &mut RequestMonitoring, @@ -326,10 +317,7 @@ async fn handle_inner( let _request_gauge = NUM_CONNECTION_REQUESTS_GAUGE .with_label_values(&[ctx.protocol]) .guard(); - info!( - protocol = ctx.protocol, - "handling interactive connection from client" - ); + info!("handling interactive connection from client"); // // Determine the destination and connection params @@ -337,11 +325,7 @@ async fn handle_inner( let headers = request.headers(); // TLS config should be there. let conn_info = get_conn_info(ctx, headers, config.tls_config.as_ref().unwrap())?; - info!( - user = conn_info.user_info.user.as_str(), - project = conn_info.user_info.endpoint.as_str(), - "credentials" - ); + info!(user = conn_info.user_info.user.as_str(), "credentials"); // Determine the output options. Default behaviour is 'false'. Anything that is not // strictly 'true' assumed to be false.