From 5c15f00d8645deeae6c9eb08f5a7f2ccd9f91ed6 Mon Sep 17 00:00:00 2001 From: Conrad Ludgate Date: Wed, 1 May 2024 13:01:34 +0100 Subject: [PATCH] refactor timeout --- proxy/src/serverless/conn_pool.rs | 73 ++++++++++++++++--------------- 1 file changed, 37 insertions(+), 36 deletions(-) diff --git a/proxy/src/serverless/conn_pool.rs b/proxy/src/serverless/conn_pool.rs index 99c6fdb534..76c9f44e0d 100644 --- a/proxy/src/serverless/conn_pool.rs +++ b/proxy/src/serverless/conn_pool.rs @@ -16,7 +16,7 @@ use std::{ sync::atomic::{self, AtomicUsize}, }; use tokio::sync::mpsc::error::TrySendError; -use tokio::time::{Instant, Sleep}; +use tokio::time::Sleep; use tokio_postgres::tls::NoTlsStream; use tokio_postgres::{AsyncMessage, ReadyForQueryStatus, Socket}; @@ -448,7 +448,6 @@ type ConnTypes = dyn pin_list::Types< Protected = ConnPoolEntry, // session ID Removed = uuid::Uuid, - // conn ID Unprotected = (), >; @@ -528,7 +527,7 @@ pub fn poll_client + Send + 'static>( let (send_client, recv_client) = tokio::sync::mpsc::channel(1); let db_conn = DbConnection { - idle_timeout: tokio::time::sleep(idle), + idle_timeout: None, idle, node: Node::>::new(), @@ -539,7 +538,6 @@ pub fn poll_client + Send + 'static>( session_span, conn_gauge, - conn_id, connection, }; @@ -558,7 +556,7 @@ pin_project! { struct DbConnection { // Used to close the current conn if it's idle #[pin] - idle_timeout: Sleep, + idle_timeout: Option, idle: tokio::time::Duration, // Used to add/remove conn from the conn pool @@ -573,7 +571,6 @@ pin_project! { // Static connection state conn_gauge: NumDbConnectionsGuard<'static>, - conn_id: uuid::Uuid, #[pin] connection: Inner, } @@ -612,27 +609,48 @@ impl> Future for DbConnection { .pool .as_ref() .expect("node cannot be init without pool"); - let pool = pool.read(); - let pool = pool + + let mut pool_lock = pool.write(); + let db_pool = pool_lock .pools .get(this.db_user) .expect("node cannot be init without pool"); - if let Ok((session_id, _)) = init.take_removed(&pool.conns) { - *this.session_span = info_span!("", %session_id); - let _span = this.session_span.enter(); - info!("changed session"); - this.idle_timeout - .as_mut() - .reset(Instant::now() + *this.idle); - }; + match init.take_removed(&db_pool.conns) { + Ok((session_id, _)) => { + *this.session_span = info_span!("", %session_id); + let _span = this.session_span.enter(); + info!("changed session"); + + // this connection is no longer idle + this.idle_timeout.set(None); + } + Err(init) => { + let idle = this + .idle_timeout + .as_mut() + .as_pin_mut() + .expect("timer must be set if node is init"); + + if idle.poll(cx).is_ready() { + info!("connection idle"); + + // remove client from pool - should close the connection if it's idle. + // does nothing if the client is currently checked-out and in-use + if pool_lock.remove_client(this.db_user.clone(), init) { + info!("closed connection removed"); + } + } + } + } } + let _span = this.session_span.enter(); + // The client has been returned. We will insert it into the linked list for this database. if let Poll::Ready(client) = this.recv_client.poll_recv(cx) { // if the send_client is dropped, then the client is dropped let Some(client) = client else { - let _span = this.session_span.enter(); info!("connection dropped"); return Poll::Ready(()); }; @@ -642,29 +660,12 @@ impl> Future for DbConnection { return Poll::Ready(()); }; - let _span = this.session_span.enter(); if !EndpointConnPool::put(pool, this.node.as_mut(), this.db_user, client) { return Poll::Ready(()); } - } - let _span = this.session_span.enter(); - - // 5 minute idle connection timeout - if this.idle_timeout.as_mut().poll(cx).is_ready() { - this.idle_timeout - .as_mut() - .reset(Instant::now() + *this.idle); - info!("connection idle"); - if let Some(pool) = &this.pool { - // remove client from pool - should close the connection if it's idle. - // does nothing if the client is currently checked-out and in-use - if let Some(init) = this.node.as_mut().initialized_mut() { - if pool.write().remove_client(this.db_user.clone(), init) { - info!("closed connection removed"); - } - } - } + // this connection is now idle + this.idle_timeout.set(Some(tokio::time::sleep(*this.idle))); } this.connection.poll(cx)