mirror of
https://github.com/neondatabase/neon.git
synced 2026-06-01 12:30:38 +00:00
improve remove
This commit is contained in:
@@ -1,7 +1,7 @@
|
||||
use dashmap::DashMap;
|
||||
use futures::Future;
|
||||
use parking_lot::RwLock;
|
||||
use pin_list::Node;
|
||||
use pin_list::{InitializedNode, Node};
|
||||
use pin_project_lite::pin_project;
|
||||
use rand::Rng;
|
||||
use smallvec::SmallVec;
|
||||
@@ -104,7 +104,11 @@ impl<C: ClientInnerExt> EndpointConnPool<C> {
|
||||
})
|
||||
}
|
||||
|
||||
fn remove_client(&mut self, db_user: (DbName, RoleName), conn_id: uuid::Uuid) -> bool {
|
||||
fn remove_client<'a>(
|
||||
&mut self,
|
||||
db_user: (DbName, RoleName),
|
||||
node: Pin<&'a mut InitializedNode<'a, ConnTypes<C>>>,
|
||||
) -> bool {
|
||||
let Self {
|
||||
pools,
|
||||
total_conns,
|
||||
@@ -112,27 +116,18 @@ impl<C: ClientInnerExt> EndpointConnPool<C> {
|
||||
..
|
||||
} = self;
|
||||
if let Some(pool) = pools.get_mut(&db_user) {
|
||||
let mut removed = 0;
|
||||
let mut cursor = pool.conns.cursor_front_mut();
|
||||
while let Some(client) = cursor.protected() {
|
||||
if client.conn.conn_id != conn_id {
|
||||
let _ = cursor.remove_current(uuid::Uuid::nil());
|
||||
removed += 1;
|
||||
} else {
|
||||
cursor.move_next()
|
||||
}
|
||||
}
|
||||
|
||||
if removed > 0 {
|
||||
global_connections_count.fetch_sub(removed, atomic::Ordering::Relaxed);
|
||||
if node.unlink(&mut pool.conns).is_ok() {
|
||||
global_connections_count.fetch_sub(1, atomic::Ordering::Relaxed);
|
||||
Metrics::get()
|
||||
.proxy
|
||||
.http_pool_opened_connections
|
||||
.get_metric()
|
||||
.dec_by(removed as i64);
|
||||
.dec_by(1);
|
||||
*total_conns -= 1;
|
||||
true
|
||||
} else {
|
||||
false
|
||||
}
|
||||
*total_conns -= removed;
|
||||
removed > 0
|
||||
} else {
|
||||
false
|
||||
}
|
||||
@@ -704,11 +699,10 @@ impl<C: ClientInnerExt, I: Future<Output = ()>> Future for DbConnection<C, I> {
|
||||
if let Some(pool) = this.pool.upgrade() {
|
||||
// remove client from pool - should close the connection if it's idle.
|
||||
// does nothing if the client is currently checked-out and in-use
|
||||
if pool
|
||||
.write()
|
||||
.remove_client(this.db_user.clone(), *this.conn_id)
|
||||
{
|
||||
info!("idle connection removed");
|
||||
if let Some(init) = this.node.as_mut().initialized_mut() {
|
||||
if pool.write().remove_client(this.db_user.clone(), init) {
|
||||
info!("closed connection removed");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -716,11 +710,10 @@ impl<C: ClientInnerExt, I: Future<Output = ()>> Future for DbConnection<C, I> {
|
||||
ready!(this.connection.poll(cx));
|
||||
|
||||
// remove from connection pool
|
||||
if pool
|
||||
.write()
|
||||
.remove_client(this.db_user.clone(), *this.conn_id)
|
||||
{
|
||||
info!("closed connection removed");
|
||||
if let Some(init) = this.node.as_mut().initialized_mut() {
|
||||
if pool.write().remove_client(this.db_user.clone(), init) {
|
||||
info!("closed connection removed");
|
||||
}
|
||||
}
|
||||
|
||||
Poll::Ready(())
|
||||
|
||||
Reference in New Issue
Block a user