From f40bf865756cb946e432b2b2b4c4a6a193dc71ba Mon Sep 17 00:00:00 2001 From: Conrad Ludgate Date: Thu, 18 Apr 2024 14:20:05 +0100 Subject: [PATCH] separate connection from poll client --- proxy/src/serverless/backend.rs | 4 +- proxy/src/serverless/conn_pool.rs | 107 +++++++++++++++++++----------- 2 files changed, 72 insertions(+), 39 deletions(-) diff --git a/proxy/src/serverless/backend.rs b/proxy/src/serverless/backend.rs index c89ebc3251..9267449b6f 100644 --- a/proxy/src/serverless/backend.rs +++ b/proxy/src/serverless/backend.rs @@ -16,7 +16,7 @@ use crate::{ proxy::connect_compute::ConnectMechanism, }; -use super::conn_pool::{poll_client, Client, ConnInfo, GlobalConnPool}; +use super::conn_pool::{poll_tokio_client, Client, ConnInfo, GlobalConnPool}; pub struct PoolingBackend { pub pool: Arc>, @@ -184,7 +184,7 @@ impl ConnectMechanism for TokioMechanism { drop(pause); tracing::Span::current().record("pid", &tracing::field::display(client.get_process_id())); - Ok(poll_client( + Ok(poll_tokio_client( self.pool.clone(), ctx, self.conn_info.clone(), diff --git a/proxy/src/serverless/conn_pool.rs b/proxy/src/serverless/conn_pool.rs index 0383cf83c7..6bb0dcd613 100644 --- a/proxy/src/serverless/conn_pool.rs +++ b/proxy/src/serverless/conn_pool.rs @@ -461,12 +461,61 @@ impl GlobalConnPool { } } -pub fn poll_client( +pub fn poll_tokio_client( + global_pool: Arc>, + ctx: &mut RequestMonitoring, + conn_info: ConnInfo, + client: tokio_postgres::Client, + mut connection: tokio_postgres::Connection, + conn_id: uuid::Uuid, + aux: MetricsAuxInfo, +) -> Client { + let connection = std::future::poll_fn(move |cx| { + loop { + let message = ready!(connection.poll_message(cx)); + match message { + Some(Ok(AsyncMessage::Notice(notice))) => { + info!("notice: {}", notice); + } + Some(Ok(AsyncMessage::Notification(notif))) => { + warn!( + pid = notif.process_id(), + channel = notif.channel(), + "notification received" + ); + } + Some(Ok(_)) => { + warn!("unknown message"); + } + Some(Err(e)) => { + error!("connection error: {}", e); + break; + } + None => { + info!("connection closed"); + break; + } + } + } + Poll::Ready(()) + }); + poll_client( + global_pool, + ctx, + conn_info, + client, + connection, + conn_id, + aux, + ) +} + +pub fn poll_client + Send + 'static>( global_pool: Arc>, ctx: &mut RequestMonitoring, conn_info: ConnInfo, client: C, - mut connection: tokio_postgres::Connection, + connection: I, conn_id: uuid::Uuid, aux: MetricsAuxInfo, ) -> Client { @@ -476,8 +525,9 @@ pub fn poll_client( let span = info_span!(parent: None, "connection", %conn_id); let cold_start_info = ctx.cold_start_info; - span.in_scope(|| { - info!(cold_start_info = cold_start_info.as_str(), %conn_info, %session_id, "new connection"); + let session_span = info_span!(parent: span.clone(), "", %session_id); + session_span.in_scope(|| { + info!(cold_start_info = cold_start_info.as_str(), %conn_info, "new connection"); }); let pool = match conn_info.endpoint_cache_key() { Some(endpoint) => Arc::downgrade(&global_pool.get_or_create_endpoint_pool(&endpoint)), @@ -493,16 +543,14 @@ pub fn poll_client( idle, db_user: conn_info.db_and_user(), pool: pool.clone(), - session_id, + session_span, session_rx: rx, conn_gauge, conn_id, connection, }; - tokio::spawn(async move { - db_conn.instrument(span).await; - }); + tokio::spawn(db_conn.instrument(span)); let inner = ClientInner { inner: client, @@ -515,7 +563,7 @@ pub fn poll_client( } pin_project! { - struct DbConnection { + struct DbConnection { #[pin] cancelled: WaitForCancellationFutureOwned, @@ -526,40 +574,47 @@ pin_project! { db_user: (DbName, RoleName), pool: Weak>>, - session_id: uuid::Uuid, + session_span: tracing::Span, session_rx: tokio::sync::watch::Receiver, conn_gauge: NumDbConnectionsGuard<'static>, conn_id: uuid::Uuid, - connection: tokio_postgres::Connection, + #[pin] + connection: Inner, } } -impl Future for DbConnection { +impl> Future for DbConnection { type Output = (); fn poll(self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll { let mut this = self.project(); if this.cancelled.as_mut().poll(cx).is_ready() { + let _span = this.session_span.enter(); info!("connection dropped"); return Poll::Ready(()); } match this.session_rx.has_changed() { Ok(true) => { - *this.session_id = *this.session_rx.borrow_and_update(); - info!(%this.session_id, "changed session"); + let session_id = *this.session_rx.borrow_and_update(); + *this.session_span = info_span!("", %session_id); + let _span = this.session_span.enter(); + info!("changed session"); this.idle_timeout .as_mut() .reset(Instant::now() + *this.idle); } Err(_) => { + let _span = this.session_span.enter(); info!("connection dropped"); return Poll::Ready(()); } _ => {} } + let _span = this.session_span.enter(); + // 5 minute idle connection timeout if this.idle_timeout.as_mut().poll(cx).is_ready() { this.idle_timeout @@ -578,29 +633,7 @@ impl Future for DbConnection { } } - loop { - let message = ready!(this.connection.poll_message(cx)); - - match message { - Some(Ok(AsyncMessage::Notice(notice))) => { - info!(session_id = %this.session_id, "notice: {}", notice); - } - Some(Ok(AsyncMessage::Notification(notif))) => { - warn!(session_id = %this.session_id, pid = notif.process_id(), channel = notif.channel(), "notification received"); - } - Some(Ok(_)) => { - warn!(session_id = %this.session_id, "unknown message"); - } - Some(Err(e)) => { - error!(session_id = %this.session_id, "connection error: {}", e); - break; - } - None => { - info!("connection closed"); - break; - } - } - } + ready!(this.connection.poll(cx)); // remove from connection pool if let Some(pool) = this.pool.upgrade() {