mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-15 17:32:56 +00:00
ensure client is removed from pool if conn is dropped
This commit is contained in:
@@ -228,48 +228,25 @@ impl<C: ClientInnerExt> Default for DbUserConnPool<C> {
|
||||
}
|
||||
|
||||
impl<C: ClientInnerExt> DbUserConnPool<C> {
|
||||
fn clear_closed_clients(&mut self, conns: &mut usize) -> usize {
|
||||
let mut removed = 0;
|
||||
|
||||
let mut cursor = self.conns.cursor_front_mut();
|
||||
while let Some(client) = cursor.protected() {
|
||||
if client.conn.is_closed() {
|
||||
let _ = cursor.remove_current(uuid::Uuid::nil());
|
||||
removed += 1;
|
||||
} else {
|
||||
cursor.move_next()
|
||||
}
|
||||
}
|
||||
|
||||
*conns -= removed;
|
||||
removed
|
||||
}
|
||||
|
||||
fn get_conn_entry(
|
||||
&mut self,
|
||||
conns: &mut usize,
|
||||
global_connections_count: &AtomicUsize,
|
||||
session_id: uuid::Uuid,
|
||||
) -> Option<ConnPoolEntry<C>> {
|
||||
let mut removed = self.clear_closed_clients(conns);
|
||||
let mut cursor = self.conns.cursor_front_mut();
|
||||
let conn = loop {
|
||||
let client = cursor.remove_current(session_id).ok()?;
|
||||
if !client.conn.is_closed() {
|
||||
break client;
|
||||
}
|
||||
};
|
||||
|
||||
let conn = self
|
||||
.conns
|
||||
.cursor_front_mut()
|
||||
.remove_current(session_id)
|
||||
.ok();
|
||||
*conns -= 1;
|
||||
global_connections_count.fetch_sub(1, atomic::Ordering::Relaxed);
|
||||
Metrics::get().proxy.http_pool_opened_connections.dec_by(1);
|
||||
|
||||
if conn.is_some() {
|
||||
*conns -= 1;
|
||||
removed += 1;
|
||||
}
|
||||
global_connections_count.fetch_sub(removed, atomic::Ordering::Relaxed);
|
||||
Metrics::get()
|
||||
.proxy
|
||||
.http_pool_opened_connections
|
||||
.get_metric()
|
||||
.dec_by(removed as i64);
|
||||
conn
|
||||
Some(conn)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -361,19 +338,11 @@ impl<C: ClientInnerExt> GlobalConnPool<C> {
|
||||
.http_pool_reclaimation_lag_seconds
|
||||
.start_timer();
|
||||
let current_len = shard.len();
|
||||
let mut clients_removed = 0;
|
||||
shard.retain(|endpoint, x| {
|
||||
// if the current endpoint pool is unique (no other strong or weak references)
|
||||
// then it is currently not in use by any connections.
|
||||
if let Some(pool) = Arc::get_mut(x.get_mut()) {
|
||||
let EndpointConnPool {
|
||||
pools, total_conns, ..
|
||||
} = pool.get_mut();
|
||||
|
||||
// ensure that closed clients are removed
|
||||
pools.iter_mut().for_each(|(_, db_pool)| {
|
||||
clients_removed += db_pool.clear_closed_clients(total_conns);
|
||||
});
|
||||
let EndpointConnPool { total_conns, .. } = pool.get_mut();
|
||||
|
||||
// we only remove this pool if it has no active connections
|
||||
if *total_conns == 0 {
|
||||
@@ -389,19 +358,6 @@ impl<C: ClientInnerExt> GlobalConnPool<C> {
|
||||
drop(shard);
|
||||
timer.observe();
|
||||
|
||||
// Do logging outside of the lock.
|
||||
if clients_removed > 0 {
|
||||
let size = self
|
||||
.global_connections_count
|
||||
.fetch_sub(clients_removed, atomic::Ordering::Relaxed)
|
||||
- clients_removed;
|
||||
Metrics::get()
|
||||
.proxy
|
||||
.http_pool_opened_connections
|
||||
.get_metric()
|
||||
.dec_by(clients_removed as i64);
|
||||
info!("pool: performed global pool gc. removed {clients_removed} clients, total number of clients in pool is {size}");
|
||||
}
|
||||
let removed = current_len - new_len;
|
||||
|
||||
if removed > 0 {
|
||||
@@ -576,6 +532,7 @@ pub fn poll_client<C: ClientInnerExt, I: Future<Output = ()> + Send + 'static>(
|
||||
session_span.in_scope(|| {
|
||||
info!(cold_start_info = cold_start_info.as_str(), %conn_info, "new connection");
|
||||
});
|
||||
|
||||
let pool = match conn_info.endpoint_cache_key() {
|
||||
Some(endpoint) => Arc::downgrade(&global_pool.get_or_create_endpoint_pool(&endpoint)),
|
||||
None => Weak::new(),
|
||||
@@ -642,6 +599,17 @@ pin_project! {
|
||||
#[pin]
|
||||
connection: Inner,
|
||||
}
|
||||
|
||||
impl<C: ClientInnerExt, I> PinnedDrop for DbConnection<C, I> {
|
||||
fn drop(this: Pin<&mut Self>) {
|
||||
let mut this = this.project();
|
||||
let Some(pool) = this.pool.upgrade() else { return };
|
||||
let Some(init) = this.node.as_mut().initialized_mut() else { return };
|
||||
if pool.write().remove_client(this.db_user.clone(), init) {
|
||||
info!("closed connection removed");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<C: ClientInnerExt, I: Future<Output = ()>> Future for DbConnection<C, I> {
|
||||
@@ -707,16 +675,7 @@ impl<C: ClientInnerExt, I: Future<Output = ()>> Future for DbConnection<C, I> {
|
||||
}
|
||||
}
|
||||
|
||||
ready!(this.connection.poll(cx));
|
||||
|
||||
// remove from connection pool
|
||||
if let Some(init) = this.node.as_mut().initialized_mut() {
|
||||
if pool.write().remove_client(this.db_user.clone(), init) {
|
||||
info!("closed connection removed");
|
||||
}
|
||||
}
|
||||
|
||||
Poll::Ready(())
|
||||
this.connection.poll(cx)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user