mirror of
https://github.com/neondatabase/neon.git
synced 2026-06-04 22:10:39 +00:00
feat(proxy): optimizing the chances of large write in copy_bidirectional (#10608)
We forked copy_bidirectional to solve some issues like fast-shutdown (disallowing half-open connections) and to introduce better error tracking (which side of the conn closed down). A change recently made its way upstream offering performance improvements: https://github.com/tokio-rs/tokio/pull/6532. These seem applicable to our fork, thus it makes sense to apply them here as well.
This commit is contained in:
@@ -201,25 +201,26 @@ impl CopyBuffer {
|
||||
W: AsyncWrite + ?Sized,
|
||||
{
|
||||
loop {
|
||||
// If our buffer is empty, then we need to read some data to
|
||||
// continue.
|
||||
if self.pos == self.cap && !self.read_done {
|
||||
self.pos = 0;
|
||||
self.cap = 0;
|
||||
|
||||
// If there is some space left in our buffer, then we try to read some
|
||||
// data to continue, thus maximizing the chances of a large write.
|
||||
if self.cap < self.buf.len() && !self.read_done {
|
||||
match self.poll_fill_buf(cx, reader.as_mut()) {
|
||||
Poll::Ready(Ok(())) => (),
|
||||
Poll::Ready(Err(err)) => return Poll::Ready(Err(ErrorDirection::Read(err))),
|
||||
Poll::Pending => {
|
||||
// Try flushing when the reader has no progress to avoid deadlock
|
||||
// when the reader depends on buffered writer.
|
||||
if self.need_flush {
|
||||
ready!(writer.as_mut().poll_flush(cx))
|
||||
.map_err(ErrorDirection::Write)?;
|
||||
self.need_flush = false;
|
||||
}
|
||||
// Ignore pending reads when our buffer is not empty, because
|
||||
// we can try to write data immediately.
|
||||
if self.pos == self.cap {
|
||||
// Try flushing when the reader has no progress to avoid deadlock
|
||||
// when the reader depends on buffered writer.
|
||||
if self.need_flush {
|
||||
ready!(writer.as_mut().poll_flush(cx))
|
||||
.map_err(ErrorDirection::Write)?;
|
||||
self.need_flush = false;
|
||||
}
|
||||
|
||||
return Poll::Pending;
|
||||
return Poll::Pending;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -246,9 +247,13 @@ impl CopyBuffer {
|
||||
"writer returned length larger than input slice"
|
||||
);
|
||||
|
||||
// All data has been written, the buffer can be considered empty again
|
||||
self.pos = 0;
|
||||
self.cap = 0;
|
||||
|
||||
// If we've written all the data and we've seen EOF, flush out the
|
||||
// data and finish the transfer.
|
||||
if self.pos == self.cap && self.read_done {
|
||||
if self.read_done {
|
||||
ready!(writer.as_mut().poll_flush(cx)).map_err(ErrorDirection::Write)?;
|
||||
return Poll::Ready(Ok(self.amt));
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user