From 38df46b3814a0265908f2b2582af985009acf519 Mon Sep 17 00:00:00 2001 From: Conrad Ludgate Date: Tue, 22 Jul 2025 14:20:39 +0100 Subject: [PATCH] fix session state by resetting it --- libs/proxy/tokio-postgres2/src/client.rs | 27 ++++++++++++++++++++++++ proxy/src/serverless/conn_pool.rs | 3 +++ proxy/src/serverless/conn_pool_lib.rs | 18 ++++++++++++---- proxy/src/serverless/http_conn_pool.rs | 4 ++++ 4 files changed, 48 insertions(+), 4 deletions(-) diff --git a/libs/proxy/tokio-postgres2/src/client.rs b/libs/proxy/tokio-postgres2/src/client.rs index 068566e955..1b137c382e 100644 --- a/libs/proxy/tokio-postgres2/src/client.rs +++ b/libs/proxy/tokio-postgres2/src/client.rs @@ -296,6 +296,33 @@ impl Client { self.batch_execute("discard all").await } + /// Similar to `discard_all`, but it does not clear any query plans + /// + /// This runs in the background, so it can be executed without `await`ing. + pub fn reset_session_background(&mut self) -> Result<(), Error> { + // "CLOSE ALL": closes any cursors + // "SET SESSION AUTHORIZATION DEFAULT": resets the current_user back to the session_user + // "RESET ALL": resets any GUCs back to their session defaults. + // "DEALLOCATE ALL": deallocates any prepared statements + // "UNLISTEN *": stops listening on all channels + // "SELECT pg_advisory_unlock_all();": unlocks all advisory locks + // "DISCARD TEMP;": drops all temporary tables + // "DISCARD SEQUENCES;": deallocates all cached sequence state + + let _responses = self.inner_mut().send_simple_query( + "CLOSE ALL; + SET SESSION AUTHORIZATION DEFAULT; + RESET ALL; + DEALLOCATE ALL; + UNLISTEN *; + SELECT pg_advisory_unlock_all(); + DISCARD TEMP; + DISCARD SEQUENCES;", + )?; + + Ok(()) + } + /// Begins a new database transaction. /// /// The transaction will roll back by default - use the `commit` method to commit it. diff --git a/proxy/src/serverless/conn_pool.rs b/proxy/src/serverless/conn_pool.rs index 015c46f787..17305e30f1 100644 --- a/proxy/src/serverless/conn_pool.rs +++ b/proxy/src/serverless/conn_pool.rs @@ -190,6 +190,9 @@ mod tests { fn get_process_id(&self) -> i32 { 0 } + fn reset(&mut self) -> Result<(), postgres_client::Error> { + Ok(()) + } } fn create_inner() -> ClientInnerCommon { diff --git a/proxy/src/serverless/conn_pool_lib.rs b/proxy/src/serverless/conn_pool_lib.rs index ed5cc0ea03..68634c0b57 100644 --- a/proxy/src/serverless/conn_pool_lib.rs +++ b/proxy/src/serverless/conn_pool_lib.rs @@ -10,7 +10,7 @@ use parking_lot::RwLock; use postgres_client::ReadyForQueryStatus; use rand::Rng; use smol_str::ToSmolStr; -use tracing::{Span, debug, info}; +use tracing::{Span, debug, info, warn}; use super::backend::HttpConnError; use super::conn_pool::ClientDataRemote; @@ -188,7 +188,7 @@ impl EndpointConnPool { self.pools.get_mut(&db_user) } - pub(crate) fn put(pool: &RwLock, conn_info: &ConnInfo, client: ClientInnerCommon) { + pub(crate) fn put(pool: &RwLock, conn_info: &ConnInfo, mut client: ClientInnerCommon) { let conn_id = client.get_conn_id(); let (max_conn, conn_count, pool_name) = { let pool = pool.read(); @@ -201,12 +201,17 @@ impl EndpointConnPool { }; if client.inner.is_closed() { - info!(%conn_id, "{}: throwing away connection '{conn_info}' because connection is closed", pool_name); + info!(%conn_id, "{pool_name}: throwing away connection '{conn_info}' because connection is closed"); + return; + } + + if let Err(error) = client.inner.reset() { + warn!(?error, %conn_id, "{pool_name}: throwing away connection '{conn_info}' because connection could not be reset"); return; } if conn_count >= max_conn { - info!(%conn_id, "{}: throwing away connection '{conn_info}' because pool is full", pool_name); + info!(%conn_id, "{pool_name}: throwing away connection '{conn_info}' because pool is full"); return; } @@ -691,6 +696,7 @@ impl Deref for Client { pub(crate) trait ClientInnerExt: Sync + Send + 'static { fn is_closed(&self) -> bool; fn get_process_id(&self) -> i32; + fn reset(&mut self) -> Result<(), postgres_client::Error>; } impl ClientInnerExt for postgres_client::Client { @@ -701,6 +707,10 @@ impl ClientInnerExt for postgres_client::Client { fn get_process_id(&self) -> i32 { self.get_process_id() } + + fn reset(&mut self) -> Result<(), postgres_client::Error> { + self.reset_session_background() + } } impl Discard<'_, C> { diff --git a/proxy/src/serverless/http_conn_pool.rs b/proxy/src/serverless/http_conn_pool.rs index 7acd816026..8a4ab18ce5 100644 --- a/proxy/src/serverless/http_conn_pool.rs +++ b/proxy/src/serverless/http_conn_pool.rs @@ -294,4 +294,8 @@ impl ClientInnerExt for Send { // ideally throw something meaningful -1 } + + fn reset(&mut self) -> Result<(), postgres_client::Error> { + Ok(()) + } }