diff --git a/pageserver/src/virtual_file/owned_buffers_io/io_buf_ext.rs b/pageserver/src/virtual_file/owned_buffers_io/io_buf_ext.rs index c3940cf6ce..ede04d4957 100644 --- a/pageserver/src/virtual_file/owned_buffers_io/io_buf_ext.rs +++ b/pageserver/src/virtual_file/owned_buffers_io/io_buf_ext.rs @@ -27,6 +27,11 @@ where let FullSlice { slice: s } = self; s } + + pub(crate) fn as_raw_slice(&self) -> &Slice { + let FullSlice { slice: s } = &self; + s + } } impl Deref for FullSlice diff --git a/pageserver/src/virtual_file/owned_buffers_io/write.rs b/pageserver/src/virtual_file/owned_buffers_io/write.rs index c3cade5966..f383ad5723 100644 --- a/pageserver/src/virtual_file/owned_buffers_io/write.rs +++ b/pageserver/src/virtual_file/owned_buffers_io/write.rs @@ -1,22 +1,25 @@ mod flush; use std::sync::Arc; -use bytes::{BufMut, BytesMut}; +use bytes::BytesMut; use tokio_epoll_uring::{BoundedBufMut, IoBuf}; -use crate::{context::RequestContext, virtual_file::IoBufferMut}; +use crate::{ + context::RequestContext, + virtual_file::{IoBuffer, IoBufferMut}, +}; use super::io_buf_ext::{FullSlice, IoBufExt}; /// A trait for doing owned-buffer write IO. /// Think [`tokio::io::AsyncWrite`] but with owned buffers. pub trait OwnedAsyncWriter { - async fn write_all_at( + fn write_all_at( &self, buf: FullSlice, offset: u64, ctx: &RequestContext, - ) -> std::io::Result>; + ) -> impl std::future::Future>> + Send; } /// A wrapper aorund an [`OwnedAsyncWriter`] that uses a [`Buffer`] to batch @@ -212,6 +215,8 @@ pub trait Buffer { fn reuse_after_flush(iobuf: Self::IoBuf) -> Self; } +pub trait BufferMut: Buffer {} + impl Buffer for BytesMut { type IoBuf = BytesMut; @@ -240,7 +245,7 @@ impl Buffer for BytesMut { } impl Buffer for IoBufferMut { - type IoBuf = IoBufferMut; + type IoBuf = IoBuffer; fn cap(&self) -> usize { self.capacity() @@ -256,12 +261,15 @@ impl Buffer for IoBufferMut { } fn flush(self) -> FullSlice { - self.slice_len() + self.freeze().slice_len() } - fn reuse_after_flush(mut iobuf: Self::IoBuf) -> Self { - iobuf.clear(); - iobuf + fn reuse_after_flush(iobuf: Self::IoBuf) -> Self { + let mut recycled = iobuf + .into_mut() + .expect("buffer should only have one strong reference"); + recycled.clear(); + recycled } } diff --git a/pageserver/src/virtual_file/owned_buffers_io/write/flush.rs b/pageserver/src/virtual_file/owned_buffers_io/write/flush.rs index 86cb2099e6..97b9f28009 100644 --- a/pageserver/src/virtual_file/owned_buffers_io/write/flush.rs +++ b/pageserver/src/virtual_file/owned_buffers_io/write/flush.rs @@ -1,19 +1,23 @@ use std::sync::Arc; use tokio::sync::mpsc; +use tokio_epoll_uring::IoBuf; -use crate::{ - context::RequestContext, - virtual_file::{IoBuffer, IoBufferMut, VirtualFile}, -}; +use crate::{context::RequestContext, virtual_file::owned_buffers_io::io_buf_ext::FullSlice}; -use super::{IoBufExt, OwnedAsyncWriter}; +use super::{Buffer, OwnedAsyncWriter}; +/// A bi-directional channel. pub struct Duplex { pub tx: mpsc::Sender, pub rx: mpsc::Receiver, } +/// Creates a bi-directional channel. +/// +/// The channel will buffer up to the provided number of messages. Once the buffer is full, +/// attempts to send new messages will wait until a message is received from the channel. +/// The provided buffer capacity must be at least 1. pub fn duplex_channel(buffer: usize) -> (Duplex, Duplex) { let (tx_a, rx_a) = mpsc::channel::(buffer); let (tx_b, rx_b) = mpsc::channel::(buffer); @@ -22,95 +26,143 @@ pub fn duplex_channel(buffer: usize) -> (Duplex, Duplex< } impl Duplex { + /// Sends a value, waiting until there is capacity. + /// + /// A successful send occurs when it is determined that the other end of the channel has not hung up already. pub async fn send(&self, x: S) -> Result<(), mpsc::error::SendError> { self.tx.send(x).await } + /// Receives the next value for this receiver. + /// + /// This method returns `None` if the channel has been closed and there are + /// no remaining messages in the channel's buffer. pub async fn recv(&mut self) -> Option { self.rx.recv().await } } -/// A handle to the flush task. -pub struct FlushHandle { +pub struct FlushHandleInner { /// A bi-directional channel that sends (buffer, offset) for writes, /// and receives recyled buffer. - channel: Duplex<(IoBuffer, u64), IoBuffer>, - /// - maybe_flushed: Option, - join_handle: tokio::task::JoinHandle>, + channel: Duplex<(FullSlice, u64), FullSlice>, + /// Join handle for the background flush task. + join_handle: tokio::task::JoinHandle>>, } -pub struct FlushBackgroundTask { - channel: Duplex, - file: Arc, +/// A handle to the flush task. +pub struct FlushHandle { + inner: Option>, + /// Buffer for serving tail reads. + maybe_flushed: Option, +} + +pub struct FlushBackgroundTask { + /// A bi-directional channel that receives (buffer, offset) for writes, + /// and send back recycled buffer. + channel: Duplex, (FullSlice, u64)>, + writer: Arc, ctx: RequestContext, } -impl FlushBackgroundTask { +impl FlushBackgroundTask +where + Buf: IoBuf + Send + Sync, + W: OwnedAsyncWriter + Sync + 'static, +{ fn new( - channel: Duplex, - file: Arc, + channel: Duplex, (FullSlice, u64)>, + file: Arc, ctx: RequestContext, ) -> Self { - FlushBackgroundTask { channel, file, ctx } + FlushBackgroundTask { + channel, + writer: file, + ctx, + } } - async fn run(mut self, buf: IoBufferMut) -> anyhow::Result<()> { - self.channel.send(buf.freeze()).await?; + /// Runs the background flush task. + async fn run(mut self, buf: FullSlice) -> std::io::Result> { + self.channel.send(buf).await.map_err(|_| { + std::io::Error::new(std::io::ErrorKind::BrokenPipe, "flush handle closed early") + })?; - while let Some((mut buf, offset)) = self.channel.recv().await { - let slice = OwnedAsyncWriter::write_all_at( - self.file.as_ref(), - buf.slice_len(), - offset, - &self.ctx, - ) - .await?; - buf = slice.into_raw_slice().into_inner(); + // Exit condition: channel is closed and there is no remaining buffer to be flushed + while let Some((slice, offset)) = self.channel.recv().await { + let slice = self.writer.write_all_at(slice, offset, &self.ctx).await?; - if self.channel.send(buf).await.is_err() { - // Channel closed, exit task. - break; + if self.channel.send(slice).await.is_err() { + // Although channel is closed. Still need to finish flushing the remaining buffers. + continue; } } - Ok(()) + Ok(self.writer) } } -impl FlushHandle { - pub fn spawn_new(file: Arc, ctx: RequestContext) -> Self { - let (front, back) = duplex_channel(1); +impl FlushHandle +where + Buf: IoBuf + Send + Sync + Clone, + W: OwnedAsyncWriter + Send + Sync + 'static + std::fmt::Debug, +{ + /// Spawns a new background flush task and obtains a handle. + pub fn spawn_new(file: Arc, buf: B, ctx: RequestContext) -> Self + where + B: Buffer + Send + 'static, + { + let (front, back) = duplex_channel(2); let bg = FlushBackgroundTask::new(back, file, ctx); - let buf = IoBufferMut::with_capacity(4096); - let join_handle = tokio::spawn(async move { bg.run(buf).await }); + let join_handle = tokio::spawn(async move { bg.run(buf.flush()).await }); FlushHandle { - channel: front, + inner: Some(FlushHandleInner { + channel: front, + join_handle, + }), maybe_flushed: None, - join_handle, } } + /// Submits a buffer to be flushed in the background task. /// Returns a buffer that completed flushing for re-use, length reset to 0, capacity unchanged. - async fn flush(&mut self, buf: IoBufferMut, offset: u64) -> IoBufferMut { - // Send - let freezed = buf.freeze(); - self.maybe_flushed.replace(freezed.clone()); - self.channel.send((freezed, offset)).await.unwrap(); + async fn flush(&mut self, buf: B, offset: u64) -> std::io::Result + where + B: Buffer + Send + 'static, + { + let freezed = buf.flush(); + + self.maybe_flushed + .replace(freezed.as_raw_slice().get_ref().clone()); + + let submit = self.inner_mut().channel.send((freezed, offset)).await; + + if submit.is_err() { + return self.handle_error().await; + } // Wait for an available buffer from the background flush task. - let recycled = self.channel.recv().await.unwrap(); + let Some(recycled) = self.inner_mut().channel.recv().await else { + return self.handle_error().await; + }; // The only other place that could hold a reference to the recycled buffer // is in `Self::maybe_flushed`, but we have already replace it with the new buffer. - let mut recycled = recycled - .into_mut() - .expect("buffer should only have one strong reference"); + Ok(Buffer::reuse_after_flush( + recycled.into_raw_slice().into_inner(), + )) + } - recycled.clear(); - recycled + fn inner_mut(&mut self) -> &mut FlushHandleInner { + self.inner.as_mut().unwrap() + } + + async fn handle_error(&mut self) -> std::io::Result { + let handle = self.inner.take().unwrap(); + drop(handle.channel.tx); + let e = handle.join_handle.await.unwrap().unwrap_err(); + return Err(e); } }