make InMemory read aware of mutable & maybe_flushed

Signed-off-by: Yuchen Liang <yuchen@neon.tech>
This commit is contained in:
Yuchen Liang
2024-11-11 03:22:01 +00:00
parent bdffc352e7
commit e0848c28d9
3 changed files with 83 additions and 25 deletions

View File

@@ -17,9 +17,9 @@ use pageserver_api::shard::TenantShardId;
use tokio_epoll_uring::{BoundedBuf, Slice};
use tracing::error;
use std::io;
use std::sync::atomic::AtomicU64;
use std::sync::Arc;
use std::{io, u64};
use utils::id::TimelineId;
pub struct EphemeralFile {
@@ -170,8 +170,10 @@ impl super::storage_layer::inmemory_layer::vectored_dio_read::File for Ephemeral
) -> std::io::Result<(tokio_epoll_uring::Slice<B>, usize)> {
let flushed_offset = self.buffered_writer.bytes_written();
let buffer = self.buffered_writer.inspect_buffer();
let buffered = &buffer[0..buffer.pending()];
let mutable = self.buffered_writer.inspect_mutable();
let mutable = &mutable[0..mutable.pending()];
let maybe_flushed = self.buffered_writer.inspect_maybe_flushed();
let dst_cap = dst.bytes_total().into_u64();
let end = {
@@ -196,8 +198,34 @@ impl super::storage_layer::inmemory_layer::vectored_dio_read::File for Ephemeral
}
}
}
let written_range = Range(start, std::cmp::min(end, flushed_offset));
let buffered_range = Range(std::cmp::max(start, flushed_offset), end);
// [ written ][ maybe_flushed ][ mutable ]
// |- TAIL_SZ -||- TAIL_SZ -|
// ^
// `flushed_offset`
//
let (written_range, maybe_flushed_range) = {
if maybe_flushed.is_some() {
(
Range(
start,
std::cmp::min(end, flushed_offset.saturating_sub(TAIL_SZ as u64)),
),
Range(
std::cmp::max(start, flushed_offset.saturating_sub(TAIL_SZ as u64)),
std::cmp::min(end, flushed_offset),
),
)
} else {
(
Range(start, std::cmp::min(end, flushed_offset)),
// zero len
Range(flushed_offset, u64::MIN),
)
}
};
let mutable_range = Range(std::cmp::max(start, flushed_offset), end);
let dst = if written_range.len() > 0 {
let file: &VirtualFile = self.buffered_writer.as_inner();
@@ -210,20 +238,44 @@ impl super::storage_layer::inmemory_layer::vectored_dio_read::File for Ephemeral
dst
};
let dst = if buffered_range.len() > 0 {
let offset_in_buffer = buffered_range
let dst = if maybe_flushed_range.len() > 0 {
let offset_in_buffer = maybe_flushed_range
.0
.checked_sub(flushed_offset.saturating_sub(TAIL_SZ as u64))
.unwrap()
.into_usize();
// Checked previously the buffer is Some.
let maybe_flushed = maybe_flushed.unwrap();
let to_copy = &maybe_flushed
[offset_in_buffer..(offset_in_buffer + maybe_flushed_range.len().into_usize())];
let bounds = dst.bounds();
let mut view = dst.slice({
let start = written_range.len().into_usize();
let end = start
.checked_add(maybe_flushed_range.len().into_usize())
.unwrap();
start..end
});
view.as_mut_rust_slice_full_zeroed()
.copy_from_slice(to_copy);
Slice::from_buf_bounds(Slice::into_inner(view), bounds)
} else {
dst
};
let dst = if mutable_range.len() > 0 {
let offset_in_buffer = mutable_range
.0
.checked_sub(flushed_offset)
.unwrap()
.into_usize();
let to_copy =
&buffered[offset_in_buffer..(offset_in_buffer + buffered_range.len().into_usize())];
&mutable[offset_in_buffer..(offset_in_buffer + mutable_range.len().into_usize())];
let bounds = dst.bounds();
let mut view = dst.slice({
let start = written_range.len().into_usize();
let end = start
.checked_add(buffered_range.len().into_usize())
.unwrap();
let start =
written_range.len().into_usize() + maybe_flushed_range.len().into_usize();
let end = start.checked_add(mutable_range.len().into_usize()).unwrap();
start..end
});
view.as_mut_rust_slice_full_zeroed()
@@ -330,7 +382,7 @@ mod tests {
.await
.unwrap();
let cap = file.buffered_writer.inspect_buffer().capacity();
let cap = file.buffered_writer.inspect_mutable().capacity();
let write_nbytes = cap + cap / 2;
@@ -361,7 +413,7 @@ mod tests {
let file_contents = std::fs::read(file.buffered_writer.as_inner().path()).unwrap();
assert_eq!(file_contents, &content[0..cap]);
let buffer_contents = file.buffered_writer.inspect_buffer();
let buffer_contents = file.buffered_writer.inspect_mutable();
assert_eq!(buffer_contents, &content[cap..write_nbytes]);
}
@@ -376,7 +428,7 @@ mod tests {
.await
.unwrap();
let cap = file.buffered_writer.inspect_buffer().capacity();
let cap = file.buffered_writer.inspect_mutable().capacity();
let content: Vec<u8> = rand::thread_rng()
.sample_iter(rand::distributions::Standard)
@@ -397,7 +449,7 @@ mod tests {
"buffered writer does one write if we write 1.5x buffer capacity"
);
assert_eq!(
&file.buffered_writer.inspect_buffer()[0..cap / 2],
&file.buffered_writer.inspect_mutable()[0..cap / 2],
&content[cap..cap + cap / 2]
);
}
@@ -419,7 +471,7 @@ mod tests {
.await
.unwrap();
let cap = file.buffered_writer.inspect_buffer().capacity();
let cap = file.buffered_writer.inspect_mutable().capacity();
let content: Vec<u8> = rand::thread_rng()
.sample_iter(rand::distributions::Standard)

View File

@@ -74,8 +74,13 @@ where
}
/// Panics if used after any of the write paths returned an error
pub fn inspect_buffer(&self) -> &B {
self.buf()
pub fn inspect_mutable(&self) -> &B {
self.mutable()
}
/// Gets a reference to the maybe flushed read-only buffer.
pub fn inspect_maybe_flushed(&self) -> Option<&Buf> {
self.flush_handle.maybe_flushed.as_ref()
}
#[cfg_attr(target_os = "macos", allow(dead_code))]
@@ -96,8 +101,9 @@ where
Ok((bytes_amount, writer))
}
/// Gets a reference to the mutable in-memory buffer.
#[inline(always)]
fn buf(&self) -> &B {
fn mutable(&self) -> &B {
self.mutable
.as_ref()
.expect("must not use after we returned an error")
@@ -114,7 +120,7 @@ where
let chunk_len = chunk.len();
// avoid memcpy for the middle of the chunk
if chunk.len() >= self.buf().cap() {
if chunk.len() >= self.mutable().cap() {
// TODO(yuchen): do we still want to keep this?
self.flush(ctx).await?;
// do a big write, bypassing `buf`
@@ -133,7 +139,7 @@ where
return Ok((chunk_len, chunk));
}
// in-memory copy the < BUFFER_SIZED tail of the chunk
assert!(chunk.len() < self.buf().cap());
assert!(chunk.len() < self.mutable().cap());
let mut slice = &chunk[..];
while !slice.is_empty() {
let buf = self.mutable.as_mut().expect("must not use after an error");

View File

@@ -54,7 +54,7 @@ pub struct FlushHandleInner<Buf, W> {
pub struct FlushHandle<Buf, W> {
inner: Option<FlushHandleInner<Buf, W>>,
/// Buffer for serving tail reads.
maybe_flushed: Option<Buf>,
pub(super) maybe_flushed: Option<Buf>,
}
pub struct FlushBackgroundTask<Buf, W> {
@@ -83,8 +83,8 @@ where
}
/// Runs the background flush task.
async fn run(mut self, buf: FullSlice<Buf>) -> std::io::Result<Arc<W>> {
self.channel.send(buf).await.map_err(|_| {
async fn run(mut self, slice: FullSlice<Buf>) -> std::io::Result<Arc<W>> {
self.channel.send(slice).await.map_err(|_| {
std::io::Error::new(std::io::ErrorKind::BrokenPipe, "flush handle closed early")
})?;