From e2d4d07139b6392845379b133e8cdca3bebe3c15 Mon Sep 17 00:00:00 2001 From: Conrad Ludgate Date: Thu, 18 Apr 2024 12:15:06 +0100 Subject: [PATCH] improve remove --- proxy/src/serverless/conn_pool.rs | 49 +++++++++++++------------------ 1 file changed, 21 insertions(+), 28 deletions(-) diff --git a/proxy/src/serverless/conn_pool.rs b/proxy/src/serverless/conn_pool.rs index abb9549185..2725e83c2e 100644 --- a/proxy/src/serverless/conn_pool.rs +++ b/proxy/src/serverless/conn_pool.rs @@ -1,7 +1,7 @@ use dashmap::DashMap; use futures::Future; use parking_lot::RwLock; -use pin_list::Node; +use pin_list::{InitializedNode, Node}; use pin_project_lite::pin_project; use rand::Rng; use smallvec::SmallVec; @@ -104,7 +104,11 @@ impl EndpointConnPool { }) } - fn remove_client(&mut self, db_user: (DbName, RoleName), conn_id: uuid::Uuid) -> bool { + fn remove_client<'a>( + &mut self, + db_user: (DbName, RoleName), + node: Pin<&'a mut InitializedNode<'a, ConnTypes>>, + ) -> bool { let Self { pools, total_conns, @@ -112,27 +116,18 @@ impl EndpointConnPool { .. } = self; if let Some(pool) = pools.get_mut(&db_user) { - let mut removed = 0; - let mut cursor = pool.conns.cursor_front_mut(); - while let Some(client) = cursor.protected() { - if client.conn.conn_id != conn_id { - let _ = cursor.remove_current(uuid::Uuid::nil()); - removed += 1; - } else { - cursor.move_next() - } - } - - if removed > 0 { - global_connections_count.fetch_sub(removed, atomic::Ordering::Relaxed); + if node.unlink(&mut pool.conns).is_ok() { + global_connections_count.fetch_sub(1, atomic::Ordering::Relaxed); Metrics::get() .proxy .http_pool_opened_connections .get_metric() - .dec_by(removed as i64); + .dec_by(1); + *total_conns -= 1; + true + } else { + false } - *total_conns -= removed; - removed > 0 } else { false } @@ -704,11 +699,10 @@ impl> Future for DbConnection { if let Some(pool) = this.pool.upgrade() { // remove client from pool - should close the connection if it's idle. // does nothing if the client is currently checked-out and in-use - if pool - .write() - .remove_client(this.db_user.clone(), *this.conn_id) - { - info!("idle connection removed"); + if let Some(init) = this.node.as_mut().initialized_mut() { + if pool.write().remove_client(this.db_user.clone(), init) { + info!("closed connection removed"); + } } } } @@ -716,11 +710,10 @@ impl> Future for DbConnection { ready!(this.connection.poll(cx)); // remove from connection pool - if pool - .write() - .remove_client(this.db_user.clone(), *this.conn_id) - { - info!("closed connection removed"); + if let Some(init) = this.node.as_mut().initialized_mut() { + if pool.write().remove_client(this.db_user.clone(), init) { + info!("closed connection removed"); + } } Poll::Ready(())