This commit is contained in:
Conrad Ludgate
2024-04-18 17:54:27 +01:00
parent 81efb554f7
commit ecec709349
2 changed files with 47 additions and 55 deletions

View File

@@ -187,7 +187,7 @@ impl ConnectMechanism for TokioMechanism {
Ok(poll_tokio_client(
self.pool.clone(),
ctx,
self.conn_info.clone(),
&self.conn_info,
client,
connection,
self.conn_id,

View File

@@ -27,7 +27,7 @@ use crate::{
auth::backend::ComputeUserInfo, context::RequestMonitoring, DbName, EndpointCacheKey, RoleName,
};
use tracing::{debug, error, warn, Span};
use tracing::{debug, error, warn};
use tracing::{info, info_span, Instrument};
use super::backend::HttpConnError;
@@ -136,10 +136,7 @@ impl<C: ClientInnerExt> EndpointConnPool<C> {
node: Pin<&mut Node<ConnTypes<C>>>,
db_user: &(DbName, RoleName),
client: ClientInner<C>,
conn_info: ConnInfo,
) -> bool {
let conn_id = client.conn_id;
{
let pool = pool.read();
if pool
@@ -147,7 +144,7 @@ impl<C: ClientInnerExt> EndpointConnPool<C> {
.load(atomic::Ordering::Relaxed)
>= pool.global_pool_size_max_conns
{
info!(%conn_id, "pool: throwing away connection '{conn_info}' because pool is full");
info!("pool: throwing away connection because pool is full");
return false;
}
}
@@ -188,9 +185,9 @@ impl<C: ClientInnerExt> EndpointConnPool<C> {
// do logging outside of the mutex
if returned {
info!(%conn_id, "pool: returning connection '{conn_info}' back to the pool, total_conns={total_conns}, for this (db, user)={per_db_size}");
info!("pool: returning connection back to the pool, total_conns={total_conns}, for this (db, user)={per_db_size}");
} else {
info!(%conn_id, "pool: throwing away connection '{conn_info}' because pool is full, total_conns={total_conns}");
info!("pool: throwing away connection because pool is full, total_conns={total_conns}");
}
returned
@@ -402,7 +399,7 @@ impl<C: ClientInnerExt> GlobalConnPool<C> {
);
ctx.set_cold_start_info(ColdStartInfo::HttpPoolHit);
ctx.latency_timer.success();
return Ok(Some(Client::new(client, conn_info.clone())));
return Ok(Some(Client::new(client)));
}
}
Ok(None)
@@ -465,7 +462,7 @@ type ConnTypes<C> = dyn pin_list::Types<
pub fn poll_tokio_client(
global_pool: Arc<GlobalConnPool<tokio_postgres::Client>>,
ctx: &mut RequestMonitoring,
conn_info: ConnInfo,
conn_info: &ConnInfo,
client: tokio_postgres::Client,
mut connection: tokio_postgres::Connection<Socket, NoTlsStream>,
conn_id: uuid::Uuid,
@@ -514,7 +511,7 @@ pub fn poll_tokio_client(
pub fn poll_client<C: ClientInnerExt, I: Future<Output = ()> + Send + 'static>(
global_pool: Arc<GlobalConnPool<C>>,
ctx: &mut RequestMonitoring,
conn_info: ConnInfo,
conn_info: &ConnInfo,
client: C,
connection: I,
conn_id: uuid::Uuid,
@@ -561,7 +558,7 @@ pub fn poll_client<C: ClientInnerExt, I: Future<Output = ()> + Send + 'static>(
aux,
conn_id,
};
Client::new(inner, conn_info)
Client::new(inner)
}
pin_project! {
@@ -574,7 +571,7 @@ pin_project! {
// Used to add/remove conn from the conn pool
#[pin]
node: Node<ConnTypes<C>>,
recv_client: tokio::sync::mpsc::Receiver<(tracing::Span, ClientInner<C>, ConnInfo)>,
recv_client: tokio::sync::mpsc::Receiver<ClientInner<C>>,
db_user: (DbName, RoleName),
pool: Option<Arc<RwLock<EndpointConnPool<C>>>>,
@@ -605,11 +602,19 @@ impl<C: ClientInnerExt, I: Future<Output = ()>> Future for DbConnection<C, I> {
fn poll(self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
let mut this = self.project();
// node is initiated via EndpointConnPool::put.
// this is only called in the if statement below.
// this can only occur if pool is set (and pool is never removed).
// when this occurs, it guarantees that the DbUserConnPool is created (it is never removed).
// Update the session span.
// If the node is initialised, then it is either
// 1. Waiting in the idle pool
// 2. Just removed from the idle pool and this is our first wake up.
//
// In the event of 1, nothing happens. (should not have many wakeups while idle)
// In the event of 2, we remove the session_id that was left in it's place.
if let Some(init) = this.node.as_mut().initialized_mut() {
// node is initiated via EndpointConnPool::put.
// this is only called in the if statement below.
// this can only occur if pool is set (and pool is never removed).
// when this occurs, it guarantees that the DbUserConnPool is created (it is never removed).
let pool = this
.pool
.as_ref()
@@ -619,6 +624,7 @@ impl<C: ClientInnerExt, I: Future<Output = ()>> Future for DbConnection<C, I> {
.pools
.get(this.db_user)
.expect("node cannot be init without pool");
if let Ok((session_id, _)) = init.take_removed(&pool.conns) {
*this.session_span = info_span!("", %session_id);
let _span = this.session_span.enter();
@@ -629,9 +635,10 @@ impl<C: ClientInnerExt, I: Future<Output = ()>> Future for DbConnection<C, I> {
};
}
// The client has been returned. We will insert it into the linked list for this database.
if let Poll::Ready(client) = this.recv_client.poll_recv(cx) {
// if the send_client is dropped, then the client is dropped
let Some((span, client, conn_info)) = client else {
let Some(client) = client else {
let _span = this.session_span.enter();
info!("connection dropped");
return Poll::Ready(());
@@ -642,8 +649,8 @@ impl<C: ClientInnerExt, I: Future<Output = ()>> Future for DbConnection<C, I> {
return Poll::Ready(());
};
let _span = span.enter();
if !EndpointConnPool::put(pool, this.node.as_mut(), this.db_user, client, conn_info) {
let _span = this.session_span.enter();
if !EndpointConnPool::put(pool, this.node.as_mut(), this.db_user, client) {
return Poll::Ready(());
}
}
@@ -673,7 +680,7 @@ impl<C: ClientInnerExt, I: Future<Output = ()>> Future for DbConnection<C, I> {
struct ClientInner<C: ClientInnerExt> {
inner: C,
pool: tokio::sync::mpsc::Sender<(tracing::Span, ClientInner<C>, ConnInfo)>,
pool: tokio::sync::mpsc::Sender<ClientInner<C>>,
aux: MetricsAuxInfo,
conn_id: uuid::Uuid,
}
@@ -709,55 +716,41 @@ impl<C: ClientInnerExt> Client<C> {
}
pub struct Client<C: ClientInnerExt> {
span: Span,
inner: Option<ClientInner<C>>,
conn_info: ConnInfo,
discarded: bool,
}
pub struct Discard<'a> {
conn_info: &'a ConnInfo,
conn_id: uuid::Uuid,
discarded: &'a mut bool,
}
impl<C: ClientInnerExt> Client<C> {
pub(self) fn new(inner: ClientInner<C>, conn_info: ConnInfo) -> Self {
pub(self) fn new(inner: ClientInner<C>) -> Self {
Self {
inner: Some(inner),
span: Span::current(),
conn_info,
discarded: false,
}
}
pub fn inner(&mut self) -> (&mut C, Discard<'_>) {
let Self {
inner,
conn_info,
span: _,
discarded,
} = self;
let Self { inner, discarded } = self;
let inner = inner.as_mut().expect("client inner should not be removed");
(
&mut inner.inner,
Discard {
discarded,
conn_info,
},
)
let conn_id = inner.conn_id;
(&mut inner.inner, Discard { discarded, conn_id })
}
}
impl Discard<'_> {
pub fn check_idle(&mut self, status: ReadyForQueryStatus) {
let conn_info = &self.conn_info;
let conn_id = &self.conn_id;
if status != ReadyForQueryStatus::Idle && !*self.discarded {
info!("pool: throwing away connection '{conn_info}' because connection is not idle")
info!(%conn_id, "pool: throwing away connection because connection is not idle")
}
}
pub fn discard(&mut self) {
let conn_info = &self.conn_info;
let conn_id = &self.conn_id;
*self.discarded = true;
info!("pool: throwing away connection '{conn_info}' because connection is potentially in a broken state")
info!(%conn_id, "pool: throwing away connection because connection is potentially in a broken state")
}
}
@@ -779,7 +772,6 @@ impl<C: ClientInnerExt> Drop for Client<C> {
return;
}
let conn_info = self.conn_info.clone();
let client = self
.inner
.take()
@@ -788,12 +780,12 @@ impl<C: ClientInnerExt> Drop for Client<C> {
let conn_id = client.conn_id;
if client.is_closed() {
info!(%conn_id, "pool: throwing away connection '{conn_info}' because connection is closed");
info!(%conn_id, "pool: throwing away connection because connection is closed");
return;
}
let tx = client.pool.clone();
match tx.try_send((self.span.clone(), client, conn_info)) {
match tx.try_send(client) {
Ok(_) => {}
Err(TrySendError::Closed(_)) => {}
Err(TrySendError::Full(_)) => {
@@ -824,7 +816,7 @@ mod tests {
fn create_inner(
global_pool: Arc<GlobalConnPool<MockClient>>,
conn_info: ConnInfo,
conn_info: &ConnInfo,
) -> (Client<MockClient>, CancellationToken) {
let cancelled = CancellationToken::new();
let client = poll_client(
@@ -869,7 +861,7 @@ mod tests {
password: "password".as_bytes().into(),
};
{
let (mut client, _) = create_inner(pool.clone(), conn_info.clone());
let (mut client, _) = create_inner(pool.clone(), &conn_info);
assert_eq!(0, pool.get_global_connections_count());
client.inner().1.discard();
drop(client);
@@ -878,13 +870,13 @@ mod tests {
assert_eq!(0, pool.get_global_connections_count());
}
{
let (client, _) = create_inner(pool.clone(), conn_info.clone());
let (client, _) = create_inner(pool.clone(), &conn_info);
drop(client);
yield_now().await;
assert_eq!(1, pool.get_global_connections_count());
}
{
let (client, cancel) = create_inner(pool.clone(), conn_info.clone());
let (client, cancel) = create_inner(pool.clone(), &conn_info);
cancel.cancel();
drop(client);
yield_now().await;
@@ -892,7 +884,7 @@ mod tests {
assert_eq!(1, pool.get_global_connections_count());
}
let cancel = {
let (client, cancel) = create_inner(pool.clone(), conn_info.clone());
let (client, cancel) = create_inner(pool.clone(), &conn_info);
drop(client);
yield_now().await;
// The client should be added to the pool.
@@ -900,7 +892,7 @@ mod tests {
cancel
};
{
let client = create_inner(pool.clone(), conn_info.clone());
let client = create_inner(pool.clone(), &conn_info);
drop(client);
yield_now().await;
// The client shouldn't be added to the pool. Because the ep-pool is full.
@@ -917,13 +909,13 @@ mod tests {
password: "password".as_bytes().into(),
};
{
let client = create_inner(pool.clone(), conn_info.clone());
let client = create_inner(pool.clone(), &conn_info);
drop(client);
yield_now().await;
assert_eq!(3, pool.get_global_connections_count());
}
{
let client = create_inner(pool.clone(), conn_info.clone());
let client = create_inner(pool.clone(), &conn_info);
drop(client);
yield_now().await;
// The client shouldn't be added to the pool. Because the global pool is full.