proxy: remove connections on shutdown (#5051)

## Problem

On shutdown, proxy connections are staying open.

## Summary of changes

Remove the connections on shutdown
This commit is contained in:
Conrad Ludgate
2023-08-21 19:20:58 +01:00
committed by GitHub
parent 4a8bd866f6
commit 0b001a0001
3 changed files with 54 additions and 12 deletions

View File

@@ -64,13 +64,13 @@ pub struct EndpointConnPool {
total_conns: usize,
}
/// This is cheap and not hugely secure.
/// But probably good enough for in memory only hashes.
/// 4096 is the number of rounds that SCRAM-SHA-256 recommends.
/// It's not the 600,000 that OWASP recommends... but our passwords are high entropy anyway.
///
/// Still takes 3.5ms to hash on my hardware.
/// Still takes 1.4ms to hash on my hardware.
/// We don't want to ruin the latency improvements of using the pool by making password verification take too long
const PARAMS: Params = Params {
rounds: 10_000,
rounds: 4096,
output_length: 32,
};
@@ -99,6 +99,10 @@ pub struct GlobalConnPool {
max_conns_per_endpoint: usize,
proxy_config: &'static crate::config::ProxyConfig,
// Using a lock to remove any race conditions.
// Eg cleaning up connections while a new connection is returned
closed: RwLock<bool>,
}
impl GlobalConnPool {
@@ -108,9 +112,24 @@ impl GlobalConnPool {
global_pool_size: AtomicUsize::new(0),
max_conns_per_endpoint: MAX_CONNS_PER_ENDPOINT,
proxy_config: config,
closed: RwLock::new(false),
})
}
pub fn shutdown(&self) {
*self.closed.write() = true;
self.global_pool.retain(|_, endpoint_pool| {
let mut pool = endpoint_pool.write();
// by clearing this hashmap, we remove the slots that a connection can be returned to.
// when returning, it drops the connection if the slot doesn't exist
pool.pools.clear();
pool.total_conns = 0;
false
});
}
pub async fn get(
&self,
conn_info: &ConnInfo,
@@ -208,7 +227,20 @@ impl GlobalConnPool {
new_client
}
pub async fn put(&self, conn_info: &ConnInfo, client: Client) -> anyhow::Result<()> {
pub fn put(&self, conn_info: &ConnInfo, client: Client) -> anyhow::Result<()> {
// We want to hold this open while we return. This ensures that the pool can't close
// while we are in the middle of returning the connection.
let closed = self.closed.read();
if *closed {
info!("pool: throwing away connection '{conn_info}' because pool is closed");
return Ok(());
}
if client.inner.is_closed() {
info!("pool: throwing away connection '{conn_info}' because connection is closed");
return Ok(());
}
let pool = self.get_or_create_endpoint_pool(&conn_info.hostname);
// return connection to the pool

View File

@@ -16,7 +16,6 @@ use tokio_postgres::types::Type;
use tokio_postgres::GenericClient;
use tokio_postgres::IsolationLevel;
use tokio_postgres::Row;
use tracing::Instrument;
use url::Url;
use super::conn_pool::ConnInfo;
@@ -286,13 +285,12 @@ pub async fn handle(
};
if allow_pool {
let current_span = tracing::Span::current();
// return connection to the pool
tokio::task::spawn(
async move {
let _ = conn_pool.put(&conn_info, client).await;
}
.in_current_span(),
);
tokio::task::spawn_blocking(move || {
let _span = current_span.enter();
let _ = conn_pool.put(&conn_info, client);
});
}
result

View File

@@ -269,6 +269,18 @@ pub async fn task_main(
let conn_pool: Arc<GlobalConnPool> = GlobalConnPool::new(config);
// shutdown the connection pool
tokio::spawn({
let cancellation_token = cancellation_token.clone();
let conn_pool = conn_pool.clone();
async move {
cancellation_token.cancelled().await;
tokio::task::spawn_blocking(move || conn_pool.shutdown())
.await
.unwrap();
}
});
let tls_config = config.tls_config.as_ref().map(|cfg| cfg.to_server_config());
let tls_acceptor: tokio_rustls::TlsAcceptor = match tls_config {
Some(config) => config.into(),