add comments; make read buffering works with write_buffered (owned version)

Signed-off-by: Yuchen Liang <yuchen@neon.tech>
This commit is contained in:
Yuchen Liang
2024-11-12 16:52:46 +00:00
parent ffd88ede38
commit 6844b5f460
5 changed files with 54 additions and 11 deletions

View File

@@ -88,6 +88,8 @@ impl<A: Alignment> AlignedBuffer<A> {
}
}
/// Returns the mutable aligned buffer, if the immutable aligned buffer
/// has exactly one strong reference. Otherwise returns `None`.
pub fn into_mut(self) -> Option<AlignedBufferMut<A>> {
let raw = Arc::into_inner(self.raw)?;
Some(AlignedBufferMut::from_raw(raw))

View File

@@ -49,6 +49,7 @@ impl<const A: usize> AlignedBufferMut<ConstAlign<A>> {
}
impl<A: Alignment> AlignedBufferMut<A> {
/// Constructs a mutable aligned buffer from raw.
pub(super) fn from_raw(raw: RawAlignedBuffer<A>) -> Self {
AlignedBufferMut { raw }
}
@@ -136,6 +137,7 @@ impl<A: Alignment> AlignedBufferMut<A> {
AlignedBuffer::from_raw(self.raw, 0..len)
}
/// Clones and appends all elements in a slice to the buffer. Reserves additional capacity as needed.
#[inline]
pub fn extend_from_slice(&mut self, extend: &[u8]) {
let cnt = extend.len();
@@ -155,6 +157,7 @@ impl<A: Alignment> AlignedBufferMut<A> {
}
}
/// Returns the remaining spare capacity of the vector as a slice of `MaybeUninit<u8>`.
#[inline]
fn spare_capacity_mut(&mut self) -> &mut [MaybeUninit<u8>] {
// SAFETY: we guarantees that the `Self::capacity()` bytes from

View File

@@ -2,8 +2,10 @@ use tokio_epoll_uring::{IoBuf, IoBufMut};
use crate::virtual_file::{IoBuffer, IoBufferMut, PageWriteGuardBuf};
/// A marker trait for a mutable aligned buffer type.
pub trait IoBufAlignedMut: IoBufMut {}
/// A marker trait for an aligned buffer type.
pub trait IoBufAligned: IoBuf {}
impl IoBufAlignedMut for IoBufferMut {}

View File

@@ -17,6 +17,7 @@ use super::{
/// A trait for doing owned-buffer write IO.
/// Think [`tokio::io::AsyncWrite`] but with owned buffers.
/// The owned buffers need to be aligned due to Direct IO requirements.
pub trait OwnedAsyncWriter {
fn write_all_at<Buf: IoBufAligned + Send>(
&self,
@@ -49,7 +50,9 @@ pub struct BufferedWriter<B: Buffer, W> {
///
/// In these exceptional cases, it's `None`.
mutable: Option<B>,
/// A handle to the background flush task for writting data to disk.
flush_handle: FlushHandle<B::IoBuf, W>,
/// The number of bytes submitted to the background task.
bytes_amount: u64,
}
@@ -59,6 +62,9 @@ where
Buf: IoBufAligned + Send + Sync + Clone,
W: OwnedAsyncWriter + Send + Sync + 'static + std::fmt::Debug,
{
/// Creates a new buffered writer.
///
/// The `buf_new` function provides a way to initialize the owned buffers used by this writer.
pub fn new(writer: Arc<W>, buf_new: impl Fn() -> B, ctx: &RequestContext) -> Self {
Self {
writer: writer.clone(),
@@ -72,6 +78,7 @@ where
&self.writer
}
/// Returns the number of bytes submitted to the background flush task.
pub fn bytes_written(&self) -> u64 {
self.bytes_amount
}
@@ -82,6 +89,7 @@ where
}
/// Gets a reference to the maybe flushed read-only buffer.
/// Returns `None` if the writer has not submitted any flush request.
pub fn inspect_maybe_flushed(&self) -> Option<&Buf> {
self.flush_handle.maybe_flushed.as_ref()
}
@@ -91,7 +99,7 @@ where
mut self,
ctx: &RequestContext,
) -> std::io::Result<(u64, Arc<W>)> {
self.flush(ctx).await?;
self.flush(true, ctx).await?;
let Self {
mutable: buf,
@@ -125,7 +133,7 @@ where
// avoid memcpy for the middle of the chunk
if chunk.len() >= self.mutable().cap() {
// TODO(yuchen): do we still want to keep the bypass path?
self.flush(ctx).await?;
self.flush(false, ctx).await?;
// do a big write, bypassing `buf`
assert_eq!(
self.mutable
@@ -156,7 +164,7 @@ where
slice = &slice[n..];
if buf.pending() >= buf.cap() {
assert_eq!(buf.pending(), buf.cap());
self.flush(ctx).await?;
self.flush(true, ctx).await?;
}
}
assert!(slice.is_empty(), "by now we should have drained the chunk");
@@ -183,20 +191,27 @@ where
chunk = &chunk[n..];
if buf.pending() >= buf.cap() {
assert_eq!(buf.pending(), buf.cap());
self.flush(ctx).await?;
self.flush(true, ctx).await?;
}
}
Ok(chunk_len)
}
async fn flush(&mut self, _ctx: &RequestContext) -> std::io::Result<()> {
async fn flush(
&mut self,
save_buf_for_read: bool,
_ctx: &RequestContext,
) -> std::io::Result<()> {
let buf = self.mutable.take().expect("must not use after an error");
let buf_len = buf.pending();
if buf_len == 0 {
self.mutable = Some(buf);
return Ok(());
}
let recycled = self.flush_handle.flush(buf, self.bytes_amount).await?;
let recycled = self
.flush_handle
.flush(buf, self.bytes_amount, save_buf_for_read)
.await?;
self.bytes_amount += u64::try_from(buf_len).unwrap();
self.mutable = Some(recycled);
Ok(())
@@ -273,6 +288,7 @@ impl Buffer for IoBufferMut {
self.freeze().slice_len()
}
/// Caller should make sure that `iobuf` only have one strong reference before invoking this method.
fn reuse_after_flush(iobuf: Self::IoBuf) -> Self {
let mut recycled = iobuf
.into_mut()

View File

@@ -56,14 +56,17 @@ pub struct FlushHandleInner<Buf, W> {
/// A handle to the flush task.
pub struct FlushHandle<Buf, W> {
inner: Option<FlushHandleInner<Buf, W>>,
/// Buffer for serving tail reads.
/// Immutable buffer for serving tail reads.
/// `None` if no flush request has been submitted.
pub(super) maybe_flushed: Option<Buf>,
}
/// A background task for flushing data to disk.
pub struct FlushBackgroundTask<Buf, W> {
/// A bi-directional channel that receives (buffer, offset) for writes,
/// and send back recycled buffer.
channel: Duplex<FullSlice<Buf>, (FullSlice<Buf>, u64)>,
/// A writter for persisting data to disk.
writer: Arc<W>,
ctx: RequestContext,
}
@@ -73,6 +76,7 @@ where
Buf: IoBufAligned + Send + Sync,
W: OwnedAsyncWriter + Sync + 'static,
{
/// Creates a new background flush task.
fn new(
channel: Duplex<FullSlice<Buf>, (FullSlice<Buf>, u64)>,
file: Arc<W>,
@@ -87,14 +91,17 @@ where
/// Runs the background flush task.
async fn run(mut self, slice: FullSlice<Buf>) -> std::io::Result<Arc<W>> {
// Sends the extra buffer back to the handle.
self.channel.send(slice).await.map_err(|_| {
std::io::Error::new(std::io::ErrorKind::BrokenPipe, "flush handle closed early")
})?;
// Exit condition: channel is closed and there is no remaining buffer to be flushed
while let Some((slice, offset)) = self.channel.recv().await {
// Write slice to disk at `offset`.
let slice = self.writer.write_all_at(slice, offset, &self.ctx).await?;
// Sends the buffer back to the handle for reuse. The handle is in charged of cleaning the buffer.
if self.channel.send(slice).await.is_err() {
// Although channel is closed. Still need to finish flushing the remaining buffers.
continue;
@@ -131,17 +138,28 @@ where
/// Submits a buffer to be flushed in the background task.
/// Returns a buffer that completed flushing for re-use, length reset to 0, capacity unchanged.
pub async fn flush<B>(&mut self, buf: B, offset: u64) -> std::io::Result<B>
/// If `save_buf_for_read` is true, then we save the buffer in `Self::maybe_flushed`, otherwise
/// clear `maybe_flushed`.
pub async fn flush<B>(
&mut self,
buf: B,
offset: u64,
save_buf_for_read: bool,
) -> std::io::Result<B>
where
B: Buffer<IoBuf = Buf> + Send + 'static,
{
let freezed = buf.flush();
self.maybe_flushed
.replace(freezed.as_raw_slice().get_ref().clone());
// Saves a buffer for read while flushing. This also removes reference to the old buffer.
self.maybe_flushed = if save_buf_for_read {
Some(freezed.as_raw_slice().get_ref().clone())
} else {
None
};
// Submits the buffer to the background task.
let submit = self.inner_mut().channel.send((freezed, offset)).await;
if submit.is_err() {
return self.handle_error().await;
}
@@ -168,6 +186,8 @@ where
handle.join_handle.await.unwrap()
}
/// Gets a mutable reference to the inner handle. Panics if [`Self::inner`] is `None`.
/// This only happens if the handle is used after an error.
fn inner_mut(&mut self) -> &mut FlushHandleInner<Buf, W> {
self.inner
.as_mut()