Get rid of futurex boxing through transmute.

This commit is contained in:
Arseny Sher
2023-02-02 14:34:10 +04:00
parent 5e972ccdc4
commit 2bbd24edbf

View File

@@ -4,6 +4,7 @@
use anyhow::Context as AnyhowContext;
use bytes::Bytes;
use futures::future::BoxFuture;
use pin_project_lite::pin_project;
use postgres_ffi::get_current_timestamp;
use postgres_ffi::{TimestampTz, MAX_SEND_SIZE};
use serde::{Deserialize, Serialize};
@@ -137,7 +138,7 @@ impl SafekeeperPostgresHandler {
send_buf: RefCell::new([0; MAX_SEND_SIZE]),
});
ReplicationHandler {
let c = ReplicationContext {
tli,
replica_id,
appname,
@@ -146,17 +147,41 @@ impl SafekeeperPostgresHandler {
end_pos,
stop_pos,
write_ctx,
// Actually we start from reading WAL, but this way is easier to
// code, we'll just immediately switch.
write_state: WriteState::FlushWal,
feedback: ReplicaState::new(),
};
let _phantom_wf = c.wait_wal_fut();
let _phantom_rf = c.read_wal_fut();
ReplicationHandler {
c,
write_state: WriteState::FlushWal,
_phantom_wf,
_phantom_rf,
}
.await
}
}
/// START_REPLICATION stream driver: sends WAL and receives feedback.
struct ReplicationHandler<'a> {
pin_project! {
struct ReplicationHandler<'a, WF, RF>
where
WF: Future<Output = anyhow::Result<Option<Lsn>>>,
RF: Future<Output = anyhow::Result<usize>>,
{
c: ReplicationContext<'a>,
#[pin]
write_state: WriteState<WF, RF>,
// To deduce anonymous types.
_phantom_wf: WF,
_phantom_rf: RF,
}
}
/// Data ReplicationHandler maintains. Separated so we could generate WriteState
/// futures during init, deducing their type.
struct ReplicationContext<'a> {
tli: Arc<Timeline>,
appname: Option<String>,
replica_id: usize,
@@ -168,19 +193,17 @@ struct ReplicationHandler<'a> {
// If present, terminate after reaching this position; used by walproposer
// in recovery.
stop_pos: Option<Lsn>,
// This data is needed to create Future sending WAL, so we need to both
// have it here (to create new future) and borrow it to the future
// itself. Essentially this is a self referential struct. To satisfy
// borrow checker, use Rc<RefCell>. To make ReplicationHandler itself
// Send'able future, wrap it into SendRc; this is safe as
// ReplicationHandler is passed between threads only as a whole (during
// rescheduling).
// This data is needed to create Future sending WAL, so we need to both have
// it here (to create new future) and borrow it to the future itself.
// Essentially this is a self referential struct. To satisfy borrow checker,
// use Rc<RefCell>. To make ReplicationHandler itself Send'able future, wrap
// it into SendRc; this is safe as ReplicationHandler is passed between
// threads only as a whole (during rescheduling).
//
// Right now we're in CurrentThread runtime, so Send is somewhat
// redundant; however, we'd need to inconveniently have separate !Send
// Right now we're in CurrentThread runtime, so Send is somewhat redundant;
// however, otherwise we'd need to inconveniently have separate !Send
// version of pg backend Handler trait (and work with LocalSet).
write_ctx: SendRc<WriteContext>,
write_state: WriteState,
feedback: ReplicaState,
}
@@ -192,15 +215,26 @@ struct WriteContext {
}
// Yield points of WAL sending machinery.
enum WriteState {
// TODO: see if we can remove boxing here; with anon type of async fn this
// is untrivial (+ needs fiddling with pinning, pin_project and replace).
WaitWal(BoxFuture<'static, anyhow::Result<Option<Lsn>>>),
ReadWal(BoxFuture<'static, anyhow::Result<usize>>),
FlushWal,
pin_project! {
#[project = WriteStateProj]
enum WriteState<WF, RF>
where
WF: Future<Output = anyhow::Result<Option<Lsn>>>,
RF: Future<Output = anyhow::Result<usize>>,
{
// TODO: see if we can remove boxing here; with anon type of async fn this
// is untrivial (+ needs fiddling with pinning, pin_project and replace).
WaitWal{ #[pin] fut: WF},
ReadWal{ #[pin] fut: RF},
FlushWal,
}
}
impl Future for ReplicationHandler<'_> {
impl<WF, RF> Future for ReplicationHandler<'_, WF, RF>
where
WF: Future<Output = anyhow::Result<Option<Lsn>>>,
RF: Future<Output = anyhow::Result<usize>>,
{
type Output = Result<(), QueryError>;
// We need to read feedback from the socket and write data there at the same
@@ -220,11 +254,15 @@ impl Future for ReplicationHandler<'_> {
}
}
impl ReplicationHandler<'_> {
impl<WF, RF> ReplicationHandler<'_, WF, RF>
where
WF: Future<Output = anyhow::Result<Option<Lsn>>>,
RF: Future<Output = anyhow::Result<usize>>,
{
// Poll reading, i.e. getting feedback and processing it. Completes only on error/end of stream.
fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), QueryError>> {
loop {
match ready!(self.pgb.poll_read_message(cx)) {
match ready!(self.as_mut().project().c.pgb.poll_read_message(cx)) {
Ok(Some(msg)) => self.as_mut().handle_feedback(&msg)?,
Ok(None) => {
return Poll::Ready(Err(QueryError::Other(anyhow::anyhow!(
@@ -236,7 +274,8 @@ impl ReplicationHandler<'_> {
}
}
fn handle_feedback(mut self: Pin<&mut Self>, msg: &FeMessage) -> Result<(), QueryError> {
fn handle_feedback(self: Pin<&mut Self>, msg: &FeMessage) -> Result<(), QueryError> {
let this = self.project();
match &msg {
FeMessage::CopyData(m) => {
// There's three possible data messages that the client is supposed to send here:
@@ -244,10 +283,11 @@ impl ReplicationHandler<'_> {
match m.first().cloned() {
Some(HOT_STANDBY_FEEDBACK_TAG_BYTE) => {
// Note: deserializing is on m[1..] because we skip the tag byte.
self.feedback.hs_feedback = HotStandbyFeedback::des(&m[1..])
this.c.feedback.hs_feedback = HotStandbyFeedback::des(&m[1..])
.context("failed to deserialize HotStandbyFeedback")?;
self.tli
.update_replica_state(self.replica_id, self.feedback);
this.c
.tli
.update_replica_state(this.c.replica_id, this.c.feedback);
}
Some(STANDBY_STATUS_UPDATE_TAG_BYTE) => {
let _reply = StandbyReply::des(&m[1..])
@@ -267,10 +307,11 @@ impl ReplicationHandler<'_> {
trace!("ReplicationFeedback is {:?}", reply);
// Only pageserver sends ReplicationFeedback, so set the flag.
// This replica is the source of information to resend to compute.
self.feedback.pageserver_feedback = Some(reply);
this.c.feedback.pageserver_feedback = Some(reply);
self.tli
.update_replica_state(self.replica_id, self.feedback);
this.c
.tli
.update_replica_state(this.c.replica_id, this.c.feedback);
}
_ => warn!("unexpected message {:?}", msg),
}
@@ -301,41 +342,47 @@ impl ReplicationHandler<'_> {
fn poll_write(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), QueryError>> {
// send while we don't block or error out
loop {
match &mut self.write_state {
WriteState::WaitWal(fut) => match ready!(fut.as_mut().poll(cx))? {
match &mut self.as_mut().project().write_state.project() {
WriteStateProj::WaitWal { fut } => match ready!(fut.as_mut().poll(cx))? {
Some(lsn) => {
self.end_pos = lsn;
self.as_mut().project().c.end_pos = lsn;
self.as_mut().start_read_wal();
continue;
}
// Timed out waiting for WAL, send keepalive and possibly terminate.
None => {
if self.tli.should_walsender_stop(self.replica_id) {
let mut this = self.as_mut().project();
if this.c.tli.should_walsender_stop(this.c.replica_id) {
// Terminate if there is nothing more to send.
// TODO close the stream properly
return Poll::Ready(Err(anyhow::anyhow!(format!(
"ending streaming to {:?} at {}, receiver is caughtup and there is no computes",
self.appname, self.start_pos,
self.c.appname, self.c.start_pos,
)).into()));
}
let end_pos = self.end_pos.0;
self.pgb
let end_pos = this.c.end_pos.0;
this.c
.pgb
.write_message(&BeMessage::KeepAlive(WalSndKeepAlive {
sent_ptr: end_pos,
timestamp: get_current_timestamp(),
request_reply: true,
}))?;
self.write_state = WriteState::FlushWal; /* flush KA */
/* flush KA */
this.write_state.set(WriteState::FlushWal);
}
},
WriteState::ReadWal(fut) => {
WriteStateProj::ReadWal { fut } => {
let read_len = ready!(fut.as_mut().poll(cx))?;
assert!(read_len > 0, "read_len={}", read_len);
let write_ctx_clone = self.write_ctx.clone();
let mut this = self.as_mut().project();
let write_ctx_clone = this.c.write_ctx.clone();
let send_buf = &write_ctx_clone.send_buf.borrow()[..read_len];
let (start_pos, end_pos) = (self.start_pos.0, self.end_pos.0);
let (start_pos, end_pos) = (this.c.start_pos.0, this.c.end_pos.0);
// write data to the output buffer
self.pgb
this.c
.pgb
.write_message(&BeMessage::XLogData(XLogDataBody {
wal_start: start_pos,
wal_end: end_pos,
@@ -344,25 +391,27 @@ impl ReplicationHandler<'_> {
}))
.context("Failed to write XLogData")?;
// and flush it
self.write_state = WriteState::FlushWal;
this.write_state.set(WriteState::FlushWal);
}
WriteState::FlushWal => {
ready!(self.pgb.poll_flush(cx))?;
WriteStateProj::FlushWal => {
let this = self.as_mut().project();
ready!(this.c.pgb.poll_flush(cx))?;
// If we are streaming to walproposer, check it is time to stop.
if let Some(stop_pos) = self.stop_pos {
if self.start_pos >= stop_pos {
if let Some(stop_pos) = this.c.stop_pos {
if this.c.start_pos >= stop_pos {
// recovery finished
// TODO close the stream properly
return Poll::Ready(Err(anyhow::anyhow!(format!(
"ending streaming to walproposer at {}, receiver is caughtup and there is no computes",
self.start_pos)).into()));
this.c.start_pos)).into()));
}
self.as_mut().start_read_wal();
continue;
} else {
// if we don't know next portion is already available, wait
// for it; otherwise proceed to sending
if self.end_pos <= self.start_pos {
if self.c.end_pos <= self.c.start_pos {
self.as_mut().start_wait_wal();
} else {
self.as_mut().start_read_wal();
@@ -374,26 +423,60 @@ impl ReplicationHandler<'_> {
}
// Start waiting for WAL, creating future doing that.
fn start_wait_wal(mut self: Pin<&mut Self>) {
let mut commit_lsn_watch_rx = self.tli.get_commit_lsn_watch_rx();
let start_pos = self.start_pos;
let wait_wal_fut = async move { wait_for_lsn(&mut commit_lsn_watch_rx, start_pos).await };
self.write_state = WriteState::WaitWal(Box::pin(wait_wal_fut));
fn start_wait_wal(self: Pin<&mut Self>) {
let fut = self.c.wait_wal_fut();
self.project().write_state.set(WriteState::WaitWal {
fut: {
// SAFETY: this function is the only way to assign futures to
// write_state. We just workaround impossibility of specifying
// async fn type, which is anonymous.
// transmute_copy is used as transmute refuses generic param:
// https://users.rust-lang.org/t/transmute-doesnt-work-on-generic-types/87272
let t = unsafe { std::mem::transmute_copy(&fut) };
std::mem::forget(fut);
t
},
});
}
// Switch into reading WAL state, creating Future doing that.
fn start_read_wal(mut self: Pin<&mut Self>) {
fn start_read_wal(self: Pin<&mut Self>) {
let fut = self.c.read_wal_fut();
self.project().write_state.set(WriteState::ReadWal {
fut: {
// SAFETY: this function is the only way to assign futures to
// write_state. We just workaround impossibility of specifying
// async fn type, which is anonymous.
// transmute_copy is used as transmute refuses generic param:
// https://users.rust-lang.org/t/transmute-doesnt-work-on-generic-types/87272
let t = unsafe { std::mem::transmute_copy(&fut) };
std::mem::forget(fut);
t
},
});
}
}
impl ReplicationContext<'_> {
// Create future waiting for WAL.
fn wait_wal_fut(&self) -> impl Future<Output = anyhow::Result<Option<Lsn>>> {
let mut commit_lsn_watch_rx = self.tli.get_commit_lsn_watch_rx();
let start_pos = self.start_pos;
async move { wait_for_lsn(&mut commit_lsn_watch_rx, start_pos).await }
}
// Create future reading WAL.
fn read_wal_fut(&self) -> impl Future<Output = anyhow::Result<usize>> {
let mut send_size = self.end_pos.checked_sub(self.start_pos).unwrap().0 as usize;
send_size = min(send_size, self.write_ctx.send_buf.borrow().len());
let write_ctx_fut = self.write_ctx.clone();
let read_wal_fut = async move {
async move {
let mut wal_reader_ref = write_ctx_fut.wal_reader.borrow_mut_send();
let mut send_buf_ref = write_ctx_fut.send_buf.borrow_mut_send();
let send_buf = &mut send_buf_ref[..send_size];
wal_reader_ref.read(send_buf).await
};
self.write_state = WriteState::ReadWal(Box::pin(read_wal_fut));
}
}
}