From ce1bbc9fa76145f4b0d7ced59ff60f9b8d25d924 Mon Sep 17 00:00:00 2001 From: Arthur Petukhovsky Date: Thu, 4 May 2023 00:07:45 +0300 Subject: [PATCH] Always send the latest commit_lsn in send_wal (#4150) When a new connection is established to the safekeeper, the 'end_pos' field is initially set to Lsn::INVALID (i.e 0/0). If there is no WAL to send to the client, we send KeepAlive messages with Lsn::INVALID. That confuses the pageserver: it thinks that safekeeper is lagging very much behind the tip of the branch, and will reconnect to a different safekeeper. Then the same thing happens with the new safekeeper, until some WAL is streamed which sets 'end_pos' to a valid value. This fix always sets `end_pos` to the most recent `commit_lsn` value. This is useful to send the latest `commit_lsn` to the receiver, so it will know how advanced this safekeeper compared to the others. Fixes https://github.com/neondatabase/neon/issues/3972 Supersedes https://github.com/neondatabase/neon/pull/4144 --- safekeeper/src/send_wal.rs | 31 ++++++++++++++++++++++++------- 1 file changed, 24 insertions(+), 7 deletions(-) diff --git a/safekeeper/src/send_wal.rs b/safekeeper/src/send_wal.rs index 6b303eb0fe..f502500e7d 100644 --- a/safekeeper/src/send_wal.rs +++ b/safekeeper/src/send_wal.rs @@ -384,6 +384,8 @@ impl SafekeeperPostgresHandler { self.appname.clone(), )); + let commit_lsn_watch_rx = tli.get_commit_lsn_watch_rx(); + // Walproposer gets special handling: safekeeper must give proposer all // local WAL till the end, whether committed or not (walproposer will // hang otherwise). That's because walproposer runs the consensus and @@ -399,7 +401,14 @@ impl SafekeeperPostgresHandler { } else { None }; - let end_pos = stop_pos.unwrap_or(Lsn::INVALID); + + // take the latest commit_lsn if don't have stop_pos + let mut end_pos = stop_pos.unwrap_or(*commit_lsn_watch_rx.borrow()); + + if end_pos < start_pos { + warn!("start_pos {} is ahead of end_pos {}", start_pos, end_pos); + end_pos = start_pos; + } info!( "starting streaming from {:?} till {:?}", @@ -429,7 +438,7 @@ impl SafekeeperPostgresHandler { start_pos, end_pos, stop_pos, - commit_lsn_watch_rx: tli.get_commit_lsn_watch_rx(), + commit_lsn_watch_rx, ws_guard: ws_guard.clone(), wal_reader, send_buf: [0; MAX_SEND_SIZE], @@ -456,6 +465,11 @@ struct WalSender<'a, IO> { // Position since which we are sending next chunk. start_pos: Lsn, // WAL up to this position is known to be locally available. + // Usually this is the same as the latest commit_lsn, but in case of + // walproposer recovery, this is flush_lsn. + // + // We send this LSN to the receiver as wal_end, so that it knows how much + // WAL this safekeeper has. This LSN should be as fresh as possible. end_pos: Lsn, // If present, terminate after reaching this position; used by walproposer // in recovery. @@ -530,10 +544,18 @@ impl WalSender<'_, IO> { /// exit in the meanwhile async fn wait_wal(&mut self) -> Result<(), CopyStreamHandlerEnd> { loop { + self.end_pos = *self.commit_lsn_watch_rx.borrow(); + if self.end_pos > self.start_pos { + // We have something to send. + return Ok(()); + } + + // Wait for WAL to appear, now self.end_pos == self.start_pos. if let Some(lsn) = wait_for_lsn(&mut self.commit_lsn_watch_rx, self.start_pos).await? { self.end_pos = lsn; return Ok(()); } + // Timed out waiting for WAL, check for termination and send KA if let Some(remote_consistent_lsn) = self .ws_guard @@ -618,11 +640,6 @@ const POLL_STATE_TIMEOUT: Duration = Duration::from_secs(1); /// - Ok(None) if timeout expired; /// - Err in case of error (if watch channel is in trouble, shouldn't happen). async fn wait_for_lsn(rx: &mut Receiver, lsn: Lsn) -> anyhow::Result> { - let commit_lsn: Lsn = *rx.borrow(); - if commit_lsn > lsn { - return Ok(Some(commit_lsn)); - } - let res = timeout(POLL_STATE_TIMEOUT, async move { let mut commit_lsn; loop {