move duplex to utils; make flush behavior controllable in test

Signed-off-by: Yuchen Liang <yuchen@neon.tech>
This commit is contained in:
Yuchen Liang
2024-11-18 23:52:52 +00:00
parent 990bc65a20
commit 5acc61bdbc
6 changed files with 348 additions and 139 deletions

View File

@@ -1,3 +1,4 @@
pub mod heavier_once_cell;
pub mod duplex;
pub mod gate;

View File

@@ -0,0 +1 @@
pub mod mpsc;

View File

@@ -0,0 +1,36 @@
use tokio::sync::mpsc;
/// A bi-directional channel.
pub struct Duplex<S, R> {
pub tx: mpsc::Sender<S>,
pub rx: mpsc::Receiver<R>,
}
/// Creates a bi-directional channel.
///
/// The channel will buffer up to the provided number of messages. Once the buffer is full,
/// attempts to send new messages will wait until a message is received from the channel.
/// The provided buffer capacity must be at least 1.
pub fn channel<A: Send, B: Send>(buffer: usize) -> (Duplex<A, B>, Duplex<B, A>) {
let (tx_a, rx_a) = mpsc::channel::<A>(buffer);
let (tx_b, rx_b) = mpsc::channel::<B>(buffer);
(Duplex { tx: tx_a, rx: rx_b }, Duplex { tx: tx_b, rx: rx_a })
}
impl<S: Send, R: Send> Duplex<S, R> {
/// Sends a value, waiting until there is capacity.
///
/// A successful send occurs when it is determined that the other end of the channel has not hung up already.
pub async fn send(&self, x: S) -> Result<(), mpsc::error::SendError<S>> {
self.tx.send(x).await
}
/// Receives the next value for this receiver.
///
/// This method returns `None` if the channel has been closed and there are
/// no remaining messages in the channel's buffer.
pub async fn recv(&mut self) -> Option<R> {
self.rx.recv().await
}
}

View File

