diff --git a/safekeeper/src/receive_wal.rs b/safekeeper/src/receive_wal.rs index 61e4c5f0fa..195470e3ca 100644 --- a/safekeeper/src/receive_wal.rs +++ b/safekeeper/src/receive_wal.rs @@ -27,6 +27,8 @@ use tokio::sync::mpsc::error::TryRecvError; use tokio::sync::mpsc::Receiver; use tokio::sync::mpsc::Sender; use tokio::task::spawn_blocking; +use tokio::time::Duration; +use tokio::time::Instant; use tracing::*; use utils::id::TenantTimelineId; use utils::lsn::Lsn; @@ -206,6 +208,10 @@ async fn network_write( } } +// Send keepalive messages to walproposer, to make sure it receives updates +// even when it writes a steady stream of messages. +const KEEPALIVE_INTERVAL: Duration = Duration::from_secs(1); + /// Takes messages from msg_rx, processes and pushes replies to reply_tx. struct WalAcceptor { tli: Arc, @@ -253,18 +259,25 @@ impl WalAcceptor { timeline: Arc::clone(&self.tli), }; - let mut next_msg: ProposerAcceptorMessage; + // After this timestamp we will stop processing AppendRequests and send a response + // to the walproposer. walproposer sends at least one AppendRequest per second, + // we will send keepalives by replying to these requests once per second. + let mut next_keepalive = Instant::now(); loop { let opt_msg = self.msg_rx.recv().await; if opt_msg.is_none() { return Ok(()); // chan closed, streaming terminated } - next_msg = opt_msg.unwrap(); + let mut next_msg = opt_msg.unwrap(); - if matches!(next_msg, ProposerAcceptorMessage::AppendRequest(_)) { + let reply_msg = if matches!(next_msg, ProposerAcceptorMessage::AppendRequest(_)) { // loop through AppendRequest's while it's readily available to // write as many WAL as possible without fsyncing + // + // Note: this will need to be rewritten if we want to read non-AppendRequest messages here. + // Otherwise, we might end up in a situation where we read a message, but don't + // process it. while let ProposerAcceptorMessage::AppendRequest(append_request) = next_msg { let noflush_msg = ProposerAcceptorMessage::NoFlushAppendRequest(append_request); @@ -274,6 +287,11 @@ impl WalAcceptor { } } + // get out of this loop if keepalive time is reached + if Instant::now() >= next_keepalive { + break; + } + match self.msg_rx.try_recv() { Ok(msg) => next_msg = msg, Err(TryRecvError::Empty) => break, @@ -282,18 +300,18 @@ impl WalAcceptor { } // flush all written WAL to the disk - if let Some(reply) = self.tli.process_msg(&ProposerAcceptorMessage::FlushWAL)? { - if self.reply_tx.send(reply).await.is_err() { - return Ok(()); // chan closed, streaming terminated - } - } + self.tli.process_msg(&ProposerAcceptorMessage::FlushWAL)? } else { // process message other than AppendRequest - if let Some(reply) = self.tli.process_msg(&next_msg)? { - if self.reply_tx.send(reply).await.is_err() { - return Ok(()); // chan closed, streaming terminated - } + self.tli.process_msg(&next_msg)? + }; + + if let Some(reply) = reply_msg { + if self.reply_tx.send(reply).await.is_err() { + return Ok(()); // chan closed, streaming terminated } + // reset keepalive time + next_keepalive = Instant::now() + KEEPALIVE_INTERVAL; } } }