From e0848c28d958f708d523442a10be915e648bdc31 Mon Sep 17 00:00:00 2001 From: Yuchen Liang Date: Mon, 11 Nov 2024 03:22:01 +0000 Subject: [PATCH] make InMemory read aware of mutable & maybe_flushed Signed-off-by: Yuchen Liang --- pageserver/src/tenant/ephemeral_file.rs | 86 +++++++++++++++---- .../virtual_file/owned_buffers_io/write.rs | 16 ++-- .../owned_buffers_io/write/flush.rs | 6 +- 3 files changed, 83 insertions(+), 25 deletions(-) diff --git a/pageserver/src/tenant/ephemeral_file.rs b/pageserver/src/tenant/ephemeral_file.rs index adebf67586..7b67570bf4 100644 --- a/pageserver/src/tenant/ephemeral_file.rs +++ b/pageserver/src/tenant/ephemeral_file.rs @@ -17,9 +17,9 @@ use pageserver_api::shard::TenantShardId; use tokio_epoll_uring::{BoundedBuf, Slice}; use tracing::error; -use std::io; use std::sync::atomic::AtomicU64; use std::sync::Arc; +use std::{io, u64}; use utils::id::TimelineId; pub struct EphemeralFile { @@ -170,8 +170,10 @@ impl super::storage_layer::inmemory_layer::vectored_dio_read::File for Ephemeral ) -> std::io::Result<(tokio_epoll_uring::Slice, usize)> { let flushed_offset = self.buffered_writer.bytes_written(); - let buffer = self.buffered_writer.inspect_buffer(); - let buffered = &buffer[0..buffer.pending()]; + let mutable = self.buffered_writer.inspect_mutable(); + let mutable = &mutable[0..mutable.pending()]; + + let maybe_flushed = self.buffered_writer.inspect_maybe_flushed(); let dst_cap = dst.bytes_total().into_u64(); let end = { @@ -196,8 +198,34 @@ impl super::storage_layer::inmemory_layer::vectored_dio_read::File for Ephemeral } } } - let written_range = Range(start, std::cmp::min(end, flushed_offset)); - let buffered_range = Range(std::cmp::max(start, flushed_offset), end); + + // [ written ][ maybe_flushed ][ mutable ] + // |- TAIL_SZ -||- TAIL_SZ -| + // ^ + // `flushed_offset` + // + let (written_range, maybe_flushed_range) = { + if maybe_flushed.is_some() { + ( + Range( + start, + std::cmp::min(end, flushed_offset.saturating_sub(TAIL_SZ as u64)), + ), + Range( + std::cmp::max(start, flushed_offset.saturating_sub(TAIL_SZ as u64)), + std::cmp::min(end, flushed_offset), + ), + ) + } else { + ( + Range(start, std::cmp::min(end, flushed_offset)), + // zero len + Range(flushed_offset, u64::MIN), + ) + } + }; + + let mutable_range = Range(std::cmp::max(start, flushed_offset), end); let dst = if written_range.len() > 0 { let file: &VirtualFile = self.buffered_writer.as_inner(); @@ -210,20 +238,44 @@ impl super::storage_layer::inmemory_layer::vectored_dio_read::File for Ephemeral dst }; - let dst = if buffered_range.len() > 0 { - let offset_in_buffer = buffered_range + let dst = if maybe_flushed_range.len() > 0 { + let offset_in_buffer = maybe_flushed_range + .0 + .checked_sub(flushed_offset.saturating_sub(TAIL_SZ as u64)) + .unwrap() + .into_usize(); + // Checked previously the buffer is Some. + let maybe_flushed = maybe_flushed.unwrap(); + let to_copy = &maybe_flushed + [offset_in_buffer..(offset_in_buffer + maybe_flushed_range.len().into_usize())]; + let bounds = dst.bounds(); + let mut view = dst.slice({ + let start = written_range.len().into_usize(); + let end = start + .checked_add(maybe_flushed_range.len().into_usize()) + .unwrap(); + start..end + }); + view.as_mut_rust_slice_full_zeroed() + .copy_from_slice(to_copy); + Slice::from_buf_bounds(Slice::into_inner(view), bounds) + } else { + dst + }; + + let dst = if mutable_range.len() > 0 { + let offset_in_buffer = mutable_range .0 .checked_sub(flushed_offset) .unwrap() .into_usize(); let to_copy = - &buffered[offset_in_buffer..(offset_in_buffer + buffered_range.len().into_usize())]; + &mutable[offset_in_buffer..(offset_in_buffer + mutable_range.len().into_usize())]; let bounds = dst.bounds(); let mut view = dst.slice({ - let start = written_range.len().into_usize(); - let end = start - .checked_add(buffered_range.len().into_usize()) - .unwrap(); + let start = + written_range.len().into_usize() + maybe_flushed_range.len().into_usize(); + let end = start.checked_add(mutable_range.len().into_usize()).unwrap(); start..end }); view.as_mut_rust_slice_full_zeroed() @@ -330,7 +382,7 @@ mod tests { .await .unwrap(); - let cap = file.buffered_writer.inspect_buffer().capacity(); + let cap = file.buffered_writer.inspect_mutable().capacity(); let write_nbytes = cap + cap / 2; @@ -361,7 +413,7 @@ mod tests { let file_contents = std::fs::read(file.buffered_writer.as_inner().path()).unwrap(); assert_eq!(file_contents, &content[0..cap]); - let buffer_contents = file.buffered_writer.inspect_buffer(); + let buffer_contents = file.buffered_writer.inspect_mutable(); assert_eq!(buffer_contents, &content[cap..write_nbytes]); } @@ -376,7 +428,7 @@ mod tests { .await .unwrap(); - let cap = file.buffered_writer.inspect_buffer().capacity(); + let cap = file.buffered_writer.inspect_mutable().capacity(); let content: Vec = rand::thread_rng() .sample_iter(rand::distributions::Standard) @@ -397,7 +449,7 @@ mod tests { "buffered writer does one write if we write 1.5x buffer capacity" ); assert_eq!( - &file.buffered_writer.inspect_buffer()[0..cap / 2], + &file.buffered_writer.inspect_mutable()[0..cap / 2], &content[cap..cap + cap / 2] ); } @@ -419,7 +471,7 @@ mod tests { .await .unwrap(); - let cap = file.buffered_writer.inspect_buffer().capacity(); + let cap = file.buffered_writer.inspect_mutable().capacity(); let content: Vec = rand::thread_rng() .sample_iter(rand::distributions::Standard) diff --git a/pageserver/src/virtual_file/owned_buffers_io/write.rs b/pageserver/src/virtual_file/owned_buffers_io/write.rs index 6e478d840b..cf0157bd75 100644 --- a/pageserver/src/virtual_file/owned_buffers_io/write.rs +++ b/pageserver/src/virtual_file/owned_buffers_io/write.rs @@ -74,8 +74,13 @@ where } /// Panics if used after any of the write paths returned an error - pub fn inspect_buffer(&self) -> &B { - self.buf() + pub fn inspect_mutable(&self) -> &B { + self.mutable() + } + + /// Gets a reference to the maybe flushed read-only buffer. + pub fn inspect_maybe_flushed(&self) -> Option<&Buf> { + self.flush_handle.maybe_flushed.as_ref() } #[cfg_attr(target_os = "macos", allow(dead_code))] @@ -96,8 +101,9 @@ where Ok((bytes_amount, writer)) } + /// Gets a reference to the mutable in-memory buffer. #[inline(always)] - fn buf(&self) -> &B { + fn mutable(&self) -> &B { self.mutable .as_ref() .expect("must not use after we returned an error") @@ -114,7 +120,7 @@ where let chunk_len = chunk.len(); // avoid memcpy for the middle of the chunk - if chunk.len() >= self.buf().cap() { + if chunk.len() >= self.mutable().cap() { // TODO(yuchen): do we still want to keep this? self.flush(ctx).await?; // do a big write, bypassing `buf` @@ -133,7 +139,7 @@ where return Ok((chunk_len, chunk)); } // in-memory copy the < BUFFER_SIZED tail of the chunk - assert!(chunk.len() < self.buf().cap()); + assert!(chunk.len() < self.mutable().cap()); let mut slice = &chunk[..]; while !slice.is_empty() { let buf = self.mutable.as_mut().expect("must not use after an error"); diff --git a/pageserver/src/virtual_file/owned_buffers_io/write/flush.rs b/pageserver/src/virtual_file/owned_buffers_io/write/flush.rs index 35057ba4bf..9b5f36a5d3 100644 --- a/pageserver/src/virtual_file/owned_buffers_io/write/flush.rs +++ b/pageserver/src/virtual_file/owned_buffers_io/write/flush.rs @@ -54,7 +54,7 @@ pub struct FlushHandleInner { pub struct FlushHandle { inner: Option>, /// Buffer for serving tail reads. - maybe_flushed: Option, + pub(super) maybe_flushed: Option, } pub struct FlushBackgroundTask { @@ -83,8 +83,8 @@ where } /// Runs the background flush task. - async fn run(mut self, buf: FullSlice) -> std::io::Result> { - self.channel.send(buf).await.map_err(|_| { + async fn run(mut self, slice: FullSlice) -> std::io::Result> { + self.channel.send(slice).await.map_err(|_| { std::io::Error::new(std::io::ErrorKind::BrokenPipe, "flush handle closed early") })?;