diff --git a/safekeeper/src/lib.rs b/safekeeper/src/lib.rs index 6d68b6b59b..b3155a2c85 100644 --- a/safekeeper/src/lib.rs +++ b/safekeeper/src/lib.rs @@ -38,6 +38,7 @@ pub mod timeline_manager; pub mod timelines_set; pub mod wal_backup; pub mod wal_backup_partial; +pub mod wal_reader_stream; pub mod wal_service; pub mod wal_storage; diff --git a/safekeeper/src/send_wal.rs b/safekeeper/src/send_wal.rs index 6d94ff98b1..998c6ff906 100644 --- a/safekeeper/src/send_wal.rs +++ b/safekeeper/src/send_wal.rs @@ -226,7 +226,7 @@ impl WalSenders { /// Get remote_consistent_lsn reported by the pageserver. Returns None if /// client is not pageserver. - fn get_ws_remote_consistent_lsn(self: &Arc, id: WalSenderId) -> Option { + pub fn get_ws_remote_consistent_lsn(self: &Arc, id: WalSenderId) -> Option { let shared = self.mutex.lock(); let slot = shared.get_slot(id); match slot.feedback { @@ -370,6 +370,16 @@ pub struct WalSenderGuard { walsenders: Arc, } +impl WalSenderGuard { + pub fn id(&self) -> WalSenderId { + self.id + } + + pub fn walsenders(&self) -> &Arc { + &self.walsenders + } +} + impl Drop for WalSenderGuard { fn drop(&mut self) { self.walsenders.unregister(self.id); @@ -499,16 +509,17 @@ impl SafekeeperPostgresHandler { } } +/// TODO(vlad): maybe lift this instead /// Walsender streams either up to commit_lsn (normally) or flush_lsn in the /// given term (recovery by walproposer or peer safekeeper). -enum EndWatch { +pub(crate) enum EndWatch { Commit(Receiver), Flush(Receiver), } impl EndWatch { /// Get current end of WAL. - fn get(&self) -> Lsn { + pub(crate) fn get(&self) -> Lsn { match self { EndWatch::Commit(r) => *r.borrow(), EndWatch::Flush(r) => r.borrow().lsn, @@ -516,13 +527,35 @@ impl EndWatch { } /// Wait for the update. - async fn changed(&mut self) -> anyhow::Result<()> { + pub(crate) async fn changed(&mut self) -> anyhow::Result<()> { match self { EndWatch::Commit(r) => r.changed().await?, EndWatch::Flush(r) => r.changed().await?, } Ok(()) } + + pub(crate) async fn wait_for_lsn( + &mut self, + lsn: Lsn, + client_term: Option, + ) -> anyhow::Result { + loop { + let end_pos = self.get(); + if end_pos > lsn { + return Ok(end_pos); + } + if let EndWatch::Flush(rx) = &self { + let curr_term = rx.borrow().term; + if let Some(client_term) = client_term { + if curr_term != client_term { + bail!("term changed: requested {}, now {}", client_term, curr_term); + } + } + } + self.changed().await?; + } + } } /// A half driving sending WAL. diff --git a/safekeeper/src/wal_reader_stream.rs b/safekeeper/src/wal_reader_stream.rs new file mode 100644 index 0000000000..404ebba870 --- /dev/null +++ b/safekeeper/src/wal_reader_stream.rs @@ -0,0 +1,139 @@ +use std::sync::Arc; + +use async_stream::try_stream; +use bytes::Bytes; +use futures::Stream; +use postgres_backend::CopyStreamHandlerEnd; +use postgres_ffi::MAX_SEND_SIZE; +use std::time::Duration; +use tokio::time::timeout; +use utils::lsn::Lsn; + +use crate::{ + safekeeper::Term, + send_wal::{EndWatch, WalSenderGuard}, + timeline::WalResidentTimeline, +}; + +pub(crate) struct WalReaderStreamBuilder { + pub(crate) tli: WalResidentTimeline, + pub(crate) start_pos: Lsn, + pub(crate) end_pos: Lsn, + pub(crate) term: Option, + pub(crate) end_watch: EndWatch, + pub(crate) wal_sender_guard: Arc, +} + +pub(crate) struct WalBytes { + /// Raw PG WAL + pub(crate) wal: Bytes, + /// Start LSN of [`Self::wal`] + pub(crate) wal_start_lsn: Lsn, + /// End LSN of [`Self::wal`] + pub(crate) wal_end_lsn: Lsn, + /// End LSN of WAL available on the safekeeper + pub(crate) available_wal_end_lsn: Lsn, +} + +impl WalReaderStreamBuilder { + /// Builds a stream of Postgres WAL starting from [`Self::start_pos`]. + /// The stream terminates when the receiver (pageserver) is fully caught up + /// and there's no active computes. + pub(crate) async fn build( + self, + ) -> anyhow::Result>> { + // TODO(vlad): The code below duplicates functionality from [`crate::send_wal`]. + // We can make the raw WAL sender use this stream too and remove the duplication. + let Self { + tli, + mut start_pos, + mut end_pos, + term, + mut end_watch, + wal_sender_guard, + } = self; + let mut wal_reader = tli.get_walreader(start_pos).await?; + let mut buffer = vec![0; MAX_SEND_SIZE]; + + const POLL_STATE_TIMEOUT: Duration = Duration::from_secs(1); + + Ok(try_stream! { + loop { + let have_something_to_send = end_pos > start_pos; + + if !have_something_to_send { + // wait for lsn + let res = timeout(POLL_STATE_TIMEOUT, end_watch.wait_for_lsn(start_pos, term)).await; + match res { + Ok(ok) => { + end_pos = ok?; + }, + Err(_) => { + if let EndWatch::Commit(_) = end_watch { + if let Some(remote_consistent_lsn) = wal_sender_guard + .walsenders() + .get_ws_remote_consistent_lsn(wal_sender_guard.id()) + { + if tli.should_walsender_stop(remote_consistent_lsn).await { + // Terminate if there is nothing more to send. + // Note that "ending streaming" part of the string is used by + // pageserver to identify WalReceiverError::SuccessfulCompletion, + // do not change this string without updating pageserver. + return; + } + } + } + + continue; + } + } + } + + + assert!( + end_pos > start_pos, + "nothing to send after waiting for WAL" + ); + + // try to send as much as available, capped by MAX_SEND_SIZE + let mut chunk_end_pos = start_pos + MAX_SEND_SIZE as u64; + // if we went behind available WAL, back off + if chunk_end_pos >= end_pos { + chunk_end_pos = end_pos; + } else { + // If sending not up to end pos, round down to page boundary to + // avoid breaking WAL record not at page boundary, as protocol + // demands. See walsender.c (XLogSendPhysical). + chunk_end_pos = chunk_end_pos + .checked_sub(chunk_end_pos.block_offset()) + .unwrap(); + } + let send_size = (chunk_end_pos.0 - start_pos.0) as usize; + let buffer = &mut buffer[..send_size]; + let send_size: usize; + { + // If uncommitted part is being pulled, check that the term is + // still the expected one. + let _term_guard = if let Some(t) = term { + Some(tli.acquire_term(t).await?) + } else { + None + }; + // Read WAL into buffer. send_size can be additionally capped to + // segment boundary here. + send_size = wal_reader.read(buffer).await? + }; + let wal = Bytes::copy_from_slice(&buffer[..send_size]); + + yield WalBytes { + wal, + wal_start_lsn: start_pos, + wal_end_lsn: start_pos + send_size as u64, + available_wal_end_lsn: end_pos + }; + + start_pos += send_size as u64; + } + }) + } +}