mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-25 09:00:37 +00:00
over-optimise copy_buffer
This commit is contained in:
@@ -164,53 +164,55 @@ impl CopyBuffer {
|
||||
where
|
||||
R: AsyncRead + ?Sized,
|
||||
{
|
||||
let me = &mut *self;
|
||||
let mut buf = ReadBuf::new(&mut me.buf);
|
||||
buf.set_filled(me.cap);
|
||||
let mut buf = ReadBuf::new(&mut self.buf);
|
||||
buf.set_filled(self.cap);
|
||||
|
||||
let res = reader.poll_read(cx, &mut buf);
|
||||
f(me.dir, &buf.filled()[me.cap..]);
|
||||
match reader.poll_read(cx, &mut buf) {
|
||||
Poll::Ready(Ok(())) => {
|
||||
f(self.dir, &buf.filled()[self.cap..]);
|
||||
let filled_len = buf.filled().len();
|
||||
self.read_done = self.cap == filled_len;
|
||||
self.cap = filled_len;
|
||||
|
||||
if let Poll::Ready(Ok(())) = res {
|
||||
let filled_len = buf.filled().len();
|
||||
me.read_done = me.cap == filled_len;
|
||||
me.cap = filled_len;
|
||||
Poll::Ready(Ok(()))
|
||||
}
|
||||
Poll::Ready(Err(e)) => Poll::Ready(Err(ErrorSource::read(self.dir, cold(e)))),
|
||||
Poll::Pending => Poll::Pending,
|
||||
}
|
||||
res.map_err(|e| ErrorSource::read(me.dir, e))
|
||||
}
|
||||
|
||||
fn poll_write_buf<R, W>(
|
||||
&mut self,
|
||||
cx: &mut Context<'_>,
|
||||
f: &mut impl for<'a> FnMut(Direction, &'a [u8]),
|
||||
mut reader: Pin<&mut R>,
|
||||
mut writer: Pin<&mut W>,
|
||||
reader: Pin<&mut R>,
|
||||
writer: Pin<&mut W>,
|
||||
) -> Poll<Result<(), ErrorSource>>
|
||||
where
|
||||
R: AsyncRead + ?Sized,
|
||||
W: AsyncWrite + ?Sized,
|
||||
{
|
||||
let me = &mut *self;
|
||||
match writer.as_mut().poll_write(cx, &me.buf[me.pos..me.cap]) {
|
||||
Poll::Pending => {
|
||||
// Top up the buffer towards full if we can read a bit more
|
||||
// data - this should improve the chances of a large write
|
||||
if !me.read_done && me.cap < me.buf.len() {
|
||||
ready!(me.poll_fill_buf(cx, f, reader.as_mut()))?;
|
||||
}
|
||||
Poll::Pending
|
||||
}
|
||||
match writer.poll_write(cx, &me.buf[me.pos..me.cap]) {
|
||||
Poll::Pending if !me.read_done || me.cap < me.buf.len() => {}
|
||||
Poll::Pending => return Poll::Pending,
|
||||
Poll::Ready(Ok(0)) => {
|
||||
let err = io::Error::new(io::ErrorKind::WriteZero, "write zero byte into writer");
|
||||
Poll::Ready(Err(ErrorSource::write(self.dir, err)))
|
||||
return Poll::Ready(Err(ErrorSource::write(self.dir, cold(err))));
|
||||
}
|
||||
Poll::Ready(Ok(i)) => {
|
||||
self.pos += i;
|
||||
self.need_flush = true;
|
||||
Poll::Ready(Ok(()))
|
||||
return Poll::Ready(Ok(()));
|
||||
}
|
||||
Poll::Ready(Err(err)) => {
|
||||
return Poll::Ready(Err(ErrorSource::write(me.dir, cold(err))));
|
||||
}
|
||||
Poll::Ready(Err(e)) => Poll::Ready(Err(ErrorSource::write(me.dir, e))),
|
||||
}
|
||||
|
||||
// Top up the buffer towards full if we can read a bit more
|
||||
// data - this should improve the chances of a large write
|
||||
me.poll_fill_buf(cx, f, reader)
|
||||
}
|
||||
|
||||
pub(super) fn poll_copy<R, W>(
|
||||
@@ -230,7 +232,7 @@ impl CopyBuffer {
|
||||
if self.cap < self.buf.len() && !self.read_done {
|
||||
match self.poll_fill_buf(cx, f, reader.as_mut()) {
|
||||
Poll::Ready(Ok(())) => (),
|
||||
Poll::Ready(Err(err)) => return Poll::Ready(Err(err)),
|
||||
Poll::Ready(Err(err)) => return Poll::Ready(Err(cold(err))),
|
||||
Poll::Pending => {
|
||||
// Ignore pending reads when our buffer is not empty, because
|
||||
// we can try to write data immediately.
|
||||
@@ -239,7 +241,7 @@ impl CopyBuffer {
|
||||
// when the reader depends on buffered writer.
|
||||
if self.need_flush {
|
||||
ready!(writer.as_mut().poll_flush(cx))
|
||||
.map_err(|e| ErrorSource::write(self.dir, e))?;
|
||||
.map_err(|e| ErrorSource::write(self.dir, cold(e)))?;
|
||||
self.need_flush = false;
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user