safekeeper: dispatch wal sending based on protocol

This commit is contained in:
Vlad Lazar
2024-11-13 13:01:34 +01:00
parent 3cef53c5c0
commit 4f3583a316

View File

@@ -5,12 +5,15 @@ use crate::handler::SafekeeperPostgresHandler;
use crate::metrics::RECEIVED_PS_FEEDBACKS;
use crate::receive_wal::WalReceivers;
use crate::safekeeper::{Term, TermLsn};
use crate::send_interpreted_wal::InterpretedWalSender;
use crate::timeline::WalResidentTimeline;
use crate::wal_reader_stream::WalReaderStreamBuilder;
use crate::wal_service::ConnectionId;
use crate::wal_storage::WalReader;
use crate::GlobalTimelines;
use anyhow::{bail, Context as AnyhowContext};
use bytes::Bytes;
use futures::future::Either;
use parking_lot::Mutex;
use postgres_backend::PostgresBackend;
use postgres_backend::{CopyStreamHandlerEnd, PostgresBackendReader, QueryError};
@@ -22,6 +25,7 @@ use tokio::io::{AsyncRead, AsyncWrite};
use utils::failpoint_support;
use utils::id::TenantTimelineId;
use utils::pageserver_feedback::PageserverFeedback;
use utils::postgres_client::PostgresClientProtocol;
use std::cmp::{max, min};
use std::net::SocketAddr;
@@ -450,11 +454,12 @@ impl SafekeeperPostgresHandler {
}
info!(
"starting streaming from {:?}, available WAL ends at {}, recovery={}, appname={:?}",
"starting streaming from {:?}, available WAL ends at {}, recovery={}, appname={:?}, protocol={}",
start_pos,
end_pos,
matches!(end_watch, EndWatch::Flush(_)),
appname
appname,
self.protocol(),
);
// switch to copy
@@ -466,19 +471,49 @@ impl SafekeeperPostgresHandler {
// not synchronized with sends, so this avoids deadlocks.
let reader = pgb.split().context("START_REPLICATION split")?;
let mut sender = WalSender {
pgb,
// should succeed since we're already holding another guard
tli: tli.wal_residence_guard().await?,
appname,
start_pos,
end_pos,
term,
end_watch,
ws_guard: ws_guard.clone(),
wal_reader,
send_buf: vec![0u8; MAX_SEND_SIZE],
let send_fut = match self.protocol() {
PostgresClientProtocol::Vanilla => {
let sender = WalSender {
pgb,
// should succeed since we're already holding another guard
tli: tli.wal_residence_guard().await?,
appname,
start_pos,
end_pos,
term,
end_watch,
ws_guard: ws_guard.clone(),
wal_reader,
send_buf: vec![0u8; MAX_SEND_SIZE],
};
Either::Left(sender.run())
}
PostgresClientProtocol::Interpreted => {
let pg_version = tli.tli.get_state().await.1.server.pg_version / 10000;
let end_watch_view = end_watch.view();
let wal_stream_builder = WalReaderStreamBuilder {
tli: tli.wal_residence_guard().await?,
start_pos,
end_pos,
term,
end_watch,
wal_sender_guard: ws_guard.clone(),
};
let sender = InterpretedWalSender {
pgb,
wal_stream_builder,
end_watch_view,
shard: self.shard.unwrap(),
pg_version,
appname,
};
Either::Right(sender.run())
}
};
let mut reply_reader = ReplyReader {
reader,
ws_guard: ws_guard.clone(),
@@ -487,7 +522,7 @@ impl SafekeeperPostgresHandler {
let res = tokio::select! {
// todo: add read|write .context to these errors
r = sender.run() => r,
r = send_fut => r,
r = reply_reader.run() => r,
};
@@ -512,12 +547,17 @@ impl SafekeeperPostgresHandler {
/// TODO(vlad): maybe lift this instead
/// Walsender streams either up to commit_lsn (normally) or flush_lsn in the
/// given term (recovery by walproposer or peer safekeeper).
#[derive(Clone)]
pub(crate) enum EndWatch {
Commit(Receiver<Lsn>),
Flush(Receiver<TermLsn>),
}
impl EndWatch {
pub(crate) fn view(&self) -> EndWatchView {
EndWatchView(self.clone())
}
/// Get current end of WAL.
pub(crate) fn get(&self) -> Lsn {
match self {
@@ -600,7 +640,7 @@ impl<IO: AsyncRead + AsyncWrite + Unpin> WalSender<'_, IO> {
///
/// Err(CopyStreamHandlerEnd) is always returned; Result is used only for ?
/// convenience.
async fn run(&mut self) -> Result<(), CopyStreamHandlerEnd> {
async fn run(mut self) -> Result<(), CopyStreamHandlerEnd> {
loop {
// Wait for the next portion if it is not there yet, or just
// update our end of WAL available for sending value, we