@@ -131,6 +131,18 @@ impl EphemeralFile {
srcbuf: &[u8],
ctx: &RequestContext,
) -> std::io::Result<u64> {
let (pos, control) = self.write_raw_controlled(srcbuf, ctx).await?;
if let Some(control) = control {
control.release().await;
}
Ok(pos)
}
pub(crate) async fn write_raw_controlled(
&mut self,
srcbuf: &[u8],
ctx: &RequestContext,
) -> std::io::Result<(u64, Option<owned_buffers_io::write::FlushControl>)> {
let pos = self.bytes_written;
let new_bytes_written = pos.checked_add(srcbuf.len().into_u64()).ok_or_else(|| {
@@ -144,9 +156,9 @@ impl EphemeralFile {
})?;
// Write the payload
let nwritten = self
let (nwritten, control) = self
.buffered_writer
.write_buffered_borrowed(srcbuf, ctx)
.write_buffered_borrowed_controlled(srcbuf, ctx)
.await?;
assert_eq!(
nwritten,
@@ -156,7 +168,7 @@ impl EphemeralFile {
self.bytes_written = new_bytes_written;
Ok(pos)
Ok((pos, control))
}
}
@@ -381,7 +393,9 @@ mod tests {
.await
.unwrap();
let cap = file.buffered_writer.inspect_mutable().capacity();
let mutable = file.buffered_writer.inspect_mutable();
let cap = mutable.capacity();
let align = mutable.align();
let write_nbytes = cap * 2 + cap / 2;
@@ -391,26 +405,33 @@ mod tests {
.collect();
let mut value_offsets = Vec::new();
for i in 0..write_nbytes {
let off = file.write_raw(&content[i..i + 1], &ctx).await.unwrap();
for range in (0..write_nbytes)
.step_by(align)
.map(|start| start..(start + align).min(write_nbytes))
{
let off = file.write_raw(&content[range], &ctx).await.unwrap();
value_offsets.push(off);
}
assert!(file.len() as usize == write_nbytes);
for i in 0..write_nbytes {
assert_eq!(value_offsets[i], i.into_u64());
let buf = IoBufferMut::with_capacity(1);
assert_eq!(file.len() as usize, write_nbytes);
for (i, range) in (0..write_nbytes)
.step_by(align)
.map(|start| start..(start + align).min(write_nbytes))
.enumerate()
{
assert_eq!(value_offsets[i], range.start.into_u64());
let buf = IoBufferMut::with_capacity(range.len());
let (buf_slice, nread) = file
.read_exact_at_eof_ok(i.into_u64(), buf.slice_full(), &ctx)
.read_exact_at_eof_ok(range.start.into_u64(), buf.slice_full(), &ctx)
.await
.unwrap();
let buf = buf_slice.into_inner();
assert_eq!(nread, 1);
assert_eq!(&buf, &content[i..i + 1]);
assert_eq!(nread, range.len());
assert_eq!(&buf, &content[range]);
}
let file_contents = std::fs::read(file.buffered_writer.as_inner().path()).unwrap();
assert!(file_contents == content[0..cap] || file_contents == content[0..cap * 2]);
assert!(file_contents == content[0..cap * 2]);
let maybe_flushed_buffer_contents = file.buffered_writer.inspect_maybe_flushed().unwrap();
assert_eq!(maybe_flushed_buffer_contents, &content[cap..cap * 2]);
@@ -430,7 +451,7 @@ mod tests {
.await
.unwrap();
// mutable buffer and maybe_flushed buffer each has cap.
// mutable buffer and maybe_flushed buffer each has `cap` bytes.
let cap = file.buffered_writer.inspect_mutable().capacity();
let content: Vec<u8> = rand::thread_rng()
@@ -446,8 +467,9 @@ mod tests {
&content[0..cap * 2 + cap / 2]
);
let md = file.buffered_writer.as_inner().path().metadata().unwrap();
assert!(
md.len() == cap.into_u64() || md.len() == 2 * cap.into_u64(),
assert_eq!(
md.len(),
2 * cap.into_u64(),
"buffered writer requires one write to be flushed if we write 2.5x buffer capacity"
);
assert_eq!(
@@ -477,14 +499,15 @@ mod tests {
.await
.unwrap();
let cap = file.buffered_writer.inspect_mutable().capacity();
let mutable = file.buffered_writer.inspect_mutable();
let cap = mutable.capacity();
let align = mutable.align();
let content: Vec<u8> = rand::thread_rng()
.sample_iter(rand::distributions::Standard)
.take(cap * 2 + cap / 2)
.collect();
file.write_raw(&content, &ctx).await.unwrap();
let (_, control) = file.write_raw_controlled(&content, &ctx).await.unwrap();
let test_read = |start: usize, len: usize| {
let file = &file;
@@ -504,24 +527,37 @@ mod tests {
}
};
let test_read_all_offset_combinations = || {
async move {
test_read(align, align).await;
// border onto edge of file
test_read(cap - align, align).await;
// read across file and buffer
test_read(cap - align, 2 * align).await;
// stay from start of maybe flushed buffer
test_read(cap, align).await;
// completely within maybe flushed buffer
test_read(cap + align, align).await;
// border onto edge of maybe flushed buffer.
test_read(cap * 2 - align, align).await;
// read across maybe flushed and mutable buffer
test_read(cap * 2 - align, 2 * align).await;
// read across three segments
test_read(cap - align, cap + 2 * align).await;
// completely within mutable buffer
test_read(cap * 2 + align, align).await;
}
};
// completely within the file range
assert!(20 < cap, "test assumption");
test_read(10, 10).await;
// border onto edge of file
test_read(cap - 10, 10).await;
// read across file and buffer
test_read(cap - 10, 20).await;
// stay from start of maybe flushed buffer
test_read(cap, 10).await;
// completely within maybe flushed buffer
test_read(cap + 10, 10).await;
// border onto edge of maybe flushed buffer.
test_read(cap * 2 - 10, 10).await;
// read across maybe flushed and mutable buffer
test_read(cap * 2 - 10, 20).await;
// read across three segments
test_read(cap - 10, cap + 20).await;
// completely within mutable buffer
test_read(cap * 2 + 10, 10).await;
assert!(align < cap, "test assumption");
assert!(cap % align == 0);
let not_started = control.unwrap().as_not_started();
test_read_all_offset_combinations().await;
let in_progress = not_started.ready_to_flush();
test_read_all_offset_combinations().await;
in_progress.wait_until_flush_is_done().await;
test_read_all_offset_combinations().await;
}
}

View File

@@ -15,6 +15,8 @@ use super::{
io_buf_ext::{FullSlice, IoBufExt},
};
pub(crate) use flush::FlushControl;
/// A trait for doing owned-buffer write IO.
/// Think [`tokio::io::AsyncWrite`] but with owned buffers.
/// The owned buffers need to be aligned due to Direct IO requirements.
@@ -133,7 +135,10 @@ where
// avoid memcpy for the middle of the chunk
if chunk.len() >= self.mutable().cap() {
// TODO(yuchen): do we still want to keep the bypass path?
self.flush(false, ctx).await?;
let control = self.flush(false, ctx).await?;
if let Some(control) = control {
control.release().await;
}
// do a big write, bypassing `buf`
assert_eq!(
self.mutable
@@ -155,7 +160,11 @@ where
// in-memory copy the < BUFFER_SIZED tail of the chunk
assert!(chunk.len() < self.mutable().cap());
let mut slice = &chunk[..];
let mut control: Option<FlushControl> = None;
while !slice.is_empty() {
if let Some(control) = control.take() {
control.release().await;
}
let buf = self.mutable.as_mut().expect("must not use after an error");
let need = buf.cap() - buf.pending();
let have = slice.len();
@@ -164,9 +173,12 @@ where
slice = &slice[n..];
if buf.pending() >= buf.cap() {
assert_eq!(buf.pending(), buf.cap());
self.flush(true, ctx).await?;
control = self.flush(true, ctx).await?;
}
}
if let Some(control) = control.take() {
control.release().await;
}
assert!(slice.is_empty(), "by now we should have drained the chunk");
Ok((chunk_len, FullSlice::must_new(chunk)))
}
@@ -178,10 +190,24 @@ where
/// for large writes.
pub async fn write_buffered_borrowed(
&mut self,
mut chunk: &[u8],
chunk: &[u8],
ctx: &RequestContext,
) -> std::io::Result<usize> {
let (len, control) = self.write_buffered_borrowed_controlled(chunk, ctx).await?;
if let Some(control) = control {
control.release().await;
}
Ok(len)
}
/// In addition to bytes submitted in this write, also returns a handle that can control the flush behavior.
pub async fn write_buffered_borrowed_controlled(
&mut self,
mut chunk: &[u8],
ctx: &RequestContext,
) -> std::io::Result<(usize, Option<FlushControl>)> {
let chunk_len = chunk.len();
let mut control: Option<FlushControl> = None;
while !chunk.is_empty() {
let buf = self.mutable.as_mut().expect("must not use after an error");
let need = buf.cap() - buf.pending();
@@ -191,30 +217,34 @@ where
chunk = &chunk[n..];
if buf.pending() >= buf.cap() {
assert_eq!(buf.pending(), buf.cap());
self.flush(true, ctx).await?;
if let Some(control) = control.take() {
control.release().await;
}
control = self.flush(true, ctx).await?;
}
}
Ok(chunk_len)
Ok((chunk_len, control))
}
#[must_use]
async fn flush(
&mut self,
save_buf_for_read: bool,
_ctx: &RequestContext,
) -> std::io::Result<()> {
) -> std::io::Result<Option<FlushControl>> {
let buf = self.mutable.take().expect("must not use after an error");
let buf_len = buf.pending();
if buf_len == 0 {
self.mutable = Some(buf);
return Ok(());
return Ok(None);
}
let recycled = self
let (recycled, flush_control) = self
.flush_handle
.flush(buf, self.bytes_submitted, save_buf_for_read)
.await?;
self.bytes_submitted += u64::try_from(buf_len).unwrap();
self.mutable = Some(recycled);
Ok(())
Ok(Some(flush_control))
}
}

View File

@@ -1,6 +1,6 @@
use std::sync::Arc;
use tokio::sync::mpsc;
use utils::sync::duplex;
use crate::{
context::RequestContext,
@@ -9,50 +9,6 @@ use crate::{
use super::{Buffer, OwnedAsyncWriter};
/// A bi-directional channel.
pub struct Duplex<S, R> {
pub tx: mpsc::Sender<S>,
pub rx: mpsc::Receiver<R>,
}
/// Creates a bi-directional channel.
///
/// The channel will buffer up to the provided number of messages. Once the buffer is full,
/// attempts to send new messages will wait until a message is received from the channel.
/// The provided buffer capacity must be at least 1.
pub fn duplex_channel<A: Send, B: Send>(buffer: usize) -> (Duplex<A, B>, Duplex<B, A>) {
let (tx_a, rx_a) = mpsc::channel::<A>(buffer);
let (tx_b, rx_b) = mpsc::channel::<B>(buffer);
(Duplex { tx: tx_a, rx: rx_b }, Duplex { tx: tx_b, rx: rx_a })
}
impl<S: Send, R: Send> Duplex<S, R> {
/// Sends a value, waiting until there is capacity.
///
/// A successful send occurs when it is determined that the other end of the channel has not hung up already.
pub async fn send(&self, x: S) -> Result<(), mpsc::error::SendError<S>> {
self.tx.send(x).await
}
/// Receives the next value for this receiver.
///
/// This method returns `None` if the channel has been closed and there are
/// no remaining messages in the channel's buffer.
pub async fn recv(&mut self) -> Option<R> {
self.rx.recv().await
}
}
// TODO(yuchen): special actions in drop to clean up the join handle?
pub struct FlushHandleInner<Buf, W> {
/// A bi-directional channel that sends (buffer, offset) for writes,
/// and receives recyled buffer.
channel: Duplex<(FullSlice<Buf>, u64), FullSlice<Buf>>,
/// Join handle for the background flush task.
join_handle: tokio::task::JoinHandle<std::io::Result<Arc<W>>>,
}
/// A handle to the flush task.
pub struct FlushHandle<Buf, W> {
inner: Option<FlushHandleInner<Buf, W>>,
@@ -61,54 +17,97 @@ pub struct FlushHandle<Buf, W> {
pub(super) maybe_flushed: Option<Buf>,
}
/// A background task for flushing data to disk.
pub struct FlushBackgroundTask<Buf, W> {
/// A bi-directional channel that receives (buffer, offset) for writes,
/// and send back recycled buffer.
channel: Duplex<FullSlice<Buf>, (FullSlice<Buf>, u64)>,
/// A writter for persisting data to disk.
writer: Arc<W>,
ctx: RequestContext,
// TODO(yuchen): special actions in drop to clean up the join handle?
pub struct FlushHandleInner<Buf, W> {
/// A bi-directional channel that sends (buffer, offset) for writes,
/// and receives recyled buffer.
channel: duplex::mpsc::Duplex<FlushRequest<Buf>, FullSlice<Buf>>,
/// Join handle for the background flush task.
join_handle: tokio::task::JoinHandle<std::io::Result<Arc<W>>>,
}
impl<Buf, W> FlushBackgroundTask<Buf, W>
where
Buf: IoBufAligned + Send + Sync,
W: OwnedAsyncWriter + Sync + 'static,
{
/// Creates a new background flush task.
fn new(
channel: Duplex<FullSlice<Buf>, (FullSlice<Buf>, u64)>,
file: Arc<W>,
ctx: RequestContext,
struct FlushRequest<Buf> {
slice: FullSlice<Buf>,
offset: u64,
#[cfg(test)]
ready_to_flush_rx: tokio::sync::oneshot::Receiver<()>,
#[cfg(test)]
done_flush_tx: tokio::sync::oneshot::Sender<()>,
}
#[cfg(not(test))]
fn new_flush_op<Buf>(slice: FullSlice<Buf>, offset: u64) -> (FlushRequest<Buf>, FlushControl) {
let request = FlushRequest { slice, offset };
let control = FlushControl::untracked();
(request, control)
}
#[cfg(test)]
fn new_flush_op<Buf>(slice: FullSlice<Buf>, offset: u64) -> (FlushRequest<Buf>, FlushControl) {
let (ready_to_flush_tx, ready_to_flush_rx) = tokio::sync::oneshot::channel();
let (done_flush_tx, done_flush_rx) = tokio::sync::oneshot::channel();
let control = FlushControl::not_started(ready_to_flush_tx, done_flush_rx);
let request = FlushRequest {
slice,
offset,
ready_to_flush_rx,
done_flush_tx,
};
(request, control)
}
pub enum FlushStartState {
#[cfg(not(test))]
Untracked,
#[cfg(test)]
NotStarted(FlushNotStarted),
}
pub(crate) struct FlushControl {
state: FlushStartState,
}
impl FlushControl {
#[cfg(test)]
fn not_started(
ready_to_flush_tx: tokio::sync::oneshot::Sender<()>,
done_flush_rx: tokio::sync::oneshot::Receiver<()>,
) -> Self {
FlushBackgroundTask {
channel,
writer: file,
ctx,
FlushControl {
state: FlushStartState::NotStarted(FlushNotStarted {
ready_to_flush_tx,
done_flush_rx,
}),
}
}
#[cfg(not(test))]
fn untracked() -> Self {
FlushControl {
state: FlushStartState::Untracked,
}
}
/// Runs the background flush task.
async fn run(mut self, slice: FullSlice<Buf>) -> std::io::Result<Arc<W>> {
// Sends the extra buffer back to the handle.
self.channel.send(slice).await.map_err(|_| {
std::io::Error::new(std::io::ErrorKind::BrokenPipe, "flush handle closed early")
})?;
#[cfg(test)]
pub(crate) fn as_not_started(self) -> FlushNotStarted {
match self.state {
FlushStartState::NotStarted(not_started) => not_started,
}
}
// Exit condition: channel is closed and there is no remaining buffer to be flushed
while let Some((slice, offset)) = self.channel.recv().await {
// Write slice to disk at `offset`.
let slice = self.writer.write_all_at(slice, offset, &self.ctx).await?;
// Sends the buffer back to the handle for reuse. The handle is in charged of cleaning the buffer.
if self.channel.send(slice).await.is_err() {
// Although channel is closed. Still need to finish flushing the remaining buffers.
continue;
pub async fn release(self) {
match self.state {
#[cfg(not(test))]
FlushStartState::Untracked => (),
#[cfg(test)]
FlushStartState::NotStarted(not_started) => {
not_started
.ready_to_flush()
.wait_until_flush_is_done()
.await;
}
}
Ok(self.writer)
}
}
@@ -122,7 +121,7 @@ where
where
B: Buffer<IoBuf = Buf> + Send + 'static,
{
let (front, back) = duplex_channel(2);
let (front, back) = duplex::mpsc::channel(1);
let bg = FlushBackgroundTask::new(back, file, ctx);
let join_handle = tokio::spawn(async move { bg.run(buf.flush()).await });
@@ -145,21 +144,23 @@ where
buf: B,
offset: u64,
save_buf_for_read: bool,
) -> std::io::Result<B>
) -> std::io::Result<(B, FlushControl)>
where
B: Buffer<IoBuf = Buf> + Send + 'static,
{
let freezed = buf.flush();
let slice = buf.flush();
// Saves a buffer for read while flushing. This also removes reference to the old buffer.
self.maybe_flushed = if save_buf_for_read {
Some(freezed.as_raw_slice().get_ref().clone())
Some(slice.as_raw_slice().get_ref().clone())
} else {
None
};
let (request, flush_control) = new_flush_op(slice, offset);
// Submits the buffer to the background task.
let submit = self.inner_mut().channel.send((freezed, offset)).await;
let submit = self.inner_mut().channel.send(request).await;
if submit.is_err() {
return self.handle_error().await;
}
@@ -171,9 +172,9 @@ where
// The only other place that could hold a reference to the recycled buffer
// is in `Self::maybe_flushed`, but we have already replace it with the new buffer.
Ok(Buffer::reuse_after_flush(
recycled.into_raw_slice().into_inner(),
))
let recycled = Buffer::reuse_after_flush(recycled.into_raw_slice().into_inner());
Ok((recycled, flush_control))
}
/// Cleans up the channel, join the flush task.
@@ -198,3 +199,107 @@ where
Err(self.shutdown().await.unwrap_err())
}
}
/// A background task for flushing data to disk.
pub struct FlushBackgroundTask<Buf, W> {
/// A bi-directional channel that receives (buffer, offset) for writes,
/// and send back recycled buffer.
channel: duplex::mpsc::Duplex<FullSlice<Buf>, FlushRequest<Buf>>,
/// A writter for persisting data to disk.
writer: Arc<W>,
ctx: RequestContext,
}
impl<Buf, W> FlushBackgroundTask<Buf, W>
where
Buf: IoBufAligned + Send + Sync,
W: OwnedAsyncWriter + Sync + 'static,
{
/// Creates a new background flush task.
fn new(
channel: duplex::mpsc::Duplex<FullSlice<Buf>, FlushRequest<Buf>>,
file: Arc<W>,
ctx: RequestContext,
) -> Self {
FlushBackgroundTask {
channel,
writer: file,
ctx,
}
}
/// Runs the background flush task.
async fn run(mut self, slice: FullSlice<Buf>) -> std::io::Result<Arc<W>> {
// Sends the extra buffer back to the handle.
self.channel.send(slice).await.map_err(|_| {
std::io::Error::new(std::io::ErrorKind::BrokenPipe, "flush handle closed early")
})?;
// Exit condition: channel is closed and there is no remaining buffer to be flushed
while let Some(request) = self.channel.recv().await {
#[cfg(test)]
{
// In test, wait for control to signal that we are ready to flush.
if request.ready_to_flush_rx.await.is_err() {
tracing::debug!("control dropped");
}
}
// Write slice to disk at `offset`.
let slice = self
.writer
.write_all_at(request.slice, request.offset, &self.ctx)
.await?;
#[cfg(test)]
{
// In test, tell control we are done flushing buffer.
if request.done_flush_tx.send(()).is_err() {
tracing::debug!("control dropped");
}
}
// Sends the buffer back to the handle for reuse. The handle is in charged of cleaning the buffer.
if self.channel.send(slice).await.is_err() {
// Although channel is closed. Still need to finish flushing the remaining buffers.
continue;
}
}
Ok(self.writer)
}
}
#[cfg(test)]
pub(crate) struct FlushNotStarted {
ready_to_flush_tx: tokio::sync::oneshot::Sender<()>,
done_flush_rx: tokio::sync::oneshot::Receiver<()>,
}
#[cfg(test)]
pub(crate) struct FlushInProgress {
done_flush_rx: tokio::sync::oneshot::Receiver<()>,
}
#[cfg(test)]
pub(crate) struct FlushDone;
#[cfg(test)]
impl FlushNotStarted {
pub fn ready_to_flush(self) -> FlushInProgress {
self.ready_to_flush_tx
.send(())
.map(|_| FlushInProgress {
done_flush_rx: self.done_flush_rx,
})
.unwrap()
}
}
#[cfg(test)]
impl FlushInProgress {
pub async fn wait_until_flush_is_done(self) -> FlushDone {
self.done_flush_rx.await.unwrap();
FlushDone
}
}