From dafdf9b9524a034f25bb67d5d6f62a375a892862 Mon Sep 17 00:00:00 2001 From: Heikki Linnakangas Date: Thu, 21 Apr 2022 16:37:36 +0300 Subject: [PATCH] Handle EINTR --- libs/utils/src/pq_proto.rs | 23 +++++++++++++++++++---- pageserver/src/walredo.rs | 7 ++++++- 2 files changed, 25 insertions(+), 5 deletions(-) diff --git a/libs/utils/src/pq_proto.rs b/libs/utils/src/pq_proto.rs index 0e4c4907e7..e1677f4311 100644 --- a/libs/utils/src/pq_proto.rs +++ b/libs/utils/src/pq_proto.rs @@ -100,6 +100,21 @@ pub struct FeExecuteMessage { #[derive(Debug)] pub struct FeCloseMessage {} +/// Retry a read on EINTR +/// +/// This runs the enclosed expression, and if it returns +/// Err(io::ErrorKind::Interrupted), retries it. +macro_rules! retry_read { + ( $x:expr ) => { + loop { + match $x { + Err(e) if e.kind() == io::ErrorKind::Interrupted => continue, + res => break res, + } + } + }; +} + impl FeMessage { /// Read one message from the stream. /// This function returns `Ok(None)` in case of EOF. @@ -141,12 +156,12 @@ impl FeMessage { // Each libpq message begins with a message type byte, followed by message length // If the client closes the connection, return None. But if the client closes the // connection in the middle of a message, we will return an error. - let tag = match stream.read_u8().await { + let tag = match retry_read!(stream.read_u8().await) { Ok(b) => b, Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => return Ok(None), Err(e) => return Err(e.into()), }; - let len = stream.read_u32().await?; + let len = retry_read!(stream.read_u32().await)?; // The message length includes itself, so it better be at least 4 let bodylen = len @@ -207,7 +222,7 @@ impl FeStartupPacket { // reading 4 bytes, to be precise), return None to indicate that the connection // was closed. This matches the PostgreSQL server's behavior, which avoids noise // in the log if the client opens connection but closes it immediately. - let len = match stream.read_u32().await { + let len = match retry_read!(stream.read_u32().await) { Ok(len) => len as usize, Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => return Ok(None), Err(e) => return Err(e.into()), @@ -217,7 +232,7 @@ impl FeStartupPacket { bail!("invalid message length"); } - let request_code = stream.read_u32().await?; + let request_code = retry_read!(stream.read_u32().await)?; // the rest of startup packet are params let params_len = len - 8; diff --git a/pageserver/src/walredo.rs b/pageserver/src/walredo.rs index dcffcda6bb..6338b839ae 100644 --- a/pageserver/src/walredo.rs +++ b/pageserver/src/walredo.rs @@ -700,7 +700,12 @@ impl PostgresRedoProcess { // If we have more data to write, wake up if 'stdin' becomes writeable or // we have data to read. Otherwise only wake up if there's data to read. let nfds = if nwrite < writebuf.len() { 3 } else { 2 }; - let n = nix::poll::poll(&mut pollfds[0..nfds], wal_redo_timeout.as_millis() as i32)?; + let n = loop { + match nix::poll::poll(&mut pollfds[0..nfds], wal_redo_timeout.as_millis() as i32) { + Err(e) if e == nix::errno::Errno::EINTR => continue, + res => break res, + } + }?; if n == 0 { return Err(Error::new(ErrorKind::Other, "WAL redo timed out"));