refactor timeout

This commit is contained in:
Conrad Ludgate
2024-05-01 13:01:34 +01:00
parent 15a49636e0
commit 5c15f00d86

View File

@@ -16,7 +16,7 @@ use std::{
sync::atomic::{self, AtomicUsize},
};
use tokio::sync::mpsc::error::TrySendError;
use tokio::time::{Instant, Sleep};
use tokio::time::Sleep;
use tokio_postgres::tls::NoTlsStream;
use tokio_postgres::{AsyncMessage, ReadyForQueryStatus, Socket};
@@ -448,7 +448,6 @@ type ConnTypes<C> = dyn pin_list::Types<
Protected = ConnPoolEntry<C>,
// session ID
Removed = uuid::Uuid,
// conn ID
Unprotected = (),
>;
@@ -528,7 +527,7 @@ pub fn poll_client<C: ClientInnerExt, I: Future<Output = ()> + Send + 'static>(
let (send_client, recv_client) = tokio::sync::mpsc::channel(1);
let db_conn = DbConnection {
idle_timeout: tokio::time::sleep(idle),
idle_timeout: None,
idle,
node: Node::<ConnTypes<C>>::new(),
@@ -539,7 +538,6 @@ pub fn poll_client<C: ClientInnerExt, I: Future<Output = ()> + Send + 'static>(
session_span,
conn_gauge,
conn_id,
connection,
};
@@ -558,7 +556,7 @@ pin_project! {
struct DbConnection<C: ClientInnerExt, Inner> {
// Used to close the current conn if it's idle
#[pin]
idle_timeout: Sleep,
idle_timeout: Option<Sleep>,
idle: tokio::time::Duration,
// Used to add/remove conn from the conn pool
@@ -573,7 +571,6 @@ pin_project! {
// Static connection state
conn_gauge: NumDbConnectionsGuard<'static>,
conn_id: uuid::Uuid,
#[pin]
connection: Inner,
}
@@ -612,27 +609,48 @@ impl<C: ClientInnerExt, I: Future<Output = ()>> Future for DbConnection<C, I> {
.pool
.as_ref()
.expect("node cannot be init without pool");
let pool = pool.read();
let pool = pool
let mut pool_lock = pool.write();
let db_pool = pool_lock
.pools
.get(this.db_user)
.expect("node cannot be init without pool");
if let Ok((session_id, _)) = init.take_removed(&pool.conns) {
*this.session_span = info_span!("", %session_id);
let _span = this.session_span.enter();
info!("changed session");
this.idle_timeout
.as_mut()
.reset(Instant::now() + *this.idle);
};
match init.take_removed(&db_pool.conns) {
Ok((session_id, _)) => {
*this.session_span = info_span!("", %session_id);
let _span = this.session_span.enter();
info!("changed session");
// this connection is no longer idle
this.idle_timeout.set(None);
}
Err(init) => {
let idle = this
.idle_timeout
.as_mut()
.as_pin_mut()
.expect("timer must be set if node is init");
if idle.poll(cx).is_ready() {
info!("connection idle");
// remove client from pool - should close the connection if it's idle.
// does nothing if the client is currently checked-out and in-use
if pool_lock.remove_client(this.db_user.clone(), init) {
info!("closed connection removed");
}
}
}
}
}
let _span = this.session_span.enter();
// The client has been returned. We will insert it into the linked list for this database.
if let Poll::Ready(client) = this.recv_client.poll_recv(cx) {
// if the send_client is dropped, then the client is dropped
let Some(client) = client else {
let _span = this.session_span.enter();
info!("connection dropped");
return Poll::Ready(());
};
@@ -642,29 +660,12 @@ impl<C: ClientInnerExt, I: Future<Output = ()>> Future for DbConnection<C, I> {
return Poll::Ready(());
};
let _span = this.session_span.enter();
if !EndpointConnPool::put(pool, this.node.as_mut(), this.db_user, client) {
return Poll::Ready(());
}
}
let _span = this.session_span.enter();
// 5 minute idle connection timeout
if this.idle_timeout.as_mut().poll(cx).is_ready() {
this.idle_timeout
.as_mut()
.reset(Instant::now() + *this.idle);
info!("connection idle");
if let Some(pool) = &this.pool {
// remove client from pool - should close the connection if it's idle.
// does nothing if the client is currently checked-out and in-use
if let Some(init) = this.node.as_mut().initialized_mut() {
if pool.write().remove_client(this.db_user.clone(), init) {
info!("closed connection removed");
}
}
}
// this connection is now idle
this.idle_timeout.set(Some(tokio::time::sleep(*this.idle)));
}
this.connection.poll(cx)