From bdffc352e7583c76126648dd1ff1c3e5e32fefa5 Mon Sep 17 00:00:00 2001 From: Yuchen Liang Date: Sun, 10 Nov 2024 20:01:52 +0000 Subject: [PATCH] use background flush for write path; read path broken Signed-off-by: Yuchen Liang --- pageserver/src/tenant/ephemeral_file.rs | 3 +- .../tenant/remote_timeline_client/download.rs | 3 +- .../virtual_file/owned_buffers_io/write.rs | 48 +++++++++---------- .../owned_buffers_io/write/flush.rs | 21 +++++--- 4 files changed, 43 insertions(+), 32 deletions(-) diff --git a/pageserver/src/tenant/ephemeral_file.rs b/pageserver/src/tenant/ephemeral_file.rs index 9a82c131f2..adebf67586 100644 --- a/pageserver/src/tenant/ephemeral_file.rs +++ b/pageserver/src/tenant/ephemeral_file.rs @@ -73,7 +73,8 @@ impl EphemeralFile { bytes_written: 0, buffered_writer: owned_buffers_io::write::BufferedWriter::new( file, - BytesMut::with_capacity(TAIL_SZ), + || BytesMut::with_capacity(TAIL_SZ), + ctx, ), _gate_guard: gate_guard, }) diff --git a/pageserver/src/tenant/remote_timeline_client/download.rs b/pageserver/src/tenant/remote_timeline_client/download.rs index d291f4302d..c39a95d452 100644 --- a/pageserver/src/tenant/remote_timeline_client/download.rs +++ b/pageserver/src/tenant/remote_timeline_client/download.rs @@ -227,7 +227,8 @@ async fn download_object<'a>( let (bytes_amount, destination_file) = async { let mut buffered = owned_buffers_io::write::BufferedWriter::::new( destination_file, - BytesMut::with_capacity(super::BUFFER_SIZE), + || BytesMut::with_capacity(super::BUFFER_SIZE), + ctx, ); while let Some(res) = futures::StreamExt::next(&mut download.download_stream).await diff --git a/pageserver/src/virtual_file/owned_buffers_io/write.rs b/pageserver/src/virtual_file/owned_buffers_io/write.rs index f383ad5723..6e478d840b 100644 --- a/pageserver/src/virtual_file/owned_buffers_io/write.rs +++ b/pageserver/src/virtual_file/owned_buffers_io/write.rs @@ -2,6 +2,7 @@ mod flush; use std::sync::Arc; use bytes::BytesMut; +use flush::FlushHandle; use tokio_epoll_uring::{BoundedBufMut, IoBuf}; use crate::{ @@ -37,7 +38,7 @@ pub trait OwnedAsyncWriter { /// /// In such cases, a different implementation that always buffers in memory /// may be preferable. -pub struct BufferedWriter { +pub struct BufferedWriter { writer: Arc, /// invariant: always remains Some(buf) except /// - while IO is ongoing => goes back to Some() once the IO completed successfully @@ -45,19 +46,21 @@ pub struct BufferedWriter { /// /// In these exceptional cases, it's `None`. mutable: Option, + flush_handle: FlushHandle, bytes_amount: u64, } impl BufferedWriter where - B: Buffer + Send, - Buf: IoBuf + Send, - W: OwnedAsyncWriter, + B: Buffer + Send + 'static, + Buf: IoBuf + Send + Sync + Clone, + W: OwnedAsyncWriter + Send + Sync + 'static + std::fmt::Debug, { - pub fn new(writer: Arc, buf: B) -> Self { + pub fn new(writer: Arc, buf_new: impl Fn() -> B, ctx: &RequestContext) -> Self { Self { - writer, - mutable: Some(buf), + writer: writer.clone(), + mutable: Some(buf_new()), + flush_handle: FlushHandle::spawn_new(writer, buf_new(), ctx.attached_child()), bytes_amount: 0, } } @@ -85,8 +88,10 @@ where let Self { mutable: buf, writer, + mut flush_handle, bytes_amount, } = self; + flush_handle.shutdown().await?; assert!(buf.is_some()); Ok((bytes_amount, writer)) } @@ -110,6 +115,7 @@ where let chunk_len = chunk.len(); // avoid memcpy for the middle of the chunk if chunk.len() >= self.buf().cap() { + // TODO(yuchen): do we still want to keep this? self.flush(ctx).await?; // do a big write, bypassing `buf` assert_eq!( @@ -171,22 +177,16 @@ where Ok(chunk_len) } - async fn flush(&mut self, ctx: &RequestContext) -> std::io::Result<()> { + async fn flush(&mut self, _ctx: &RequestContext) -> std::io::Result<()> { let buf = self.mutable.take().expect("must not use after an error"); let buf_len = buf.pending(); if buf_len == 0 { self.mutable = Some(buf); return Ok(()); } - let slice = buf.flush(); - let slice = self - .writer - .write_all_at(slice, self.bytes_amount, ctx) - .await?; + let recycled = self.flush_handle.flush(buf, self.bytes_amount).await?; self.bytes_amount += u64::try_from(buf_len).unwrap(); - self.mutable = Some(Buffer::reuse_after_flush( - slice.into_raw_slice().into_inner(), - )); + self.mutable = Some(recycled); Ok(()) } } @@ -215,8 +215,6 @@ pub trait Buffer { fn reuse_after_flush(iobuf: Self::IoBuf) -> Self; } -pub trait BufferMut: Buffer {} - impl Buffer for BytesMut { type IoBuf = BytesMut; @@ -283,7 +281,7 @@ mod tests { use crate::context::{DownloadBehavior, RequestContext}; use crate::task_mgr::TaskKind; - #[derive(Default)] + #[derive(Default, Debug)] struct RecorderWriter { /// record bytes and write offsets. writes: Mutex, u64)>>, @@ -331,7 +329,8 @@ mod tests { #[tokio::test] async fn test_buffered_writes_only() -> std::io::Result<()> { let recorder = Arc::new(RecorderWriter::default()); - let mut writer = BufferedWriter::new(recorder, BytesMut::with_capacity(2)); + let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error); + let mut writer = BufferedWriter::new(recorder, || BytesMut::with_capacity(2), &ctx); write!(writer, b"a"); write!(writer, b"b"); write!(writer, b"c"); @@ -348,7 +347,8 @@ mod tests { #[tokio::test] async fn test_passthrough_writes_only() -> std::io::Result<()> { let recorder = Arc::new(RecorderWriter::default()); - let mut writer = BufferedWriter::new(recorder, BytesMut::with_capacity(2)); + let ctx = test_ctx(); + let mut writer = BufferedWriter::new(recorder, || BytesMut::with_capacity(2), &ctx); write!(writer, b"abc"); write!(writer, b"de"); write!(writer, b""); @@ -364,8 +364,8 @@ mod tests { #[tokio::test] async fn test_passthrough_write_with_nonempty_buffer() -> std::io::Result<()> { let recorder = Arc::new(RecorderWriter::default()); - let mut writer = - BufferedWriter::<_, RecorderWriter>::new(recorder, BytesMut::with_capacity(2)); + let ctx = test_ctx(); + let mut writer = BufferedWriter::new(recorder, || BytesMut::with_capacity(2), &ctx); write!(writer, b"a"); write!(writer, b"bc"); write!(writer, b"d"); @@ -384,7 +384,7 @@ mod tests { let ctx = &ctx; let recorder = Arc::new(RecorderWriter::default()); let mut writer = - BufferedWriter::<_, RecorderWriter>::new(recorder, BytesMut::with_capacity(2)); + BufferedWriter::<_, RecorderWriter>::new(recorder, || BytesMut::with_capacity(2), ctx); writer.write_buffered_borrowed(b"abc", ctx).await?; writer.write_buffered_borrowed(b"d", ctx).await?; diff --git a/pageserver/src/virtual_file/owned_buffers_io/write/flush.rs b/pageserver/src/virtual_file/owned_buffers_io/write/flush.rs index 97b9f28009..35057ba4bf 100644 --- a/pageserver/src/virtual_file/owned_buffers_io/write/flush.rs +++ b/pageserver/src/virtual_file/owned_buffers_io/write/flush.rs @@ -128,7 +128,7 @@ where /// Submits a buffer to be flushed in the background task. /// Returns a buffer that completed flushing for re-use, length reset to 0, capacity unchanged. - async fn flush(&mut self, buf: B, offset: u64) -> std::io::Result + pub async fn flush(&mut self, buf: B, offset: u64) -> std::io::Result where B: Buffer + Send + 'static, { @@ -155,14 +155,23 @@ where )) } + /// Cleans up the channel, join the flush task. + pub async fn shutdown(&mut self) -> std::io::Result> { + let handle = self + .inner + .take() + .expect("must not use after we returned an error"); + drop(handle.channel.tx); + handle.join_handle.await.unwrap() + } + fn inner_mut(&mut self) -> &mut FlushHandleInner { - self.inner.as_mut().unwrap() + self.inner + .as_mut() + .expect("must not use after we returned an error") } async fn handle_error(&mut self) -> std::io::Result { - let handle = self.inner.take().unwrap(); - drop(handle.channel.tx); - let e = handle.join_handle.await.unwrap().unwrap_err(); - return Err(e); + Err(self.shutdown().await.unwrap_err()) } }