mirror of
https://github.com/neondatabase/neon.git
synced 2026-06-01 20:40:37 +00:00
safekeeper: flush WAL on compute disconnect (#9436)
## Problem In #9259, we found that the `check_safekeepers_synced` fast path could result in a lower basebackup LSN than the `flush_lsn` reported by Safekeepers in `VoteResponse`, causing the compute to panic once on startup. This would happen if the Safekeeper had unflushed WAL records due to a compute disconnect. The `TIMELINE_STATUS` query would report a `flush_lsn` below these unflushed records, while `VoteResponse` would flush the WAL and report the advanced `flush_lsn`. See https://github.com/neondatabase/neon/issues/9259#issuecomment-2410849032. ## Summary of changes Flush the WAL if the compute disconnects during WAL processing.
This commit is contained in:
@@ -498,21 +498,18 @@ impl WalAcceptor {
|
||||
// we will send keepalives by replying to these requests once per second.
|
||||
let mut next_keepalive = Instant::now();
|
||||
|
||||
loop {
|
||||
let opt_msg = self.msg_rx.recv().await;
|
||||
if opt_msg.is_none() {
|
||||
return Ok(()); // chan closed, streaming terminated
|
||||
}
|
||||
let mut next_msg = opt_msg.unwrap();
|
||||
|
||||
while let Some(mut next_msg) = self.msg_rx.recv().await {
|
||||
// Update walreceiver state in shmem for reporting.
|
||||
if let ProposerAcceptorMessage::Elected(_) = &next_msg {
|
||||
walreceiver_guard.get().status = WalReceiverStatus::Streaming;
|
||||
}
|
||||
|
||||
let reply_msg = if matches!(next_msg, ProposerAcceptorMessage::AppendRequest(_)) {
|
||||
// loop through AppendRequest's while it's readily available to
|
||||
// write as many WAL as possible without fsyncing
|
||||
// Loop through AppendRequests while available to write as many WAL records as
|
||||
// possible without fsyncing.
|
||||
//
|
||||
// Make sure the WAL is flushed before returning, see:
|
||||
// https://github.com/neondatabase/neon/issues/9259
|
||||
//
|
||||
// Note: this will need to be rewritten if we want to read non-AppendRequest messages here.
|
||||
// Otherwise, we might end up in a situation where we read a message, but don't
|
||||
@@ -522,7 +519,7 @@ impl WalAcceptor {
|
||||
|
||||
if let Some(reply) = self.tli.process_msg(&noflush_msg).await? {
|
||||
if self.reply_tx.send(reply).await.is_err() {
|
||||
return Ok(()); // chan closed, streaming terminated
|
||||
break; // disconnected, flush WAL and return on next send/recv
|
||||
}
|
||||
}
|
||||
|
||||
@@ -531,11 +528,13 @@ impl WalAcceptor {
|
||||
break;
|
||||
}
|
||||
|
||||
// continue pulling AppendRequests if available
|
||||
match self.msg_rx.try_recv() {
|
||||
Ok(msg) => next_msg = msg,
|
||||
Err(TryRecvError::Empty) => break,
|
||||
Err(TryRecvError::Disconnected) => return Ok(()), // chan closed, streaming terminated
|
||||
}
|
||||
// on disconnect, flush WAL and return on next send/recv
|
||||
Err(TryRecvError::Disconnected) => break,
|
||||
};
|
||||
}
|
||||
|
||||
// flush all written WAL to the disk
|
||||
@@ -555,5 +554,6 @@ impl WalAcceptor {
|
||||
next_keepalive = Instant::now() + KEEPALIVE_INTERVAL;
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user