mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-20 06:30:43 +00:00
146 lines
5.4 KiB
Rust
146 lines
5.4 KiB
Rust
use std::sync::Arc;
|
|
|
|
use async_stream::try_stream;
|
|
use bytes::Bytes;
|
|
use futures::Stream;
|
|
use postgres_backend::CopyStreamHandlerEnd;
|
|
use postgres_ffi::MAX_SEND_SIZE;
|
|
use std::time::Duration;
|
|
use tokio::time::timeout;
|
|
use utils::lsn::Lsn;
|
|
|
|
use crate::{
|
|
safekeeper::Term,
|
|
send_wal::{EndWatch, WalSenderGuard},
|
|
timeline::WalResidentTimeline,
|
|
};
|
|
|
|
pub(crate) struct WalReaderStreamBuilder {
|
|
pub(crate) tli: WalResidentTimeline,
|
|
pub(crate) start_pos: Lsn,
|
|
pub(crate) end_pos: Lsn,
|
|
pub(crate) term: Option<Term>,
|
|
pub(crate) end_watch: EndWatch,
|
|
pub(crate) wal_sender_guard: Arc<WalSenderGuard>,
|
|
}
|
|
|
|
impl WalReaderStreamBuilder {
|
|
pub(crate) fn start_pos(&self) -> Lsn {
|
|
self.start_pos
|
|
}
|
|
}
|
|
|
|
pub(crate) struct WalBytes {
|
|
/// Raw PG WAL
|
|
pub(crate) wal: Bytes,
|
|
/// Start LSN of [`Self::wal`]
|
|
pub(crate) wal_start_lsn: Lsn,
|
|
/// End LSN of [`Self::wal`]
|
|
pub(crate) wal_end_lsn: Lsn,
|
|
/// End LSN of WAL available on the safekeeper
|
|
pub(crate) available_wal_end_lsn: Lsn,
|
|
}
|
|
|
|
impl WalReaderStreamBuilder {
|
|
/// Builds a stream of Postgres WAL starting from [`Self::start_pos`].
|
|
/// The stream terminates when the receiver (pageserver) is fully caught up
|
|
/// and there's no active computes.
|
|
pub(crate) async fn build(
|
|
self,
|
|
) -> anyhow::Result<impl Stream<Item = Result<WalBytes, CopyStreamHandlerEnd>>> {
|
|
// TODO(vlad): The code below duplicates functionality from [`crate::send_wal`].
|
|
// We can make the raw WAL sender use this stream too and remove the duplication.
|
|
let Self {
|
|
tli,
|
|
mut start_pos,
|
|
mut end_pos,
|
|
term,
|
|
mut end_watch,
|
|
wal_sender_guard,
|
|
} = self;
|
|
let mut wal_reader = tli.get_walreader(start_pos).await?;
|
|
let mut buffer = vec![0; MAX_SEND_SIZE];
|
|
|
|
const POLL_STATE_TIMEOUT: Duration = Duration::from_secs(1);
|
|
|
|
Ok(try_stream! {
|
|
loop {
|
|
let have_something_to_send = end_pos > start_pos;
|
|
|
|
if !have_something_to_send {
|
|
// wait for lsn
|
|
let res = timeout(POLL_STATE_TIMEOUT, end_watch.wait_for_lsn(start_pos, term)).await;
|
|
match res {
|
|
Ok(ok) => {
|
|
end_pos = ok?;
|
|
},
|
|
Err(_) => {
|
|
if let EndWatch::Commit(_) = end_watch {
|
|
if let Some(remote_consistent_lsn) = wal_sender_guard
|
|
.walsenders()
|
|
.get_ws_remote_consistent_lsn(wal_sender_guard.id())
|
|
{
|
|
if tli.should_walsender_stop(remote_consistent_lsn).await {
|
|
// Terminate if there is nothing more to send.
|
|
// Note that "ending streaming" part of the string is used by
|
|
// pageserver to identify WalReceiverError::SuccessfulCompletion,
|
|
// do not change this string without updating pageserver.
|
|
return;
|
|
}
|
|
}
|
|
}
|
|
|
|
continue;
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
assert!(
|
|
end_pos > start_pos,
|
|
"nothing to send after waiting for WAL"
|
|
);
|
|
|
|
// try to send as much as available, capped by MAX_SEND_SIZE
|
|
let mut chunk_end_pos = start_pos + MAX_SEND_SIZE as u64;
|
|
// if we went behind available WAL, back off
|
|
if chunk_end_pos >= end_pos {
|
|
chunk_end_pos = end_pos;
|
|
} else {
|
|
// If sending not up to end pos, round down to page boundary to
|
|
// avoid breaking WAL record not at page boundary, as protocol
|
|
// demands. See walsender.c (XLogSendPhysical).
|
|
chunk_end_pos = chunk_end_pos
|
|
.checked_sub(chunk_end_pos.block_offset())
|
|
.unwrap();
|
|
}
|
|
let send_size = (chunk_end_pos.0 - start_pos.0) as usize;
|
|
let buffer = &mut buffer[..send_size];
|
|
let send_size: usize;
|
|
{
|
|
// If uncommitted part is being pulled, check that the term is
|
|
// still the expected one.
|
|
let _term_guard = if let Some(t) = term {
|
|
Some(tli.acquire_term(t).await?)
|
|
} else {
|
|
None
|
|
};
|
|
// Read WAL into buffer. send_size can be additionally capped to
|
|
// segment boundary here.
|
|
send_size = wal_reader.read(buffer).await?
|
|
};
|
|
let wal = Bytes::copy_from_slice(&buffer[..send_size]);
|
|
|
|
yield WalBytes {
|
|
wal,
|
|
wal_start_lsn: start_pos,
|
|
wal_end_lsn: start_pos + send_size as u64,
|
|
available_wal_end_lsn: end_pos
|
|
};
|
|
|
|
start_pos += send_size as u64;
|
|
}
|
|
})
|
|
}
|
|
}
|