mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-16 01:42:55 +00:00
safekeeper: abstract WAL reading into a stream
This commit is contained in:
@@ -38,6 +38,7 @@ pub mod timeline_manager;
|
||||
pub mod timelines_set;
|
||||
pub mod wal_backup;
|
||||
pub mod wal_backup_partial;
|
||||
pub mod wal_reader_stream;
|
||||
pub mod wal_service;
|
||||
pub mod wal_storage;
|
||||
|
||||
|
||||
@@ -226,7 +226,7 @@ impl WalSenders {
|
||||
|
||||
/// Get remote_consistent_lsn reported by the pageserver. Returns None if
|
||||
/// client is not pageserver.
|
||||
fn get_ws_remote_consistent_lsn(self: &Arc<WalSenders>, id: WalSenderId) -> Option<Lsn> {
|
||||
pub fn get_ws_remote_consistent_lsn(self: &Arc<WalSenders>, id: WalSenderId) -> Option<Lsn> {
|
||||
let shared = self.mutex.lock();
|
||||
let slot = shared.get_slot(id);
|
||||
match slot.feedback {
|
||||
@@ -370,6 +370,16 @@ pub struct WalSenderGuard {
|
||||
walsenders: Arc<WalSenders>,
|
||||
}
|
||||
|
||||
impl WalSenderGuard {
|
||||
pub fn id(&self) -> WalSenderId {
|
||||
self.id
|
||||
}
|
||||
|
||||
pub fn walsenders(&self) -> &Arc<WalSenders> {
|
||||
&self.walsenders
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for WalSenderGuard {
|
||||
fn drop(&mut self) {
|
||||
self.walsenders.unregister(self.id);
|
||||
@@ -499,16 +509,17 @@ impl SafekeeperPostgresHandler {
|
||||
}
|
||||
}
|
||||
|
||||
/// TODO(vlad): maybe lift this instead
|
||||
/// Walsender streams either up to commit_lsn (normally) or flush_lsn in the
|
||||
/// given term (recovery by walproposer or peer safekeeper).
|
||||
enum EndWatch {
|
||||
pub(crate) enum EndWatch {
|
||||
Commit(Receiver<Lsn>),
|
||||
Flush(Receiver<TermLsn>),
|
||||
}
|
||||
|
||||
impl EndWatch {
|
||||
/// Get current end of WAL.
|
||||
fn get(&self) -> Lsn {
|
||||
pub(crate) fn get(&self) -> Lsn {
|
||||
match self {
|
||||
EndWatch::Commit(r) => *r.borrow(),
|
||||
EndWatch::Flush(r) => r.borrow().lsn,
|
||||
@@ -516,13 +527,35 @@ impl EndWatch {
|
||||
}
|
||||
|
||||
/// Wait for the update.
|
||||
async fn changed(&mut self) -> anyhow::Result<()> {
|
||||
pub(crate) async fn changed(&mut self) -> anyhow::Result<()> {
|
||||
match self {
|
||||
EndWatch::Commit(r) => r.changed().await?,
|
||||
EndWatch::Flush(r) => r.changed().await?,
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) async fn wait_for_lsn(
|
||||
&mut self,
|
||||
lsn: Lsn,
|
||||
client_term: Option<Term>,
|
||||
) -> anyhow::Result<Lsn> {
|
||||
loop {
|
||||
let end_pos = self.get();
|
||||
if end_pos > lsn {
|
||||
return Ok(end_pos);
|
||||
}
|
||||
if let EndWatch::Flush(rx) = &self {
|
||||
let curr_term = rx.borrow().term;
|
||||
if let Some(client_term) = client_term {
|
||||
if curr_term != client_term {
|
||||
bail!("term changed: requested {}, now {}", client_term, curr_term);
|
||||
}
|
||||
}
|
||||
}
|
||||
self.changed().await?;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// A half driving sending WAL.
|
||||
|
||||
139
safekeeper/src/wal_reader_stream.rs
Normal file
139
safekeeper/src/wal_reader_stream.rs
Normal file
@@ -0,0 +1,139 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use async_stream::try_stream;
|
||||
use bytes::Bytes;
|
||||
use futures::Stream;
|
||||
use postgres_backend::CopyStreamHandlerEnd;
|
||||
use postgres_ffi::MAX_SEND_SIZE;
|
||||
use std::time::Duration;
|
||||
use tokio::time::timeout;
|
||||
use utils::lsn::Lsn;
|
||||
|
||||
use crate::{
|
||||
safekeeper::Term,
|
||||
send_wal::{EndWatch, WalSenderGuard},
|
||||
timeline::WalResidentTimeline,
|
||||
};
|
||||
|
||||
pub(crate) struct WalReaderStreamBuilder {
|
||||
pub(crate) tli: WalResidentTimeline,
|
||||
pub(crate) start_pos: Lsn,
|
||||
pub(crate) end_pos: Lsn,
|
||||
pub(crate) term: Option<Term>,
|
||||
pub(crate) end_watch: EndWatch,
|
||||
pub(crate) wal_sender_guard: Arc<WalSenderGuard>,
|
||||
}
|
||||
|
||||
pub(crate) struct WalBytes {
|
||||
/// Raw PG WAL
|
||||
pub(crate) wal: Bytes,
|
||||
/// Start LSN of [`Self::wal`]
|
||||
pub(crate) wal_start_lsn: Lsn,
|
||||
/// End LSN of [`Self::wal`]
|
||||
pub(crate) wal_end_lsn: Lsn,
|
||||
/// End LSN of WAL available on the safekeeper
|
||||
pub(crate) available_wal_end_lsn: Lsn,
|
||||
}
|
||||
|
||||
impl WalReaderStreamBuilder {
|
||||
/// Builds a stream of Postgres WAL starting from [`Self::start_pos`].
|
||||
/// The stream terminates when the receiver (pageserver) is fully caught up
|
||||
/// and there's no active computes.
|
||||
pub(crate) async fn build(
|
||||
self,
|
||||
) -> anyhow::Result<impl Stream<Item = Result<WalBytes, CopyStreamHandlerEnd>>> {
|
||||
// TODO(vlad): The code below duplicates functionality from [`crate::send_wal`].
|
||||
// We can make the raw WAL sender use this stream too and remove the duplication.
|
||||
let Self {
|
||||
tli,
|
||||
mut start_pos,
|
||||
mut end_pos,
|
||||
term,
|
||||
mut end_watch,
|
||||
wal_sender_guard,
|
||||
} = self;
|
||||
let mut wal_reader = tli.get_walreader(start_pos).await?;
|
||||
let mut buffer = vec![0; MAX_SEND_SIZE];
|
||||
|
||||
const POLL_STATE_TIMEOUT: Duration = Duration::from_secs(1);
|
||||
|
||||
Ok(try_stream! {
|
||||
loop {
|
||||
let have_something_to_send = end_pos > start_pos;
|
||||
|
||||
if !have_something_to_send {
|
||||
// wait for lsn
|
||||
let res = timeout(POLL_STATE_TIMEOUT, end_watch.wait_for_lsn(start_pos, term)).await;
|
||||
match res {
|
||||
Ok(ok) => {
|
||||
end_pos = ok?;
|
||||
},
|
||||
Err(_) => {
|
||||
if let EndWatch::Commit(_) = end_watch {
|
||||
if let Some(remote_consistent_lsn) = wal_sender_guard
|
||||
.walsenders()
|
||||
.get_ws_remote_consistent_lsn(wal_sender_guard.id())
|
||||
{
|
||||
if tli.should_walsender_stop(remote_consistent_lsn).await {
|
||||
// Terminate if there is nothing more to send.
|
||||
// Note that "ending streaming" part of the string is used by
|
||||
// pageserver to identify WalReceiverError::SuccessfulCompletion,
|
||||
// do not change this string without updating pageserver.
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
assert!(
|
||||
end_pos > start_pos,
|
||||
"nothing to send after waiting for WAL"
|
||||
);
|
||||
|
||||
// try to send as much as available, capped by MAX_SEND_SIZE
|
||||
let mut chunk_end_pos = start_pos + MAX_SEND_SIZE as u64;
|
||||
// if we went behind available WAL, back off
|
||||
if chunk_end_pos >= end_pos {
|
||||
chunk_end_pos = end_pos;
|
||||
} else {
|
||||
// If sending not up to end pos, round down to page boundary to
|
||||
// avoid breaking WAL record not at page boundary, as protocol
|
||||
// demands. See walsender.c (XLogSendPhysical).
|
||||
chunk_end_pos = chunk_end_pos
|
||||
.checked_sub(chunk_end_pos.block_offset())
|
||||
.unwrap();
|
||||
}
|
||||
let send_size = (chunk_end_pos.0 - start_pos.0) as usize;
|
||||
let buffer = &mut buffer[..send_size];
|
||||
let send_size: usize;
|
||||
{
|
||||
// If uncommitted part is being pulled, check that the term is
|
||||
// still the expected one.
|
||||
let _term_guard = if let Some(t) = term {
|
||||
Some(tli.acquire_term(t).await?)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
// Read WAL into buffer. send_size can be additionally capped to
|
||||
// segment boundary here.
|
||||
send_size = wal_reader.read(buffer).await?
|
||||
};
|
||||
let wal = Bytes::copy_from_slice(&buffer[..send_size]);
|
||||
|
||||
yield WalBytes {
|
||||
wal,
|
||||
wal_start_lsn: start_pos,
|
||||
wal_end_lsn: start_pos + send_size as u64,
|
||||
available_wal_end_lsn: end_pos
|
||||
};
|
||||
|
||||
start_pos += send_size as u64;
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user