Signed-off-by: Yuchen Liang <yuchen@neon.tech>
This commit is contained in:
Yuchen Liang
2024-12-04 16:27:25 +00:00
parent 559ebffe71
commit afe1419ef5
3 changed files with 164 additions and 65 deletions

View File

@@ -1,36 +1,75 @@
use tokio::sync::mpsc;
use tokio::sync::oneshot;
/// A bi-directional channel.
pub struct Duplex<S, R> {
pub tx: mpsc::Sender<S>,
pub rx: mpsc::Receiver<R>,
}
/// Creates a bi-directional channel.
/// Sends values to the associated `Receiver`.
///
/// The channel will buffer up to the provided number of messages. Once the buffer is full,
/// attempts to send new messages will wait until a message is received from the channel.
/// The provided buffer capacity must be at least 1.
pub fn channel<A: Send, B: Send>(buffer: usize) -> (Duplex<A, B>, Duplex<B, A>) {
let (tx_a, rx_a) = mpsc::channel::<A>(buffer);
let (tx_b, rx_b) = mpsc::channel::<B>(buffer);
(Duplex { tx: tx_a, rx: rx_b }, Duplex { tx: tx_b, rx: rx_a })
/// Instances are created by the [`channel`] function.
pub struct Sender<S, R> {
pub tx: mpsc::Sender<Request<S, R>>,
}
impl<S: Send, R: Send> Duplex<S, R> {
/// Sends a value, waiting until there is capacity.
///
/// A successful send occurs when it is determined that the other end of the channel has not hung up already.
pub async fn send(&self, x: S) -> Result<(), mpsc::error::SendError<S>> {
self.tx.send(x).await
}
/// Receives values from the associated `Sender`.
///
/// Instances are created by the [`channel`] function.
pub struct Receiver<S, R> {
pub rx: mpsc::Receiver<Request<S, R>>,
}
/// Receives the next value for this receiver.
/// Request type that [`Sender`] sends to [`Receiver`]
pub struct Request<S, R> {
/// Actual payload
pub payload: S,
/// Sends associated response back to the associated `Receiver` on the sender side.
/// Instances are created by the [`Sender::send`] function.
pub response_tx: oneshot::Sender<R>,
}
impl<S, R> Request<S, R> {
/// Creates a new request that can send back response.
pub fn new(data: S, response_tx: oneshot::Sender<R>) -> Self {
Request {
payload: data,
response_tx,
}
}
}
pub mod error {
pub type SendError<S, R> = tokio::sync::mpsc::error::SendError<super::Request<S, R>>;
pub type RecvError = tokio::sync::oneshot::error::RecvError;
}
/// Creates a bounded mpsc channel that enables bi-directional communication between asynchronous tasks
/// with backpressure.
///
/// The channel will buffer up to the provided number of messages. Once the
/// buffer is full, attempts to send new messages will wait until a message is
/// received from the channel. The provided buffer capacity must be at least 1.
///
/// # Panics
///
/// Panics if the buffer capacity is 0.
pub fn channel<S: Send, R: Send>(buffer: usize) -> (Sender<S, R>, Receiver<S, R>) {
let (tx, rx) = mpsc::channel::<Request<S, R>>(buffer);
(Sender { tx }, Receiver { rx })
}
impl<S: Send, R: Send> Sender<S, R> {
/// Sends a value, waiting until there is capacity. On success, returns a one-shot channel receiver that
/// gets the associated response back.
pub async fn send(&self, x: S) -> Result<oneshot::Receiver<R>, error::SendError<S, R>> {
let (tx, rx) = oneshot::channel();
self.tx.send(Request::new(x, tx)).await?;
Ok(rx)
}
}
impl<S: Send, R: Send> Receiver<S, R> {
/// Receives the next value for the receiver.
///
/// This method returns `None` if the channel has been closed and there are
/// no remaining messages in the channel's buffer.
pub async fn recv(&mut self) -> Option<R> {
pub async fn recv(&mut self) -> Option<Request<S, R>> {
self.rx.recv().await
}
}

View File

@@ -103,7 +103,7 @@ where
/// Gets a reference to the maybe flushed read-only buffer.
/// Returns `None` if the writer has not submitted any flush request.
pub fn inspect_maybe_flushed(&self) -> Option<&FullSlice<Buf>> {
self.flush_handle.maybe_flushed.as_ref()
self.flush_handle.maybe_flushed.read_buf()
}
#[cfg_attr(target_os = "macos", allow(dead_code))]
@@ -178,9 +178,20 @@ where
self.mutable = Some(buf);
return Ok(None);
}
let (recycled, flush_control) = self.flush_handle.flush(buf, self.bytes_submitted).await?;
let (maybe_flushed, flush_control) = self
.flush_handle
.flush(buf.flush(), self.bytes_submitted)
.await?;
self.bytes_submitted += u64::try_from(buf_len).unwrap();
self.mutable = Some(recycled);
let Ok(recycled) = maybe_flushed.recycle().await else {
return self.flush_handle.handle_error().await;
};
// The only other place that could hold a reference to the recycled buffer
// is in `Self::maybe_flushed`, which get dropped when the buffer is recycled.
self.mutable = Some(Buffer::reuse_after_flush(
recycled.into_raw_slice().into_inner(),
));
Ok(Some(flush_control))
}
}

