diff --git a/proxy/src/metrics.rs b/proxy/src/metrics.rs index ccf89f9b05..f7f162a075 100644 --- a/proxy/src/metrics.rs +++ b/proxy/src/metrics.rs @@ -200,8 +200,9 @@ impl LatencyTimer { pub fn success(&mut self) { // stop the stopwatch and record the time that we have accumulated - let start = self.start.take().expect("latency timer should be started"); - self.accumulated += start.elapsed(); + if let Some(start) = self.start.take() { + self.accumulated += start.elapsed(); + } // success self.outcome = "success"; diff --git a/proxy/src/serverless/backend.rs b/proxy/src/serverless/backend.rs index 8285da68d7..156002006d 100644 --- a/proxy/src/serverless/backend.rs +++ b/proxy/src/serverless/backend.rs @@ -1,7 +1,7 @@ use std::{sync::Arc, time::Duration}; use async_trait::async_trait; -use tracing::info; +use tracing::{field::display, info}; use crate::{ auth::{backend::ComputeCredentialKeys, check_peer_addr_is_in_list, AuthError}, @@ -15,7 +15,7 @@ use crate::{ proxy::connect_compute::ConnectMechanism, }; -use super::conn_pool::{poll_client, Client, ConnInfo, GlobalConnPool, APP_NAME}; +use super::conn_pool::{poll_client, Client, ConnInfo, GlobalConnPool}; pub struct PoolingBackend { pub pool: Arc>, @@ -81,8 +81,8 @@ impl PoolingBackend { return Ok(client); } let conn_id = uuid::Uuid::new_v4(); - info!(%conn_id, "pool: opening a new connection '{conn_info}'"); - ctx.set_application(Some(APP_NAME)); + tracing::Span::current().record("conn_id", display(conn_id)); + info!("pool: opening a new connection '{conn_info}'"); let backend = self .config .auth_backend diff --git a/proxy/src/serverless/conn_pool.rs b/proxy/src/serverless/conn_pool.rs index f4e5b145c5..53e7c1c2ee 100644 --- a/proxy/src/serverless/conn_pool.rs +++ b/proxy/src/serverless/conn_pool.rs @@ -4,7 +4,6 @@ use metrics::IntCounterPairGuard; use parking_lot::RwLock; use rand::Rng; use smallvec::SmallVec; -use smol_str::SmolStr; use std::{collections::HashMap, pin::pin, sync::Arc, sync::Weak, time::Duration}; use std::{ fmt, @@ -31,8 +30,6 @@ use tracing::{info, info_span, Instrument}; use super::backend::HttpConnError; -pub const APP_NAME: SmolStr = SmolStr::new_inline("/sql_over_http"); - #[derive(Debug, Clone)] pub struct ConnInfo { pub user_info: ComputeUserInfo, @@ -379,12 +376,13 @@ impl GlobalConnPool { info!("pool: cached connection '{conn_info}' is closed, opening a new one"); return Ok(None); } else { - info!("pool: reusing connection '{conn_info}'"); - client.session.send(ctx.session_id)?; + tracing::Span::current().record("conn_id", tracing::field::display(client.conn_id)); tracing::Span::current().record( "pid", &tracing::field::display(client.inner.get_process_id()), ); + info!("pool: reusing connection '{conn_info}'"); + client.session.send(ctx.session_id)?; ctx.latency_timer.pool_hit(); ctx.latency_timer.success(); return Ok(Some(Client::new(client, conn_info.clone(), endpoint_pool))); @@ -577,7 +575,6 @@ pub struct Client { } pub struct Discard<'a, C: ClientInnerExt> { - conn_id: uuid::Uuid, conn_info: &'a ConnInfo, pool: &'a mut Weak>>, } @@ -603,14 +600,7 @@ impl Client { span: _, } = self; let inner = inner.as_mut().expect("client inner should not be removed"); - ( - &mut inner.inner, - Discard { - pool, - conn_info, - conn_id: inner.conn_id, - }, - ) + (&mut inner.inner, Discard { pool, conn_info }) } pub fn check_idle(&mut self, status: ReadyForQueryStatus) { @@ -625,13 +615,13 @@ impl Discard<'_, C> { pub fn check_idle(&mut self, status: ReadyForQueryStatus) { let conn_info = &self.conn_info; if status != ReadyForQueryStatus::Idle && std::mem::take(self.pool).strong_count() > 0 { - info!(conn_id = %self.conn_id, "pool: throwing away connection '{conn_info}' because connection is not idle") + info!("pool: throwing away connection '{conn_info}' because connection is not idle") } } pub fn discard(&mut self) { let conn_info = &self.conn_info; if std::mem::take(self.pool).strong_count() > 0 { - info!(conn_id = %self.conn_id, "pool: throwing away connection '{conn_info}' because connection is potentially in a broken state") + info!("pool: throwing away connection '{conn_info}' because connection is potentially in a broken state") } } } diff --git a/proxy/src/serverless/sql_over_http.rs b/proxy/src/serverless/sql_over_http.rs index e9f868d51e..ecb72abe73 100644 --- a/proxy/src/serverless/sql_over_http.rs +++ b/proxy/src/serverless/sql_over_http.rs @@ -36,6 +36,7 @@ use crate::error::ReportableError; use crate::metrics::HTTP_CONTENT_LENGTH; use crate::metrics::NUM_CONNECTION_REQUESTS_GAUGE; use crate::proxy::NeonOptions; +use crate::serverless::backend::HttpConnError; use crate::DbName; use crate::RoleName; @@ -305,7 +306,14 @@ pub async fn handle( Ok(response) } -#[instrument(name = "sql-over-http", fields(pid = tracing::field::Empty), skip_all)] +#[instrument( + name = "sql-over-http", + skip_all, + fields( + pid = tracing::field::Empty, + conn_id = tracing::field::Empty + ) +)] async fn handle_inner( config: &'static ProxyConfig, ctx: &mut RequestMonitoring, @@ -359,12 +367,10 @@ async fn handle_inner( let txn_read_only = headers.get(&TXN_READ_ONLY) == Some(&HEADER_VALUE_TRUE); let txn_deferrable = headers.get(&TXN_DEFERRABLE) == Some(&HEADER_VALUE_TRUE); - let paused = ctx.latency_timer.pause(); let request_content_length = match request.body().size_hint().upper() { Some(v) => v, None => MAX_REQUEST_SIZE + 1, }; - drop(paused); info!(request_content_length, "request size in bytes"); HTTP_CONTENT_LENGTH.observe(request_content_length as f64); @@ -380,15 +386,20 @@ async fn handle_inner( let body = hyper::body::to_bytes(request.into_body()) .await .map_err(anyhow::Error::from)?; + info!(length = body.len(), "request payload read"); let payload: Payload = serde_json::from_slice(&body)?; Ok::(payload) // Adjust error type accordingly }; let authenticate_and_connect = async { let keys = backend.authenticate(ctx, &conn_info).await?; - backend + let client = backend .connect_to_compute(ctx, conn_info, keys, !allow_pool) - .await + .await?; + // not strictly necessary to mark success here, + // but it's just insurance for if we forget it somewhere else + ctx.latency_timer.success(); + Ok::<_, HttpConnError>(client) }; // Run both operations in parallel @@ -420,6 +431,7 @@ async fn handle_inner( results } Payload::Batch(statements) => { + info!("starting transaction"); let (inner, mut discard) = client.inner(); let mut builder = inner.build_transaction(); if let Some(isolation_level) = txn_isolation_level { @@ -449,6 +461,7 @@ async fn handle_inner( .await { Ok(results) => { + info!("commit"); let status = transaction.commit().await.map_err(|e| { // if we cannot commit - for now don't return connection to pool // TODO: get a query status from the error @@ -459,6 +472,7 @@ async fn handle_inner( results } Err(err) => { + info!("rollback"); let status = transaction.rollback().await.map_err(|e| { // if we cannot rollback - for now don't return connection to pool // TODO: get a query status from the error @@ -533,8 +547,10 @@ async fn query_to_json( raw_output: bool, default_array_mode: bool, ) -> anyhow::Result<(ReadyForQueryStatus, Value)> { + info!("executing query"); let query_params = data.params; let row_stream = client.query_raw_txt(&data.query, query_params).await?; + info!("finished executing query"); // Manually drain the stream into a vector to leave row_stream hanging // around to get a command tag. Also check that the response is not too @@ -569,6 +585,13 @@ async fn query_to_json( } .and_then(|s| s.parse::().ok()); + info!( + rows = rows.len(), + ?ready, + command_tag, + "finished reading rows" + ); + let mut fields = vec![]; let mut columns = vec![];