From d9cedb4a950db777deb61f945a1d270b5e0792ec Mon Sep 17 00:00:00 2001 From: Conrad Ludgate Date: Fri, 25 Jul 2025 10:03:21 +0100 Subject: [PATCH] [tokio-postgres] fix regression in buffer reuse (#12739) Follow up to #12701, which introduced a new regression. When profiling locally I noticed that writes have the tendency to always reallocate. On investigation I found that even if the `Connection`'s write buffer is empty, if it still shares the same data pointer as the `Client`'s write buffer then the client cannot reclaim it. The best way I found to fix this is to just drop the `Connection`'s write buffer each time we fully flush it. Additionally, I remembered that `BytesMut` has an `unsplit` method which is allows even better sharing over the previous optimisation I had when 'encoding'. --- libs/proxy/tokio-postgres2/src/client.rs | 3 ++- libs/proxy/tokio-postgres2/src/codec.rs | 9 +-------- libs/proxy/tokio-postgres2/src/connect.rs | 5 ++++- libs/proxy/tokio-postgres2/src/connection.rs | 7 +++++-- 4 files changed, 12 insertions(+), 12 deletions(-) diff --git a/libs/proxy/tokio-postgres2/src/client.rs b/libs/proxy/tokio-postgres2/src/client.rs index f5aed010ef..90ff39aff1 100644 --- a/libs/proxy/tokio-postgres2/src/client.rs +++ b/libs/proxy/tokio-postgres2/src/client.rs @@ -185,6 +185,7 @@ impl Client { ssl_mode: SslMode, process_id: i32, secret_key: i32, + write_buf: BytesMut, ) -> Client { Client { inner: InnerClient { @@ -195,7 +196,7 @@ impl Client { waiting: 0, received: 0, }, - buffer: Default::default(), + buffer: write_buf, }, cached_typeinfo: Default::default(), diff --git a/libs/proxy/tokio-postgres2/src/codec.rs b/libs/proxy/tokio-postgres2/src/codec.rs index 71fe062fca..35f616d229 100644 --- a/libs/proxy/tokio-postgres2/src/codec.rs +++ b/libs/proxy/tokio-postgres2/src/codec.rs @@ -47,14 +47,7 @@ impl Encoder for PostgresCodec { type Error = io::Error; fn encode(&mut self, item: BytesMut, dst: &mut BytesMut) -> io::Result<()> { - // When it comes to request/response workflows, we usually flush the entire write - // buffer in order to wait for the response before we send a new request. - // Therefore we can avoid the copy and just replace the buffer. - if dst.is_empty() { - *dst = item; - } else { - dst.extend_from_slice(&item); - } + dst.unsplit(item); Ok(()) } } diff --git a/libs/proxy/tokio-postgres2/src/connect.rs b/libs/proxy/tokio-postgres2/src/connect.rs index ca6f69f049..b1df87811e 100644 --- a/libs/proxy/tokio-postgres2/src/connect.rs +++ b/libs/proxy/tokio-postgres2/src/connect.rs @@ -77,6 +77,9 @@ where connect_timeout, }; + let mut stream = stream.into_framed(); + let write_buf = std::mem::take(stream.write_buffer_mut()); + let (client_tx, conn_rx) = mpsc::unbounded_channel(); let (conn_tx, client_rx) = mpsc::channel(4); let client = Client::new( @@ -86,9 +89,9 @@ where ssl_mode, process_id, secret_key, + write_buf, ); - let stream = stream.into_framed(); let connection = Connection::new(stream, conn_tx, conn_rx); Ok((client, connection)) diff --git a/libs/proxy/tokio-postgres2/src/connection.rs b/libs/proxy/tokio-postgres2/src/connection.rs index bee4b3372d..303de71cfa 100644 --- a/libs/proxy/tokio-postgres2/src/connection.rs +++ b/libs/proxy/tokio-postgres2/src/connection.rs @@ -229,8 +229,11 @@ where Poll::Ready(()) => { trace!("poll_flush: flushed"); - // GC the write buffer if we managed to flush - gc_bytesmut(self.stream.write_buffer_mut()); + // Since our codec prefers to share the buffer with the `Client`, + // if we don't release our share, then the `Client` would have to re-alloc + // the buffer when they next use it. + debug_assert!(self.stream.write_buffer().is_empty()); + *self.stream.write_buffer_mut() = BytesMut::new(); Poll::Ready(Ok(())) }