mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-16 18:02:56 +00:00
safekeeper: pipeline WAL sending
With this patch we can serialize one batch while waiting for the read for the next one.
This commit is contained in:
@@ -1,6 +1,7 @@
|
||||
use std::time::Duration;
|
||||
|
||||
use anyhow::Context;
|
||||
use bytes::Bytes;
|
||||
use futures::StreamExt;
|
||||
use pageserver_api::shard::ShardIdentity;
|
||||
use postgres_backend::{CopyStreamHandlerEnd, PostgresBackend};
|
||||
@@ -9,6 +10,7 @@ use postgres_ffi::{get_current_timestamp, waldecoder::WalStreamDecoder};
|
||||
use pq_proto::{BeMessage, InterpretedWalRecordsBody, WalSndKeepAlive};
|
||||
use tokio::io::{AsyncRead, AsyncWrite};
|
||||
use tokio::time::MissedTickBehavior;
|
||||
use utils::lsn::Lsn;
|
||||
use utils::postgres_client::Compression;
|
||||
use utils::postgres_client::InterpretedFormat;
|
||||
use wal_decoder::models::{InterpretedWalRecord, InterpretedWalRecords};
|
||||
@@ -31,6 +33,18 @@ pub(crate) struct InterpretedWalSender<'a, IO> {
|
||||
pub(crate) appname: Option<String>,
|
||||
}
|
||||
|
||||
struct Batch {
|
||||
wal_end_lsn: Lsn,
|
||||
available_wal_end_lsn: Lsn,
|
||||
records: InterpretedWalRecords,
|
||||
}
|
||||
|
||||
struct SerializedBatch {
|
||||
wal_end_lsn: Lsn,
|
||||
available_wal_end_lsn: Lsn,
|
||||
buf: Bytes,
|
||||
}
|
||||
|
||||
impl<IO: AsyncRead + AsyncWrite + Unpin> InterpretedWalSender<'_, IO> {
|
||||
/// Send interpreted WAL to a receiver.
|
||||
/// Stops when an error occurs or the receiver is caught up and there's no active compute.
|
||||
@@ -49,10 +63,29 @@ impl<IO: AsyncRead + AsyncWrite + Unpin> InterpretedWalSender<'_, IO> {
|
||||
keepalive_ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);
|
||||
keepalive_ticker.reset();
|
||||
|
||||
let (tx, rx) = tokio::sync::mpsc::channel::<Batch>(2);
|
||||
let batches_stream =
|
||||
tokio_stream::wrappers::ReceiverStream::new(rx).then(|batch| async move {
|
||||
let buf: Result<Bytes, CopyStreamHandlerEnd> = batch
|
||||
.records
|
||||
.to_wire(self.format, self.compression)
|
||||
.await
|
||||
.with_context(|| "Failed to serialize interpreted WAL")
|
||||
.map_err(CopyStreamHandlerEnd::from);
|
||||
|
||||
Result::<_, CopyStreamHandlerEnd>::Ok(SerializedBatch {
|
||||
wal_end_lsn: batch.wal_end_lsn,
|
||||
available_wal_end_lsn: batch.available_wal_end_lsn,
|
||||
buf: buf?,
|
||||
})
|
||||
});
|
||||
let mut batches_stream = std::pin::pin!(batches_stream);
|
||||
|
||||
loop {
|
||||
tokio::select! {
|
||||
// Get some WAL from the stream and then: decode, interpret and send it
|
||||
wal = stream.next() => {
|
||||
// Get some WAL from the stream and then: decode, interpret and push it down the
|
||||
// pipeline.
|
||||
wal = stream.next(), if tx.capacity() > 0 => {
|
||||
let WalBytes { wal, wal_start_lsn: _, wal_end_lsn, available_wal_end_lsn } = match wal {
|
||||
Some(some) => some?,
|
||||
None => { break; }
|
||||
@@ -88,8 +121,15 @@ impl<IO: AsyncRead + AsyncWrite + Unpin> InterpretedWalSender<'_, IO> {
|
||||
records,
|
||||
next_record_lsn: max_next_record_lsn
|
||||
};
|
||||
let buf = batch.to_wire(self.format, self.compression).await
|
||||
.with_context(|| "Failed to serialize interpreted WAL")?;
|
||||
|
||||
tx.send(Batch {wal_end_lsn, available_wal_end_lsn, records: batch}).await.unwrap();
|
||||
},
|
||||
// For a previously interpreted batch, serialize it and push it down the wire.
|
||||
encoded_batch = batches_stream.next() => {
|
||||
let SerializedBatch {wal_end_lsn, available_wal_end_lsn, buf } = match encoded_batch {
|
||||
Some(ser_batch) => ser_batch?,
|
||||
None => { break; }
|
||||
};
|
||||
|
||||
// Reset the keep alive ticker since we are sending something
|
||||
// over the wire now.
|
||||
@@ -102,7 +142,6 @@ impl<IO: AsyncRead + AsyncWrite + Unpin> InterpretedWalSender<'_, IO> {
|
||||
data: &buf,
|
||||
})).await?;
|
||||
}
|
||||
|
||||
// Send a periodic keep alive when the connection has been idle for a while.
|
||||
_ = keepalive_ticker.tick() => {
|
||||
self.pgb
|
||||
|
||||
Reference in New Issue
Block a user