make flush handle & task generic

Signed-off-by: Yuchen Liang <yuchen@neon.tech>
This commit is contained in:
Yuchen Liang
2024-11-10 19:07:45 +00:00
parent 26c8b50451
commit 45998046f3
3 changed files with 125 additions and 60 deletions

View File

@@ -27,6 +27,11 @@ where
let FullSlice { slice: s } = self;
s
}
pub(crate) fn as_raw_slice(&self) -> &Slice<B> {
let FullSlice { slice: s } = &self;
s
}
}
impl<B> Deref for FullSlice<B>

View File

@@ -1,22 +1,25 @@
mod flush;
use std::sync::Arc;
use bytes::{BufMut, BytesMut};
use bytes::BytesMut;
use tokio_epoll_uring::{BoundedBufMut, IoBuf};
use crate::{context::RequestContext, virtual_file::IoBufferMut};
use crate::{
context::RequestContext,
virtual_file::{IoBuffer, IoBufferMut},
};
use super::io_buf_ext::{FullSlice, IoBufExt};
/// A trait for doing owned-buffer write IO.
/// Think [`tokio::io::AsyncWrite`] but with owned buffers.
pub trait OwnedAsyncWriter {
async fn write_all_at<Buf: IoBuf + Send>(
fn write_all_at<Buf: IoBuf + Send>(
&self,
buf: FullSlice<Buf>,
offset: u64,
ctx: &RequestContext,
) -> std::io::Result<FullSlice<Buf>>;
) -> impl std::future::Future<Output = std::io::Result<FullSlice<Buf>>> + Send;
}
/// A wrapper aorund an [`OwnedAsyncWriter`] that uses a [`Buffer`] to batch
@@ -212,6 +215,8 @@ pub trait Buffer {
fn reuse_after_flush(iobuf: Self::IoBuf) -> Self;
}
pub trait BufferMut: Buffer {}
impl Buffer for BytesMut {
type IoBuf = BytesMut;
@@ -240,7 +245,7 @@ impl Buffer for BytesMut {
}
impl Buffer for IoBufferMut {
type IoBuf = IoBufferMut;
type IoBuf = IoBuffer;
fn cap(&self) -> usize {
self.capacity()
@@ -256,12 +261,15 @@ impl Buffer for IoBufferMut {
}
fn flush(self) -> FullSlice<Self::IoBuf> {
self.slice_len()
self.freeze().slice_len()
}
fn reuse_after_flush(mut iobuf: Self::IoBuf) -> Self {
iobuf.clear();
iobuf
fn reuse_after_flush(iobuf: Self::IoBuf) -> Self {
let mut recycled = iobuf
.into_mut()
.expect("buffer should only have one strong reference");
recycled.clear();
recycled
}
}

View File

@@ -1,19 +1,23 @@
use std::sync::Arc;
use tokio::sync::mpsc;
use tokio_epoll_uring::IoBuf;
use crate::{
context::RequestContext,
virtual_file::{IoBuffer, IoBufferMut, VirtualFile},
};
use crate::{context::RequestContext, virtual_file::owned_buffers_io::io_buf_ext::FullSlice};
use super::{IoBufExt, OwnedAsyncWriter};
use super::{Buffer, OwnedAsyncWriter};
/// A bi-directional channel.
pub struct Duplex<S, R> {
pub tx: mpsc::Sender<S>,
pub rx: mpsc::Receiver<R>,
}
/// Creates a bi-directional channel.
///
/// The channel will buffer up to the provided number of messages. Once the buffer is full,
/// attempts to send new messages will wait until a message is received from the channel.
/// The provided buffer capacity must be at least 1.
pub fn duplex_channel<A: Send, B: Send>(buffer: usize) -> (Duplex<A, B>, Duplex<B, A>) {
let (tx_a, rx_a) = mpsc::channel::<A>(buffer);
let (tx_b, rx_b) = mpsc::channel::<B>(buffer);
@@ -22,95 +26,143 @@ pub fn duplex_channel<A: Send, B: Send>(buffer: usize) -> (Duplex<A, B>, Duplex<
}
impl<S: Send, R: Send> Duplex<S, R> {
/// Sends a value, waiting until there is capacity.
///
/// A successful send occurs when it is determined that the other end of the channel has not hung up already.
pub async fn send(&self, x: S) -> Result<(), mpsc::error::SendError<S>> {
self.tx.send(x).await
}
/// Receives the next value for this receiver.
///
/// This method returns `None` if the channel has been closed and there are
/// no remaining messages in the channel's buffer.
pub async fn recv(&mut self) -> Option<R> {
self.rx.recv().await
}
}
/// A handle to the flush task.
pub struct FlushHandle {
pub struct FlushHandleInner<Buf, W> {
/// A bi-directional channel that sends (buffer, offset) for writes,
/// and receives recyled buffer.
channel: Duplex<(IoBuffer, u64), IoBuffer>,
///
maybe_flushed: Option<IoBuffer>,
join_handle: tokio::task::JoinHandle<anyhow::Result<()>>,
channel: Duplex<(FullSlice<Buf>, u64), FullSlice<Buf>>,
/// Join handle for the background flush task.
join_handle: tokio::task::JoinHandle<std::io::Result<Arc<W>>>,
}
pub struct FlushBackgroundTask {
channel: Duplex<IoBuffer, (IoBuffer, u64)>,
file: Arc<VirtualFile>,
/// A handle to the flush task.
pub struct FlushHandle<Buf, W> {
inner: Option<FlushHandleInner<Buf, W>>,
/// Buffer for serving tail reads.
maybe_flushed: Option<Buf>,
}
pub struct FlushBackgroundTask<Buf, W> {
/// A bi-directional channel that receives (buffer, offset) for writes,
/// and send back recycled buffer.
channel: Duplex<FullSlice<Buf>, (FullSlice<Buf>, u64)>,
writer: Arc<W>,
ctx: RequestContext,
}
impl FlushBackgroundTask {
impl<Buf, W> FlushBackgroundTask<Buf, W>
where
Buf: IoBuf + Send + Sync,
W: OwnedAsyncWriter + Sync + 'static,
{
fn new(
channel: Duplex<IoBuffer, (IoBuffer, u64)>,
file: Arc<VirtualFile>,
channel: Duplex<FullSlice<Buf>, (FullSlice<Buf>, u64)>,
file: Arc<W>,
ctx: RequestContext,
) -> Self {
FlushBackgroundTask { channel, file, ctx }
FlushBackgroundTask {
channel,
writer: file,
ctx,
}
}
async fn run(mut self, buf: IoBufferMut) -> anyhow::Result<()> {
self.channel.send(buf.freeze()).await?;
/// Runs the background flush task.
async fn run(mut self, buf: FullSlice<Buf>) -> std::io::Result<Arc<W>> {
self.channel.send(buf).await.map_err(|_| {
std::io::Error::new(std::io::ErrorKind::BrokenPipe, "flush handle closed early")
})?;
while let Some((mut buf, offset)) = self.channel.recv().await {
let slice = OwnedAsyncWriter::write_all_at(
self.file.as_ref(),
buf.slice_len(),
offset,
&self.ctx,
)
.await?;
buf = slice.into_raw_slice().into_inner();
// Exit condition: channel is closed and there is no remaining buffer to be flushed
while let Some((slice, offset)) = self.channel.recv().await {
let slice = self.writer.write_all_at(slice, offset, &self.ctx).await?;
if self.channel.send(buf).await.is_err() {
// Channel closed, exit task.
break;
if self.channel.send(slice).await.is_err() {
// Although channel is closed. Still need to finish flushing the remaining buffers.
continue;
}
}
Ok(())
Ok(self.writer)
}
}
impl FlushHandle {
pub fn spawn_new(file: Arc<VirtualFile>, ctx: RequestContext) -> Self {
let (front, back) = duplex_channel(1);
impl<Buf, W> FlushHandle<Buf, W>
where
Buf: IoBuf + Send + Sync + Clone,
W: OwnedAsyncWriter + Send + Sync + 'static + std::fmt::Debug,
{
/// Spawns a new background flush task and obtains a handle.
pub fn spawn_new<B>(file: Arc<W>, buf: B, ctx: RequestContext) -> Self
where
B: Buffer<IoBuf = Buf> + Send + 'static,
{
let (front, back) = duplex_channel(2);
let bg = FlushBackgroundTask::new(back, file, ctx);
let buf = IoBufferMut::with_capacity(4096);
let join_handle = tokio::spawn(async move { bg.run(buf).await });
let join_handle = tokio::spawn(async move { bg.run(buf.flush()).await });
FlushHandle {
channel: front,
inner: Some(FlushHandleInner {
channel: front,
join_handle,
}),
maybe_flushed: None,
join_handle,
}
}
/// Submits a buffer to be flushed in the background task.
/// Returns a buffer that completed flushing for re-use, length reset to 0, capacity unchanged.
async fn flush(&mut self, buf: IoBufferMut, offset: u64) -> IoBufferMut {
// Send
let freezed = buf.freeze();
self.maybe_flushed.replace(freezed.clone());
self.channel.send((freezed, offset)).await.unwrap();
async fn flush<B>(&mut self, buf: B, offset: u64) -> std::io::Result<B>
where
B: Buffer<IoBuf = Buf> + Send + 'static,
{
let freezed = buf.flush();
self.maybe_flushed
.replace(freezed.as_raw_slice().get_ref().clone());
let submit = self.inner_mut().channel.send((freezed, offset)).await;
if submit.is_err() {
return self.handle_error().await;
}
// Wait for an available buffer from the background flush task.
let recycled = self.channel.recv().await.unwrap();
let Some(recycled) = self.inner_mut().channel.recv().await else {
return self.handle_error().await;
};
// The only other place that could hold a reference to the recycled buffer
// is in `Self::maybe_flushed`, but we have already replace it with the new buffer.
let mut recycled = recycled
.into_mut()
.expect("buffer should only have one strong reference");
Ok(Buffer::reuse_after_flush(
recycled.into_raw_slice().into_inner(),
))
}
recycled.clear();
recycled
fn inner_mut(&mut self) -> &mut FlushHandleInner<Buf, W> {
self.inner.as_mut().unwrap()
}
async fn handle_error<T>(&mut self) -> std::io::Result<T> {
let handle = self.inner.take().unwrap();
drop(handle.channel.tx);
let e = handle.join_handle.await.unwrap().unwrap_err();
return Err(e);
}
}