|
|
|
|
@@ -1,9 +1,12 @@
|
|
|
|
|
use dashmap::DashMap;
|
|
|
|
|
use futures::{future::poll_fn, Future};
|
|
|
|
|
use futures::Future;
|
|
|
|
|
use parking_lot::RwLock;
|
|
|
|
|
use pin_list::{InitializedNode, Node};
|
|
|
|
|
use pin_project_lite::pin_project;
|
|
|
|
|
use rand::Rng;
|
|
|
|
|
use smallvec::SmallVec;
|
|
|
|
|
use std::{collections::HashMap, pin::pin, sync::Arc, sync::Weak, time::Duration};
|
|
|
|
|
use std::pin::Pin;
|
|
|
|
|
use std::{collections::HashMap, sync::Arc, time::Duration};
|
|
|
|
|
use std::{
|
|
|
|
|
fmt,
|
|
|
|
|
task::{ready, Poll},
|
|
|
|
|
@@ -12,19 +15,19 @@ use std::{
|
|
|
|
|
ops::Deref,
|
|
|
|
|
sync::atomic::{self, AtomicUsize},
|
|
|
|
|
};
|
|
|
|
|
use tokio::time::Instant;
|
|
|
|
|
use tokio::sync::mpsc::error::TrySendError;
|
|
|
|
|
use tokio::time::Sleep;
|
|
|
|
|
use tokio_postgres::tls::NoTlsStream;
|
|
|
|
|
use tokio_postgres::{AsyncMessage, ReadyForQueryStatus, Socket};
|
|
|
|
|
use tokio_util::sync::CancellationToken;
|
|
|
|
|
|
|
|
|
|
use crate::console::messages::{ColdStartInfo, MetricsAuxInfo};
|
|
|
|
|
use crate::metrics::{HttpEndpointPoolsGuard, Metrics};
|
|
|
|
|
use crate::metrics::{HttpEndpointPoolsGuard, Metrics, NumDbConnectionsGuard};
|
|
|
|
|
use crate::usage_metrics::{Ids, MetricCounter, USAGE_METRICS};
|
|
|
|
|
use crate::{
|
|
|
|
|
auth::backend::ComputeUserInfo, context::RequestMonitoring, DbName, EndpointCacheKey, RoleName,
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
use tracing::{debug, error, warn, Span};
|
|
|
|
|
use tracing::{debug, error, warn};
|
|
|
|
|
use tracing::{info, info_span, Instrument};
|
|
|
|
|
|
|
|
|
|
use super::backend::HttpConnError;
|
|
|
|
|
@@ -83,7 +86,11 @@ pub struct EndpointConnPool<C: ClientInnerExt> {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
impl<C: ClientInnerExt> EndpointConnPool<C> {
|
|
|
|
|
fn get_conn_entry(&mut self, db_user: (DbName, RoleName)) -> Option<ConnPoolEntry<C>> {
|
|
|
|
|
fn get_conn_entry(
|
|
|
|
|
&mut self,
|
|
|
|
|
db_user: (DbName, RoleName),
|
|
|
|
|
session_id: uuid::Uuid,
|
|
|
|
|
) -> Option<ConnPoolEntry<C>> {
|
|
|
|
|
let Self {
|
|
|
|
|
pools,
|
|
|
|
|
total_conns,
|
|
|
|
|
@@ -91,11 +98,15 @@ impl<C: ClientInnerExt> EndpointConnPool<C> {
|
|
|
|
|
..
|
|
|
|
|
} = self;
|
|
|
|
|
pools.get_mut(&db_user).and_then(|pool_entries| {
|
|
|
|
|
pool_entries.get_conn_entry(total_conns, global_connections_count.clone())
|
|
|
|
|
pool_entries.get_conn_entry(total_conns, global_connections_count, session_id)
|
|
|
|
|
})
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
fn remove_client(&mut self, db_user: (DbName, RoleName), conn_id: uuid::Uuid) -> bool {
|
|
|
|
|
fn remove_client<'a>(
|
|
|
|
|
&mut self,
|
|
|
|
|
db_user: (DbName, RoleName),
|
|
|
|
|
node: Pin<&'a mut InitializedNode<'a, ConnTypes<C>>>,
|
|
|
|
|
) -> bool {
|
|
|
|
|
let Self {
|
|
|
|
|
pools,
|
|
|
|
|
total_conns,
|
|
|
|
|
@@ -103,41 +114,39 @@ impl<C: ClientInnerExt> EndpointConnPool<C> {
|
|
|
|
|
..
|
|
|
|
|
} = self;
|
|
|
|
|
if let Some(pool) = pools.get_mut(&db_user) {
|
|
|
|
|
let old_len = pool.conns.len();
|
|
|
|
|
pool.conns.retain(|conn| conn.conn.conn_id != conn_id);
|
|
|
|
|
let new_len = pool.conns.len();
|
|
|
|
|
let removed = old_len - new_len;
|
|
|
|
|
if removed > 0 {
|
|
|
|
|
global_connections_count.fetch_sub(removed, atomic::Ordering::Relaxed);
|
|
|
|
|
if node.unlink(&mut pool.conns).is_ok() {
|
|
|
|
|
global_connections_count.fetch_sub(1, atomic::Ordering::Relaxed);
|
|
|
|
|
Metrics::get()
|
|
|
|
|
.proxy
|
|
|
|
|
.http_pool_opened_connections
|
|
|
|
|
.get_metric()
|
|
|
|
|
.dec_by(removed as i64);
|
|
|
|
|
.dec_by(1);
|
|
|
|
|
*total_conns -= 1;
|
|
|
|
|
true
|
|
|
|
|
} else {
|
|
|
|
|
false
|
|
|
|
|
}
|
|
|
|
|
*total_conns -= removed;
|
|
|
|
|
removed > 0
|
|
|
|
|
} else {
|
|
|
|
|
false
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
fn put(pool: &RwLock<Self>, conn_info: &ConnInfo, client: ClientInner<C>) {
|
|
|
|
|
let conn_id = client.conn_id;
|
|
|
|
|
|
|
|
|
|
if client.is_closed() {
|
|
|
|
|
info!(%conn_id, "pool: throwing away connection '{conn_info}' because connection is closed");
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
let global_max_conn = pool.read().global_pool_size_max_conns;
|
|
|
|
|
if pool
|
|
|
|
|
.read()
|
|
|
|
|
.global_connections_count
|
|
|
|
|
.load(atomic::Ordering::Relaxed)
|
|
|
|
|
>= global_max_conn
|
|
|
|
|
fn put(
|
|
|
|
|
pool: &RwLock<Self>,
|
|
|
|
|
node: Pin<&mut Node<ConnTypes<C>>>,
|
|
|
|
|
db_user: &(DbName, RoleName),
|
|
|
|
|
client: ClientInner<C>,
|
|
|
|
|
) -> bool {
|
|
|
|
|
{
|
|
|
|
|
info!(%conn_id, "pool: throwing away connection '{conn_info}' because pool is full");
|
|
|
|
|
return;
|
|
|
|
|
let pool = pool.read();
|
|
|
|
|
if pool
|
|
|
|
|
.global_connections_count
|
|
|
|
|
.load(atomic::Ordering::Relaxed)
|
|
|
|
|
>= pool.global_pool_size_max_conns
|
|
|
|
|
{
|
|
|
|
|
info!("pool: throwing away connection because pool is full");
|
|
|
|
|
return false;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// return connection to the pool
|
|
|
|
|
@@ -147,14 +156,19 @@ impl<C: ClientInnerExt> EndpointConnPool<C> {
|
|
|
|
|
let mut pool = pool.write();
|
|
|
|
|
|
|
|
|
|
if pool.total_conns < pool.max_conns {
|
|
|
|
|
let pool_entries = pool.pools.entry(conn_info.db_and_user()).or_default();
|
|
|
|
|
pool_entries.conns.push(ConnPoolEntry {
|
|
|
|
|
conn: client,
|
|
|
|
|
_last_access: std::time::Instant::now(),
|
|
|
|
|
});
|
|
|
|
|
let pool_entries = pool.pools.entry(db_user.clone()).or_default();
|
|
|
|
|
|
|
|
|
|
pool_entries.conns.cursor_front_mut().insert_after(
|
|
|
|
|
node,
|
|
|
|
|
ConnPoolEntry {
|
|
|
|
|
conn: client,
|
|
|
|
|
_last_access: std::time::Instant::now(),
|
|
|
|
|
},
|
|
|
|
|
(),
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
returned = true;
|
|
|
|
|
per_db_size = pool_entries.conns.len();
|
|
|
|
|
per_db_size = pool_entries.len;
|
|
|
|
|
|
|
|
|
|
pool.total_conns += 1;
|
|
|
|
|
pool.global_connections_count
|
|
|
|
|
@@ -171,10 +185,12 @@ impl<C: ClientInnerExt> EndpointConnPool<C> {
|
|
|
|
|
|
|
|
|
|
// do logging outside of the mutex
|
|
|
|
|
if returned {
|
|
|
|
|
info!(%conn_id, "pool: returning connection '{conn_info}' back to the pool, total_conns={total_conns}, for this (db, user)={per_db_size}");
|
|
|
|
|
info!("pool: returning connection back to the pool, total_conns={total_conns}, for this (db, user)={per_db_size}");
|
|
|
|
|
} else {
|
|
|
|
|
info!(%conn_id, "pool: throwing away connection '{conn_info}' because pool is full, total_conns={total_conns}");
|
|
|
|
|
info!("pool: throwing away connection because pool is full, total_conns={total_conns}");
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
returned
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@@ -193,45 +209,37 @@ impl<C: ClientInnerExt> Drop for EndpointConnPool<C> {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
pub struct DbUserConnPool<C: ClientInnerExt> {
|
|
|
|
|
conns: Vec<ConnPoolEntry<C>>,
|
|
|
|
|
conns: pin_list::PinList<ConnTypes<C>>,
|
|
|
|
|
len: usize,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
impl<C: ClientInnerExt> Default for DbUserConnPool<C> {
|
|
|
|
|
fn default() -> Self {
|
|
|
|
|
Self { conns: Vec::new() }
|
|
|
|
|
Self {
|
|
|
|
|
conns: pin_list::PinList::new(pin_list::id::Checked::new()),
|
|
|
|
|
len: 0,
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
impl<C: ClientInnerExt> DbUserConnPool<C> {
|
|
|
|
|
fn clear_closed_clients(&mut self, conns: &mut usize) -> usize {
|
|
|
|
|
let old_len = self.conns.len();
|
|
|
|
|
|
|
|
|
|
self.conns.retain(|conn| !conn.conn.is_closed());
|
|
|
|
|
|
|
|
|
|
let new_len = self.conns.len();
|
|
|
|
|
let removed = old_len - new_len;
|
|
|
|
|
*conns -= removed;
|
|
|
|
|
removed
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
fn get_conn_entry(
|
|
|
|
|
&mut self,
|
|
|
|
|
conns: &mut usize,
|
|
|
|
|
global_connections_count: Arc<AtomicUsize>,
|
|
|
|
|
global_connections_count: &AtomicUsize,
|
|
|
|
|
session_id: uuid::Uuid,
|
|
|
|
|
) -> Option<ConnPoolEntry<C>> {
|
|
|
|
|
let mut removed = self.clear_closed_clients(conns);
|
|
|
|
|
let conn = self.conns.pop();
|
|
|
|
|
if conn.is_some() {
|
|
|
|
|
*conns -= 1;
|
|
|
|
|
removed += 1;
|
|
|
|
|
}
|
|
|
|
|
global_connections_count.fetch_sub(removed, atomic::Ordering::Relaxed);
|
|
|
|
|
Metrics::get()
|
|
|
|
|
.proxy
|
|
|
|
|
.http_pool_opened_connections
|
|
|
|
|
.get_metric()
|
|
|
|
|
.dec_by(removed as i64);
|
|
|
|
|
conn
|
|
|
|
|
let conn = self
|
|
|
|
|
.conns
|
|
|
|
|
.cursor_front_mut()
|
|
|
|
|
.remove_current(session_id)
|
|
|
|
|
.ok()?;
|
|
|
|
|
|
|
|
|
|
*conns -= 1;
|
|
|
|
|
global_connections_count.fetch_sub(1, atomic::Ordering::Relaxed);
|
|
|
|
|
Metrics::get().proxy.http_pool_opened_connections.dec_by(1);
|
|
|
|
|
|
|
|
|
|
Some(conn)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@@ -323,19 +331,11 @@ impl<C: ClientInnerExt> GlobalConnPool<C> {
|
|
|
|
|
.http_pool_reclaimation_lag_seconds
|
|
|
|
|
.start_timer();
|
|
|
|
|
let current_len = shard.len();
|
|
|
|
|
let mut clients_removed = 0;
|
|
|
|
|
shard.retain(|endpoint, x| {
|
|
|
|
|
// if the current endpoint pool is unique (no other strong or weak references)
|
|
|
|
|
// then it is currently not in use by any connections.
|
|
|
|
|
if let Some(pool) = Arc::get_mut(x.get_mut()) {
|
|
|
|
|
let EndpointConnPool {
|
|
|
|
|
pools, total_conns, ..
|
|
|
|
|
} = pool.get_mut();
|
|
|
|
|
|
|
|
|
|
// ensure that closed clients are removed
|
|
|
|
|
pools.iter_mut().for_each(|(_, db_pool)| {
|
|
|
|
|
clients_removed += db_pool.clear_closed_clients(total_conns);
|
|
|
|
|
});
|
|
|
|
|
let EndpointConnPool { total_conns, .. } = pool.get_mut();
|
|
|
|
|
|
|
|
|
|
// we only remove this pool if it has no active connections
|
|
|
|
|
if *total_conns == 0 {
|
|
|
|
|
@@ -351,19 +351,6 @@ impl<C: ClientInnerExt> GlobalConnPool<C> {
|
|
|
|
|
drop(shard);
|
|
|
|
|
timer.observe();
|
|
|
|
|
|
|
|
|
|
// Do logging outside of the lock.
|
|
|
|
|
if clients_removed > 0 {
|
|
|
|
|
let size = self
|
|
|
|
|
.global_connections_count
|
|
|
|
|
.fetch_sub(clients_removed, atomic::Ordering::Relaxed)
|
|
|
|
|
- clients_removed;
|
|
|
|
|
Metrics::get()
|
|
|
|
|
.proxy
|
|
|
|
|
.http_pool_opened_connections
|
|
|
|
|
.get_metric()
|
|
|
|
|
.dec_by(clients_removed as i64);
|
|
|
|
|
info!("pool: performed global pool gc. removed {clients_removed} clients, total number of clients in pool is {size}");
|
|
|
|
|
}
|
|
|
|
|
let removed = current_len - new_len;
|
|
|
|
|
|
|
|
|
|
if removed > 0 {
|
|
|
|
|
@@ -388,32 +375,25 @@ impl<C: ClientInnerExt> GlobalConnPool<C> {
|
|
|
|
|
let endpoint_pool = self.get_or_create_endpoint_pool(&endpoint);
|
|
|
|
|
if let Some(entry) = endpoint_pool
|
|
|
|
|
.write()
|
|
|
|
|
.get_conn_entry(conn_info.db_and_user())
|
|
|
|
|
.get_conn_entry(conn_info.db_and_user(), ctx.session_id)
|
|
|
|
|
{
|
|
|
|
|
client = Some(entry.conn)
|
|
|
|
|
}
|
|
|
|
|
let endpoint_pool = Arc::downgrade(&endpoint_pool);
|
|
|
|
|
|
|
|
|
|
// ok return cached connection if found and establish a new one otherwise
|
|
|
|
|
if let Some(client) = client {
|
|
|
|
|
if client.is_closed() {
|
|
|
|
|
info!("pool: cached connection '{conn_info}' is closed, opening a new one");
|
|
|
|
|
return Ok(None);
|
|
|
|
|
} else {
|
|
|
|
|
tracing::Span::current().record("conn_id", tracing::field::display(client.conn_id));
|
|
|
|
|
tracing::Span::current().record(
|
|
|
|
|
"pid",
|
|
|
|
|
&tracing::field::display(client.inner.get_process_id()),
|
|
|
|
|
);
|
|
|
|
|
info!(
|
|
|
|
|
cold_start_info = ColdStartInfo::HttpPoolHit.as_str(),
|
|
|
|
|
"pool: reusing connection '{conn_info}'"
|
|
|
|
|
);
|
|
|
|
|
client.session.send(ctx.session_id)?;
|
|
|
|
|
ctx.set_cold_start_info(ColdStartInfo::HttpPoolHit);
|
|
|
|
|
ctx.latency_timer.success();
|
|
|
|
|
return Ok(Some(Client::new(client, conn_info.clone(), endpoint_pool)));
|
|
|
|
|
}
|
|
|
|
|
tracing::Span::current().record("conn_id", tracing::field::display(client.conn_id));
|
|
|
|
|
tracing::Span::current().record(
|
|
|
|
|
"pid",
|
|
|
|
|
&tracing::field::display(client.inner.get_process_id()),
|
|
|
|
|
);
|
|
|
|
|
info!(
|
|
|
|
|
cold_start_info = ColdStartInfo::HttpPoolHit.as_str(),
|
|
|
|
|
"pool: reusing connection '{conn_info}'"
|
|
|
|
|
);
|
|
|
|
|
ctx.set_cold_start_info(ColdStartInfo::HttpPoolHit);
|
|
|
|
|
ctx.latency_timer.success();
|
|
|
|
|
return Ok(Some(Client::new(client)));
|
|
|
|
|
}
|
|
|
|
|
Ok(None)
|
|
|
|
|
}
|
|
|
|
|
@@ -463,154 +443,252 @@ impl<C: ClientInnerExt> GlobalConnPool<C> {
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
pub fn poll_client<C: ClientInnerExt>(
|
|
|
|
|
type ConnTypes<C> = dyn pin_list::Types<
|
|
|
|
|
Id = pin_list::id::Checked,
|
|
|
|
|
Protected = ConnPoolEntry<C>,
|
|
|
|
|
// session ID
|
|
|
|
|
Removed = uuid::Uuid,
|
|
|
|
|
Unprotected = (),
|
|
|
|
|
>;
|
|
|
|
|
|
|
|
|
|
pub fn poll_tokio_client(
|
|
|
|
|
global_pool: Arc<GlobalConnPool<tokio_postgres::Client>>,
|
|
|
|
|
ctx: &mut RequestMonitoring,
|
|
|
|
|
conn_info: &ConnInfo,
|
|
|
|
|
client: tokio_postgres::Client,
|
|
|
|
|
mut connection: tokio_postgres::Connection<Socket, NoTlsStream>,
|
|
|
|
|
conn_id: uuid::Uuid,
|
|
|
|
|
aux: MetricsAuxInfo,
|
|
|
|
|
) -> Client<tokio_postgres::Client> {
|
|
|
|
|
let connection = std::future::poll_fn(move |cx| {
|
|
|
|
|
loop {
|
|
|
|
|
let message = ready!(connection.poll_message(cx));
|
|
|
|
|
match message {
|
|
|
|
|
Some(Ok(AsyncMessage::Notice(notice))) => {
|
|
|
|
|
info!("notice: {}", notice);
|
|
|
|
|
}
|
|
|
|
|
Some(Ok(AsyncMessage::Notification(notif))) => {
|
|
|
|
|
warn!(
|
|
|
|
|
pid = notif.process_id(),
|
|
|
|
|
channel = notif.channel(),
|
|
|
|
|
"notification received"
|
|
|
|
|
);
|
|
|
|
|
}
|
|
|
|
|
Some(Ok(_)) => {
|
|
|
|
|
warn!("unknown message");
|
|
|
|
|
}
|
|
|
|
|
Some(Err(e)) => {
|
|
|
|
|
error!("connection error: {}", e);
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
None => {
|
|
|
|
|
info!("connection closed");
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
Poll::Ready(())
|
|
|
|
|
});
|
|
|
|
|
poll_client(
|
|
|
|
|
global_pool,
|
|
|
|
|
ctx,
|
|
|
|
|
conn_info,
|
|
|
|
|
client,
|
|
|
|
|
connection,
|
|
|
|
|
conn_id,
|
|
|
|
|
aux,
|
|
|
|
|
)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
pub fn poll_client<C: ClientInnerExt, I: Future<Output = ()> + Send + 'static>(
|
|
|
|
|
global_pool: Arc<GlobalConnPool<C>>,
|
|
|
|
|
ctx: &mut RequestMonitoring,
|
|
|
|
|
conn_info: ConnInfo,
|
|
|
|
|
conn_info: &ConnInfo,
|
|
|
|
|
client: C,
|
|
|
|
|
mut connection: tokio_postgres::Connection<Socket, NoTlsStream>,
|
|
|
|
|
connection: I,
|
|
|
|
|
conn_id: uuid::Uuid,
|
|
|
|
|
aux: MetricsAuxInfo,
|
|
|
|
|
) -> Client<C> {
|
|
|
|
|
let conn_gauge = Metrics::get().proxy.db_connections.guard(ctx.protocol);
|
|
|
|
|
let mut session_id = ctx.session_id;
|
|
|
|
|
let (tx, mut rx) = tokio::sync::watch::channel(session_id);
|
|
|
|
|
let session_id = ctx.session_id;
|
|
|
|
|
|
|
|
|
|
let span = info_span!(parent: None, "connection", %conn_id);
|
|
|
|
|
let cold_start_info = ctx.cold_start_info;
|
|
|
|
|
span.in_scope(|| {
|
|
|
|
|
info!(cold_start_info = cold_start_info.as_str(), %conn_info, %session_id, "new connection");
|
|
|
|
|
let session_span = info_span!(parent: span.clone(), "", %session_id);
|
|
|
|
|
session_span.in_scope(|| {
|
|
|
|
|
info!(cold_start_info = cold_start_info.as_str(), %conn_info, "new connection");
|
|
|
|
|
});
|
|
|
|
|
let pool = match conn_info.endpoint_cache_key() {
|
|
|
|
|
Some(endpoint) => Arc::downgrade(&global_pool.get_or_create_endpoint_pool(&endpoint)),
|
|
|
|
|
None => Weak::new(),
|
|
|
|
|
};
|
|
|
|
|
let pool_clone = pool.clone();
|
|
|
|
|
|
|
|
|
|
let db_user = conn_info.db_and_user();
|
|
|
|
|
let pool = conn_info
|
|
|
|
|
.endpoint_cache_key()
|
|
|
|
|
.map(|endpoint| global_pool.get_or_create_endpoint_pool(&endpoint));
|
|
|
|
|
|
|
|
|
|
let idle = global_pool.get_idle_timeout();
|
|
|
|
|
let cancel = CancellationToken::new();
|
|
|
|
|
let cancelled = cancel.clone().cancelled_owned();
|
|
|
|
|
|
|
|
|
|
tokio::spawn(
|
|
|
|
|
async move {
|
|
|
|
|
let _conn_gauge = conn_gauge;
|
|
|
|
|
let mut idle_timeout = pin!(tokio::time::sleep(idle));
|
|
|
|
|
let mut cancelled = pin!(cancelled);
|
|
|
|
|
let (send_client, recv_client) = tokio::sync::mpsc::channel(1);
|
|
|
|
|
let db_conn = DbConnection {
|
|
|
|
|
idle_timeout: None,
|
|
|
|
|
idle,
|
|
|
|
|
|
|
|
|
|
poll_fn(move |cx| {
|
|
|
|
|
if cancelled.as_mut().poll(cx).is_ready() {
|
|
|
|
|
info!("connection dropped");
|
|
|
|
|
return Poll::Ready(())
|
|
|
|
|
}
|
|
|
|
|
node: Node::<ConnTypes<C>>::new(),
|
|
|
|
|
recv_client,
|
|
|
|
|
db_user: conn_info.db_and_user(),
|
|
|
|
|
pool,
|
|
|
|
|
|
|
|
|
|
match rx.has_changed() {
|
|
|
|
|
Ok(true) => {
|
|
|
|
|
session_id = *rx.borrow_and_update();
|
|
|
|
|
info!(%session_id, "changed session");
|
|
|
|
|
idle_timeout.as_mut().reset(Instant::now() + idle);
|
|
|
|
|
}
|
|
|
|
|
Err(_) => {
|
|
|
|
|
info!("connection dropped");
|
|
|
|
|
return Poll::Ready(())
|
|
|
|
|
}
|
|
|
|
|
_ => {}
|
|
|
|
|
}
|
|
|
|
|
session_span,
|
|
|
|
|
|
|
|
|
|
// 5 minute idle connection timeout
|
|
|
|
|
if idle_timeout.as_mut().poll(cx).is_ready() {
|
|
|
|
|
idle_timeout.as_mut().reset(Instant::now() + idle);
|
|
|
|
|
info!("connection idle");
|
|
|
|
|
if let Some(pool) = pool.clone().upgrade() {
|
|
|
|
|
// remove client from pool - should close the connection if it's idle.
|
|
|
|
|
// does nothing if the client is currently checked-out and in-use
|
|
|
|
|
if pool.write().remove_client(db_user.clone(), conn_id) {
|
|
|
|
|
info!("idle connection removed");
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
conn_gauge,
|
|
|
|
|
connection,
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
loop {
|
|
|
|
|
let message = ready!(connection.poll_message(cx));
|
|
|
|
|
tokio::spawn(db_conn.instrument(span));
|
|
|
|
|
|
|
|
|
|
match message {
|
|
|
|
|
Some(Ok(AsyncMessage::Notice(notice))) => {
|
|
|
|
|
info!(%session_id, "notice: {}", notice);
|
|
|
|
|
}
|
|
|
|
|
Some(Ok(AsyncMessage::Notification(notif))) => {
|
|
|
|
|
warn!(%session_id, pid = notif.process_id(), channel = notif.channel(), "notification received");
|
|
|
|
|
}
|
|
|
|
|
Some(Ok(_)) => {
|
|
|
|
|
warn!(%session_id, "unknown message");
|
|
|
|
|
}
|
|
|
|
|
Some(Err(e)) => {
|
|
|
|
|
error!(%session_id, "connection error: {}", e);
|
|
|
|
|
break
|
|
|
|
|
}
|
|
|
|
|
None => {
|
|
|
|
|
info!("connection closed");
|
|
|
|
|
break
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// remove from connection pool
|
|
|
|
|
if let Some(pool) = pool.clone().upgrade() {
|
|
|
|
|
if pool.write().remove_client(db_user.clone(), conn_id) {
|
|
|
|
|
info!("closed connection removed");
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
Poll::Ready(())
|
|
|
|
|
}).await;
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
.instrument(span));
|
|
|
|
|
let inner = ClientInner {
|
|
|
|
|
inner: client,
|
|
|
|
|
session: tx,
|
|
|
|
|
cancel,
|
|
|
|
|
pool: send_client,
|
|
|
|
|
aux,
|
|
|
|
|
conn_id,
|
|
|
|
|
};
|
|
|
|
|
Client::new(inner, conn_info, pool_clone)
|
|
|
|
|
Client::new(inner)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
pin_project! {
|
|
|
|
|
struct DbConnection<C: ClientInnerExt, Inner> {
|
|
|
|
|
// Used to close the current conn if it's idle
|
|
|
|
|
#[pin]
|
|
|
|
|
idle_timeout: Option<Sleep>,
|
|
|
|
|
idle: tokio::time::Duration,
|
|
|
|
|
|
|
|
|
|
// Used to add/remove conn from the conn pool
|
|
|
|
|
#[pin]
|
|
|
|
|
node: Node<ConnTypes<C>>,
|
|
|
|
|
recv_client: tokio::sync::mpsc::Receiver<ClientInner<C>>,
|
|
|
|
|
db_user: (DbName, RoleName),
|
|
|
|
|
pool: Option<Arc<RwLock<EndpointConnPool<C>>>>,
|
|
|
|
|
|
|
|
|
|
// Used for reporting the current session the conn is attached to
|
|
|
|
|
session_span: tracing::Span,
|
|
|
|
|
|
|
|
|
|
// Static connection state
|
|
|
|
|
conn_gauge: NumDbConnectionsGuard<'static>,
|
|
|
|
|
#[pin]
|
|
|
|
|
connection: Inner,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
impl<C: ClientInnerExt, I> PinnedDrop for DbConnection<C, I> {
|
|
|
|
|
fn drop(this: Pin<&mut Self>) {
|
|
|
|
|
let mut this = this.project();
|
|
|
|
|
let Some(init) = this.node.as_mut().initialized_mut() else { return };
|
|
|
|
|
let pool = this.pool.as_ref().expect("pool must be set if the node is initialsed in the pool");
|
|
|
|
|
if pool.write().remove_client(this.db_user.clone(), init) {
|
|
|
|
|
info!("closed connection removed");
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
impl<C: ClientInnerExt, I: Future<Output = ()>> Future for DbConnection<C, I> {
|
|
|
|
|
type Output = ();
|
|
|
|
|
|
|
|
|
|
fn poll(self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
|
|
|
|
|
let mut this = self.project();
|
|
|
|
|
|
|
|
|
|
// Update the session span.
|
|
|
|
|
// If the node is initialised, then it is either
|
|
|
|
|
// 1. Waiting in the idle pool
|
|
|
|
|
// 2. Just removed from the idle pool and this is our first wake up.
|
|
|
|
|
//
|
|
|
|
|
// In the event of 1, nothing happens. (should not have many wakeups while idle)
|
|
|
|
|
// In the event of 2, we remove the session_id that was left in it's place.
|
|
|
|
|
if let Some(init) = this.node.as_mut().initialized_mut() {
|
|
|
|
|
// node is initiated via EndpointConnPool::put.
|
|
|
|
|
// this is only called in the if statement below.
|
|
|
|
|
// this can only occur if pool is set (and pool is never removed).
|
|
|
|
|
// when this occurs, it guarantees that the DbUserConnPool is created (it is never removed).
|
|
|
|
|
let pool = this
|
|
|
|
|
.pool
|
|
|
|
|
.as_ref()
|
|
|
|
|
.expect("node cannot be init without pool");
|
|
|
|
|
|
|
|
|
|
let mut pool_lock = pool.write();
|
|
|
|
|
let db_pool = pool_lock
|
|
|
|
|
.pools
|
|
|
|
|
.get(this.db_user)
|
|
|
|
|
.expect("node cannot be init without pool");
|
|
|
|
|
|
|
|
|
|
match init.take_removed(&db_pool.conns) {
|
|
|
|
|
Ok((session_id, _)) => {
|
|
|
|
|
*this.session_span = info_span!("", %session_id);
|
|
|
|
|
let _span = this.session_span.enter();
|
|
|
|
|
info!("changed session");
|
|
|
|
|
|
|
|
|
|
// this connection is no longer idle
|
|
|
|
|
this.idle_timeout.set(None);
|
|
|
|
|
}
|
|
|
|
|
Err(init) => {
|
|
|
|
|
let idle = this
|
|
|
|
|
.idle_timeout
|
|
|
|
|
.as_mut()
|
|
|
|
|
.as_pin_mut()
|
|
|
|
|
.expect("timer must be set if node is init");
|
|
|
|
|
|
|
|
|
|
if idle.poll(cx).is_ready() {
|
|
|
|
|
info!("connection idle");
|
|
|
|
|
|
|
|
|
|
// remove client from pool - should close the connection if it's idle.
|
|
|
|
|
// does nothing if the client is currently checked-out and in-use
|
|
|
|
|
if pool_lock.remove_client(this.db_user.clone(), init) {
|
|
|
|
|
info!("closed connection removed");
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
let _span = this.session_span.enter();
|
|
|
|
|
|
|
|
|
|
// The client has been returned. We will insert it into the linked list for this database.
|
|
|
|
|
if let Poll::Ready(client) = this.recv_client.poll_recv(cx) {
|
|
|
|
|
// if the send_client is dropped, then the client is dropped
|
|
|
|
|
let Some(client) = client else {
|
|
|
|
|
info!("connection dropped");
|
|
|
|
|
return Poll::Ready(());
|
|
|
|
|
};
|
|
|
|
|
// if there's no pool, then this client will be closed.
|
|
|
|
|
let Some(pool) = &this.pool else {
|
|
|
|
|
info!("connection dropped");
|
|
|
|
|
return Poll::Ready(());
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
if !EndpointConnPool::put(pool, this.node.as_mut(), this.db_user, client) {
|
|
|
|
|
return Poll::Ready(());
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// this connection is now idle
|
|
|
|
|
this.idle_timeout.set(Some(tokio::time::sleep(*this.idle)));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
this.connection.poll(cx)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
struct ClientInner<C: ClientInnerExt> {
|
|
|
|
|
inner: C,
|
|
|
|
|
session: tokio::sync::watch::Sender<uuid::Uuid>,
|
|
|
|
|
cancel: CancellationToken,
|
|
|
|
|
pool: tokio::sync::mpsc::Sender<ClientInner<C>>,
|
|
|
|
|
aux: MetricsAuxInfo,
|
|
|
|
|
conn_id: uuid::Uuid,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
impl<C: ClientInnerExt> Drop for ClientInner<C> {
|
|
|
|
|
fn drop(&mut self) {
|
|
|
|
|
// on client drop, tell the conn to shut down
|
|
|
|
|
self.cancel.cancel();
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
pub trait ClientInnerExt: Sync + Send + 'static {
|
|
|
|
|
fn is_closed(&self) -> bool;
|
|
|
|
|
fn get_process_id(&self) -> i32;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
impl ClientInnerExt for tokio_postgres::Client {
|
|
|
|
|
fn is_closed(&self) -> bool {
|
|
|
|
|
self.is_closed()
|
|
|
|
|
}
|
|
|
|
|
fn get_process_id(&self) -> i32 {
|
|
|
|
|
self.get_process_id()
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
impl<C: ClientInnerExt> ClientInner<C> {
|
|
|
|
|
pub fn is_closed(&self) -> bool {
|
|
|
|
|
self.inner.is_closed()
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
impl<C: ClientInnerExt> Client<C> {
|
|
|
|
|
pub fn metrics(&self) -> Arc<MetricCounter> {
|
|
|
|
|
let aux = &self.inner.as_ref().unwrap().aux;
|
|
|
|
|
@@ -622,54 +700,42 @@ impl<C: ClientInnerExt> Client<C> {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
pub struct Client<C: ClientInnerExt> {
|
|
|
|
|
span: Span,
|
|
|
|
|
inner: Option<ClientInner<C>>,
|
|
|
|
|
conn_info: ConnInfo,
|
|
|
|
|
pool: Weak<RwLock<EndpointConnPool<C>>>,
|
|
|
|
|
discarded: bool,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
pub struct Discard<'a, C: ClientInnerExt> {
|
|
|
|
|
conn_info: &'a ConnInfo,
|
|
|
|
|
pool: &'a mut Weak<RwLock<EndpointConnPool<C>>>,
|
|
|
|
|
pub struct Discard<'a> {
|
|
|
|
|
conn_id: uuid::Uuid,
|
|
|
|
|
discarded: &'a mut bool,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
impl<C: ClientInnerExt> Client<C> {
|
|
|
|
|
pub(self) fn new(
|
|
|
|
|
inner: ClientInner<C>,
|
|
|
|
|
conn_info: ConnInfo,
|
|
|
|
|
pool: Weak<RwLock<EndpointConnPool<C>>>,
|
|
|
|
|
) -> Self {
|
|
|
|
|
pub(self) fn new(inner: ClientInner<C>) -> Self {
|
|
|
|
|
Self {
|
|
|
|
|
inner: Some(inner),
|
|
|
|
|
span: Span::current(),
|
|
|
|
|
conn_info,
|
|
|
|
|
pool,
|
|
|
|
|
discarded: false,
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
pub fn inner(&mut self) -> (&mut C, Discard<'_, C>) {
|
|
|
|
|
let Self {
|
|
|
|
|
inner,
|
|
|
|
|
pool,
|
|
|
|
|
conn_info,
|
|
|
|
|
span: _,
|
|
|
|
|
} = self;
|
|
|
|
|
pub fn inner(&mut self) -> (&mut C, Discard<'_>) {
|
|
|
|
|
let Self { inner, discarded } = self;
|
|
|
|
|
let inner = inner.as_mut().expect("client inner should not be removed");
|
|
|
|
|
(&mut inner.inner, Discard { pool, conn_info })
|
|
|
|
|
let conn_id = inner.conn_id;
|
|
|
|
|
(&mut inner.inner, Discard { discarded, conn_id })
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
impl<C: ClientInnerExt> Discard<'_, C> {
|
|
|
|
|
impl Discard<'_> {
|
|
|
|
|
pub fn check_idle(&mut self, status: ReadyForQueryStatus) {
|
|
|
|
|
let conn_info = &self.conn_info;
|
|
|
|
|
if status != ReadyForQueryStatus::Idle && std::mem::take(self.pool).strong_count() > 0 {
|
|
|
|
|
info!("pool: throwing away connection '{conn_info}' because connection is not idle")
|
|
|
|
|
let conn_id = &self.conn_id;
|
|
|
|
|
if status != ReadyForQueryStatus::Idle && !*self.discarded {
|
|
|
|
|
*self.discarded = true;
|
|
|
|
|
info!(%conn_id, "pool: throwing away connection because connection is not idle")
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
pub fn discard(&mut self) {
|
|
|
|
|
let conn_info = &self.conn_info;
|
|
|
|
|
if std::mem::take(self.pool).strong_count() > 0 {
|
|
|
|
|
info!("pool: throwing away connection '{conn_info}' because connection is potentially in a broken state")
|
|
|
|
|
}
|
|
|
|
|
let conn_id = &self.conn_id;
|
|
|
|
|
*self.discarded = true;
|
|
|
|
|
info!(%conn_id, "pool: throwing away connection because connection is potentially in a broken state")
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@@ -685,73 +751,68 @@ impl<C: ClientInnerExt> Deref for Client<C> {
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
impl<C: ClientInnerExt> Client<C> {
|
|
|
|
|
fn do_drop(&mut self) -> Option<impl FnOnce()> {
|
|
|
|
|
let conn_info = self.conn_info.clone();
|
|
|
|
|
impl<C: ClientInnerExt> Drop for Client<C> {
|
|
|
|
|
fn drop(&mut self) {
|
|
|
|
|
let client = self
|
|
|
|
|
.inner
|
|
|
|
|
.take()
|
|
|
|
|
.expect("client inner should not be removed");
|
|
|
|
|
if let Some(conn_pool) = std::mem::take(&mut self.pool).upgrade() {
|
|
|
|
|
let current_span = self.span.clone();
|
|
|
|
|
// return connection to the pool
|
|
|
|
|
return Some(move || {
|
|
|
|
|
let _span = current_span.enter();
|
|
|
|
|
EndpointConnPool::put(&conn_pool, &conn_info, client);
|
|
|
|
|
});
|
|
|
|
|
}
|
|
|
|
|
None
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
impl<C: ClientInnerExt> Drop for Client<C> {
|
|
|
|
|
fn drop(&mut self) {
|
|
|
|
|
if let Some(drop) = self.do_drop() {
|
|
|
|
|
tokio::task::spawn_blocking(drop);
|
|
|
|
|
if self.discarded {
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
let conn_id = client.conn_id;
|
|
|
|
|
|
|
|
|
|
let tx = client.pool.clone();
|
|
|
|
|
match tx.try_send(client) {
|
|
|
|
|
Ok(_) => {}
|
|
|
|
|
Err(TrySendError::Closed(_)) => {
|
|
|
|
|
info!(%conn_id, "pool: throwing away connection because connection is closed");
|
|
|
|
|
}
|
|
|
|
|
Err(TrySendError::Full(_)) => {
|
|
|
|
|
error!("client channel should not be full")
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[cfg(test)]
|
|
|
|
|
mod tests {
|
|
|
|
|
use std::{mem, sync::atomic::AtomicBool};
|
|
|
|
|
use tokio::task::yield_now;
|
|
|
|
|
use tokio_util::sync::CancellationToken;
|
|
|
|
|
|
|
|
|
|
use crate::{BranchId, EndpointId, ProjectId};
|
|
|
|
|
|
|
|
|
|
use super::*;
|
|
|
|
|
|
|
|
|
|
struct MockClient(Arc<AtomicBool>);
|
|
|
|
|
impl MockClient {
|
|
|
|
|
fn new(is_closed: bool) -> Self {
|
|
|
|
|
MockClient(Arc::new(is_closed.into()))
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
struct MockClient;
|
|
|
|
|
impl ClientInnerExt for MockClient {
|
|
|
|
|
fn is_closed(&self) -> bool {
|
|
|
|
|
self.0.load(atomic::Ordering::Relaxed)
|
|
|
|
|
}
|
|
|
|
|
fn get_process_id(&self) -> i32 {
|
|
|
|
|
0
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
fn create_inner() -> ClientInner<MockClient> {
|
|
|
|
|
create_inner_with(MockClient::new(false))
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
fn create_inner_with(client: MockClient) -> ClientInner<MockClient> {
|
|
|
|
|
ClientInner {
|
|
|
|
|
inner: client,
|
|
|
|
|
session: tokio::sync::watch::Sender::new(uuid::Uuid::new_v4()),
|
|
|
|
|
cancel: CancellationToken::new(),
|
|
|
|
|
aux: MetricsAuxInfo {
|
|
|
|
|
fn create_inner(
|
|
|
|
|
global_pool: Arc<GlobalConnPool<MockClient>>,
|
|
|
|
|
conn_info: &ConnInfo,
|
|
|
|
|
) -> (Client<MockClient>, CancellationToken) {
|
|
|
|
|
let cancelled = CancellationToken::new();
|
|
|
|
|
let client = poll_client(
|
|
|
|
|
global_pool,
|
|
|
|
|
&mut RequestMonitoring::test(),
|
|
|
|
|
conn_info,
|
|
|
|
|
MockClient,
|
|
|
|
|
cancelled.clone().cancelled_owned(),
|
|
|
|
|
uuid::Uuid::new_v4(),
|
|
|
|
|
MetricsAuxInfo {
|
|
|
|
|
endpoint_id: (&EndpointId::from("endpoint")).into(),
|
|
|
|
|
project_id: (&ProjectId::from("project")).into(),
|
|
|
|
|
branch_id: (&BranchId::from("branch")).into(),
|
|
|
|
|
cold_start_info: crate::console::messages::ColdStartInfo::Warm,
|
|
|
|
|
},
|
|
|
|
|
conn_id: uuid::Uuid::new_v4(),
|
|
|
|
|
}
|
|
|
|
|
);
|
|
|
|
|
(client, cancelled)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[tokio::test]
|
|
|
|
|
@@ -778,51 +839,41 @@ mod tests {
|
|
|
|
|
dbname: "dbname".into(),
|
|
|
|
|
password: "password".as_bytes().into(),
|
|
|
|
|
};
|
|
|
|
|
let ep_pool = Arc::downgrade(
|
|
|
|
|
&pool.get_or_create_endpoint_pool(&conn_info.endpoint_cache_key().unwrap()),
|
|
|
|
|
);
|
|
|
|
|
{
|
|
|
|
|
let mut client = Client::new(create_inner(), conn_info.clone(), ep_pool.clone());
|
|
|
|
|
let (mut client, _) = create_inner(pool.clone(), &conn_info);
|
|
|
|
|
assert_eq!(0, pool.get_global_connections_count());
|
|
|
|
|
client.inner().1.discard();
|
|
|
|
|
drop(client);
|
|
|
|
|
yield_now().await;
|
|
|
|
|
// Discard should not add the connection from the pool.
|
|
|
|
|
assert_eq!(0, pool.get_global_connections_count());
|
|
|
|
|
}
|
|
|
|
|
{
|
|
|
|
|
let mut client = Client::new(create_inner(), conn_info.clone(), ep_pool.clone());
|
|
|
|
|
client.do_drop().unwrap()();
|
|
|
|
|
mem::forget(client); // drop the client
|
|
|
|
|
let (client, _) = create_inner(pool.clone(), &conn_info);
|
|
|
|
|
drop(client);
|
|
|
|
|
yield_now().await;
|
|
|
|
|
assert_eq!(1, pool.get_global_connections_count());
|
|
|
|
|
}
|
|
|
|
|
{
|
|
|
|
|
let mut closed_client = Client::new(
|
|
|
|
|
create_inner_with(MockClient::new(true)),
|
|
|
|
|
conn_info.clone(),
|
|
|
|
|
ep_pool.clone(),
|
|
|
|
|
);
|
|
|
|
|
closed_client.do_drop().unwrap()();
|
|
|
|
|
mem::forget(closed_client); // drop the client
|
|
|
|
|
// The closed client shouldn't be added to the pool.
|
|
|
|
|
let (client, cancel) = create_inner(pool.clone(), &conn_info);
|
|
|
|
|
cancel.cancel();
|
|
|
|
|
drop(client);
|
|
|
|
|
yield_now().await;
|
|
|
|
|
// The closed client shouldn't be added to the pool.
|
|
|
|
|
assert_eq!(1, pool.get_global_connections_count());
|
|
|
|
|
}
|
|
|
|
|
let is_closed: Arc<AtomicBool> = Arc::new(false.into());
|
|
|
|
|
{
|
|
|
|
|
let mut client = Client::new(
|
|
|
|
|
create_inner_with(MockClient(is_closed.clone())),
|
|
|
|
|
conn_info.clone(),
|
|
|
|
|
ep_pool.clone(),
|
|
|
|
|
);
|
|
|
|
|
client.do_drop().unwrap()();
|
|
|
|
|
mem::forget(client); // drop the client
|
|
|
|
|
|
|
|
|
|
let cancel = {
|
|
|
|
|
let (client, cancel) = create_inner(pool.clone(), &conn_info);
|
|
|
|
|
drop(client);
|
|
|
|
|
yield_now().await;
|
|
|
|
|
// The client should be added to the pool.
|
|
|
|
|
assert_eq!(2, pool.get_global_connections_count());
|
|
|
|
|
}
|
|
|
|
|
cancel
|
|
|
|
|
};
|
|
|
|
|
{
|
|
|
|
|
let mut client = Client::new(create_inner(), conn_info, ep_pool);
|
|
|
|
|
client.do_drop().unwrap()();
|
|
|
|
|
mem::forget(client); // drop the client
|
|
|
|
|
|
|
|
|
|
let client = create_inner(pool.clone(), &conn_info);
|
|
|
|
|
drop(client);
|
|
|
|
|
yield_now().await;
|
|
|
|
|
// The client shouldn't be added to the pool. Because the ep-pool is full.
|
|
|
|
|
assert_eq!(2, pool.get_global_connections_count());
|
|
|
|
|
}
|
|
|
|
|
@@ -836,25 +887,22 @@ mod tests {
|
|
|
|
|
dbname: "dbname".into(),
|
|
|
|
|
password: "password".as_bytes().into(),
|
|
|
|
|
};
|
|
|
|
|
let ep_pool = Arc::downgrade(
|
|
|
|
|
&pool.get_or_create_endpoint_pool(&conn_info.endpoint_cache_key().unwrap()),
|
|
|
|
|
);
|
|
|
|
|
{
|
|
|
|
|
let mut client = Client::new(create_inner(), conn_info.clone(), ep_pool.clone());
|
|
|
|
|
client.do_drop().unwrap()();
|
|
|
|
|
mem::forget(client); // drop the client
|
|
|
|
|
let client = create_inner(pool.clone(), &conn_info);
|
|
|
|
|
drop(client);
|
|
|
|
|
yield_now().await;
|
|
|
|
|
assert_eq!(3, pool.get_global_connections_count());
|
|
|
|
|
}
|
|
|
|
|
{
|
|
|
|
|
let mut client = Client::new(create_inner(), conn_info.clone(), ep_pool.clone());
|
|
|
|
|
client.do_drop().unwrap()();
|
|
|
|
|
mem::forget(client); // drop the client
|
|
|
|
|
|
|
|
|
|
let client = create_inner(pool.clone(), &conn_info);
|
|
|
|
|
drop(client);
|
|
|
|
|
yield_now().await;
|
|
|
|
|
// The client shouldn't be added to the pool. Because the global pool is full.
|
|
|
|
|
assert_eq!(3, pool.get_global_connections_count());
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
is_closed.store(true, atomic::Ordering::Relaxed);
|
|
|
|
|
cancel.cancel();
|
|
|
|
|
yield_now().await;
|
|
|
|
|
// Do gc for all shards.
|
|
|
|
|
pool.gc(0);
|
|
|
|
|
pool.gc(1);
|
|
|
|
|
|