proxy: fix overloaded db connection closure (#7364)

## Problem

possible for the database connections to not close in time.

## Summary of changes

force the closing of connections if the client has hung up
This commit is contained in:
Conrad Ludgate
2024-04-11 21:55:05 +01:00
committed by GitHub
parent 40f15c3123
commit e92fb94149

View File

@@ -15,6 +15,7 @@ use std::{
use tokio::time::Instant;
use tokio_postgres::tls::NoTlsStream;
use tokio_postgres::{AsyncMessage, ReadyForQueryStatus, Socket};
use tokio_util::sync::CancellationToken;
use crate::console::messages::{ColdStartInfo, MetricsAuxInfo};
use crate::metrics::{HttpEndpointPoolsGuard, Metrics};
@@ -488,15 +489,32 @@ pub fn poll_client<C: ClientInnerExt>(
let db_user = conn_info.db_and_user();
let idle = global_pool.get_idle_timeout();
let cancel = CancellationToken::new();
let cancelled = cancel.clone().cancelled_owned();
tokio::spawn(
async move {
let _conn_gauge = conn_gauge;
let mut idle_timeout = pin!(tokio::time::sleep(idle));
let mut cancelled = pin!(cancelled);
poll_fn(move |cx| {
if matches!(rx.has_changed(), Ok(true)) {
session_id = *rx.borrow_and_update();
info!(%session_id, "changed session");
idle_timeout.as_mut().reset(Instant::now() + idle);
if cancelled.as_mut().poll(cx).is_ready() {
info!("connection dropped");
return Poll::Ready(())
}
match rx.has_changed() {
Ok(true) => {
session_id = *rx.borrow_and_update();
info!(%session_id, "changed session");
idle_timeout.as_mut().reset(Instant::now() + idle);
}
Err(_) => {
info!("connection dropped");
return Poll::Ready(())
}
_ => {}
}
// 5 minute idle connection timeout
@@ -551,6 +569,7 @@ pub fn poll_client<C: ClientInnerExt>(
let inner = ClientInner {
inner: client,
session: tx,
cancel,
aux,
conn_id,
};
@@ -560,10 +579,18 @@ pub fn poll_client<C: ClientInnerExt>(
struct ClientInner<C: ClientInnerExt> {
inner: C,
session: tokio::sync::watch::Sender<uuid::Uuid>,
cancel: CancellationToken,
aux: MetricsAuxInfo,
conn_id: uuid::Uuid,
}
impl<C: ClientInnerExt> Drop for ClientInner<C> {
fn drop(&mut self) {
// on client drop, tell the conn to shut down
self.cancel.cancel();
}
}
pub trait ClientInnerExt: Sync + Send + 'static {
fn is_closed(&self) -> bool;
fn get_process_id(&self) -> i32;
@@ -716,6 +743,7 @@ mod tests {
ClientInner {
inner: client,
session: tokio::sync::watch::Sender::new(uuid::Uuid::new_v4()),
cancel: CancellationToken::new(),
aux: MetricsAuxInfo {
endpoint_id: (&EndpointId::from("endpoint")).into(),
project_id: (&ProjectId::from("project")).into(),