more renaming to read_exact_at_eof_ok & reuse File::read_exact_at_eof_ok for load_to_vec

This commit is contained in:
Christian Schwarz
2024-08-22 16:15:16 +00:00
parent 2d066838c1
commit df4571fef1
3 changed files with 84 additions and 94 deletions

View File

@@ -5,6 +5,7 @@ use crate::assert_u64_eq_usize::{U64IsUsize, UsizeIsU64};
use crate::config::PageServerConf;
use crate::context::RequestContext;
use crate::page_cache;
use crate::tenant::storage_layer::inmemory_layer::vectored_dio_read::File;
use crate::virtual_file::owned_buffers_io::slice::SliceMutExt;
use crate::virtual_file::owned_buffers_io::util::size_tracking_writer;
use crate::virtual_file::owned_buffers_io::write::Buffer;
@@ -13,7 +14,7 @@ use bytes::BytesMut;
use camino::Utf8PathBuf;
use num_traits::Num;
use pageserver_api::shard::TenantShardId;
use tokio_epoll_uring::{BoundedBuf, IoBufMut, Slice};
use tokio_epoll_uring::{BoundedBuf, Slice};
use tracing::error;
use std::io;
@@ -107,42 +108,62 @@ impl EphemeralFile {
}
pub(crate) async fn load_to_vec(&self, ctx: &RequestContext) -> Result<Vec<u8>, io::Error> {
let size = usize::try_from(self.len()).unwrap();
let size = self.len().into_usize();
let vec = Vec::with_capacity(size);
// read from disk what we've already flushed
let file_size_tracker = self.buffered_writer.as_inner();
let flushed_offset = usize::try_from(file_size_tracker.bytes_written()).unwrap();
let flushed_range = 0..flushed_offset;
let file: &VirtualFile = file_size_tracker.as_inner();
let mut vec = file
.read_exact_at(
vec.slice(0..(flushed_range.end - flushed_range.start)),
u64::try_from(flushed_range.start).unwrap(),
ctx,
)
.await?
.into_inner();
// copy from in-memory buffer what we haven't flushed yet but would return when accessed via read_blk
let buffer = self.buffered_writer.inspect_buffer();
let buffered = &buffer[0..buffer.pending()];
vec.extend_from_slice(buffered);
assert_eq!(vec.len(), size);
let (slice, nread) = self.read_exact_at_eof_ok(0, vec.slice_full(), ctx).await?;
assert_eq!(nread, size);
let vec = slice.into_inner();
assert_eq!(vec.len(), nread);
assert_eq!(vec.capacity(), size, "we shouldn't be reallocating");
Ok(vec)
}
/// Fill dst will dst.bytes_total() bytes from the bytes written to the buffered writer from offset `start` and later.
/// If `dst` is larger than the available bytes, the read will be short.
/// The read will never be short for other reasons.
/// The number of bytes read into `dst` is returned as part of the result tuple.
/// No guarantees are made about the remaining bytes in `dst`, i.e., assume their contents are random.
pub(crate) async fn read_at_to_end<B: IoBufMut + Send>(
&self,
start: u64,
dst: Slice<B>,
/// Returns the offset at which the first byte of the input was written, for use
/// in constructing indices over the written value.
///
/// Panics if the write is short because there's no way we can recover from that.
/// TODO: make upstack handle this as an error.
pub(crate) async fn write_raw(
&mut self,
srcbuf: &[u8],
ctx: &RequestContext,
) -> std::io::Result<(Slice<B>, usize)> {
) -> std::io::Result<u64> {
let pos = self.bytes_written;
let new_bytes_written = pos.checked_add(srcbuf.len().into_u64()).ok_or_else(|| {
std::io::Error::new(
std::io::ErrorKind::Other,
format!(
"write would grow EphemeralFile beyond u64::MAX: len={pos} writen={srcbuf_len}",
srcbuf_len = srcbuf.len(),
),
)
})?;
// Write the payload
let nwritten = self
.buffered_writer
.write_buffered_borrowed(srcbuf, ctx)
.await?;
assert_eq!(
nwritten,
srcbuf.len(),
"buffered writer has no short writes"
);
self.bytes_written = new_bytes_written;
Ok(pos)
}
}
impl super::storage_layer::inmemory_layer::vectored_dio_read::File for EphemeralFile {
async fn read_exact_at_eof_ok<'a, 'b, B: tokio_epoll_uring::IoBufMut + Send>(
&'b self,
start: u64,
dst: tokio_epoll_uring::Slice<B>,
ctx: &'a RequestContext,
) -> std::io::Result<(tokio_epoll_uring::Slice<B>, usize)> {
let file_size_tracking_writer = self.buffered_writer.as_inner();
let flushed_offset = file_size_tracking_writer.bytes_written();
@@ -214,44 +235,6 @@ impl EphemeralFile {
Ok((dst, (end - start).into_usize()))
}
/// Returns the offset at which the first byte of the input was written, for use
/// in constructing indices over the written value.
///
/// Panics if the write is short because there's no way we can recover from that.
/// TODO: make upstack handle this as an error.
pub(crate) async fn write_raw(
&mut self,
srcbuf: &[u8],
ctx: &RequestContext,
) -> std::io::Result<u64> {
let pos = self.bytes_written;
let new_bytes_written = pos.checked_add(srcbuf.len().into_u64()).ok_or_else(|| {
std::io::Error::new(
std::io::ErrorKind::Other,
format!(
"write would grow EphemeralFile beyond u64::MAX: len={pos} writen={srcbuf_len}",
srcbuf_len = srcbuf.len(),
),
)
})?;
// Write the payload
let nwritten = self
.buffered_writer
.write_buffered_borrowed(srcbuf, ctx)
.await?;
assert_eq!(
nwritten,
srcbuf.len(),
"buffered writer has no short writes"
);
self.bytes_written = new_bytes_written;
Ok(pos)
}
}
/// Does the given filename look like an ephemeral file?
@@ -365,7 +348,7 @@ mod tests {
assert_eq!(value_offsets[i], i.into_u64());
let buf = Vec::with_capacity(1);
let (buf_slice, nread) = file
.read_at_to_end(i.into_u64(), buf.slice_full(), &ctx)
.read_exact_at_eof_ok(i.into_u64(), buf.slice_full(), &ctx)
.await
.unwrap();
let buf = buf_slice.into_inner();
@@ -456,7 +439,11 @@ mod tests {
let content = &content;
async move {
let (buf, nread) = file
.read_at_to_end(start.into_u64(), Vec::with_capacity(len).slice_full(), ctx)
.read_exact_at_eof_ok(
start.into_u64(),
Vec::with_capacity(len).slice_full(),
ctx,
)
.await
.unwrap();
assert_eq!(nread, len);

View File

@@ -39,7 +39,7 @@ use super::{
DeltaLayerWriter, PersistentLayerDesc, ValueReconstructSituation, ValuesReconstructState,
};
mod vectored_dio_read;
pub(crate) mod vectored_dio_read;
#[derive(Debug, PartialEq, Eq, Clone, Copy, Hash)]
pub(crate) struct InMemoryLayerFileId(page_cache::FileId);
@@ -449,16 +449,7 @@ impl InMemoryLayer {
}
// Execute the reads.
impl vectored_dio_read::File for EphemeralFile {
async fn read_at_to_end<'a, 'b, B: tokio_epoll_uring::IoBufMut + Send>(
&'b self,
start: u64,
dst: tokio_epoll_uring::Slice<B>,
ctx: &'a RequestContext,
) -> std::io::Result<(tokio_epoll_uring::Slice<B>, usize)> {
EphemeralFile::read_at_to_end(self, start, dst, ctx).await
}
}
let f = vectored_dio_read::execute(
&inner.file,
reads

View File

@@ -13,7 +13,18 @@ use crate::{
/// The file interface we require. At runtime, this is a [`crate::tenant::ephemeral_file::EphemeralFile`].
pub trait File: Send {
async fn read_at_to_end<'a, 'b, B: IoBufMut + Send>(
/// Attempt to read the bytes in `self` in range `[start,start+dst.bytes_total())`
/// and return the number of bytes read (let's call it `nread`).
/// The bytes read are placed in `dst`, i.e., `&dst[..nread]` will contain the read bytes.
///
/// The only reason why the read may be short (i.e., `nread != dst.bytes_total()`)
/// is if the file is shorter than `start+dst.len()`.
///
/// This is unlike [`std::os::unix::fs::File::read_exact_at`] which returns an
/// [`std::io::ErrorKind::UnexpectedEof`] error if the file is shorter than `start+dst.len()`.
///
/// No guarantees are made about the remaining bytes in `dst` in case of a short read.
async fn read_exact_at_eof_ok<'a, 'b, B: IoBufMut + Send>(
&'b self,
start: u64,
dst: Slice<B>,
@@ -228,7 +239,8 @@ where
.expect("we produce chunk_nos by dividing by DIO_CHUNK_SIZE earlier");
let io_buf = get_io_buffer(nchunks).slice_full();
let req_len = io_buf.len();
let (io_buf_slice, nread) = match file.read_at_to_end(read_offset, io_buf, ctx).await {
let (io_buf_slice, nread) = match file.read_exact_at_eof_ok(read_offset, io_buf, ctx).await
{
Ok(t) => t,
Err(e) => {
let e = Arc::new(e);
@@ -442,7 +454,7 @@ mod tests {
let file = InMemoryFile::new_random(10);
let test_read = |pos, len| {
let buf = vec![0; len];
let fut = file.read_at_to_end(pos, buf.slice_full(), &ctx);
let fut = file.read_exact_at_eof_ok(pos, buf.slice_full(), &ctx);
use futures::FutureExt;
let (slice, nread) = fut
.now_or_never()
@@ -460,7 +472,7 @@ mod tests {
}
impl File for InMemoryFile {
async fn read_at_to_end<'a, 'b, B: IoBufMut + Send>(
async fn read_exact_at_eof_ok<'a, 'b, B: IoBufMut + Send>(
&'b self,
start: u64,
mut dst: Slice<B>,
@@ -591,13 +603,13 @@ mod tests {
}
impl<'x> File for RecorderFile<'x> {
async fn read_at_to_end<'a, 'b, B: IoBufMut + Send>(
async fn read_exact_at_eof_ok<'a, 'b, B: IoBufMut + Send>(
&'b self,
start: u64,
dst: Slice<B>,
ctx: &'a RequestContext,
) -> std::io::Result<(Slice<B>, usize)> {
let (dst, nread) = self.file.read_at_to_end(start, dst, ctx).await?;
let (dst, nread) = self.file.read_exact_at_eof_ok(start, dst, ctx).await?;
self.recorded.borrow_mut().push(RecordedRead {
pos: start,
req_len: dst.bytes_total(),
@@ -721,7 +733,7 @@ mod tests {
}
impl File for MockFile {
async fn read_at_to_end<'a, 'b, B: IoBufMut + Send>(
async fn read_exact_at_eof_ok<'a, 'b, B: IoBufMut + Send>(
&'b self,
start: u64,
mut dst: Slice<B>,
@@ -766,7 +778,7 @@ mod tests {
let buf = Vec::with_capacity(512);
let (buf, nread) = mock_file
.read_at_to_end(0, buf.slice_full(), &ctx)
.read_exact_at_eof_ok(0, buf.slice_full(), &ctx)
.await
.unwrap();
assert_eq!(nread, 512);
@@ -774,7 +786,7 @@ mod tests {
let buf = Vec::with_capacity(512);
let (buf, nread) = mock_file
.read_at_to_end(512, buf.slice_full(), &ctx)
.read_exact_at_eof_ok(512, buf.slice_full(), &ctx)
.await
.unwrap();
assert_eq!(nread, 512);
@@ -782,7 +794,7 @@ mod tests {
let buf = Vec::with_capacity(512);
let (buf, nread) = mock_file
.read_at_to_end(1024, buf.slice_full(), &ctx)
.read_exact_at_eof_ok(1024, buf.slice_full(), &ctx)
.await
.unwrap();
assert_eq!(nread, 10);
@@ -790,7 +802,7 @@ mod tests {
let buf = Vec::with_capacity(1024);
let err = mock_file
.read_at_to_end(2048, buf.slice_full(), &ctx)
.read_exact_at_eof_ok(2048, buf.slice_full(), &ctx)
.await
.err()
.unwrap();