View File

@@ -14,13 +14,13 @@ pub struct FlushHandle<Buf, W> {
inner: Option<FlushHandleInner<Buf, W>>,
/// Immutable buffer for serving tail reads.
/// `None` if no flush request has been submitted.
pub(super) maybe_flushed: Option<FullSlice<Buf>>,
pub(super) maybe_flushed: MaybeFlushedFullSlice<Buf>,
}
pub struct FlushHandleInner<Buf, W> {
/// A bi-directional channel that sends (buffer, offset) for writes,
/// and receives recyled buffer.
channel: duplex::mpsc::Duplex<FlushRequest<Buf>, FullSlice<Buf>>,
channel: duplex::mpsc::Sender<FlushRequest<Buf>, FullSlice<Buf>>,
/// Join handle for the background flush task.
join_handle: tokio::task::JoinHandle<std::io::Result<Arc<W>>>,
}
@@ -59,6 +59,62 @@ fn new_flush_op<Buf>(slice: FullSlice<Buf>, offset: u64) -> (FlushRequest<Buf>,
(request, control)
}
pub enum MaybeFlushedFullSlice<Buf> {
Unused(FullSlice<Buf>),
Flushing {
read_buf: FullSlice<Buf>,
write_buf: tokio::sync::oneshot::Receiver<FullSlice<Buf>>,
},
}
impl<Buf> MaybeFlushedFullSlice<Buf> {
pub fn new_flushing(
read_buf: FullSlice<Buf>,
write_buf: tokio::sync::oneshot::Receiver<FullSlice<Buf>>,
) -> Self {
Self::Flushing {
read_buf,
write_buf,
}
}
/// Creates an unused maybe flushed full slice.
pub fn new_unused(buf: FullSlice<Buf>) -> Self {
Self::Unused(buf)
}
/// Returns a reference to the buffer for read if the buffer is [`Self::Flushing`], otherwise returns `None`.
pub fn read_buf(&self) -> Option<&FullSlice<Buf>> {
match self {
MaybeFlushedFullSlice::Unused(_) => None,
MaybeFlushedFullSlice::Flushing {
read_buf,
write_buf: _,
} => Some(read_buf),
}
}
/// Recycles a maybe flushed buffer to a `FullSlice`.
///
/// The call returns immediately if the buffer is [`Self::Unused`].
/// If the buffer is [`Self::Flushing`], the call will wait for an available buffer from the background task.
/// The cheap-cloned slice for read is also dropped.
pub async fn recycle(self) -> Result<FullSlice<Buf>, duplex::mpsc::error::RecvError> {
let buf = match self {
MaybeFlushedFullSlice::Unused(buf) => buf,
MaybeFlushedFullSlice::Flushing {
read_buf: _,
write_buf,
} => {
// This is the BACKPRESSURE mechanism: if the flush task can't keep up,
// then the write path will eventually wait for it here.
write_buf.await?
}
};
Ok(buf)
}
}
/// A handle to a `FlushRequest` that allows unit tests precise control over flush behavior.
#[cfg(test)]
pub(crate) struct FlushControl {
@@ -129,7 +185,7 @@ where
let join_handle = tokio::spawn(async move {
FlushBackgroundTask::new(back, file, gate_guard, ctx)
.run(buf.flush())
.run()
.await
});
@@ -138,7 +194,7 @@ where
channel: front,
join_handle,
}),
maybe_flushed: None,
maybe_flushed: MaybeFlushedFullSlice::new_unused(buf.flush()),
}
}
@@ -146,37 +202,29 @@ where
/// Returns a buffer that completed flushing for re-use, length reset to 0, capacity unchanged.
/// If `save_buf_for_read` is true, then we save the buffer in `Self::maybe_flushed`, otherwise
/// clear `maybe_flushed`.
pub async fn flush<B>(&mut self, buf: B, offset: u64) -> std::io::Result<(B, FlushControl)>
where
B: Buffer<IoBuf = Buf> + Send + 'static,
{
let slice = buf.flush();
// Saves a buffer for read while flushing. This also removes reference to the old buffer.
self.maybe_flushed = Some(slice.cheap_clone());
let (request, flush_control) = new_flush_op(slice, offset);
pub async fn flush(
&mut self,
slice: FullSlice<Buf>,
offset: u64,
) -> std::io::Result<(MaybeFlushedFullSlice<Buf>, FlushControl)> {
let (request, flush_control) = new_flush_op(slice.cheap_clone(), offset);
// Submits the buffer to the background task.
let submit = self.inner_mut().channel.send(request).await;
if submit.is_err() {
return self.handle_error().await;
}
// Wait for an available buffer from the background flush task.
// This is the BACKPRESSURE mechanism: if the flush task can't keep up,
// then the write path will eventually wait for it here.
let Some(recycled) = self.inner_mut().channel.recv().await else {
let Ok(submit) = self.inner_mut().channel.send(request).await else {
return self.handle_error().await;
};
// The only other place that could hold a reference to the recycled buffer
// is in `Self::maybe_flushed`, but we have already replace it with the new buffer.
let recycled = Buffer::reuse_after_flush(recycled.into_raw_slice().into_inner());
// Saves a buffer for read while flushing. This also removes reference to the old buffer.
let recycled = std::mem::replace(
&mut self.maybe_flushed,
MaybeFlushedFullSlice::new_flushing(slice, submit),
);
Ok((recycled, flush_control))
}
async fn handle_error<T>(&mut self) -> std::io::Result<T> {
/// Joins the background task to check for io error.
pub(super) async fn handle_error<T>(&mut self) -> std::io::Result<T> {
Err(self
.shutdown()
.await
@@ -206,7 +254,7 @@ where
pub struct FlushBackgroundTask<Buf, W> {
/// A bi-directional channel that receives (buffer, offset) for writes,
/// and send back recycled buffer.
channel: duplex::mpsc::Duplex<FullSlice<Buf>, FlushRequest<Buf>>,
channel: duplex::mpsc::Receiver<FlushRequest<Buf>, FullSlice<Buf>>,
/// A writter for persisting data to disk.
writer: Arc<W>,
ctx: RequestContext,
@@ -221,7 +269,7 @@ where
{
/// Creates a new background flush task.
fn new(
channel: duplex::mpsc::Duplex<FullSlice<Buf>, FlushRequest<Buf>>,
channel: duplex::mpsc::Receiver<FlushRequest<Buf>, FullSlice<Buf>>,
file: Arc<W>,
gate_guard: utils::sync::gate::GateGuard,
ctx: RequestContext,
@@ -236,18 +284,19 @@ where
/// Runs the background flush task.
/// The passed in slice is immediately sent back to the flush handle through the duplex channel.
async fn run(mut self, slice: FullSlice<Buf>) -> std::io::Result<Arc<W>> {
async fn run(mut self) -> std::io::Result<Arc<W>> {
// Sends the extra buffer back to the handle.
self.channel.send(slice).await.map_err(|_| {
std::io::Error::new(std::io::ErrorKind::BrokenPipe, "flush handle closed early")
})?;
// Exit condition: channel is closed and there is no remaining buffer to be flushed
while let Some(request) = self.channel.recv().await {
while let Some(duplex::mpsc::Request {
payload,
response_tx,
}) = self.channel.recv().await
{
#[cfg(test)]
{
// In test, wait for control to signal that we are ready to flush.
if request.ready_to_flush_rx.await.is_err() {
if payload.ready_to_flush_rx.await.is_err() {
tracing::debug!("control dropped");
}
}
@@ -255,19 +304,19 @@ where
// Write slice to disk at `offset`.
let slice = self
.writer
.write_all_at(request.slice, request.offset, &self.ctx)
.write_all_at(payload.slice, payload.offset, &self.ctx)
.await?;
#[cfg(test)]
{
// In test, tell control we are done flushing buffer.
if request.done_flush_tx.send(()).is_err() {
if payload.done_flush_tx.send(()).is_err() {
tracing::debug!("control dropped");
}
}
// Sends the buffer back to the handle for reuse. The handle is in charged of cleaning the buffer.
if self.channel.send(slice).await.is_err() {
if response_tx.send(slice).is_err() {
// Although channel is closed. Still need to finish flushing the remaining buffers.
continue;
}