Send AppendResponse keepalive once per second (#4036)

Walproposer sends AppendRequest at least once per second. This patch
adds a response to these requests once per second.

Fixes https://github.com/neondatabase/neon/issues/4017
This commit is contained in:
Arthur Petukhovsky
2023-04-17 11:24:57 +03:00
committed by GitHub
parent c2496c7ef2
commit 73f34eaa5e

View File

@@ -27,6 +27,8 @@ use tokio::sync::mpsc::error::TryRecvError;
use tokio::sync::mpsc::Receiver;
use tokio::sync::mpsc::Sender;
use tokio::task::spawn_blocking;
use tokio::time::Duration;
use tokio::time::Instant;
use tracing::*;
use utils::id::TenantTimelineId;
use utils::lsn::Lsn;
@@ -206,6 +208,10 @@ async fn network_write<IO: AsyncRead + AsyncWrite + Unpin>(
}
}
// Send keepalive messages to walproposer, to make sure it receives updates
// even when it writes a steady stream of messages.
const KEEPALIVE_INTERVAL: Duration = Duration::from_secs(1);
/// Takes messages from msg_rx, processes and pushes replies to reply_tx.
struct WalAcceptor {
tli: Arc<Timeline>,
@@ -253,18 +259,25 @@ impl WalAcceptor {
timeline: Arc::clone(&self.tli),
};
let mut next_msg: ProposerAcceptorMessage;
// After this timestamp we will stop processing AppendRequests and send a response
// to the walproposer. walproposer sends at least one AppendRequest per second,
// we will send keepalives by replying to these requests once per second.
let mut next_keepalive = Instant::now();
loop {
let opt_msg = self.msg_rx.recv().await;
if opt_msg.is_none() {
return Ok(()); // chan closed, streaming terminated
}
next_msg = opt_msg.unwrap();
let mut next_msg = opt_msg.unwrap();
if matches!(next_msg, ProposerAcceptorMessage::AppendRequest(_)) {
let reply_msg = if matches!(next_msg, ProposerAcceptorMessage::AppendRequest(_)) {
// loop through AppendRequest's while it's readily available to
// write as many WAL as possible without fsyncing
//
// Note: this will need to be rewritten if we want to read non-AppendRequest messages here.
// Otherwise, we might end up in a situation where we read a message, but don't
// process it.
while let ProposerAcceptorMessage::AppendRequest(append_request) = next_msg {
let noflush_msg = ProposerAcceptorMessage::NoFlushAppendRequest(append_request);
@@ -274,6 +287,11 @@ impl WalAcceptor {
}
}
// get out of this loop if keepalive time is reached
if Instant::now() >= next_keepalive {
break;
}
match self.msg_rx.try_recv() {
Ok(msg) => next_msg = msg,
Err(TryRecvError::Empty) => break,
@@ -282,18 +300,18 @@ impl WalAcceptor {
}
// flush all written WAL to the disk
if let Some(reply) = self.tli.process_msg(&ProposerAcceptorMessage::FlushWAL)? {
if self.reply_tx.send(reply).await.is_err() {
return Ok(()); // chan closed, streaming terminated
}
}
self.tli.process_msg(&ProposerAcceptorMessage::FlushWAL)?
} else {
// process message other than AppendRequest
if let Some(reply) = self.tli.process_msg(&next_msg)? {
if self.reply_tx.send(reply).await.is_err() {
return Ok(()); // chan closed, streaming terminated
}
self.tli.process_msg(&next_msg)?
};
if let Some(reply) = reply_msg {
if self.reply_tx.send(reply).await.is_err() {
return Ok(()); // chan closed, streaming terminated
}
// reset keepalive time
next_keepalive = Instant::now() + KEEPALIVE_INTERVAL;
}
}
}