From df4571fef17f3a77fa44212503be7aae9831d76f Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Thu, 22 Aug 2024 16:15:16 +0000 Subject: [PATCH] more renaming to read_exact_at_eof_ok & reuse File::read_exact_at_eof_ok for load_to_vec --- pageserver/src/tenant/ephemeral_file.rs | 131 ++++++++---------- .../tenant/storage_layer/inmemory_layer.rs | 13 +- .../inmemory_layer/vectored_dio_read.rs | 34 +++-- 3 files changed, 84 insertions(+), 94 deletions(-) diff --git a/pageserver/src/tenant/ephemeral_file.rs b/pageserver/src/tenant/ephemeral_file.rs index a9de3876fb..162ff53915 100644 --- a/pageserver/src/tenant/ephemeral_file.rs +++ b/pageserver/src/tenant/ephemeral_file.rs @@ -5,6 +5,7 @@ use crate::assert_u64_eq_usize::{U64IsUsize, UsizeIsU64}; use crate::config::PageServerConf; use crate::context::RequestContext; use crate::page_cache; +use crate::tenant::storage_layer::inmemory_layer::vectored_dio_read::File; use crate::virtual_file::owned_buffers_io::slice::SliceMutExt; use crate::virtual_file::owned_buffers_io::util::size_tracking_writer; use crate::virtual_file::owned_buffers_io::write::Buffer; @@ -13,7 +14,7 @@ use bytes::BytesMut; use camino::Utf8PathBuf; use num_traits::Num; use pageserver_api::shard::TenantShardId; -use tokio_epoll_uring::{BoundedBuf, IoBufMut, Slice}; +use tokio_epoll_uring::{BoundedBuf, Slice}; use tracing::error; use std::io; @@ -107,42 +108,62 @@ impl EphemeralFile { } pub(crate) async fn load_to_vec(&self, ctx: &RequestContext) -> Result, io::Error> { - let size = usize::try_from(self.len()).unwrap(); + let size = self.len().into_usize(); let vec = Vec::with_capacity(size); - - // read from disk what we've already flushed - let file_size_tracker = self.buffered_writer.as_inner(); - let flushed_offset = usize::try_from(file_size_tracker.bytes_written()).unwrap(); - let flushed_range = 0..flushed_offset; - let file: &VirtualFile = file_size_tracker.as_inner(); - let mut vec = file - .read_exact_at( - vec.slice(0..(flushed_range.end - flushed_range.start)), - u64::try_from(flushed_range.start).unwrap(), - ctx, - ) - .await? - .into_inner(); - - // copy from in-memory buffer what we haven't flushed yet but would return when accessed via read_blk - let buffer = self.buffered_writer.inspect_buffer(); - let buffered = &buffer[0..buffer.pending()]; - vec.extend_from_slice(buffered); - assert_eq!(vec.len(), size); + let (slice, nread) = self.read_exact_at_eof_ok(0, vec.slice_full(), ctx).await?; + assert_eq!(nread, size); + let vec = slice.into_inner(); + assert_eq!(vec.len(), nread); + assert_eq!(vec.capacity(), size, "we shouldn't be reallocating"); Ok(vec) } - /// Fill dst will dst.bytes_total() bytes from the bytes written to the buffered writer from offset `start` and later. - /// If `dst` is larger than the available bytes, the read will be short. - /// The read will never be short for other reasons. - /// The number of bytes read into `dst` is returned as part of the result tuple. - /// No guarantees are made about the remaining bytes in `dst`, i.e., assume their contents are random. - pub(crate) async fn read_at_to_end( - &self, - start: u64, - dst: Slice, + /// Returns the offset at which the first byte of the input was written, for use + /// in constructing indices over the written value. + /// + /// Panics if the write is short because there's no way we can recover from that. + /// TODO: make upstack handle this as an error. + pub(crate) async fn write_raw( + &mut self, + srcbuf: &[u8], ctx: &RequestContext, - ) -> std::io::Result<(Slice, usize)> { + ) -> std::io::Result { + let pos = self.bytes_written; + + let new_bytes_written = pos.checked_add(srcbuf.len().into_u64()).ok_or_else(|| { + std::io::Error::new( + std::io::ErrorKind::Other, + format!( + "write would grow EphemeralFile beyond u64::MAX: len={pos} writen={srcbuf_len}", + srcbuf_len = srcbuf.len(), + ), + ) + })?; + + // Write the payload + let nwritten = self + .buffered_writer + .write_buffered_borrowed(srcbuf, ctx) + .await?; + assert_eq!( + nwritten, + srcbuf.len(), + "buffered writer has no short writes" + ); + + self.bytes_written = new_bytes_written; + + Ok(pos) + } +} + +impl super::storage_layer::inmemory_layer::vectored_dio_read::File for EphemeralFile { + async fn read_exact_at_eof_ok<'a, 'b, B: tokio_epoll_uring::IoBufMut + Send>( + &'b self, + start: u64, + dst: tokio_epoll_uring::Slice, + ctx: &'a RequestContext, + ) -> std::io::Result<(tokio_epoll_uring::Slice, usize)> { let file_size_tracking_writer = self.buffered_writer.as_inner(); let flushed_offset = file_size_tracking_writer.bytes_written(); @@ -214,44 +235,6 @@ impl EphemeralFile { Ok((dst, (end - start).into_usize())) } - - /// Returns the offset at which the first byte of the input was written, for use - /// in constructing indices over the written value. - /// - /// Panics if the write is short because there's no way we can recover from that. - /// TODO: make upstack handle this as an error. - pub(crate) async fn write_raw( - &mut self, - srcbuf: &[u8], - ctx: &RequestContext, - ) -> std::io::Result { - let pos = self.bytes_written; - - let new_bytes_written = pos.checked_add(srcbuf.len().into_u64()).ok_or_else(|| { - std::io::Error::new( - std::io::ErrorKind::Other, - format!( - "write would grow EphemeralFile beyond u64::MAX: len={pos} writen={srcbuf_len}", - srcbuf_len = srcbuf.len(), - ), - ) - })?; - - // Write the payload - let nwritten = self - .buffered_writer - .write_buffered_borrowed(srcbuf, ctx) - .await?; - assert_eq!( - nwritten, - srcbuf.len(), - "buffered writer has no short writes" - ); - - self.bytes_written = new_bytes_written; - - Ok(pos) - } } /// Does the given filename look like an ephemeral file? @@ -365,7 +348,7 @@ mod tests { assert_eq!(value_offsets[i], i.into_u64()); let buf = Vec::with_capacity(1); let (buf_slice, nread) = file - .read_at_to_end(i.into_u64(), buf.slice_full(), &ctx) + .read_exact_at_eof_ok(i.into_u64(), buf.slice_full(), &ctx) .await .unwrap(); let buf = buf_slice.into_inner(); @@ -456,7 +439,11 @@ mod tests { let content = &content; async move { let (buf, nread) = file - .read_at_to_end(start.into_u64(), Vec::with_capacity(len).slice_full(), ctx) + .read_exact_at_eof_ok( + start.into_u64(), + Vec::with_capacity(len).slice_full(), + ctx, + ) .await .unwrap(); assert_eq!(nread, len); diff --git a/pageserver/src/tenant/storage_layer/inmemory_layer.rs b/pageserver/src/tenant/storage_layer/inmemory_layer.rs index e7494fcb1b..a6a7783972 100644 --- a/pageserver/src/tenant/storage_layer/inmemory_layer.rs +++ b/pageserver/src/tenant/storage_layer/inmemory_layer.rs @@ -39,7 +39,7 @@ use super::{ DeltaLayerWriter, PersistentLayerDesc, ValueReconstructSituation, ValuesReconstructState, }; -mod vectored_dio_read; +pub(crate) mod vectored_dio_read; #[derive(Debug, PartialEq, Eq, Clone, Copy, Hash)] pub(crate) struct InMemoryLayerFileId(page_cache::FileId); @@ -449,16 +449,7 @@ impl InMemoryLayer { } // Execute the reads. - impl vectored_dio_read::File for EphemeralFile { - async fn read_at_to_end<'a, 'b, B: tokio_epoll_uring::IoBufMut + Send>( - &'b self, - start: u64, - dst: tokio_epoll_uring::Slice, - ctx: &'a RequestContext, - ) -> std::io::Result<(tokio_epoll_uring::Slice, usize)> { - EphemeralFile::read_at_to_end(self, start, dst, ctx).await - } - } + let f = vectored_dio_read::execute( &inner.file, reads diff --git a/pageserver/src/tenant/storage_layer/inmemory_layer/vectored_dio_read.rs b/pageserver/src/tenant/storage_layer/inmemory_layer/vectored_dio_read.rs index 15dcdeff18..673e8caace 100644 --- a/pageserver/src/tenant/storage_layer/inmemory_layer/vectored_dio_read.rs +++ b/pageserver/src/tenant/storage_layer/inmemory_layer/vectored_dio_read.rs @@ -13,7 +13,18 @@ use crate::{ /// The file interface we require. At runtime, this is a [`crate::tenant::ephemeral_file::EphemeralFile`]. pub trait File: Send { - async fn read_at_to_end<'a, 'b, B: IoBufMut + Send>( + /// Attempt to read the bytes in `self` in range `[start,start+dst.bytes_total())` + /// and return the number of bytes read (let's call it `nread`). + /// The bytes read are placed in `dst`, i.e., `&dst[..nread]` will contain the read bytes. + /// + /// The only reason why the read may be short (i.e., `nread != dst.bytes_total()`) + /// is if the file is shorter than `start+dst.len()`. + /// + /// This is unlike [`std::os::unix::fs::File::read_exact_at`] which returns an + /// [`std::io::ErrorKind::UnexpectedEof`] error if the file is shorter than `start+dst.len()`. + /// + /// No guarantees are made about the remaining bytes in `dst` in case of a short read. + async fn read_exact_at_eof_ok<'a, 'b, B: IoBufMut + Send>( &'b self, start: u64, dst: Slice, @@ -228,7 +239,8 @@ where .expect("we produce chunk_nos by dividing by DIO_CHUNK_SIZE earlier"); let io_buf = get_io_buffer(nchunks).slice_full(); let req_len = io_buf.len(); - let (io_buf_slice, nread) = match file.read_at_to_end(read_offset, io_buf, ctx).await { + let (io_buf_slice, nread) = match file.read_exact_at_eof_ok(read_offset, io_buf, ctx).await + { Ok(t) => t, Err(e) => { let e = Arc::new(e); @@ -442,7 +454,7 @@ mod tests { let file = InMemoryFile::new_random(10); let test_read = |pos, len| { let buf = vec![0; len]; - let fut = file.read_at_to_end(pos, buf.slice_full(), &ctx); + let fut = file.read_exact_at_eof_ok(pos, buf.slice_full(), &ctx); use futures::FutureExt; let (slice, nread) = fut .now_or_never() @@ -460,7 +472,7 @@ mod tests { } impl File for InMemoryFile { - async fn read_at_to_end<'a, 'b, B: IoBufMut + Send>( + async fn read_exact_at_eof_ok<'a, 'b, B: IoBufMut + Send>( &'b self, start: u64, mut dst: Slice, @@ -591,13 +603,13 @@ mod tests { } impl<'x> File for RecorderFile<'x> { - async fn read_at_to_end<'a, 'b, B: IoBufMut + Send>( + async fn read_exact_at_eof_ok<'a, 'b, B: IoBufMut + Send>( &'b self, start: u64, dst: Slice, ctx: &'a RequestContext, ) -> std::io::Result<(Slice, usize)> { - let (dst, nread) = self.file.read_at_to_end(start, dst, ctx).await?; + let (dst, nread) = self.file.read_exact_at_eof_ok(start, dst, ctx).await?; self.recorded.borrow_mut().push(RecordedRead { pos: start, req_len: dst.bytes_total(), @@ -721,7 +733,7 @@ mod tests { } impl File for MockFile { - async fn read_at_to_end<'a, 'b, B: IoBufMut + Send>( + async fn read_exact_at_eof_ok<'a, 'b, B: IoBufMut + Send>( &'b self, start: u64, mut dst: Slice, @@ -766,7 +778,7 @@ mod tests { let buf = Vec::with_capacity(512); let (buf, nread) = mock_file - .read_at_to_end(0, buf.slice_full(), &ctx) + .read_exact_at_eof_ok(0, buf.slice_full(), &ctx) .await .unwrap(); assert_eq!(nread, 512); @@ -774,7 +786,7 @@ mod tests { let buf = Vec::with_capacity(512); let (buf, nread) = mock_file - .read_at_to_end(512, buf.slice_full(), &ctx) + .read_exact_at_eof_ok(512, buf.slice_full(), &ctx) .await .unwrap(); assert_eq!(nread, 512); @@ -782,7 +794,7 @@ mod tests { let buf = Vec::with_capacity(512); let (buf, nread) = mock_file - .read_at_to_end(1024, buf.slice_full(), &ctx) + .read_exact_at_eof_ok(1024, buf.slice_full(), &ctx) .await .unwrap(); assert_eq!(nread, 10); @@ -790,7 +802,7 @@ mod tests { let buf = Vec::with_capacity(1024); let err = mock_file - .read_at_to_end(2048, buf.slice_full(), &ctx) + .read_exact_at_eof_ok(2048, buf.slice_full(), &ctx) .await .err() .unwrap();