diff --git a/safekeeper/src/send_interpreted_wal.rs b/safekeeper/src/send_interpreted_wal.rs index 308f7c92ad..820ad5a608 100644 --- a/safekeeper/src/send_interpreted_wal.rs +++ b/safekeeper/src/send_interpreted_wal.rs @@ -4,6 +4,7 @@ use anyhow::Context; use futures::StreamExt; use pageserver_api::shard::ShardIdentity; use postgres_backend::{CopyStreamHandlerEnd, PostgresBackend}; +use postgres_ffi::MAX_SEND_SIZE; use postgres_ffi::{get_current_timestamp, waldecoder::WalStreamDecoder}; use pq_proto::{BeMessage, InterpretedWalRecordsBody, WalSndKeepAlive}; use tokio::io::{AsyncRead, AsyncWrite}; @@ -38,7 +39,7 @@ impl InterpretedWalSender<'_, IO> { let mut wal_decoder = WalStreamDecoder::new(self.wal_stream_builder.start_pos(), self.pg_version); - let stream = self.wal_stream_builder.build().await?; + let stream = self.wal_stream_builder.build(MAX_SEND_SIZE).await?; let mut stream = std::pin::pin!(stream); let mut keepalive_ticker = tokio::time::interval(Duration::from_secs(1)); diff --git a/safekeeper/src/wal_reader_stream.rs b/safekeeper/src/wal_reader_stream.rs index 9a5c0b292a..be5b29b8e1 100644 --- a/safekeeper/src/wal_reader_stream.rs +++ b/safekeeper/src/wal_reader_stream.rs @@ -4,7 +4,6 @@ use async_stream::try_stream; use bytes::Bytes; use futures::Stream; use postgres_backend::CopyStreamHandlerEnd; -use postgres_ffi::MAX_SEND_SIZE; use std::time::Duration; use tokio::time::timeout; use utils::lsn::Lsn; @@ -47,6 +46,7 @@ impl WalReaderStreamBuilder { /// and there's no active computes. pub(crate) async fn build( self, + buffer_size: usize, ) -> anyhow::Result>> { // TODO(vlad): The code below duplicates functionality from [`crate::send_wal`]. // We can make the raw WAL sender use this stream too and remove the duplication. @@ -59,7 +59,7 @@ impl WalReaderStreamBuilder { wal_sender_guard, } = self; let mut wal_reader = tli.get_walreader(start_pos).await?; - let mut buffer = vec![0; MAX_SEND_SIZE]; + let mut buffer = vec![0; buffer_size]; const POLL_STATE_TIMEOUT: Duration = Duration::from_secs(1); @@ -101,8 +101,8 @@ impl WalReaderStreamBuilder { "nothing to send after waiting for WAL" ); - // try to send as much as available, capped by MAX_SEND_SIZE - let mut chunk_end_pos = start_pos + MAX_SEND_SIZE as u64; + // try to send as much as available, capped by the buffer size + let mut chunk_end_pos = start_pos + buffer_size as u64; // if we went behind available WAL, back off if chunk_end_pos >= end_pos { chunk_end_pos = end_pos;