mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-04 20:12:54 +00:00
[tokio-postgres] fix regression in buffer reuse (#12739)
Follow up to #12701, which introduced a new regression. When profiling locally I noticed that writes have the tendency to always reallocate. On investigation I found that even if the `Connection`'s write buffer is empty, if it still shares the same data pointer as the `Client`'s write buffer then the client cannot reclaim it. The best way I found to fix this is to just drop the `Connection`'s write buffer each time we fully flush it. Additionally, I remembered that `BytesMut` has an `unsplit` method which is allows even better sharing over the previous optimisation I had when 'encoding'.
This commit is contained in:
@@ -185,6 +185,7 @@ impl Client {
|
||||
ssl_mode: SslMode,
|
||||
process_id: i32,
|
||||
secret_key: i32,
|
||||
write_buf: BytesMut,
|
||||
) -> Client {
|
||||
Client {
|
||||
inner: InnerClient {
|
||||
@@ -195,7 +196,7 @@ impl Client {
|
||||
waiting: 0,
|
||||
received: 0,
|
||||
},
|
||||
buffer: Default::default(),
|
||||
buffer: write_buf,
|
||||
},
|
||||
cached_typeinfo: Default::default(),
|
||||
|
||||
|
||||
@@ -47,14 +47,7 @@ impl Encoder<BytesMut> for PostgresCodec {
|
||||
type Error = io::Error;
|
||||
|
||||
fn encode(&mut self, item: BytesMut, dst: &mut BytesMut) -> io::Result<()> {
|
||||
// When it comes to request/response workflows, we usually flush the entire write
|
||||
// buffer in order to wait for the response before we send a new request.
|
||||
// Therefore we can avoid the copy and just replace the buffer.
|
||||
if dst.is_empty() {
|
||||
*dst = item;
|
||||
} else {
|
||||
dst.extend_from_slice(&item);
|
||||
}
|
||||
dst.unsplit(item);
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -77,6 +77,9 @@ where
|
||||
connect_timeout,
|
||||
};
|
||||
|
||||
let mut stream = stream.into_framed();
|
||||
let write_buf = std::mem::take(stream.write_buffer_mut());
|
||||
|
||||
let (client_tx, conn_rx) = mpsc::unbounded_channel();
|
||||
let (conn_tx, client_rx) = mpsc::channel(4);
|
||||
let client = Client::new(
|
||||
@@ -86,9 +89,9 @@ where
|
||||
ssl_mode,
|
||||
process_id,
|
||||
secret_key,
|
||||
write_buf,
|
||||
);
|
||||
|
||||
let stream = stream.into_framed();
|
||||
let connection = Connection::new(stream, conn_tx, conn_rx);
|
||||
|
||||
Ok((client, connection))
|
||||
|
||||
@@ -229,8 +229,11 @@ where
|
||||
Poll::Ready(()) => {
|
||||
trace!("poll_flush: flushed");
|
||||
|
||||
// GC the write buffer if we managed to flush
|
||||
gc_bytesmut(self.stream.write_buffer_mut());
|
||||
// Since our codec prefers to share the buffer with the `Client`,
|
||||
// if we don't release our share, then the `Client` would have to re-alloc
|
||||
// the buffer when they next use it.
|
||||
debug_assert!(self.stream.write_buffer().is_empty());
|
||||
*self.stream.write_buffer_mut() = BytesMut::new();
|
||||
|
||||
Poll::Ready(Ok(()))
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user