diff --git a/safekeeper/src/send_interpreted_wal.rs b/safekeeper/src/send_interpreted_wal.rs index 03874b63f5..a0bd71057d 100644 --- a/safekeeper/src/send_interpreted_wal.rs +++ b/safekeeper/src/send_interpreted_wal.rs @@ -1,6 +1,7 @@ use std::time::Duration; use anyhow::Context; +use bytes::Bytes; use futures::StreamExt; use pageserver_api::shard::ShardIdentity; use postgres_backend::{CopyStreamHandlerEnd, PostgresBackend}; @@ -9,6 +10,7 @@ use postgres_ffi::{get_current_timestamp, waldecoder::WalStreamDecoder}; use pq_proto::{BeMessage, InterpretedWalRecordsBody, WalSndKeepAlive}; use tokio::io::{AsyncRead, AsyncWrite}; use tokio::time::MissedTickBehavior; +use utils::lsn::Lsn; use utils::postgres_client::Compression; use utils::postgres_client::InterpretedFormat; use wal_decoder::models::{InterpretedWalRecord, InterpretedWalRecords}; @@ -31,6 +33,18 @@ pub(crate) struct InterpretedWalSender<'a, IO> { pub(crate) appname: Option, } +struct Batch { + wal_end_lsn: Lsn, + available_wal_end_lsn: Lsn, + records: InterpretedWalRecords, +} + +struct SerializedBatch { + wal_end_lsn: Lsn, + available_wal_end_lsn: Lsn, + buf: Bytes, +} + impl InterpretedWalSender<'_, IO> { /// Send interpreted WAL to a receiver. /// Stops when an error occurs or the receiver is caught up and there's no active compute. @@ -49,10 +63,29 @@ impl InterpretedWalSender<'_, IO> { keepalive_ticker.set_missed_tick_behavior(MissedTickBehavior::Skip); keepalive_ticker.reset(); + let (tx, rx) = tokio::sync::mpsc::channel::(2); + let batches_stream = + tokio_stream::wrappers::ReceiverStream::new(rx).then(|batch| async move { + let buf: Result = batch + .records + .to_wire(self.format, self.compression) + .await + .with_context(|| "Failed to serialize interpreted WAL") + .map_err(CopyStreamHandlerEnd::from); + + Result::<_, CopyStreamHandlerEnd>::Ok(SerializedBatch { + wal_end_lsn: batch.wal_end_lsn, + available_wal_end_lsn: batch.available_wal_end_lsn, + buf: buf?, + }) + }); + let mut batches_stream = std::pin::pin!(batches_stream); + loop { tokio::select! { - // Get some WAL from the stream and then: decode, interpret and send it - wal = stream.next() => { + // Get some WAL from the stream and then: decode, interpret and push it down the + // pipeline. + wal = stream.next(), if tx.capacity() > 0 => { let WalBytes { wal, wal_start_lsn: _, wal_end_lsn, available_wal_end_lsn } = match wal { Some(some) => some?, None => { break; } @@ -88,8 +121,15 @@ impl InterpretedWalSender<'_, IO> { records, next_record_lsn: max_next_record_lsn }; - let buf = batch.to_wire(self.format, self.compression).await - .with_context(|| "Failed to serialize interpreted WAL")?; + + tx.send(Batch {wal_end_lsn, available_wal_end_lsn, records: batch}).await.unwrap(); + }, + // For a previously interpreted batch, serialize it and push it down the wire. + encoded_batch = batches_stream.next() => { + let SerializedBatch {wal_end_lsn, available_wal_end_lsn, buf } = match encoded_batch { + Some(ser_batch) => ser_batch?, + None => { break; } + }; // Reset the keep alive ticker since we are sending something // over the wire now. @@ -102,7 +142,6 @@ impl InterpretedWalSender<'_, IO> { data: &buf, })).await?; } - // Send a periodic keep alive when the connection has been idle for a while. _ = keepalive_ticker.tick() => { self.pgb