From 78f37e6f11e3918a7b5d7c2beb72c5b52713cb6e Mon Sep 17 00:00:00 2001 From: Conrad Ludgate Date: Mon, 19 May 2025 23:26:17 +0100 Subject: [PATCH] over-optimise copy_buffer --- proxy/src/proxy/copy_bidirectional.rs | 54 ++++++++++++++------------- 1 file changed, 28 insertions(+), 26 deletions(-) diff --git a/proxy/src/proxy/copy_bidirectional.rs b/proxy/src/proxy/copy_bidirectional.rs index e040e37b9d..f1fbf27e4f 100644 --- a/proxy/src/proxy/copy_bidirectional.rs +++ b/proxy/src/proxy/copy_bidirectional.rs @@ -164,53 +164,55 @@ impl CopyBuffer { where R: AsyncRead + ?Sized, { - let me = &mut *self; - let mut buf = ReadBuf::new(&mut me.buf); - buf.set_filled(me.cap); + let mut buf = ReadBuf::new(&mut self.buf); + buf.set_filled(self.cap); - let res = reader.poll_read(cx, &mut buf); - f(me.dir, &buf.filled()[me.cap..]); + match reader.poll_read(cx, &mut buf) { + Poll::Ready(Ok(())) => { + f(self.dir, &buf.filled()[self.cap..]); + let filled_len = buf.filled().len(); + self.read_done = self.cap == filled_len; + self.cap = filled_len; - if let Poll::Ready(Ok(())) = res { - let filled_len = buf.filled().len(); - me.read_done = me.cap == filled_len; - me.cap = filled_len; + Poll::Ready(Ok(())) + } + Poll::Ready(Err(e)) => Poll::Ready(Err(ErrorSource::read(self.dir, cold(e)))), + Poll::Pending => Poll::Pending, } - res.map_err(|e| ErrorSource::read(me.dir, e)) } fn poll_write_buf( &mut self, cx: &mut Context<'_>, f: &mut impl for<'a> FnMut(Direction, &'a [u8]), - mut reader: Pin<&mut R>, - mut writer: Pin<&mut W>, + reader: Pin<&mut R>, + writer: Pin<&mut W>, ) -> Poll> where R: AsyncRead + ?Sized, W: AsyncWrite + ?Sized, { let me = &mut *self; - match writer.as_mut().poll_write(cx, &me.buf[me.pos..me.cap]) { - Poll::Pending => { - // Top up the buffer towards full if we can read a bit more - // data - this should improve the chances of a large write - if !me.read_done && me.cap < me.buf.len() { - ready!(me.poll_fill_buf(cx, f, reader.as_mut()))?; - } - Poll::Pending - } + match writer.poll_write(cx, &me.buf[me.pos..me.cap]) { + Poll::Pending if !me.read_done || me.cap < me.buf.len() => {} + Poll::Pending => return Poll::Pending, Poll::Ready(Ok(0)) => { let err = io::Error::new(io::ErrorKind::WriteZero, "write zero byte into writer"); - Poll::Ready(Err(ErrorSource::write(self.dir, err))) + return Poll::Ready(Err(ErrorSource::write(self.dir, cold(err)))); } Poll::Ready(Ok(i)) => { self.pos += i; self.need_flush = true; - Poll::Ready(Ok(())) + return Poll::Ready(Ok(())); + } + Poll::Ready(Err(err)) => { + return Poll::Ready(Err(ErrorSource::write(me.dir, cold(err)))); } - Poll::Ready(Err(e)) => Poll::Ready(Err(ErrorSource::write(me.dir, e))), } + + // Top up the buffer towards full if we can read a bit more + // data - this should improve the chances of a large write + me.poll_fill_buf(cx, f, reader) } pub(super) fn poll_copy( @@ -230,7 +232,7 @@ impl CopyBuffer { if self.cap < self.buf.len() && !self.read_done { match self.poll_fill_buf(cx, f, reader.as_mut()) { Poll::Ready(Ok(())) => (), - Poll::Ready(Err(err)) => return Poll::Ready(Err(err)), + Poll::Ready(Err(err)) => return Poll::Ready(Err(cold(err))), Poll::Pending => { // Ignore pending reads when our buffer is not empty, because // we can try to write data immediately. @@ -239,7 +241,7 @@ impl CopyBuffer { // when the reader depends on buffered writer. if self.need_flush { ready!(writer.as_mut().poll_flush(cx)) - .map_err(|e| ErrorSource::write(self.dir, e))?; + .map_err(|e| ErrorSource::write(self.dir, cold(e)))?; self.need_flush = false; }