From ec10838aa4cbc0ece2d7874c7d55151dfbd50118 Mon Sep 17 00:00:00 2001 From: Conrad Ludgate Date: Fri, 18 Aug 2023 11:44:08 +0100 Subject: [PATCH] proxy: pool connection logs (#5020) ## Problem Errors and notices that happen during a pooled connection lifecycle have no session identifiers ## Summary of changes Using a watch channel, we set the session ID whenever it changes. This way we can see the status of a connection for that session Also, adding a connection id to be able to search the entire connection lifecycle --- proxy/src/auth/backend/classic.rs | 1 - proxy/src/http/conn_pool.rs | 107 +++++++++++++++++++++++------- proxy/src/http/sql_over_http.rs | 17 +++-- proxy/src/http/websocket.rs | 4 +- 4 files changed, 95 insertions(+), 34 deletions(-) diff --git a/proxy/src/auth/backend/classic.rs b/proxy/src/auth/backend/classic.rs index 15d6f88203..9a056f1445 100644 --- a/proxy/src/auth/backend/classic.rs +++ b/proxy/src/auth/backend/classic.rs @@ -62,7 +62,6 @@ pub(super) async fn authenticate( } }; - info!("compute node's state has likely changed; requesting a wake-up"); let mut num_retries = 0; let mut node = loop { let wake_res = api.wake_compute(extra, creds).await; diff --git a/proxy/src/http/conn_pool.rs b/proxy/src/http/conn_pool.rs index 9bba846d57..180d10940c 100644 --- a/proxy/src/http/conn_pool.rs +++ b/proxy/src/http/conn_pool.rs @@ -1,16 +1,21 @@ use anyhow::Context; use async_trait::async_trait; use dashmap::DashMap; +use futures::future::poll_fn; use parking_lot::RwLock; use pbkdf2::{ password_hash::{PasswordHashString, PasswordHasher, PasswordVerifier, SaltString}, Params, Pbkdf2, }; use pq_proto::StartupMessageParams; -use std::fmt; use std::sync::atomic::{self, AtomicUsize}; use std::{collections::HashMap, sync::Arc}; +use std::{ + fmt, + task::{ready, Poll}, +}; use tokio::time; +use tokio_postgres::AsyncMessage; use crate::{auth, console}; use crate::{compute, config}; @@ -19,8 +24,8 @@ use super::sql_over_http::MAX_RESPONSE_SIZE; use crate::proxy::ConnectMechanism; -use tracing::error; -use tracing::info; +use tracing::{error, warn}; +use tracing::{info, info_span, Instrument}; pub const APP_NAME: &str = "sql_over_http"; const MAX_CONNS_PER_ENDPOINT: usize = 20; @@ -48,7 +53,7 @@ impl fmt::Display for ConnInfo { } struct ConnPoolEntry { - conn: tokio_postgres::Client, + conn: Client, _last_access: std::time::Instant, } @@ -110,8 +115,9 @@ impl GlobalConnPool { &self, conn_info: &ConnInfo, force_new: bool, - ) -> anyhow::Result { - let mut client: Option = None; + session_id: uuid::Uuid, + ) -> anyhow::Result { + let mut client: Option = None; let mut hash_valid = false; if !force_new { @@ -153,16 +159,17 @@ impl GlobalConnPool { // ok return cached connection if found and establish a new one otherwise let new_client = if let Some(client) = client { - if client.is_closed() { + if client.inner.is_closed() { info!("pool: cached connection '{conn_info}' is closed, opening a new one"); - connect_to_compute(self.proxy_config, conn_info).await + connect_to_compute(self.proxy_config, conn_info, session_id).await } else { info!("pool: reusing connection '{conn_info}'"); + client.session.send(session_id)?; return Ok(client); } } else { info!("pool: opening a new connection '{conn_info}'"); - connect_to_compute(self.proxy_config, conn_info).await + connect_to_compute(self.proxy_config, conn_info, session_id).await }; match &new_client { @@ -201,11 +208,7 @@ impl GlobalConnPool { new_client } - pub async fn put( - &self, - conn_info: &ConnInfo, - client: tokio_postgres::Client, - ) -> anyhow::Result<()> { + pub async fn put(&self, conn_info: &ConnInfo, client: Client) -> anyhow::Result<()> { let pool = self.get_or_create_endpoint_pool(&conn_info.hostname); // return connection to the pool @@ -282,11 +285,12 @@ impl GlobalConnPool { struct TokioMechanism<'a> { conn_info: &'a ConnInfo, + session_id: uuid::Uuid, } #[async_trait] impl ConnectMechanism for TokioMechanism<'_> { - type Connection = tokio_postgres::Client; + type Connection = Client; type ConnectError = tokio_postgres::Error; type Error = anyhow::Error; @@ -295,7 +299,7 @@ impl ConnectMechanism for TokioMechanism<'_> { node_info: &console::CachedNodeInfo, timeout: time::Duration, ) -> Result { - connect_to_compute_once(node_info, self.conn_info, timeout).await + connect_to_compute_once(node_info, self.conn_info, timeout, self.session_id).await } fn update_connect_config(&self, _config: &mut compute::ConnCfg) {} @@ -308,7 +312,8 @@ impl ConnectMechanism for TokioMechanism<'_> { async fn connect_to_compute( config: &config::ProxyConfig, conn_info: &ConnInfo, -) -> anyhow::Result { + session_id: uuid::Uuid, +) -> anyhow::Result { let tls = config.tls_config.as_ref(); let common_names = tls.and_then(|tls| tls.common_names.clone()); @@ -339,17 +344,27 @@ async fn connect_to_compute( .await? .context("missing cache entry from wake_compute")?; - crate::proxy::connect_to_compute(&TokioMechanism { conn_info }, node_info, &extra, &creds).await + crate::proxy::connect_to_compute( + &TokioMechanism { + conn_info, + session_id, + }, + node_info, + &extra, + &creds, + ) + .await } async fn connect_to_compute_once( node_info: &console::CachedNodeInfo, conn_info: &ConnInfo, timeout: time::Duration, -) -> Result { + mut session: uuid::Uuid, +) -> Result { let mut config = (*node_info.config).clone(); - let (client, connection) = config + let (client, mut connection) = config .user(&conn_info.username) .password(&conn_info.password) .dbname(&conn_info.dbname) @@ -358,11 +373,53 @@ async fn connect_to_compute_once( .connect(tokio_postgres::NoTls) .await?; - tokio::spawn(async move { - if let Err(e) = connection.await { - error!("connection error: {}", e); - } + let (tx, mut rx) = tokio::sync::watch::channel(session); + + let conn_id = uuid::Uuid::new_v4(); + let span = info_span!(parent: None, "connection", %conn_info, %conn_id); + span.in_scope(|| { + info!(%session, "new connection"); }); - Ok(client) + tokio::spawn( + poll_fn(move |cx| { + if matches!(rx.has_changed(), Ok(true)) { + session = *rx.borrow_and_update(); + info!(%session, "changed session"); + } + + let message = ready!(connection.poll_message(cx)); + + match message { + Some(Ok(AsyncMessage::Notice(notice))) => { + info!(%session, "notice: {}", notice); + Poll::Pending + } + Some(Ok(AsyncMessage::Notification(notif))) => { + warn!(%session, pid = notif.process_id(), channel = notif.channel(), "notification received"); + Poll::Pending + } + Some(Ok(_)) => { + warn!(%session, "unknown message"); + Poll::Pending + } + Some(Err(e)) => { + error!(%session, "connection error: {}", e); + Poll::Ready(()) + } + None => Poll::Ready(()), + } + }) + .instrument(span) + ); + + Ok(Client { + inner: client, + session: tx, + }) +} + +pub struct Client { + pub inner: tokio_postgres::Client, + session: tokio::sync::watch::Sender, } diff --git a/proxy/src/http/sql_over_http.rs b/proxy/src/http/sql_over_http.rs index 33375e63e9..4470996c04 100644 --- a/proxy/src/http/sql_over_http.rs +++ b/proxy/src/http/sql_over_http.rs @@ -16,6 +16,7 @@ use tokio_postgres::types::Type; use tokio_postgres::GenericClient; use tokio_postgres::IsolationLevel; use tokio_postgres::Row; +use tracing::Instrument; use url::Url; use super::conn_pool::ConnInfo; @@ -181,6 +182,7 @@ pub async fn handle( request: Request, sni_hostname: Option, conn_pool: Arc, + session_id: uuid::Uuid, ) -> anyhow::Result<(Value, HashMap)> { // // Determine the destination and connection params @@ -230,18 +232,18 @@ pub async fn handle( let body = hyper::body::to_bytes(request.into_body()).await?; let payload: Payload = serde_json::from_slice(&body)?; - let mut client = conn_pool.get(&conn_info, !allow_pool).await?; + let mut client = conn_pool.get(&conn_info, !allow_pool, session_id).await?; // // Now execute the query and return the result // let result = match payload { - Payload::Single(query) => query_to_json(&client, query, raw_output, array_mode) + Payload::Single(query) => query_to_json(&client.inner, query, raw_output, array_mode) .await .map(|x| (x, HashMap::default())), Payload::Batch(batch_query) => { let mut results = Vec::new(); - let mut builder = client.build_transaction(); + let mut builder = client.inner.build_transaction(); if let Some(isolation_level) = txn_isolation_level { builder = builder.isolation_level(isolation_level); } @@ -285,9 +287,12 @@ pub async fn handle( if allow_pool { // return connection to the pool - tokio::task::spawn(async move { - let _ = conn_pool.put(&conn_info, client).await; - }); + tokio::task::spawn( + async move { + let _ = conn_pool.put(&conn_info, client).await; + } + .in_current_span(), + ); } result diff --git a/proxy/src/http/websocket.rs b/proxy/src/http/websocket.rs index 794da17929..ba158dfca3 100644 --- a/proxy/src/http/websocket.rs +++ b/proxy/src/http/websocket.rs @@ -203,7 +203,7 @@ async fn ws_handler( // TODO: that deserves a refactor as now this function also handles http json client besides websockets. // Right now I don't want to blow up sql-over-http patch with file renames and do that as a follow up instead. } else if request.uri().path() == "/sql" && request.method() == Method::POST { - let result = sql_over_http::handle(request, sni_hostname, conn_pool) + let result = sql_over_http::handle(request, sni_hostname, conn_pool, session_id) .instrument(info_span!("sql-over-http")) .await; let status_code = match result { @@ -307,7 +307,7 @@ pub async fn task_main( ws_handler(req, config, conn_pool, cancel_map, session_id, sni_name) .instrument(info_span!( "ws-client", - session = format_args!("{session_id}") + session = %session_id )) .await }