use background flush for write path; read path broken

Signed-off-by: Yuchen Liang <yuchen@neon.tech>
This commit is contained in:
Yuchen Liang
2024-11-10 20:01:52 +00:00
parent 45998046f3
commit bdffc352e7
4 changed files with 43 additions and 32 deletions

View File

@@ -73,7 +73,8 @@ impl EphemeralFile {
bytes_written: 0,
buffered_writer: owned_buffers_io::write::BufferedWriter::new(
file,
BytesMut::with_capacity(TAIL_SZ),
|| BytesMut::with_capacity(TAIL_SZ),
ctx,
),
_gate_guard: gate_guard,
})

View File

@@ -227,7 +227,8 @@ async fn download_object<'a>(
let (bytes_amount, destination_file) = async {
let mut buffered = owned_buffers_io::write::BufferedWriter::<BytesMut, _>::new(
destination_file,
BytesMut::with_capacity(super::BUFFER_SIZE),
|| BytesMut::with_capacity(super::BUFFER_SIZE),
ctx,
);
while let Some(res) =
futures::StreamExt::next(&mut download.download_stream).await

View File

@@ -2,6 +2,7 @@ mod flush;
use std::sync::Arc;
use bytes::BytesMut;
use flush::FlushHandle;
use tokio_epoll_uring::{BoundedBufMut, IoBuf};
use crate::{
@@ -37,7 +38,7 @@ pub trait OwnedAsyncWriter {
///
/// In such cases, a different implementation that always buffers in memory
/// may be preferable.
pub struct BufferedWriter<B, W> {
pub struct BufferedWriter<B: Buffer, W> {
writer: Arc<W>,
/// invariant: always remains Some(buf) except
/// - while IO is ongoing => goes back to Some() once the IO completed successfully
@@ -45,19 +46,21 @@ pub struct BufferedWriter<B, W> {
///
/// In these exceptional cases, it's `None`.
mutable: Option<B>,
flush_handle: FlushHandle<B::IoBuf, W>,
bytes_amount: u64,
}
impl<B, Buf, W> BufferedWriter<B, W>
where
B: Buffer<IoBuf = Buf> + Send,
Buf: IoBuf + Send,
W: OwnedAsyncWriter,
B: Buffer<IoBuf = Buf> + Send + 'static,
Buf: IoBuf + Send + Sync + Clone,
W: OwnedAsyncWriter + Send + Sync + 'static + std::fmt::Debug,
{
pub fn new(writer: Arc<W>, buf: B) -> Self {
pub fn new(writer: Arc<W>, buf_new: impl Fn() -> B, ctx: &RequestContext) -> Self {
Self {
writer,
mutable: Some(buf),
writer: writer.clone(),
mutable: Some(buf_new()),
flush_handle: FlushHandle::spawn_new(writer, buf_new(), ctx.attached_child()),
bytes_amount: 0,
}
}
@@ -85,8 +88,10 @@ where
let Self {
mutable: buf,
writer,
mut flush_handle,
bytes_amount,
} = self;
flush_handle.shutdown().await?;
assert!(buf.is_some());
Ok((bytes_amount, writer))
}
@@ -110,6 +115,7 @@ where
let chunk_len = chunk.len();
// avoid memcpy for the middle of the chunk
if chunk.len() >= self.buf().cap() {
// TODO(yuchen): do we still want to keep this?
self.flush(ctx).await?;
// do a big write, bypassing `buf`
assert_eq!(
@@ -171,22 +177,16 @@ where
Ok(chunk_len)
}
async fn flush(&mut self, ctx: &RequestContext) -> std::io::Result<()> {
async fn flush(&mut self, _ctx: &RequestContext) -> std::io::Result<()> {
let buf = self.mutable.take().expect("must not use after an error");
let buf_len = buf.pending();
if buf_len == 0 {
self.mutable = Some(buf);
return Ok(());
}
let slice = buf.flush();
let slice = self
.writer
.write_all_at(slice, self.bytes_amount, ctx)
.await?;
let recycled = self.flush_handle.flush(buf, self.bytes_amount).await?;
self.bytes_amount += u64::try_from(buf_len).unwrap();
self.mutable = Some(Buffer::reuse_after_flush(
slice.into_raw_slice().into_inner(),
));
self.mutable = Some(recycled);
Ok(())
}
}
@@ -215,8 +215,6 @@ pub trait Buffer {
fn reuse_after_flush(iobuf: Self::IoBuf) -> Self;
}
pub trait BufferMut: Buffer {}
impl Buffer for BytesMut {
type IoBuf = BytesMut;
@@ -283,7 +281,7 @@ mod tests {
use crate::context::{DownloadBehavior, RequestContext};
use crate::task_mgr::TaskKind;
#[derive(Default)]
#[derive(Default, Debug)]
struct RecorderWriter {
/// record bytes and write offsets.
writes: Mutex<Vec<(Vec<u8>, u64)>>,
@@ -331,7 +329,8 @@ mod tests {
#[tokio::test]
async fn test_buffered_writes_only() -> std::io::Result<()> {
let recorder = Arc::new(RecorderWriter::default());
let mut writer = BufferedWriter::new(recorder, BytesMut::with_capacity(2));
let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);
let mut writer = BufferedWriter::new(recorder, || BytesMut::with_capacity(2), &ctx);
write!(writer, b"a");
write!(writer, b"b");
write!(writer, b"c");
@@ -348,7 +347,8 @@ mod tests {
#[tokio::test]
async fn test_passthrough_writes_only() -> std::io::Result<()> {
let recorder = Arc::new(RecorderWriter::default());
let mut writer = BufferedWriter::new(recorder, BytesMut::with_capacity(2));
let ctx = test_ctx();
let mut writer = BufferedWriter::new(recorder, || BytesMut::with_capacity(2), &ctx);
write!(writer, b"abc");
write!(writer, b"de");
write!(writer, b"");
@@ -364,8 +364,8 @@ mod tests {
#[tokio::test]
async fn test_passthrough_write_with_nonempty_buffer() -> std::io::Result<()> {
let recorder = Arc::new(RecorderWriter::default());
let mut writer =
BufferedWriter::<_, RecorderWriter>::new(recorder, BytesMut::with_capacity(2));
let ctx = test_ctx();
let mut writer = BufferedWriter::new(recorder, || BytesMut::with_capacity(2), &ctx);
write!(writer, b"a");
write!(writer, b"bc");
write!(writer, b"d");
@@ -384,7 +384,7 @@ mod tests {
let ctx = &ctx;
let recorder = Arc::new(RecorderWriter::default());
let mut writer =
BufferedWriter::<_, RecorderWriter>::new(recorder, BytesMut::with_capacity(2));
BufferedWriter::<_, RecorderWriter>::new(recorder, || BytesMut::with_capacity(2), ctx);
writer.write_buffered_borrowed(b"abc", ctx).await?;
writer.write_buffered_borrowed(b"d", ctx).await?;

View File

@@ -128,7 +128,7 @@ where
/// Submits a buffer to be flushed in the background task.
/// Returns a buffer that completed flushing for re-use, length reset to 0, capacity unchanged.
async fn flush<B>(&mut self, buf: B, offset: u64) -> std::io::Result<B>
pub async fn flush<B>(&mut self, buf: B, offset: u64) -> std::io::Result<B>
where
B: Buffer<IoBuf = Buf> + Send + 'static,
{
@@ -155,14 +155,23 @@ where
))
}
/// Cleans up the channel, join the flush task.
pub async fn shutdown(&mut self) -> std::io::Result<Arc<W>> {
let handle = self
.inner
.take()
.expect("must not use after we returned an error");
drop(handle.channel.tx);
handle.join_handle.await.unwrap()
}
fn inner_mut(&mut self) -> &mut FlushHandleInner<Buf, W> {
self.inner.as_mut().unwrap()
self.inner
.as_mut()
.expect("must not use after we returned an error")
}
async fn handle_error<T>(&mut self) -> std::io::Result<T> {
let handle = self.inner.take().unwrap();
drop(handle.channel.tx);
let e = handle.join_handle.await.unwrap().unwrap_err();
return Err(e);
Err(self.shutdown().await.unwrap_err())
}
}