From 4099fe6ea6f315a3dc04c1ff4a26f56b29cc75d6 Mon Sep 17 00:00:00 2001 From: Conrad Ludgate Date: Thu, 18 Apr 2024 14:16:53 +0100 Subject: [PATCH] ensure client is removed from pool if conn is dropped --- proxy/src/serverless/conn_pool.rs | 91 +++++++++---------------------- 1 file changed, 25 insertions(+), 66 deletions(-) diff --git a/proxy/src/serverless/conn_pool.rs b/proxy/src/serverless/conn_pool.rs index 2725e83c2e..5211b0c62c 100644 --- a/proxy/src/serverless/conn_pool.rs +++ b/proxy/src/serverless/conn_pool.rs @@ -228,48 +228,25 @@ impl Default for DbUserConnPool { } impl DbUserConnPool { - fn clear_closed_clients(&mut self, conns: &mut usize) -> usize { - let mut removed = 0; - - let mut cursor = self.conns.cursor_front_mut(); - while let Some(client) = cursor.protected() { - if client.conn.is_closed() { - let _ = cursor.remove_current(uuid::Uuid::nil()); - removed += 1; - } else { - cursor.move_next() - } - } - - *conns -= removed; - removed - } - fn get_conn_entry( &mut self, conns: &mut usize, global_connections_count: &AtomicUsize, session_id: uuid::Uuid, ) -> Option> { - let mut removed = self.clear_closed_clients(conns); + let mut cursor = self.conns.cursor_front_mut(); + let conn = loop { + let client = cursor.remove_current(session_id).ok()?; + if !client.conn.is_closed() { + break client; + } + }; - let conn = self - .conns - .cursor_front_mut() - .remove_current(session_id) - .ok(); + *conns -= 1; + global_connections_count.fetch_sub(1, atomic::Ordering::Relaxed); + Metrics::get().proxy.http_pool_opened_connections.dec_by(1); - if conn.is_some() { - *conns -= 1; - removed += 1; - } - global_connections_count.fetch_sub(removed, atomic::Ordering::Relaxed); - Metrics::get() - .proxy - .http_pool_opened_connections - .get_metric() - .dec_by(removed as i64); - conn + Some(conn) } } @@ -361,19 +338,11 @@ impl GlobalConnPool { .http_pool_reclaimation_lag_seconds .start_timer(); let current_len = shard.len(); - let mut clients_removed = 0; shard.retain(|endpoint, x| { // if the current endpoint pool is unique (no other strong or weak references) // then it is currently not in use by any connections. if let Some(pool) = Arc::get_mut(x.get_mut()) { - let EndpointConnPool { - pools, total_conns, .. - } = pool.get_mut(); - - // ensure that closed clients are removed - pools.iter_mut().for_each(|(_, db_pool)| { - clients_removed += db_pool.clear_closed_clients(total_conns); - }); + let EndpointConnPool { total_conns, .. } = pool.get_mut(); // we only remove this pool if it has no active connections if *total_conns == 0 { @@ -389,19 +358,6 @@ impl GlobalConnPool { drop(shard); timer.observe(); - // Do logging outside of the lock. - if clients_removed > 0 { - let size = self - .global_connections_count - .fetch_sub(clients_removed, atomic::Ordering::Relaxed) - - clients_removed; - Metrics::get() - .proxy - .http_pool_opened_connections - .get_metric() - .dec_by(clients_removed as i64); - info!("pool: performed global pool gc. removed {clients_removed} clients, total number of clients in pool is {size}"); - } let removed = current_len - new_len; if removed > 0 { @@ -576,6 +532,7 @@ pub fn poll_client + Send + 'static>( session_span.in_scope(|| { info!(cold_start_info = cold_start_info.as_str(), %conn_info, "new connection"); }); + let pool = match conn_info.endpoint_cache_key() { Some(endpoint) => Arc::downgrade(&global_pool.get_or_create_endpoint_pool(&endpoint)), None => Weak::new(), @@ -642,6 +599,17 @@ pin_project! { #[pin] connection: Inner, } + + impl PinnedDrop for DbConnection { + fn drop(this: Pin<&mut Self>) { + let mut this = this.project(); + let Some(pool) = this.pool.upgrade() else { return }; + let Some(init) = this.node.as_mut().initialized_mut() else { return }; + if pool.write().remove_client(this.db_user.clone(), init) { + info!("closed connection removed"); + } + } + } } impl> Future for DbConnection { @@ -707,16 +675,7 @@ impl> Future for DbConnection { } } - ready!(this.connection.poll(cx)); - - // remove from connection pool - if let Some(init) = this.node.as_mut().initialized_mut() { - if pool.write().remove_client(this.db_user.clone(), init) { - info!("closed connection removed"); - } - } - - Poll::Ready(()) + this.connection.poll(cx) } }