diff --git a/proxy/src/http/conn_pool.rs b/proxy/src/http/conn_pool.rs index 7a5f00d627..5218a44479 100644 --- a/proxy/src/http/conn_pool.rs +++ b/proxy/src/http/conn_pool.rs @@ -191,22 +191,39 @@ impl GlobalConnPool { // ok return cached connection if found and establish a new one otherwise let new_client = if let Some(client) = client { if client.inner.is_closed() { - info!("pool: cached connection '{conn_info}' is closed, opening a new one"); - connect_to_compute(self.proxy_config, conn_info, session_id, latency_timer).await + let conn_id = uuid::Uuid::new_v4(); + info!(%conn_id, "pool: cached connection '{conn_info}' is closed, opening a new one"); + connect_to_compute( + self.proxy_config, + conn_info, + conn_id, + session_id, + latency_timer, + ) + .await } else { info!("pool: reusing connection '{conn_info}'"); client.session.send(session_id)?; latency_timer.pool_hit(); latency_timer.success(); return Ok(Client { + conn_id: client.conn_id, inner: Some(client), span: Span::current(), pool, }); } } else { - info!("pool: opening a new connection '{conn_info}'"); - connect_to_compute(self.proxy_config, conn_info, session_id, latency_timer).await + let conn_id = uuid::Uuid::new_v4(); + info!(%conn_id, "pool: opening a new connection '{conn_info}'"); + connect_to_compute( + self.proxy_config, + conn_info, + conn_id, + session_id, + latency_timer, + ) + .await }; match &new_client { @@ -243,6 +260,7 @@ impl GlobalConnPool { } new_client.map(|inner| Client { + conn_id: inner.conn_id, inner: Some(inner), span: Span::current(), pool, @@ -250,16 +268,18 @@ impl GlobalConnPool { } fn put(&self, conn_info: &ConnInfo, client: ClientInner) -> anyhow::Result<()> { + let conn_id = client.conn_id; + // We want to hold this open while we return. This ensures that the pool can't close // while we are in the middle of returning the connection. let closed = self.closed.read(); if *closed { - info!("pool: throwing away connection '{conn_info}' because pool is closed"); + info!(%conn_id, "pool: throwing away connection '{conn_info}' because pool is closed"); return Ok(()); } if client.inner.is_closed() { - info!("pool: throwing away connection '{conn_info}' because connection is closed"); + info!(%conn_id, "pool: throwing away connection '{conn_info}' because connection is closed"); return Ok(()); } @@ -291,9 +311,9 @@ impl GlobalConnPool { // do logging outside of the mutex if returned { - info!("pool: returning connection '{conn_info}' back to the pool, total_conns={total_conns}, for this (db, user)={per_db_size}"); + info!(%conn_id, "pool: returning connection '{conn_info}' back to the pool, total_conns={total_conns}, for this (db, user)={per_db_size}"); } else { - info!("pool: throwing away connection '{conn_info}' because pool is full, total_conns={total_conns}"); + info!(%conn_id, "pool: throwing away connection '{conn_info}' because pool is full, total_conns={total_conns}"); } Ok(()) @@ -340,6 +360,7 @@ impl GlobalConnPool { struct TokioMechanism<'a> { conn_info: &'a ConnInfo, session_id: uuid::Uuid, + conn_id: uuid::Uuid, } #[async_trait] @@ -353,7 +374,14 @@ impl ConnectMechanism for TokioMechanism<'_> { node_info: &console::CachedNodeInfo, timeout: time::Duration, ) -> Result { - connect_to_compute_once(node_info, self.conn_info, timeout, self.session_id).await + connect_to_compute_once( + node_info, + self.conn_info, + timeout, + self.conn_id, + self.session_id, + ) + .await } fn update_connect_config(&self, _config: &mut compute::ConnCfg) {} @@ -366,6 +394,7 @@ impl ConnectMechanism for TokioMechanism<'_> { async fn connect_to_compute( config: &config::ProxyConfig, conn_info: &ConnInfo, + conn_id: uuid::Uuid, session_id: uuid::Uuid, latency_timer: LatencyTimer, ) -> anyhow::Result { @@ -401,6 +430,7 @@ async fn connect_to_compute( crate::proxy::connect_to_compute( &TokioMechanism { + conn_id, conn_info, session_id, }, @@ -416,6 +446,7 @@ async fn connect_to_compute_once( node_info: &console::CachedNodeInfo, conn_info: &ConnInfo, timeout: time::Duration, + conn_id: uuid::Uuid, mut session: uuid::Uuid, ) -> Result { let mut config = (*node_info.config).clone(); @@ -430,7 +461,6 @@ async fn connect_to_compute_once( let (tx, mut rx) = tokio::sync::watch::channel(session); - let conn_id = uuid::Uuid::new_v4(); let span = info_span!(parent: None, "connection", %conn_id); span.in_scope(|| { info!(%conn_info, %session, "new connection"); @@ -484,6 +514,7 @@ async fn connect_to_compute_once( inner: client, session: tx, ids, + conn_id, }) } @@ -491,6 +522,7 @@ struct ClientInner { inner: tokio_postgres::Client, session: tokio::sync::watch::Sender, ids: Ids, + conn_id: uuid::Uuid, } impl Client { @@ -500,12 +532,14 @@ impl Client { } pub struct Client { + conn_id: uuid::Uuid, span: Span, inner: Option, pool: Option<(ConnInfo, Arc)>, } pub struct Discard<'a> { + conn_id: uuid::Uuid, pool: &'a mut Option<(ConnInfo, Arc)>, } @@ -514,6 +548,7 @@ impl Client { let Self { inner, pool, + conn_id, span: _, } = self; ( @@ -521,7 +556,10 @@ impl Client { .as_mut() .expect("client inner should not be removed") .inner, - Discard { pool }, + Discard { + pool, + conn_id: *conn_id, + }, ) } @@ -537,13 +575,13 @@ impl Discard<'_> { pub fn check_idle(&mut self, status: ReadyForQueryStatus) { if status != ReadyForQueryStatus::Idle { if let Some((conn_info, _)) = self.pool.take() { - info!("pool: throwing away connection '{conn_info}' because connection is not idle") + info!(conn_id = %self.conn_id, "pool: throwing away connection '{conn_info}' because connection is not idle") } } } pub fn discard(&mut self) { if let Some((conn_info, _)) = self.pool.take() { - info!("pool: throwing away connection '{conn_info}' because connection is potentially in a broken state") + info!(conn_id = %self.conn_id, "pool: throwing away connection '{conn_info}' because connection is potentially in a broken state") } } } diff --git a/test_runner/regress/test_proxy.py b/test_runner/regress/test_proxy.py index a33b29549c..6388e9f365 100644 --- a/test_runner/regress/test_proxy.py +++ b/test_runner/regress/test_proxy.py @@ -1,5 +1,6 @@ import json import subprocess +import time from typing import Any, List, Optional, Tuple import psycopg2 @@ -364,10 +365,14 @@ def test_sql_over_http_pool(static_proxy: NeonProxy): pid1 = get_pid(200, "http")["rows"][0]["pid"] + time.sleep(0.02) + # query should be on the same connection rows = get_pid(200, "http")["rows"] assert rows == [{"pid": pid1}] + time.sleep(0.02) + # incorrect password should not work res = get_pid(400, "foobar") assert "password authentication failed for user" in res["message"] @@ -378,10 +383,14 @@ def test_sql_over_http_pool(static_proxy: NeonProxy): pid2 = get_pid(200, "http2")["rows"][0]["pid"] assert pid1 != pid2 + time.sleep(0.02) + # query should be on an existing connection pid = get_pid(200, "http2")["rows"][0]["pid"] assert pid in [pid1, pid2] + time.sleep(0.02) + # old password should not work res = get_pid(400, "http") assert "password authentication failed for user" in res["message"] @@ -419,6 +428,7 @@ def test_sql_over_http_pool_idle(static_proxy: NeonProxy): ) pid1 = query(200, GET_CONNECTION_PID_QUERY)["rows"][0]["pid"] + time.sleep(0.02) query(200, "BEGIN") pid2 = query(200, GET_CONNECTION_PID_QUERY)["rows"][0]["pid"] assert pid1 != pid2