safekeeper: add an interpreted wal sender

This commit is contained in:
Vlad Lazar
2024-11-13 12:41:09 +01:00
parent 1f043f480a
commit 3cef53c5c0
6 changed files with 136 additions and 0 deletions

1
Cargo.lock generated
View File

@@ -5192,6 +5192,7 @@ dependencies = [
"tracing-subscriber",
"url",
"utils",
"wal_decoder",
"walproposer",
"workspace_hack",
]

View File

@@ -58,6 +58,7 @@ sd-notify.workspace = true
storage_broker.workspace = true
tokio-stream.workspace = true
utils.workspace = true
wal_decoder.workspace = true
workspace_hack.workspace = true

View File

@@ -29,6 +29,7 @@ pub mod receive_wal;
pub mod recovery;
pub mod remove_wal;
pub mod safekeeper;
pub mod send_interpreted_wal;
pub mod send_wal;
pub mod state;
pub mod timeline;

View File

@@ -0,0 +1,120 @@
use std::time::Duration;
use anyhow::Context;
use futures::StreamExt;
use pageserver_api::shard::ShardIdentity;
use postgres_backend::{CopyStreamHandlerEnd, PostgresBackend};
use postgres_ffi::{get_current_timestamp, waldecoder::WalStreamDecoder};
use pq_proto::{BeMessage, InterpretedWalRecordsBody, WalSndKeepAlive};
use tokio::io::{AsyncRead, AsyncWrite};
use tokio::time::MissedTickBehavior;
use utils::bin_ser::BeSer;
use utils::lsn::Lsn;
use wal_decoder::models::InterpretedWalRecord;
use crate::send_wal::EndWatchView;
use crate::wal_reader_stream::{WalBytes, WalReaderStreamBuilder};
/// Shard-aware interpreted record sender.
/// This is used for sending WAL to the pageserver. Said WAL
/// is pre-interpreted and filtered for the shard.
pub(crate) struct InterpretedWalSender<'a, IO> {
pub(crate) pgb: &'a mut PostgresBackend<IO>,
pub(crate) wal_stream_builder: WalReaderStreamBuilder,
pub(crate) end_watch_view: EndWatchView,
pub(crate) shard: ShardIdentity,
pub(crate) pg_version: u32,
pub(crate) appname: Option<String>,
}
impl<IO: AsyncRead + AsyncWrite + Unpin> InterpretedWalSender<'_, IO> {
/// Send interpreted WAL to a receiver.
/// Stops when an error occurs or the receiver is caught up and there's no active compute.
///
/// Err(CopyStreamHandlerEnd) is always returned; Result is used only for ?
/// convenience.
pub(crate) async fn run(self) -> Result<(), CopyStreamHandlerEnd> {
let mut wal_position = self.wal_stream_builder.start_pos();
let mut wal_decoder =
WalStreamDecoder::new(self.wal_stream_builder.start_pos(), self.pg_version);
let stream = self.wal_stream_builder.build().await?;
let mut stream = std::pin::pin!(stream);
let mut keepalive_ticker = tokio::time::interval(Duration::from_secs(1));
keepalive_ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);
keepalive_ticker.reset();
loop {
tokio::select! {
// Get some WAL from the stream and then: decode, interpret and send it
wal = stream.next() => {
let WalBytes { wal, wal_start_lsn, wal_end_lsn, available_wal_end_lsn } = match wal {
Some(some) => some?,
None => { break; }
};
wal_position = wal_start_lsn;
wal_decoder.feed_bytes(&wal);
let mut records = Vec::new();
let mut max_next_record_lsn = None;
while let Some((next_record_lsn, recdata)) = wal_decoder
.poll_decode()
.with_context(|| "Failed to decode WAL")?
{
assert!(next_record_lsn.is_aligned());
max_next_record_lsn = Some(next_record_lsn);
// Deserialize and interpret WAL record
let interpreted = InterpretedWalRecord::from_bytes_filtered(
recdata,
&self.shard,
next_record_lsn,
self.pg_version,
)
.with_context(|| "Failed to interpret WAL")?;
if !interpreted.is_empty() {
records.push(interpreted);
}
}
let mut buf = Vec::new();
records
.ser_into(&mut buf)
.with_context(|| "Failed to serialize interpreted WAL")?;
// Reset the keep alive ticker since we are sending something
// over the wire now.
keepalive_ticker.reset();
self.pgb
.write_message(&BeMessage::InterpretedWalRecords(InterpretedWalRecordsBody {
streaming_lsn: wal_end_lsn.0,
safekeeper_wal_end_lsn: available_wal_end_lsn.0,
next_record_lsn: max_next_record_lsn.unwrap_or(Lsn::INVALID).0,
data: buf.as_slice(),
})).await?;
}
// Send a periodic keep alive when the connection has been idle for a while.
_ = keepalive_ticker.tick() => {
self.pgb
.write_message(&BeMessage::KeepAlive(WalSndKeepAlive {
wal_end: self.end_watch_view.get().0,
timestamp: get_current_timestamp(),
request_reply: true,
}))
.await?;
}
}
}
// The loop above ends when the receiver is caught up and there's no more WAL to send.
Err(CopyStreamHandlerEnd::ServerInitiated(format!(
"ending streaming to {:?} at {}, receiver is caughtup and there is no computes",
self.appname, wal_position,
)))
}
}

View File

@@ -558,6 +558,13 @@ impl EndWatch {
}
}
pub(crate) struct EndWatchView(EndWatch);
impl EndWatchView {
pub(crate) fn get(&self) -> Lsn {
self.0.get()
}
}
/// A half driving sending WAL.
struct WalSender<'a, IO> {
pgb: &'a mut PostgresBackend<IO>,

View File

@@ -24,6 +24,12 @@ pub(crate) struct WalReaderStreamBuilder {
pub(crate) wal_sender_guard: Arc<WalSenderGuard>,
}
impl WalReaderStreamBuilder {
pub(crate) fn start_pos(&self) -> Lsn {
self.start_pos
}
}
pub(crate) struct WalBytes {
/// Raw PG WAL
pub(crate) wal: Bytes,