diff --git a/safekeeper/src/safekeeper.rs b/safekeeper/src/safekeeper.rs index 943524398d..b9fcd2c0b2 100644 --- a/safekeeper/src/safekeeper.rs +++ b/safekeeper/src/safekeeper.rs @@ -34,13 +34,24 @@ pub const UNKNOWN_SERVER_VERSION: u32 = 0; /// Consensus logical timestamp. pub type Term = u64; -const INVALID_TERM: Term = 0; +pub const INVALID_TERM: Term = 0; #[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)] pub struct TermLsn { pub term: Term, pub lsn: Lsn, } + +// Creation from tuple provides less typing (e.g. for unit tests). +impl From<(Term, Lsn)> for TermLsn { + fn from(pair: (Term, Lsn)) -> TermLsn { + TermLsn { + term: pair.0, + lsn: pair.1, + } + } +} + #[derive(Clone, Serialize, Deserialize)] pub struct TermHistory(pub Vec); @@ -557,6 +568,11 @@ where .up_to(self.flush_lsn()) } + /// Get current term. + pub fn get_term(&self) -> Term { + self.state.acceptor_state.term + } + pub fn get_epoch(&self) -> Term { self.state.acceptor_state.get_epoch(self.flush_lsn()) } diff --git a/safekeeper/src/send_wal.rs b/safekeeper/src/send_wal.rs index 808177d2c4..b684083446 100644 --- a/safekeeper/src/send_wal.rs +++ b/safekeeper/src/send_wal.rs @@ -2,12 +2,12 @@ //! with the "START_REPLICATION" message, and registry of walsenders. use crate::handler::SafekeeperPostgresHandler; -use crate::safekeeper::Term; +use crate::safekeeper::{Term, TermLsn}; use crate::timeline::Timeline; use crate::wal_service::ConnectionId; use crate::wal_storage::WalReader; use crate::GlobalTimelines; -use anyhow::Context as AnyhowContext; +use anyhow::{bail, Context as AnyhowContext}; use bytes::Bytes; use parking_lot::Mutex; use postgres_backend::PostgresBackend; @@ -390,26 +390,25 @@ impl SafekeeperPostgresHandler { self.appname.clone(), )); - let commit_lsn_watch_rx = tli.get_commit_lsn_watch_rx(); - - // Walproposer gets special handling: safekeeper must give proposer all - // local WAL till the end, whether committed or not (walproposer will - // hang otherwise). That's because walproposer runs the consensus and - // synchronizes safekeepers on the most advanced one. + // Walsender can operate in one of two modes which we select by + // application_name: give only committed WAL (used by pageserver) or all + // existing WAL (up to flush_lsn, used by walproposer or peer recovery). + // The second case is always driven by a consensus leader which term + // must generally be also supplied. However we're sloppy to do this in + // walproposer recovery which will be removed soon. So TODO is to make + // it not Option'al then. // - // There is a small risk of this WAL getting concurrently garbaged if - // another compute rises which collects majority and starts fixing log - // on this safekeeper itself. That's ok as (old) proposer will never be - // able to commit such WAL. - let stop_pos: Option = if self.is_walproposer_recovery() { - let wal_end = tli.get_flush_lsn().await; - Some(wal_end) + // Fetching WAL without term in recovery creates a small risk of this + // WAL getting concurrently garbaged if another compute rises which + // collects majority and starts fixing log on this safekeeper itself. + // That's ok as (old) proposer will never be able to commit such WAL. + let end_watch = if self.is_walproposer_recovery() { + EndWatch::Flush(tli.get_term_flush_lsn_watch_rx()) } else { - None + EndWatch::Commit(tli.get_commit_lsn_watch_rx()) }; - - // take the latest commit_lsn if don't have stop_pos - let end_pos = stop_pos.unwrap_or(*commit_lsn_watch_rx.borrow()); + // we don't check term here; it will be checked on first waiting/WAL reading anyway. + let end_pos = end_watch.get(); if end_pos < start_pos { warn!( @@ -419,8 +418,10 @@ impl SafekeeperPostgresHandler { } info!( - "starting streaming from {:?} till {:?}, available WAL ends at {}", - start_pos, stop_pos, end_pos + "starting streaming from {:?}, available WAL ends at {}, recovery={}", + start_pos, + end_pos, + matches!(end_watch, EndWatch::Flush(_)) ); // switch to copy @@ -445,9 +446,8 @@ impl SafekeeperPostgresHandler { appname, start_pos, end_pos, - stop_pos, term, - commit_lsn_watch_rx, + end_watch, ws_guard: ws_guard.clone(), wal_reader, send_buf: [0; MAX_SEND_SIZE], @@ -466,6 +466,32 @@ impl SafekeeperPostgresHandler { } } +/// Walsender streams either up to commit_lsn (normally) or flush_lsn in the +/// given term (recovery by walproposer or peer safekeeper). +enum EndWatch { + Commit(Receiver), + Flush(Receiver), +} + +impl EndWatch { + /// Get current end of WAL. + fn get(&self) -> Lsn { + match self { + EndWatch::Commit(r) => *r.borrow(), + EndWatch::Flush(r) => r.borrow().lsn, + } + } + + /// Wait for the update. + async fn changed(&mut self) -> anyhow::Result<()> { + match self { + EndWatch::Commit(r) => r.changed().await?, + EndWatch::Flush(r) => r.changed().await?, + } + Ok(()) + } +} + /// A half driving sending WAL. struct WalSender<'a, IO> { pgb: &'a mut PostgresBackend, @@ -480,14 +506,12 @@ struct WalSender<'a, IO> { // We send this LSN to the receiver as wal_end, so that it knows how much // WAL this safekeeper has. This LSN should be as fresh as possible. end_pos: Lsn, - // If present, terminate after reaching this position; used by walproposer - // in recovery. - stop_pos: Option, /// When streaming uncommitted part, the term the client acts as the leader /// in. Streaming is stopped if local term changes to a different (higher) /// value. term: Option, - commit_lsn_watch_rx: Receiver, + /// Watch channel receiver to learn end of available WAL (and wait for its advancement). + end_watch: EndWatch, ws_guard: Arc, wal_reader: WalReader, // buffer for readling WAL into to send it @@ -497,29 +521,20 @@ struct WalSender<'a, IO> { impl WalSender<'_, IO> { /// Send WAL until /// - an error occurs - /// - if we are streaming to walproposer, we've streamed until stop_pos - /// (recovery finished) - /// - receiver is caughtup and there is no computes + /// - receiver is caughtup and there is no computes (if streaming up to commit_lsn) /// /// Err(CopyStreamHandlerEnd) is always returned; Result is used only for ? /// convenience. async fn run(&mut self) -> Result<(), CopyStreamHandlerEnd> { loop { - // If we are streaming to walproposer, check it is time to stop. - if let Some(stop_pos) = self.stop_pos { - if self.start_pos >= stop_pos { - // recovery finished - return Err(CopyStreamHandlerEnd::ServerInitiated(format!( - "ending streaming to walproposer at {}, recovery finished", - self.start_pos - ))); - } - } else { - // Wait for the next portion if it is not there yet, or just - // update our end of WAL available for sending value, we - // communicate it to the receiver. - self.wait_wal().await?; - } + // Wait for the next portion if it is not there yet, or just + // update our end of WAL available for sending value, we + // communicate it to the receiver. + self.wait_wal().await?; + assert!( + self.end_pos > self.start_pos, + "nothing to send after waiting for WAL" + ); // try to send as much as available, capped by MAX_SEND_SIZE let mut send_size = self @@ -567,7 +582,7 @@ impl WalSender<'_, IO> { /// exit in the meanwhile async fn wait_wal(&mut self) -> Result<(), CopyStreamHandlerEnd> { loop { - self.end_pos = *self.commit_lsn_watch_rx.borrow(); + self.end_pos = self.end_watch.get(); if self.end_pos > self.start_pos { // We have something to send. trace!("got end_pos {:?}, streaming", self.end_pos); @@ -575,27 +590,31 @@ impl WalSender<'_, IO> { } // Wait for WAL to appear, now self.end_pos == self.start_pos. - if let Some(lsn) = wait_for_lsn(&mut self.commit_lsn_watch_rx, self.start_pos).await? { + if let Some(lsn) = wait_for_lsn(&mut self.end_watch, self.term, self.start_pos).await? { self.end_pos = lsn; trace!("got end_pos {:?}, streaming", self.end_pos); return Ok(()); } - // Timed out waiting for WAL, check for termination and send KA - if let Some(remote_consistent_lsn) = self - .ws_guard - .walsenders - .get_ws_remote_consistent_lsn(self.ws_guard.id) - { - if self.tli.should_walsender_stop(remote_consistent_lsn).await { - // Terminate if there is nothing more to send. - // Note that "ending streaming" part of the string is used by - // pageserver to identify WalReceiverError::SuccessfulCompletion, - // do not change this string without updating pageserver. - return Err(CopyStreamHandlerEnd::ServerInitiated(format!( + // Timed out waiting for WAL, check for termination and send KA. + // Check for termination only if we are streaming up to commit_lsn + // (to pageserver). + if let EndWatch::Commit(_) = self.end_watch { + if let Some(remote_consistent_lsn) = self + .ws_guard + .walsenders + .get_ws_remote_consistent_lsn(self.ws_guard.id) + { + if self.tli.should_walsender_stop(remote_consistent_lsn).await { + // Terminate if there is nothing more to send. + // Note that "ending streaming" part of the string is used by + // pageserver to identify WalReceiverError::SuccessfulCompletion, + // do not change this string without updating pageserver. + return Err(CopyStreamHandlerEnd::ServerInitiated(format!( "ending streaming to {:?} at {}, receiver is caughtup and there is no computes", self.appname, self.start_pos, ))); + } } } @@ -663,22 +682,32 @@ impl ReplyReader { const POLL_STATE_TIMEOUT: Duration = Duration::from_secs(1); -/// Wait until we have commit_lsn > lsn or timeout expires. Returns -/// - Ok(Some(commit_lsn)) if needed lsn is successfully observed; +/// Wait until we have available WAL > start_pos or timeout expires. Returns +/// - Ok(Some(end_pos)) if needed lsn is successfully observed; /// - Ok(None) if timeout expired; -/// - Err in case of error (if watch channel is in trouble, shouldn't happen). -async fn wait_for_lsn(rx: &mut Receiver, lsn: Lsn) -> anyhow::Result> { +/// - Err in case of error -- only if 1) term changed while fetching in recovery +/// mode 2) watch channel closed, which must never happen. +async fn wait_for_lsn( + rx: &mut EndWatch, + client_term: Option, + start_pos: Lsn, +) -> anyhow::Result> { let res = timeout(POLL_STATE_TIMEOUT, async move { - let mut commit_lsn; loop { - rx.changed().await?; - commit_lsn = *rx.borrow(); - if commit_lsn > lsn { - break; + let end_pos = rx.get(); + if end_pos > start_pos { + return Ok(end_pos); } + if let EndWatch::Flush(rx) = rx { + let curr_term = rx.borrow().term; + if let Some(client_term) = client_term { + if curr_term != client_term { + bail!("term changed: requested {}, now {}", client_term, curr_term); + } + } + } + rx.changed().await?; } - - Ok(commit_lsn) }) .await; diff --git a/safekeeper/src/timeline.rs b/safekeeper/src/timeline.rs index 78d027c747..3e066de34f 100644 --- a/safekeeper/src/timeline.rs +++ b/safekeeper/src/timeline.rs @@ -30,7 +30,7 @@ use crate::receive_wal::WalReceivers; use crate::recovery::recovery_main; use crate::safekeeper::{ AcceptorProposerMessage, ProposerAcceptorMessage, SafeKeeper, SafeKeeperState, - SafekeeperMemState, ServerInfo, Term, + SafekeeperMemState, ServerInfo, Term, TermLsn, INVALID_TERM, }; use crate::send_wal::WalSenders; use crate::{control_file, safekeeper::UNKNOWN_SERVER_VERSION}; @@ -309,6 +309,13 @@ pub struct Timeline { commit_lsn_watch_tx: watch::Sender, commit_lsn_watch_rx: watch::Receiver, + /// Broadcasts (current term, flush_lsn) updates, walsender is interested in + /// them when sending in recovery mode (to walproposer or peers). Note: this + /// is just a notification, WAL reading should always done with lock held as + /// term can change otherwise. + term_flush_lsn_watch_tx: watch::Sender, + term_flush_lsn_watch_rx: watch::Receiver, + /// Safekeeper and other state, that should remain consistent and /// synchronized with the disk. This is tokio mutex as we write WAL to disk /// while holding it, ensuring that consensus checks are in order. @@ -340,6 +347,10 @@ impl Timeline { let rcl = shared_state.sk.state.remote_consistent_lsn; let (commit_lsn_watch_tx, commit_lsn_watch_rx) = watch::channel(shared_state.sk.state.commit_lsn); + let (term_flush_lsn_watch_tx, term_flush_lsn_watch_rx) = watch::channel(TermLsn::from(( + shared_state.sk.get_term(), + shared_state.sk.flush_lsn(), + ))); let (cancellation_tx, cancellation_rx) = watch::channel(false); Ok(Timeline { @@ -347,6 +358,8 @@ impl Timeline { wal_backup_launcher_tx, commit_lsn_watch_tx, commit_lsn_watch_rx, + term_flush_lsn_watch_tx, + term_flush_lsn_watch_rx, mutex: Mutex::new(shared_state), walsenders: WalSenders::new(rcl), walreceivers: WalReceivers::new(), @@ -366,6 +379,8 @@ impl Timeline { local_start_lsn: Lsn, ) -> Result { let (commit_lsn_watch_tx, commit_lsn_watch_rx) = watch::channel(Lsn::INVALID); + let (term_flush_lsn_watch_tx, term_flush_lsn_watch_rx) = + watch::channel(TermLsn::from((INVALID_TERM, Lsn::INVALID))); let (cancellation_tx, cancellation_rx) = watch::channel(false); let state = SafeKeeperState::new(&ttid, server_info, vec![], commit_lsn, local_start_lsn); @@ -374,6 +389,8 @@ impl Timeline { wal_backup_launcher_tx, commit_lsn_watch_tx, commit_lsn_watch_rx, + term_flush_lsn_watch_tx, + term_flush_lsn_watch_rx, mutex: Mutex::new(SharedState::create_new(conf, &ttid, state)?), walsenders: WalSenders::new(Lsn(0)), walreceivers: WalReceivers::new(), @@ -551,6 +568,11 @@ impl Timeline { self.commit_lsn_watch_rx.clone() } + /// Returns term_flush_lsn watch channel. + pub fn get_term_flush_lsn_watch_rx(&self) -> watch::Receiver { + self.term_flush_lsn_watch_rx.clone() + } + /// Pass arrived message to the safekeeper. pub async fn process_msg( &self, @@ -562,6 +584,7 @@ impl Timeline { let mut rmsg: Option; let commit_lsn: Lsn; + let term_flush_lsn: TermLsn; { let mut shared_state = self.write_shared_state().await; rmsg = shared_state.sk.process_msg(msg).await?; @@ -575,8 +598,11 @@ impl Timeline { } commit_lsn = shared_state.sk.inmem.commit_lsn; + term_flush_lsn = + TermLsn::from((shared_state.sk.get_term(), shared_state.sk.flush_lsn())); } self.commit_lsn_watch_tx.send(commit_lsn)?; + self.term_flush_lsn_watch_tx.send(term_flush_lsn)?; Ok(rmsg) }