review: pass buffer size as argument to wal reader

This commit is contained in:
Vlad Lazar
2024-11-18 12:09:11 +01:00
parent c3701890d6
commit 8c4deeed1d
2 changed files with 6 additions and 5 deletions

View File

@@ -4,6 +4,7 @@ use anyhow::Context;
use futures::StreamExt;
use pageserver_api::shard::ShardIdentity;
use postgres_backend::{CopyStreamHandlerEnd, PostgresBackend};
use postgres_ffi::MAX_SEND_SIZE;
use postgres_ffi::{get_current_timestamp, waldecoder::WalStreamDecoder};
use pq_proto::{BeMessage, InterpretedWalRecordsBody, WalSndKeepAlive};
use tokio::io::{AsyncRead, AsyncWrite};
@@ -38,7 +39,7 @@ impl<IO: AsyncRead + AsyncWrite + Unpin> InterpretedWalSender<'_, IO> {
let mut wal_decoder =
WalStreamDecoder::new(self.wal_stream_builder.start_pos(), self.pg_version);
let stream = self.wal_stream_builder.build().await?;
let stream = self.wal_stream_builder.build(MAX_SEND_SIZE).await?;
let mut stream = std::pin::pin!(stream);
let mut keepalive_ticker = tokio::time::interval(Duration::from_secs(1));

View File

@@ -4,7 +4,6 @@ use async_stream::try_stream;
use bytes::Bytes;
use futures::Stream;
use postgres_backend::CopyStreamHandlerEnd;
use postgres_ffi::MAX_SEND_SIZE;
use std::time::Duration;
use tokio::time::timeout;
use utils::lsn::Lsn;
@@ -47,6 +46,7 @@ impl WalReaderStreamBuilder {
/// and there's no active computes.
pub(crate) async fn build(
self,
buffer_size: usize,
) -> anyhow::Result<impl Stream<Item = Result<WalBytes, CopyStreamHandlerEnd>>> {
// TODO(vlad): The code below duplicates functionality from [`crate::send_wal`].
// We can make the raw WAL sender use this stream too and remove the duplication.
@@ -59,7 +59,7 @@ impl WalReaderStreamBuilder {
wal_sender_guard,
} = self;
let mut wal_reader = tli.get_walreader(start_pos).await?;
let mut buffer = vec![0; MAX_SEND_SIZE];
let mut buffer = vec![0; buffer_size];
const POLL_STATE_TIMEOUT: Duration = Duration::from_secs(1);
@@ -101,8 +101,8 @@ impl WalReaderStreamBuilder {
"nothing to send after waiting for WAL"
);
// try to send as much as available, capped by MAX_SEND_SIZE
let mut chunk_end_pos = start_pos + MAX_SEND_SIZE as u64;
// try to send as much as available, capped by the buffer size
let mut chunk_end_pos = start_pos + buffer_size as u64;
// if we went behind available WAL, back off
if chunk_end_pos >= end_pos {
chunk_end_pos = end_pos;