diff --git a/safekeeper/src/send_wal.rs b/safekeeper/src/send_wal.rs index bf46139e22..743c9b5ecf 100644 --- a/safekeeper/src/send_wal.rs +++ b/safekeeper/src/send_wal.rs @@ -4,6 +4,7 @@ use anyhow::Context as AnyhowContext; use bytes::Bytes; use futures::future::BoxFuture; +use pin_project_lite::pin_project; use postgres_ffi::get_current_timestamp; use postgres_ffi::{TimestampTz, MAX_SEND_SIZE}; use serde::{Deserialize, Serialize}; @@ -137,7 +138,7 @@ impl SafekeeperPostgresHandler { send_buf: RefCell::new([0; MAX_SEND_SIZE]), }); - ReplicationHandler { + let c = ReplicationContext { tli, replica_id, appname, @@ -146,17 +147,41 @@ impl SafekeeperPostgresHandler { end_pos, stop_pos, write_ctx, - // Actually we start from reading WAL, but this way is easier to - // code, we'll just immediately switch. - write_state: WriteState::FlushWal, feedback: ReplicaState::new(), + }; + + let _phantom_wf = c.wait_wal_fut(); + let _phantom_rf = c.read_wal_fut(); + + ReplicationHandler { + c, + write_state: WriteState::FlushWal, + _phantom_wf, + _phantom_rf, } .await } } /// START_REPLICATION stream driver: sends WAL and receives feedback. -struct ReplicationHandler<'a> { +pin_project! { + struct ReplicationHandler<'a, WF, RF> + where + WF: Future>>, + RF: Future>, + { + c: ReplicationContext<'a>, + #[pin] + write_state: WriteState, + // To deduce anonymous types. + _phantom_wf: WF, + _phantom_rf: RF, + } +} + +/// Data ReplicationHandler maintains. Separated so we could generate WriteState +/// futures during init, deducing their type. +struct ReplicationContext<'a> { tli: Arc, appname: Option, replica_id: usize, @@ -168,19 +193,17 @@ struct ReplicationHandler<'a> { // If present, terminate after reaching this position; used by walproposer // in recovery. stop_pos: Option, - // This data is needed to create Future sending WAL, so we need to both - // have it here (to create new future) and borrow it to the future - // itself. Essentially this is a self referential struct. To satisfy - // borrow checker, use Rc. To make ReplicationHandler itself - // Send'able future, wrap it into SendRc; this is safe as - // ReplicationHandler is passed between threads only as a whole (during - // rescheduling). + // This data is needed to create Future sending WAL, so we need to both have + // it here (to create new future) and borrow it to the future itself. + // Essentially this is a self referential struct. To satisfy borrow checker, + // use Rc. To make ReplicationHandler itself Send'able future, wrap + // it into SendRc; this is safe as ReplicationHandler is passed between + // threads only as a whole (during rescheduling). // - // Right now we're in CurrentThread runtime, so Send is somewhat - // redundant; however, we'd need to inconveniently have separate !Send + // Right now we're in CurrentThread runtime, so Send is somewhat redundant; + // however, otherwise we'd need to inconveniently have separate !Send // version of pg backend Handler trait (and work with LocalSet). write_ctx: SendRc, - write_state: WriteState, feedback: ReplicaState, } @@ -192,15 +215,26 @@ struct WriteContext { } // Yield points of WAL sending machinery. -enum WriteState { - // TODO: see if we can remove boxing here; with anon type of async fn this - // is untrivial (+ needs fiddling with pinning, pin_project and replace). - WaitWal(BoxFuture<'static, anyhow::Result>>), - ReadWal(BoxFuture<'static, anyhow::Result>), - FlushWal, +pin_project! { + #[project = WriteStateProj] + enum WriteState + where + WF: Future>>, + RF: Future>, + { + // TODO: see if we can remove boxing here; with anon type of async fn this + // is untrivial (+ needs fiddling with pinning, pin_project and replace). + WaitWal{ #[pin] fut: WF}, + ReadWal{ #[pin] fut: RF}, + FlushWal, + } } -impl Future for ReplicationHandler<'_> { +impl Future for ReplicationHandler<'_, WF, RF> +where + WF: Future>>, + RF: Future>, +{ type Output = Result<(), QueryError>; // We need to read feedback from the socket and write data there at the same @@ -220,11 +254,15 @@ impl Future for ReplicationHandler<'_> { } } -impl ReplicationHandler<'_> { +impl ReplicationHandler<'_, WF, RF> +where + WF: Future>>, + RF: Future>, +{ // Poll reading, i.e. getting feedback and processing it. Completes only on error/end of stream. fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { loop { - match ready!(self.pgb.poll_read_message(cx)) { + match ready!(self.as_mut().project().c.pgb.poll_read_message(cx)) { Ok(Some(msg)) => self.as_mut().handle_feedback(&msg)?, Ok(None) => { return Poll::Ready(Err(QueryError::Other(anyhow::anyhow!( @@ -236,7 +274,8 @@ impl ReplicationHandler<'_> { } } - fn handle_feedback(mut self: Pin<&mut Self>, msg: &FeMessage) -> Result<(), QueryError> { + fn handle_feedback(self: Pin<&mut Self>, msg: &FeMessage) -> Result<(), QueryError> { + let this = self.project(); match &msg { FeMessage::CopyData(m) => { // There's three possible data messages that the client is supposed to send here: @@ -244,10 +283,11 @@ impl ReplicationHandler<'_> { match m.first().cloned() { Some(HOT_STANDBY_FEEDBACK_TAG_BYTE) => { // Note: deserializing is on m[1..] because we skip the tag byte. - self.feedback.hs_feedback = HotStandbyFeedback::des(&m[1..]) + this.c.feedback.hs_feedback = HotStandbyFeedback::des(&m[1..]) .context("failed to deserialize HotStandbyFeedback")?; - self.tli - .update_replica_state(self.replica_id, self.feedback); + this.c + .tli + .update_replica_state(this.c.replica_id, this.c.feedback); } Some(STANDBY_STATUS_UPDATE_TAG_BYTE) => { let _reply = StandbyReply::des(&m[1..]) @@ -267,10 +307,11 @@ impl ReplicationHandler<'_> { trace!("ReplicationFeedback is {:?}", reply); // Only pageserver sends ReplicationFeedback, so set the flag. // This replica is the source of information to resend to compute. - self.feedback.pageserver_feedback = Some(reply); + this.c.feedback.pageserver_feedback = Some(reply); - self.tli - .update_replica_state(self.replica_id, self.feedback); + this.c + .tli + .update_replica_state(this.c.replica_id, this.c.feedback); } _ => warn!("unexpected message {:?}", msg), } @@ -301,41 +342,47 @@ impl ReplicationHandler<'_> { fn poll_write(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { // send while we don't block or error out loop { - match &mut self.write_state { - WriteState::WaitWal(fut) => match ready!(fut.as_mut().poll(cx))? { + match &mut self.as_mut().project().write_state.project() { + WriteStateProj::WaitWal { fut } => match ready!(fut.as_mut().poll(cx))? { Some(lsn) => { - self.end_pos = lsn; + self.as_mut().project().c.end_pos = lsn; self.as_mut().start_read_wal(); continue; } // Timed out waiting for WAL, send keepalive and possibly terminate. None => { - if self.tli.should_walsender_stop(self.replica_id) { + let mut this = self.as_mut().project(); + if this.c.tli.should_walsender_stop(this.c.replica_id) { // Terminate if there is nothing more to send. // TODO close the stream properly return Poll::Ready(Err(anyhow::anyhow!(format!( "ending streaming to {:?} at {}, receiver is caughtup and there is no computes", - self.appname, self.start_pos, + self.c.appname, self.c.start_pos, )).into())); } - let end_pos = self.end_pos.0; - self.pgb + let end_pos = this.c.end_pos.0; + this.c + .pgb .write_message(&BeMessage::KeepAlive(WalSndKeepAlive { sent_ptr: end_pos, timestamp: get_current_timestamp(), request_reply: true, }))?; - self.write_state = WriteState::FlushWal; /* flush KA */ + /* flush KA */ + this.write_state.set(WriteState::FlushWal); } }, - WriteState::ReadWal(fut) => { + WriteStateProj::ReadWal { fut } => { let read_len = ready!(fut.as_mut().poll(cx))?; assert!(read_len > 0, "read_len={}", read_len); - let write_ctx_clone = self.write_ctx.clone(); + + let mut this = self.as_mut().project(); + let write_ctx_clone = this.c.write_ctx.clone(); let send_buf = &write_ctx_clone.send_buf.borrow()[..read_len]; - let (start_pos, end_pos) = (self.start_pos.0, self.end_pos.0); + let (start_pos, end_pos) = (this.c.start_pos.0, this.c.end_pos.0); // write data to the output buffer - self.pgb + this.c + .pgb .write_message(&BeMessage::XLogData(XLogDataBody { wal_start: start_pos, wal_end: end_pos, @@ -344,25 +391,27 @@ impl ReplicationHandler<'_> { })) .context("Failed to write XLogData")?; // and flush it - self.write_state = WriteState::FlushWal; + this.write_state.set(WriteState::FlushWal); } - WriteState::FlushWal => { - ready!(self.pgb.poll_flush(cx))?; + WriteStateProj::FlushWal => { + let this = self.as_mut().project(); + + ready!(this.c.pgb.poll_flush(cx))?; // If we are streaming to walproposer, check it is time to stop. - if let Some(stop_pos) = self.stop_pos { - if self.start_pos >= stop_pos { + if let Some(stop_pos) = this.c.stop_pos { + if this.c.start_pos >= stop_pos { // recovery finished // TODO close the stream properly return Poll::Ready(Err(anyhow::anyhow!(format!( "ending streaming to walproposer at {}, receiver is caughtup and there is no computes", - self.start_pos)).into())); + this.c.start_pos)).into())); } self.as_mut().start_read_wal(); continue; } else { // if we don't know next portion is already available, wait // for it; otherwise proceed to sending - if self.end_pos <= self.start_pos { + if self.c.end_pos <= self.c.start_pos { self.as_mut().start_wait_wal(); } else { self.as_mut().start_read_wal(); @@ -374,26 +423,60 @@ impl ReplicationHandler<'_> { } // Start waiting for WAL, creating future doing that. - fn start_wait_wal(mut self: Pin<&mut Self>) { - let mut commit_lsn_watch_rx = self.tli.get_commit_lsn_watch_rx(); - let start_pos = self.start_pos; - let wait_wal_fut = async move { wait_for_lsn(&mut commit_lsn_watch_rx, start_pos).await }; - self.write_state = WriteState::WaitWal(Box::pin(wait_wal_fut)); + fn start_wait_wal(self: Pin<&mut Self>) { + let fut = self.c.wait_wal_fut(); + self.project().write_state.set(WriteState::WaitWal { + fut: { + // SAFETY: this function is the only way to assign futures to + // write_state. We just workaround impossibility of specifying + // async fn type, which is anonymous. + // transmute_copy is used as transmute refuses generic param: + // https://users.rust-lang.org/t/transmute-doesnt-work-on-generic-types/87272 + let t = unsafe { std::mem::transmute_copy(&fut) }; + std::mem::forget(fut); + t + }, + }); } // Switch into reading WAL state, creating Future doing that. - fn start_read_wal(mut self: Pin<&mut Self>) { + fn start_read_wal(self: Pin<&mut Self>) { + let fut = self.c.read_wal_fut(); + self.project().write_state.set(WriteState::ReadWal { + fut: { + // SAFETY: this function is the only way to assign futures to + // write_state. We just workaround impossibility of specifying + // async fn type, which is anonymous. + // transmute_copy is used as transmute refuses generic param: + // https://users.rust-lang.org/t/transmute-doesnt-work-on-generic-types/87272 + let t = unsafe { std::mem::transmute_copy(&fut) }; + std::mem::forget(fut); + t + }, + }); + } +} + +impl ReplicationContext<'_> { + // Create future waiting for WAL. + fn wait_wal_fut(&self) -> impl Future>> { + let mut commit_lsn_watch_rx = self.tli.get_commit_lsn_watch_rx(); + let start_pos = self.start_pos; + async move { wait_for_lsn(&mut commit_lsn_watch_rx, start_pos).await } + } + + // Create future reading WAL. + fn read_wal_fut(&self) -> impl Future> { let mut send_size = self.end_pos.checked_sub(self.start_pos).unwrap().0 as usize; send_size = min(send_size, self.write_ctx.send_buf.borrow().len()); let write_ctx_fut = self.write_ctx.clone(); - let read_wal_fut = async move { + async move { let mut wal_reader_ref = write_ctx_fut.wal_reader.borrow_mut_send(); let mut send_buf_ref = write_ctx_fut.send_buf.borrow_mut_send(); let send_buf = &mut send_buf_ref[..send_size]; wal_reader_ref.read(send_buf).await - }; - self.write_state = WriteState::ReadWal(Box::pin(read_wal_fut)); + } } }