From afe1419ef577f4740ab1f5039a543e30850c51d2 Mon Sep 17 00:00:00 2001 From: Yuchen Liang Date: Wed, 4 Dec 2024 16:27:25 +0000 Subject: [PATCH] wip Signed-off-by: Yuchen Liang --- libs/utils/src/sync/duplex/mpsc.rs | 87 ++++++++---- .../virtual_file/owned_buffers_io/write.rs | 17 ++- .../owned_buffers_io/write/flush.rs | 125 ++++++++++++------ 3 files changed, 164 insertions(+), 65 deletions(-) diff --git a/libs/utils/src/sync/duplex/mpsc.rs b/libs/utils/src/sync/duplex/mpsc.rs index 56b4e6d2b3..4f44897fa9 100644 --- a/libs/utils/src/sync/duplex/mpsc.rs +++ b/libs/utils/src/sync/duplex/mpsc.rs @@ -1,36 +1,75 @@ use tokio::sync::mpsc; +use tokio::sync::oneshot; -/// A bi-directional channel. -pub struct Duplex { - pub tx: mpsc::Sender, - pub rx: mpsc::Receiver, -} - -/// Creates a bi-directional channel. +/// Sends values to the associated `Receiver`. /// -/// The channel will buffer up to the provided number of messages. Once the buffer is full, -/// attempts to send new messages will wait until a message is received from the channel. -/// The provided buffer capacity must be at least 1. -pub fn channel(buffer: usize) -> (Duplex, Duplex) { - let (tx_a, rx_a) = mpsc::channel::(buffer); - let (tx_b, rx_b) = mpsc::channel::(buffer); - - (Duplex { tx: tx_a, rx: rx_b }, Duplex { tx: tx_b, rx: rx_a }) +/// Instances are created by the [`channel`] function. +pub struct Sender { + pub tx: mpsc::Sender>, } -impl Duplex { - /// Sends a value, waiting until there is capacity. - /// - /// A successful send occurs when it is determined that the other end of the channel has not hung up already. - pub async fn send(&self, x: S) -> Result<(), mpsc::error::SendError> { - self.tx.send(x).await - } +/// Receives values from the associated `Sender`. +/// +/// Instances are created by the [`channel`] function. +pub struct Receiver { + pub rx: mpsc::Receiver>, +} - /// Receives the next value for this receiver. +/// Request type that [`Sender`] sends to [`Receiver`] +pub struct Request { + /// Actual payload + pub payload: S, + /// Sends associated response back to the associated `Receiver` on the sender side. + /// Instances are created by the [`Sender::send`] function. + pub response_tx: oneshot::Sender, +} + +impl Request { + /// Creates a new request that can send back response. + pub fn new(data: S, response_tx: oneshot::Sender) -> Self { + Request { + payload: data, + response_tx, + } + } +} + +pub mod error { + pub type SendError = tokio::sync::mpsc::error::SendError>; + pub type RecvError = tokio::sync::oneshot::error::RecvError; +} + +/// Creates a bounded mpsc channel that enables bi-directional communication between asynchronous tasks +/// with backpressure. +/// +/// The channel will buffer up to the provided number of messages. Once the +/// buffer is full, attempts to send new messages will wait until a message is +/// received from the channel. The provided buffer capacity must be at least 1. +/// +/// # Panics +/// +/// Panics if the buffer capacity is 0. +pub fn channel(buffer: usize) -> (Sender, Receiver) { + let (tx, rx) = mpsc::channel::>(buffer); + (Sender { tx }, Receiver { rx }) +} + +impl Sender { + /// Sends a value, waiting until there is capacity. On success, returns a one-shot channel receiver that + /// gets the associated response back. + pub async fn send(&self, x: S) -> Result, error::SendError> { + let (tx, rx) = oneshot::channel(); + self.tx.send(Request::new(x, tx)).await?; + Ok(rx) + } +} + +impl Receiver { + /// Receives the next value for the receiver. /// /// This method returns `None` if the channel has been closed and there are /// no remaining messages in the channel's buffer. - pub async fn recv(&mut self) -> Option { + pub async fn recv(&mut self) -> Option> { self.rx.recv().await } } diff --git a/pageserver/src/virtual_file/owned_buffers_io/write.rs b/pageserver/src/virtual_file/owned_buffers_io/write.rs index 20bf878123..b1bf773f26 100644 --- a/pageserver/src/virtual_file/owned_buffers_io/write.rs +++ b/pageserver/src/virtual_file/owned_buffers_io/write.rs @@ -103,7 +103,7 @@ where /// Gets a reference to the maybe flushed read-only buffer. /// Returns `None` if the writer has not submitted any flush request. pub fn inspect_maybe_flushed(&self) -> Option<&FullSlice> { - self.flush_handle.maybe_flushed.as_ref() + self.flush_handle.maybe_flushed.read_buf() } #[cfg_attr(target_os = "macos", allow(dead_code))] @@ -178,9 +178,20 @@ where self.mutable = Some(buf); return Ok(None); } - let (recycled, flush_control) = self.flush_handle.flush(buf, self.bytes_submitted).await?; + let (maybe_flushed, flush_control) = self + .flush_handle + .flush(buf.flush(), self.bytes_submitted) + .await?; self.bytes_submitted += u64::try_from(buf_len).unwrap(); - self.mutable = Some(recycled); + let Ok(recycled) = maybe_flushed.recycle().await else { + return self.flush_handle.handle_error().await; + }; + + // The only other place that could hold a reference to the recycled buffer + // is in `Self::maybe_flushed`, which get dropped when the buffer is recycled. + self.mutable = Some(Buffer::reuse_after_flush( + recycled.into_raw_slice().into_inner(), + )); Ok(Some(flush_control)) } } diff --git a/pageserver/src/virtual_file/owned_buffers_io/write/flush.rs b/pageserver/src/virtual_file/owned_buffers_io/write/flush.rs index 9ce8b311bb..df91e50bc9 100644 --- a/pageserver/src/virtual_file/owned_buffers_io/write/flush.rs +++ b/pageserver/src/virtual_file/owned_buffers_io/write/flush.rs @@ -14,13 +14,13 @@ pub struct FlushHandle { inner: Option>, /// Immutable buffer for serving tail reads. /// `None` if no flush request has been submitted. - pub(super) maybe_flushed: Option>, + pub(super) maybe_flushed: MaybeFlushedFullSlice, } pub struct FlushHandleInner { /// A bi-directional channel that sends (buffer, offset) for writes, /// and receives recyled buffer. - channel: duplex::mpsc::Duplex, FullSlice>, + channel: duplex::mpsc::Sender, FullSlice>, /// Join handle for the background flush task. join_handle: tokio::task::JoinHandle>>, } @@ -59,6 +59,62 @@ fn new_flush_op(slice: FullSlice, offset: u64) -> (FlushRequest, (request, control) } +pub enum MaybeFlushedFullSlice { + Unused(FullSlice), + Flushing { + read_buf: FullSlice, + write_buf: tokio::sync::oneshot::Receiver>, + }, +} + +impl MaybeFlushedFullSlice { + pub fn new_flushing( + read_buf: FullSlice, + write_buf: tokio::sync::oneshot::Receiver>, + ) -> Self { + Self::Flushing { + read_buf, + write_buf, + } + } + + /// Creates an unused maybe flushed full slice. + pub fn new_unused(buf: FullSlice) -> Self { + Self::Unused(buf) + } + + /// Returns a reference to the buffer for read if the buffer is [`Self::Flushing`], otherwise returns `None`. + pub fn read_buf(&self) -> Option<&FullSlice> { + match self { + MaybeFlushedFullSlice::Unused(_) => None, + MaybeFlushedFullSlice::Flushing { + read_buf, + write_buf: _, + } => Some(read_buf), + } + } + + /// Recycles a maybe flushed buffer to a `FullSlice`. + /// + /// The call returns immediately if the buffer is [`Self::Unused`]. + /// If the buffer is [`Self::Flushing`], the call will wait for an available buffer from the background task. + /// The cheap-cloned slice for read is also dropped. + pub async fn recycle(self) -> Result, duplex::mpsc::error::RecvError> { + let buf = match self { + MaybeFlushedFullSlice::Unused(buf) => buf, + MaybeFlushedFullSlice::Flushing { + read_buf: _, + write_buf, + } => { + // This is the BACKPRESSURE mechanism: if the flush task can't keep up, + // then the write path will eventually wait for it here. + write_buf.await? + } + }; + Ok(buf) + } +} + /// A handle to a `FlushRequest` that allows unit tests precise control over flush behavior. #[cfg(test)] pub(crate) struct FlushControl { @@ -129,7 +185,7 @@ where let join_handle = tokio::spawn(async move { FlushBackgroundTask::new(back, file, gate_guard, ctx) - .run(buf.flush()) + .run() .await }); @@ -138,7 +194,7 @@ where channel: front, join_handle, }), - maybe_flushed: None, + maybe_flushed: MaybeFlushedFullSlice::new_unused(buf.flush()), } } @@ -146,37 +202,29 @@ where /// Returns a buffer that completed flushing for re-use, length reset to 0, capacity unchanged. /// If `save_buf_for_read` is true, then we save the buffer in `Self::maybe_flushed`, otherwise /// clear `maybe_flushed`. - pub async fn flush(&mut self, buf: B, offset: u64) -> std::io::Result<(B, FlushControl)> - where - B: Buffer + Send + 'static, - { - let slice = buf.flush(); - - // Saves a buffer for read while flushing. This also removes reference to the old buffer. - self.maybe_flushed = Some(slice.cheap_clone()); - - let (request, flush_control) = new_flush_op(slice, offset); + pub async fn flush( + &mut self, + slice: FullSlice, + offset: u64, + ) -> std::io::Result<(MaybeFlushedFullSlice, FlushControl)> { + let (request, flush_control) = new_flush_op(slice.cheap_clone(), offset); // Submits the buffer to the background task. - let submit = self.inner_mut().channel.send(request).await; - if submit.is_err() { - return self.handle_error().await; - } - - // Wait for an available buffer from the background flush task. - // This is the BACKPRESSURE mechanism: if the flush task can't keep up, - // then the write path will eventually wait for it here. - let Some(recycled) = self.inner_mut().channel.recv().await else { + let Ok(submit) = self.inner_mut().channel.send(request).await else { return self.handle_error().await; }; - // The only other place that could hold a reference to the recycled buffer - // is in `Self::maybe_flushed`, but we have already replace it with the new buffer. - let recycled = Buffer::reuse_after_flush(recycled.into_raw_slice().into_inner()); + // Saves a buffer for read while flushing. This also removes reference to the old buffer. + let recycled = std::mem::replace( + &mut self.maybe_flushed, + MaybeFlushedFullSlice::new_flushing(slice, submit), + ); + Ok((recycled, flush_control)) } - async fn handle_error(&mut self) -> std::io::Result { + /// Joins the background task to check for io error. + pub(super) async fn handle_error(&mut self) -> std::io::Result { Err(self .shutdown() .await @@ -206,7 +254,7 @@ where pub struct FlushBackgroundTask { /// A bi-directional channel that receives (buffer, offset) for writes, /// and send back recycled buffer. - channel: duplex::mpsc::Duplex, FlushRequest>, + channel: duplex::mpsc::Receiver, FullSlice>, /// A writter for persisting data to disk. writer: Arc, ctx: RequestContext, @@ -221,7 +269,7 @@ where { /// Creates a new background flush task. fn new( - channel: duplex::mpsc::Duplex, FlushRequest>, + channel: duplex::mpsc::Receiver, FullSlice>, file: Arc, gate_guard: utils::sync::gate::GateGuard, ctx: RequestContext, @@ -236,18 +284,19 @@ where /// Runs the background flush task. /// The passed in slice is immediately sent back to the flush handle through the duplex channel. - async fn run(mut self, slice: FullSlice) -> std::io::Result> { + async fn run(mut self) -> std::io::Result> { // Sends the extra buffer back to the handle. - self.channel.send(slice).await.map_err(|_| { - std::io::Error::new(std::io::ErrorKind::BrokenPipe, "flush handle closed early") - })?; // Exit condition: channel is closed and there is no remaining buffer to be flushed - while let Some(request) = self.channel.recv().await { + while let Some(duplex::mpsc::Request { + payload, + response_tx, + }) = self.channel.recv().await + { #[cfg(test)] { // In test, wait for control to signal that we are ready to flush. - if request.ready_to_flush_rx.await.is_err() { + if payload.ready_to_flush_rx.await.is_err() { tracing::debug!("control dropped"); } } @@ -255,19 +304,19 @@ where // Write slice to disk at `offset`. let slice = self .writer - .write_all_at(request.slice, request.offset, &self.ctx) + .write_all_at(payload.slice, payload.offset, &self.ctx) .await?; #[cfg(test)] { // In test, tell control we are done flushing buffer. - if request.done_flush_tx.send(()).is_err() { + if payload.done_flush_tx.send(()).is_err() { tracing::debug!("control dropped"); } } // Sends the buffer back to the handle for reuse. The handle is in charged of cleaning the buffer. - if self.channel.send(slice).await.is_err() { + if response_tx.send(slice).is_err() { // Although channel is closed. Still need to finish flushing the remaining buffers. continue; }