From 81efb554f7127289d438b8162d30c48017512947 Mon Sep 17 00:00:00 2001 From: Conrad Ludgate Date: Thu, 18 Apr 2024 17:33:58 +0100 Subject: [PATCH] rely on channel cancellation signals on drop --- proxy/src/serverless/conn_pool.rs | 24 +----------------------- 1 file changed, 1 insertion(+), 23 deletions(-) diff --git a/proxy/src/serverless/conn_pool.rs b/proxy/src/serverless/conn_pool.rs index 0a135504ff..a79bd6f020 100644 --- a/proxy/src/serverless/conn_pool.rs +++ b/proxy/src/serverless/conn_pool.rs @@ -19,7 +19,6 @@ use tokio::sync::mpsc::error::TrySendError; use tokio::time::{Instant, Sleep}; use tokio_postgres::tls::NoTlsStream; use tokio_postgres::{AsyncMessage, ReadyForQueryStatus, Socket}; -use tokio_util::sync::{CancellationToken, WaitForCancellationFutureOwned}; use crate::console::messages::{ColdStartInfo, MetricsAuxInfo}; use crate::metrics::{HttpEndpointPoolsGuard, Metrics, NumDbConnectionsGuard}; @@ -536,12 +535,9 @@ pub fn poll_client + Send + 'static>( .map(|endpoint| global_pool.get_or_create_endpoint_pool(&endpoint)); let idle = global_pool.get_idle_timeout(); - let cancel = CancellationToken::new(); let (send_client, recv_client) = tokio::sync::mpsc::channel(1); let db_conn = DbConnection { - cancelled: cancel.clone().cancelled_owned(), - idle_timeout: tokio::time::sleep(idle), idle, @@ -562,7 +558,6 @@ pub fn poll_client + Send + 'static>( let inner = ClientInner { inner: client, pool: send_client, - cancel, aux, conn_id, }; @@ -571,10 +566,6 @@ pub fn poll_client + Send + 'static>( pin_project! { struct DbConnection { - // Used to close the current conn if the client is dropped - #[pin] - cancelled: WaitForCancellationFutureOwned, - // Used to close the current conn if it's idle #[pin] idle_timeout: Sleep, @@ -614,12 +605,6 @@ impl> Future for DbConnection { fn poll(self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll { let mut this = self.project(); - if this.cancelled.as_mut().poll(cx).is_ready() { - let _span = this.session_span.enter(); - info!("connection dropped"); - return Poll::Ready(()); - } - // node is initiated via EndpointConnPool::put. // this is only called in the if statement below. // this can only occur if pool is set (and pool is never removed). @@ -689,18 +674,10 @@ impl> Future for DbConnection { struct ClientInner { inner: C, pool: tokio::sync::mpsc::Sender<(tracing::Span, ClientInner, ConnInfo)>, - cancel: CancellationToken, aux: MetricsAuxInfo, conn_id: uuid::Uuid, } -impl Drop for ClientInner { - fn drop(&mut self) { - // on client drop, tell the conn to shut down - self.cancel.cancel(); - } -} - pub trait ClientInnerExt: Sync + Send + 'static { fn is_closed(&self) -> bool; fn get_process_id(&self) -> i32; @@ -829,6 +806,7 @@ impl Drop for Client { #[cfg(test)] mod tests { use tokio::task::yield_now; + use tokio_util::sync::CancellationToken; use crate::{BranchId, EndpointId, ProjectId};