mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-07 05:22:56 +00:00
Always send the latest commit_lsn in send_wal (#4150)
When a new connection is established to the safekeeper, the 'end_pos' field is initially set to Lsn::INVALID (i.e 0/0). If there is no WAL to send to the client, we send KeepAlive messages with Lsn::INVALID. That confuses the pageserver: it thinks that safekeeper is lagging very much behind the tip of the branch, and will reconnect to a different safekeeper. Then the same thing happens with the new safekeeper, until some WAL is streamed which sets 'end_pos' to a valid value. This fix always sets `end_pos` to the most recent `commit_lsn` value. This is useful to send the latest `commit_lsn` to the receiver, so it will know how advanced this safekeeper compared to the others. Fixes https://github.com/neondatabase/neon/issues/3972 Supersedes https://github.com/neondatabase/neon/pull/4144
This commit is contained in:
committed by
GitHub
parent
b114ef26c2
commit
ce1bbc9fa7
@@ -384,6 +384,8 @@ impl SafekeeperPostgresHandler {
|
||||
self.appname.clone(),
|
||||
));
|
||||
|
||||
let commit_lsn_watch_rx = tli.get_commit_lsn_watch_rx();
|
||||
|
||||
// Walproposer gets special handling: safekeeper must give proposer all
|
||||
// local WAL till the end, whether committed or not (walproposer will
|
||||
// hang otherwise). That's because walproposer runs the consensus and
|
||||
@@ -399,7 +401,14 @@ impl SafekeeperPostgresHandler {
|
||||
} else {
|
||||
None
|
||||
};
|
||||
let end_pos = stop_pos.unwrap_or(Lsn::INVALID);
|
||||
|
||||
// take the latest commit_lsn if don't have stop_pos
|
||||
let mut end_pos = stop_pos.unwrap_or(*commit_lsn_watch_rx.borrow());
|
||||
|
||||
if end_pos < start_pos {
|
||||
warn!("start_pos {} is ahead of end_pos {}", start_pos, end_pos);
|
||||
end_pos = start_pos;
|
||||
}
|
||||
|
||||
info!(
|
||||
"starting streaming from {:?} till {:?}",
|
||||
@@ -429,7 +438,7 @@ impl SafekeeperPostgresHandler {
|
||||
start_pos,
|
||||
end_pos,
|
||||
stop_pos,
|
||||
commit_lsn_watch_rx: tli.get_commit_lsn_watch_rx(),
|
||||
commit_lsn_watch_rx,
|
||||
ws_guard: ws_guard.clone(),
|
||||
wal_reader,
|
||||
send_buf: [0; MAX_SEND_SIZE],
|
||||
@@ -456,6 +465,11 @@ struct WalSender<'a, IO> {
|
||||
// Position since which we are sending next chunk.
|
||||
start_pos: Lsn,
|
||||
// WAL up to this position is known to be locally available.
|
||||
// Usually this is the same as the latest commit_lsn, but in case of
|
||||
// walproposer recovery, this is flush_lsn.
|
||||
//
|
||||
// We send this LSN to the receiver as wal_end, so that it knows how much
|
||||
// WAL this safekeeper has. This LSN should be as fresh as possible.
|
||||
end_pos: Lsn,
|
||||
// If present, terminate after reaching this position; used by walproposer
|
||||
// in recovery.
|
||||
@@ -530,10 +544,18 @@ impl<IO: AsyncRead + AsyncWrite + Unpin> WalSender<'_, IO> {
|
||||
/// exit in the meanwhile
|
||||
async fn wait_wal(&mut self) -> Result<(), CopyStreamHandlerEnd> {
|
||||
loop {
|
||||
self.end_pos = *self.commit_lsn_watch_rx.borrow();
|
||||
if self.end_pos > self.start_pos {
|
||||
// We have something to send.
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
// Wait for WAL to appear, now self.end_pos == self.start_pos.
|
||||
if let Some(lsn) = wait_for_lsn(&mut self.commit_lsn_watch_rx, self.start_pos).await? {
|
||||
self.end_pos = lsn;
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
// Timed out waiting for WAL, check for termination and send KA
|
||||
if let Some(remote_consistent_lsn) = self
|
||||
.ws_guard
|
||||
@@ -618,11 +640,6 @@ const POLL_STATE_TIMEOUT: Duration = Duration::from_secs(1);
|
||||
/// - Ok(None) if timeout expired;
|
||||
/// - Err in case of error (if watch channel is in trouble, shouldn't happen).
|
||||
async fn wait_for_lsn(rx: &mut Receiver<Lsn>, lsn: Lsn) -> anyhow::Result<Option<Lsn>> {
|
||||
let commit_lsn: Lsn = *rx.borrow();
|
||||
if commit_lsn > lsn {
|
||||
return Ok(Some(commit_lsn));
|
||||
}
|
||||
|
||||
let res = timeout(POLL_STATE_TIMEOUT, async move {
|
||||
let mut commit_lsn;
|
||||
loop {
|
||||
|
||||
Reference in New Issue
Block a user