diff --git a/libs/postgres_backend/src/lib.rs b/libs/postgres_backend/src/lib.rs index 8ce7f85481..ad3af4e794 100644 --- a/libs/postgres_backend/src/lib.rs +++ b/libs/postgres_backend/src/lib.rs @@ -242,6 +242,7 @@ impl MaybeWriteOnly { } } + /// Cancellation safe as long as the underlying IO is cancellation safe. async fn shutdown(&mut self) -> io::Result<()> { match self { MaybeWriteOnly::Full(framed) => framed.shutdown().await, @@ -393,13 +394,23 @@ impl PostgresBackend { shutdown_watcher: F, ) -> Result<(), QueryError> where - F: Fn() -> S, + F: Fn() -> S + Clone, S: Future, { - let ret = self.run_message_loop(handler, shutdown_watcher).await; - // socket might be already closed, e.g. if previously received error, - // so ignore result. - self.framed.shutdown().await.ok(); + let ret = self + .run_message_loop(handler, shutdown_watcher.clone()) + .await; + + tokio::select! { + _ = shutdown_watcher() => { + // do nothing; we most likely got already stopped by shutdown and will log it next. + } + _ = self.framed.shutdown() => { + // socket might be already closed, e.g. if previously received error, + // so ignore result. + }, + } + match ret { Ok(()) => Ok(()), Err(QueryError::Shutdown) => { diff --git a/libs/pq_proto/src/framed.rs b/libs/pq_proto/src/framed.rs index c12898a05c..6e97b8c2a0 100644 --- a/libs/pq_proto/src/framed.rs +++ b/libs/pq_proto/src/framed.rs @@ -214,27 +214,24 @@ where } } +/// Cancellation safe as long as the AsyncWrite is cancellation safe. async fn flush( stream: &mut S, write_buf: &mut BytesMut, ) -> Result<(), io::Error> { while write_buf.has_remaining() { - let bytes_written = stream.write(write_buf.chunk()).await?; + let bytes_written = stream.write_buf(write_buf).await?; if bytes_written == 0 { return Err(io::Error::new( ErrorKind::WriteZero, "failed to write message", )); } - // The advanced part will be garbage collected, likely during shifting - // data left on next attempt to write to buffer when free space is not - // enough. - write_buf.advance(bytes_written); } - write_buf.clear(); stream.flush().await } +/// Cancellation safe as long as the AsyncWrite is cancellation safe. async fn shutdown( stream: &mut S, write_buf: &mut BytesMut,