diff --git a/proxy/src/serverless/backend.rs b/proxy/src/serverless/backend.rs index 9267449b6f..ea63f26dd9 100644 --- a/proxy/src/serverless/backend.rs +++ b/proxy/src/serverless/backend.rs @@ -187,7 +187,7 @@ impl ConnectMechanism for TokioMechanism { Ok(poll_tokio_client( self.pool.clone(), ctx, - self.conn_info.clone(), + &self.conn_info, client, connection, self.conn_id, diff --git a/proxy/src/serverless/conn_pool.rs b/proxy/src/serverless/conn_pool.rs index a79bd6f020..466a90133a 100644 --- a/proxy/src/serverless/conn_pool.rs +++ b/proxy/src/serverless/conn_pool.rs @@ -27,7 +27,7 @@ use crate::{ auth::backend::ComputeUserInfo, context::RequestMonitoring, DbName, EndpointCacheKey, RoleName, }; -use tracing::{debug, error, warn, Span}; +use tracing::{debug, error, warn}; use tracing::{info, info_span, Instrument}; use super::backend::HttpConnError; @@ -136,10 +136,7 @@ impl EndpointConnPool { node: Pin<&mut Node>>, db_user: &(DbName, RoleName), client: ClientInner, - conn_info: ConnInfo, ) -> bool { - let conn_id = client.conn_id; - { let pool = pool.read(); if pool @@ -147,7 +144,7 @@ impl EndpointConnPool { .load(atomic::Ordering::Relaxed) >= pool.global_pool_size_max_conns { - info!(%conn_id, "pool: throwing away connection '{conn_info}' because pool is full"); + info!("pool: throwing away connection because pool is full"); return false; } } @@ -188,9 +185,9 @@ impl EndpointConnPool { // do logging outside of the mutex if returned { - info!(%conn_id, "pool: returning connection '{conn_info}' back to the pool, total_conns={total_conns}, for this (db, user)={per_db_size}"); + info!("pool: returning connection back to the pool, total_conns={total_conns}, for this (db, user)={per_db_size}"); } else { - info!(%conn_id, "pool: throwing away connection '{conn_info}' because pool is full, total_conns={total_conns}"); + info!("pool: throwing away connection because pool is full, total_conns={total_conns}"); } returned @@ -402,7 +399,7 @@ impl GlobalConnPool { ); ctx.set_cold_start_info(ColdStartInfo::HttpPoolHit); ctx.latency_timer.success(); - return Ok(Some(Client::new(client, conn_info.clone()))); + return Ok(Some(Client::new(client))); } } Ok(None) @@ -465,7 +462,7 @@ type ConnTypes = dyn pin_list::Types< pub fn poll_tokio_client( global_pool: Arc>, ctx: &mut RequestMonitoring, - conn_info: ConnInfo, + conn_info: &ConnInfo, client: tokio_postgres::Client, mut connection: tokio_postgres::Connection, conn_id: uuid::Uuid, @@ -514,7 +511,7 @@ pub fn poll_tokio_client( pub fn poll_client + Send + 'static>( global_pool: Arc>, ctx: &mut RequestMonitoring, - conn_info: ConnInfo, + conn_info: &ConnInfo, client: C, connection: I, conn_id: uuid::Uuid, @@ -561,7 +558,7 @@ pub fn poll_client + Send + 'static>( aux, conn_id, }; - Client::new(inner, conn_info) + Client::new(inner) } pin_project! { @@ -574,7 +571,7 @@ pin_project! { // Used to add/remove conn from the conn pool #[pin] node: Node>, - recv_client: tokio::sync::mpsc::Receiver<(tracing::Span, ClientInner, ConnInfo)>, + recv_client: tokio::sync::mpsc::Receiver>, db_user: (DbName, RoleName), pool: Option>>>, @@ -605,11 +602,19 @@ impl> Future for DbConnection { fn poll(self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll { let mut this = self.project(); - // node is initiated via EndpointConnPool::put. - // this is only called in the if statement below. - // this can only occur if pool is set (and pool is never removed). - // when this occurs, it guarantees that the DbUserConnPool is created (it is never removed). + + // Update the session span. + // If the node is initialised, then it is either + // 1. Waiting in the idle pool + // 2. Just removed from the idle pool and this is our first wake up. + // + // In the event of 1, nothing happens. (should not have many wakeups while idle) + // In the event of 2, we remove the session_id that was left in it's place. if let Some(init) = this.node.as_mut().initialized_mut() { + // node is initiated via EndpointConnPool::put. + // this is only called in the if statement below. + // this can only occur if pool is set (and pool is never removed). + // when this occurs, it guarantees that the DbUserConnPool is created (it is never removed). let pool = this .pool .as_ref() @@ -619,6 +624,7 @@ impl> Future for DbConnection { .pools .get(this.db_user) .expect("node cannot be init without pool"); + if let Ok((session_id, _)) = init.take_removed(&pool.conns) { *this.session_span = info_span!("", %session_id); let _span = this.session_span.enter(); @@ -629,9 +635,10 @@ impl> Future for DbConnection { }; } + // The client has been returned. We will insert it into the linked list for this database. if let Poll::Ready(client) = this.recv_client.poll_recv(cx) { // if the send_client is dropped, then the client is dropped - let Some((span, client, conn_info)) = client else { + let Some(client) = client else { let _span = this.session_span.enter(); info!("connection dropped"); return Poll::Ready(()); @@ -642,8 +649,8 @@ impl> Future for DbConnection { return Poll::Ready(()); }; - let _span = span.enter(); - if !EndpointConnPool::put(pool, this.node.as_mut(), this.db_user, client, conn_info) { + let _span = this.session_span.enter(); + if !EndpointConnPool::put(pool, this.node.as_mut(), this.db_user, client) { return Poll::Ready(()); } } @@ -673,7 +680,7 @@ impl> Future for DbConnection { struct ClientInner { inner: C, - pool: tokio::sync::mpsc::Sender<(tracing::Span, ClientInner, ConnInfo)>, + pool: tokio::sync::mpsc::Sender>, aux: MetricsAuxInfo, conn_id: uuid::Uuid, } @@ -709,55 +716,41 @@ impl Client { } pub struct Client { - span: Span, inner: Option>, - conn_info: ConnInfo, discarded: bool, } pub struct Discard<'a> { - conn_info: &'a ConnInfo, + conn_id: uuid::Uuid, discarded: &'a mut bool, } impl Client { - pub(self) fn new(inner: ClientInner, conn_info: ConnInfo) -> Self { + pub(self) fn new(inner: ClientInner) -> Self { Self { inner: Some(inner), - span: Span::current(), - conn_info, discarded: false, } } pub fn inner(&mut self) -> (&mut C, Discard<'_>) { - let Self { - inner, - conn_info, - span: _, - discarded, - } = self; + let Self { inner, discarded } = self; let inner = inner.as_mut().expect("client inner should not be removed"); - ( - &mut inner.inner, - Discard { - discarded, - conn_info, - }, - ) + let conn_id = inner.conn_id; + (&mut inner.inner, Discard { discarded, conn_id }) } } impl Discard<'_> { pub fn check_idle(&mut self, status: ReadyForQueryStatus) { - let conn_info = &self.conn_info; + let conn_id = &self.conn_id; if status != ReadyForQueryStatus::Idle && !*self.discarded { - info!("pool: throwing away connection '{conn_info}' because connection is not idle") + info!(%conn_id, "pool: throwing away connection because connection is not idle") } } pub fn discard(&mut self) { - let conn_info = &self.conn_info; + let conn_id = &self.conn_id; *self.discarded = true; - info!("pool: throwing away connection '{conn_info}' because connection is potentially in a broken state") + info!(%conn_id, "pool: throwing away connection because connection is potentially in a broken state") } } @@ -779,7 +772,6 @@ impl Drop for Client { return; } - let conn_info = self.conn_info.clone(); let client = self .inner .take() @@ -788,12 +780,12 @@ impl Drop for Client { let conn_id = client.conn_id; if client.is_closed() { - info!(%conn_id, "pool: throwing away connection '{conn_info}' because connection is closed"); + info!(%conn_id, "pool: throwing away connection because connection is closed"); return; } let tx = client.pool.clone(); - match tx.try_send((self.span.clone(), client, conn_info)) { + match tx.try_send(client) { Ok(_) => {} Err(TrySendError::Closed(_)) => {} Err(TrySendError::Full(_)) => { @@ -824,7 +816,7 @@ mod tests { fn create_inner( global_pool: Arc>, - conn_info: ConnInfo, + conn_info: &ConnInfo, ) -> (Client, CancellationToken) { let cancelled = CancellationToken::new(); let client = poll_client( @@ -869,7 +861,7 @@ mod tests { password: "password".as_bytes().into(), }; { - let (mut client, _) = create_inner(pool.clone(), conn_info.clone()); + let (mut client, _) = create_inner(pool.clone(), &conn_info); assert_eq!(0, pool.get_global_connections_count()); client.inner().1.discard(); drop(client); @@ -878,13 +870,13 @@ mod tests { assert_eq!(0, pool.get_global_connections_count()); } { - let (client, _) = create_inner(pool.clone(), conn_info.clone()); + let (client, _) = create_inner(pool.clone(), &conn_info); drop(client); yield_now().await; assert_eq!(1, pool.get_global_connections_count()); } { - let (client, cancel) = create_inner(pool.clone(), conn_info.clone()); + let (client, cancel) = create_inner(pool.clone(), &conn_info); cancel.cancel(); drop(client); yield_now().await; @@ -892,7 +884,7 @@ mod tests { assert_eq!(1, pool.get_global_connections_count()); } let cancel = { - let (client, cancel) = create_inner(pool.clone(), conn_info.clone()); + let (client, cancel) = create_inner(pool.clone(), &conn_info); drop(client); yield_now().await; // The client should be added to the pool. @@ -900,7 +892,7 @@ mod tests { cancel }; { - let client = create_inner(pool.clone(), conn_info.clone()); + let client = create_inner(pool.clone(), &conn_info); drop(client); yield_now().await; // The client shouldn't be added to the pool. Because the ep-pool is full. @@ -917,13 +909,13 @@ mod tests { password: "password".as_bytes().into(), }; { - let client = create_inner(pool.clone(), conn_info.clone()); + let client = create_inner(pool.clone(), &conn_info); drop(client); yield_now().await; assert_eq!(3, pool.get_global_connections_count()); } { - let client = create_inner(pool.clone(), conn_info.clone()); + let client = create_inner(pool.clone(), &conn_info); drop(client); yield_now().await; // The client shouldn't be added to the pool. Because the global pool is full.