From 168913bdf0aa9665099b4bba2cf891ce8d48f691 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Wed, 14 Aug 2024 21:57:17 +0200 Subject: [PATCH] refactor(write path): newtype to enforce use of fully initialized slices (#8717) The `tokio_epoll_uring::Slice` / `tokio_uring::Slice` type is weird. The new `FullSlice` newtype is better. See the doc comment for details. The naming is not ideal, but we'll clean that up in a future refactoring where we move the `FullSlice` into `tokio_epoll_uring`. Then, we'll do the following: * tokio_epoll_uring::Slice is removed * `FullSlice` becomes `tokio_epoll_uring::IoBufView` * new type `tokio_epoll_uring::IoBufMutView` for the current `tokio_epoll_uring::Slice` Context ------- I did this work in preparation for https://github.com/neondatabase/neon/pull/8537. There, I'm changing the type that the `inmemory_layer.rs` passes to `DeltaLayerWriter::put_value_bytes` and thus it seemed like a good opportunity to make this cleanup first. --- pageserver/src/tenant/blob_io.rs | 93 ++++++++------ .../src/tenant/ephemeral_file/page_caching.rs | 31 ++--- .../zero_padded_read_write/zero_padded.rs | 6 +- .../tenant/remote_timeline_client/download.rs | 5 +- .../src/tenant/storage_layer/delta_layer.rs | 40 ++++-- .../src/tenant/storage_layer/image_layer.rs | 9 +- .../tenant/storage_layer/inmemory_layer.rs | 25 +++- pageserver/src/virtual_file.rs | 120 +++++++++--------- pageserver/src/virtual_file/io_engine.rs | 19 ++- .../owned_buffers_io/io_buf_ext.rs | 78 ++++++++++++ .../virtual_file/owned_buffers_io/slice.rs | 4 +- .../util/size_tracking_writer.rs | 13 +- .../virtual_file/owned_buffers_io/write.rs | 71 +++++------ 13 files changed, 310 insertions(+), 204 deletions(-) create mode 100644 pageserver/src/virtual_file/owned_buffers_io/io_buf_ext.rs diff --git a/pageserver/src/tenant/blob_io.rs b/pageserver/src/tenant/blob_io.rs index 8e9d349ca8..a245c99a88 100644 --- a/pageserver/src/tenant/blob_io.rs +++ b/pageserver/src/tenant/blob_io.rs @@ -24,6 +24,7 @@ use tracing::warn; use crate::context::RequestContext; use crate::page_cache::PAGE_SZ; use crate::tenant::block_io::BlockCursor; +use crate::virtual_file::owned_buffers_io::io_buf_ext::{FullSlice, IoBufExt}; use crate::virtual_file::VirtualFile; use std::cmp::min; use std::io::{Error, ErrorKind}; @@ -186,11 +187,11 @@ impl BlobWriter { /// You need to make sure that the internal buffer is empty, otherwise /// data will be written in wrong order. #[inline(always)] - async fn write_all_unbuffered, Buf: IoBuf + Send>( + async fn write_all_unbuffered( &mut self, - src_buf: B, + src_buf: FullSlice, ctx: &RequestContext, - ) -> (B::Buf, Result<(), Error>) { + ) -> (FullSlice, Result<(), Error>) { let (src_buf, res) = self.inner.write_all(src_buf, ctx).await; let nbytes = match res { Ok(nbytes) => nbytes, @@ -204,8 +205,9 @@ impl BlobWriter { /// Flushes the internal buffer to the underlying `VirtualFile`. pub async fn flush_buffer(&mut self, ctx: &RequestContext) -> Result<(), Error> { let buf = std::mem::take(&mut self.buf); - let (mut buf, res) = self.inner.write_all(buf, ctx).await; + let (slice, res) = self.inner.write_all(buf.slice_len(), ctx).await; res?; + let mut buf = slice.into_raw_slice().into_inner(); buf.clear(); self.buf = buf; Ok(()) @@ -222,19 +224,30 @@ impl BlobWriter { } /// Internal, possibly buffered, write function - async fn write_all, Buf: IoBuf + Send>( + async fn write_all( &mut self, - src_buf: B, + src_buf: FullSlice, ctx: &RequestContext, - ) -> (B::Buf, Result<(), Error>) { + ) -> (FullSlice, Result<(), Error>) { + let src_buf = src_buf.into_raw_slice(); + let src_buf_bounds = src_buf.bounds(); + let restore = move |src_buf_slice: Slice<_>| { + FullSlice::must_new(Slice::from_buf_bounds( + src_buf_slice.into_inner(), + src_buf_bounds, + )) + }; + if !BUFFERED { assert!(self.buf.is_empty()); - return self.write_all_unbuffered(src_buf, ctx).await; + return self + .write_all_unbuffered(FullSlice::must_new(src_buf), ctx) + .await; } let remaining = Self::CAPACITY - self.buf.len(); let src_buf_len = src_buf.bytes_init(); if src_buf_len == 0 { - return (Slice::into_inner(src_buf.slice_full()), Ok(())); + return (restore(src_buf), Ok(())); } let mut src_buf = src_buf.slice(0..src_buf_len); // First try to copy as much as we can into the buffer @@ -245,7 +258,7 @@ impl BlobWriter { // Then, if the buffer is full, flush it out if self.buf.len() == Self::CAPACITY { if let Err(e) = self.flush_buffer(ctx).await { - return (Slice::into_inner(src_buf), Err(e)); + return (restore(src_buf), Err(e)); } } // Finally, write the tail of src_buf: @@ -258,27 +271,29 @@ impl BlobWriter { let copied = self.write_into_buffer(&src_buf); // We just verified above that src_buf fits into our internal buffer. assert_eq!(copied, src_buf.len()); - Slice::into_inner(src_buf) + restore(src_buf) } else { - let (src_buf, res) = self.write_all_unbuffered(src_buf, ctx).await; + let (src_buf, res) = self + .write_all_unbuffered(FullSlice::must_new(src_buf), ctx) + .await; if let Err(e) = res { return (src_buf, Err(e)); } src_buf } } else { - Slice::into_inner(src_buf) + restore(src_buf) }; (src_buf, Ok(())) } /// Write a blob of data. Returns the offset that it was written to, /// which can be used to retrieve the data later. - pub async fn write_blob, Buf: IoBuf + Send>( + pub async fn write_blob( &mut self, - srcbuf: B, + srcbuf: FullSlice, ctx: &RequestContext, - ) -> (B::Buf, Result) { + ) -> (FullSlice, Result) { let (buf, res) = self .write_blob_maybe_compressed(srcbuf, ctx, ImageCompressionAlgorithm::Disabled) .await; @@ -287,43 +302,40 @@ impl BlobWriter { /// Write a blob of data. Returns the offset that it was written to, /// which can be used to retrieve the data later. - pub async fn write_blob_maybe_compressed, Buf: IoBuf + Send>( + pub(crate) async fn write_blob_maybe_compressed( &mut self, - srcbuf: B, + srcbuf: FullSlice, ctx: &RequestContext, algorithm: ImageCompressionAlgorithm, - ) -> (B::Buf, Result<(u64, CompressionInfo), Error>) { + ) -> (FullSlice, Result<(u64, CompressionInfo), Error>) { let offset = self.offset; let mut compression_info = CompressionInfo { written_compressed: false, compressed_size: None, }; - let len = srcbuf.bytes_init(); + let len = srcbuf.len(); let mut io_buf = self.io_buf.take().expect("we always put it back below"); io_buf.clear(); let mut compressed_buf = None; - let ((io_buf, hdr_res), srcbuf) = async { + let ((io_buf_slice, hdr_res), srcbuf) = async { if len < 128 { // Short blob. Write a 1-byte length header io_buf.put_u8(len as u8); - ( - self.write_all(io_buf, ctx).await, - srcbuf.slice_full().into_inner(), - ) + (self.write_all(io_buf.slice_len(), ctx).await, srcbuf) } else { // Write a 4-byte length header if len > MAX_SUPPORTED_LEN { return ( ( - io_buf, + io_buf.slice_len(), Err(Error::new( ErrorKind::Other, format!("blob too large ({len} bytes)"), )), ), - srcbuf.slice_full().into_inner(), + srcbuf, ); } let (high_bit_mask, len_written, srcbuf) = match algorithm { @@ -336,8 +348,7 @@ impl BlobWriter { } else { async_compression::tokio::write::ZstdEncoder::new(Vec::new()) }; - let slice = srcbuf.slice_full(); - encoder.write_all(&slice[..]).await.unwrap(); + encoder.write_all(&srcbuf[..]).await.unwrap(); encoder.shutdown().await.unwrap(); let compressed = encoder.into_inner(); compression_info.compressed_size = Some(compressed.len()); @@ -345,31 +356,29 @@ impl BlobWriter { compression_info.written_compressed = true; let compressed_len = compressed.len(); compressed_buf = Some(compressed); - (BYTE_ZSTD, compressed_len, slice.into_inner()) + (BYTE_ZSTD, compressed_len, srcbuf) } else { - (BYTE_UNCOMPRESSED, len, slice.into_inner()) + (BYTE_UNCOMPRESSED, len, srcbuf) } } - ImageCompressionAlgorithm::Disabled => { - (BYTE_UNCOMPRESSED, len, srcbuf.slice_full().into_inner()) - } + ImageCompressionAlgorithm::Disabled => (BYTE_UNCOMPRESSED, len, srcbuf), }; let mut len_buf = (len_written as u32).to_be_bytes(); assert_eq!(len_buf[0] & 0xf0, 0); len_buf[0] |= high_bit_mask; io_buf.extend_from_slice(&len_buf[..]); - (self.write_all(io_buf, ctx).await, srcbuf) + (self.write_all(io_buf.slice_len(), ctx).await, srcbuf) } } .await; - self.io_buf = Some(io_buf); + self.io_buf = Some(io_buf_slice.into_raw_slice().into_inner()); match hdr_res { Ok(_) => (), - Err(e) => return (Slice::into_inner(srcbuf.slice(..)), Err(e)), + Err(e) => return (srcbuf, Err(e)), } let (srcbuf, res) = if let Some(compressed_buf) = compressed_buf { - let (_buf, res) = self.write_all(compressed_buf, ctx).await; - (Slice::into_inner(srcbuf.slice(..)), res) + let (_buf, res) = self.write_all(compressed_buf.slice_len(), ctx).await; + (srcbuf, res) } else { self.write_all(srcbuf, ctx).await }; @@ -432,21 +441,21 @@ pub(crate) mod tests { let (_, res) = if compression { let res = wtr .write_blob_maybe_compressed( - blob.clone(), + blob.clone().slice_len(), ctx, ImageCompressionAlgorithm::Zstd { level: Some(1) }, ) .await; (res.0, res.1.map(|(off, _)| off)) } else { - wtr.write_blob(blob.clone(), ctx).await + wtr.write_blob(blob.clone().slice_len(), ctx).await }; let offs = res?; offsets.push(offs); } // Write out one page worth of zeros so that we can // read again with read_blk - let (_, res) = wtr.write_blob(vec![0; PAGE_SZ], ctx).await; + let (_, res) = wtr.write_blob(vec![0; PAGE_SZ].slice_len(), ctx).await; let offs = res?; println!("Writing final blob at offs={offs}"); wtr.flush_buffer(ctx).await?; diff --git a/pageserver/src/tenant/ephemeral_file/page_caching.rs b/pageserver/src/tenant/ephemeral_file/page_caching.rs index 0a12b64a7c..7355b3b5a3 100644 --- a/pageserver/src/tenant/ephemeral_file/page_caching.rs +++ b/pageserver/src/tenant/ephemeral_file/page_caching.rs @@ -4,6 +4,7 @@ use crate::context::RequestContext; use crate::page_cache::{self, PAGE_SZ}; use crate::tenant::block_io::BlockLease; +use crate::virtual_file::owned_buffers_io::io_buf_ext::FullSlice; use crate::virtual_file::VirtualFile; use once_cell::sync::Lazy; @@ -208,21 +209,11 @@ impl PreWarmingWriter { } impl crate::virtual_file::owned_buffers_io::write::OwnedAsyncWriter for PreWarmingWriter { - async fn write_all< - B: tokio_epoll_uring::BoundedBuf, - Buf: tokio_epoll_uring::IoBuf + Send, - >( + async fn write_all( &mut self, - buf: B, + buf: FullSlice, ctx: &RequestContext, - ) -> std::io::Result<(usize, B::Buf)> { - let buf = buf.slice(..); - let saved_bounds = buf.bounds(); // save for reconstructing the Slice from iobuf after the IO is done - let check_bounds_stuff_works = if cfg!(test) && cfg!(debug_assertions) { - Some(buf.to_vec()) - } else { - None - }; + ) -> std::io::Result<(usize, FullSlice)> { let buflen = buf.len(); assert_eq!( buflen % PAGE_SZ, @@ -231,10 +222,10 @@ impl crate::virtual_file::owned_buffers_io::write::OwnedAsyncWriter for PreWarmi ); // Do the IO. - let iobuf = match self.file.write_all(buf, ctx).await { - (iobuf, Ok(nwritten)) => { + let buf = match self.file.write_all(buf, ctx).await { + (buf, Ok(nwritten)) => { assert_eq!(nwritten, buflen); - iobuf + buf } (_, Err(e)) => { return Err(std::io::Error::new( @@ -248,12 +239,6 @@ impl crate::virtual_file::owned_buffers_io::write::OwnedAsyncWriter for PreWarmi } }; - // Reconstruct the Slice (the write path consumed the Slice and returned us the underlying IoBuf) - let buf = tokio_epoll_uring::Slice::from_buf_bounds(iobuf, saved_bounds); - if let Some(check_bounds_stuff_works) = check_bounds_stuff_works { - assert_eq!(&check_bounds_stuff_works, &*buf); - } - let nblocks = buflen / PAGE_SZ; let nblocks32 = u32::try_from(nblocks).unwrap(); @@ -300,6 +285,6 @@ impl crate::virtual_file::owned_buffers_io::write::OwnedAsyncWriter for PreWarmi } self.nwritten_blocks = self.nwritten_blocks.checked_add(nblocks32).unwrap(); - Ok((buflen, buf.into_inner())) + Ok((buflen, buf)) } } diff --git a/pageserver/src/tenant/ephemeral_file/zero_padded_read_write/zero_padded.rs b/pageserver/src/tenant/ephemeral_file/zero_padded_read_write/zero_padded.rs index f90291bbf8..2dc0277638 100644 --- a/pageserver/src/tenant/ephemeral_file/zero_padded_read_write/zero_padded.rs +++ b/pageserver/src/tenant/ephemeral_file/zero_padded_read_write/zero_padded.rs @@ -5,6 +5,8 @@ use std::mem::MaybeUninit; +use crate::virtual_file::owned_buffers_io::io_buf_ext::FullSlice; + /// See module-level comment. pub struct Buffer { allocation: Box<[u8; N]>, @@ -60,10 +62,10 @@ impl crate::virtual_file::owned_buffers_io::write::Buffer for Bu self.written } - fn flush(self) -> tokio_epoll_uring::Slice { + fn flush(self) -> FullSlice { self.invariants(); let written = self.written; - tokio_epoll_uring::BoundedBuf::slice(self, 0..written) + FullSlice::must_new(tokio_epoll_uring::BoundedBuf::slice(self, 0..written)) } fn reuse_after_flush(iobuf: Self::IoBuf) -> Self { diff --git a/pageserver/src/tenant/remote_timeline_client/download.rs b/pageserver/src/tenant/remote_timeline_client/download.rs index a17b32c983..8199218c3c 100644 --- a/pageserver/src/tenant/remote_timeline_client/download.rs +++ b/pageserver/src/tenant/remote_timeline_client/download.rs @@ -23,6 +23,7 @@ use crate::span::debug_assert_current_span_has_tenant_and_timeline_id; use crate::tenant::remote_timeline_client::{remote_layer_path, remote_timelines_path}; use crate::tenant::storage_layer::LayerName; use crate::tenant::Generation; +use crate::virtual_file::owned_buffers_io::io_buf_ext::IoBufExt; use crate::virtual_file::{on_fatal_io_error, MaybeFatalIo, VirtualFile}; use crate::TEMP_FILE_SUFFIX; use remote_storage::{DownloadError, GenericRemoteStorage, ListingMode, RemotePath}; @@ -219,9 +220,7 @@ async fn download_object<'a>( Ok(chunk) => chunk, Err(e) => return Err(e), }; - buffered - .write_buffered(tokio_epoll_uring::BoundedBuf::slice_full(chunk), ctx) - .await?; + buffered.write_buffered(chunk.slice_len(), ctx).await?; } let size_tracking = buffered.flush_and_into_inner(ctx).await?; Ok(size_tracking.into_inner()) diff --git a/pageserver/src/tenant/storage_layer/delta_layer.rs b/pageserver/src/tenant/storage_layer/delta_layer.rs index 0ed2f72c3f..6c2391d72d 100644 --- a/pageserver/src/tenant/storage_layer/delta_layer.rs +++ b/pageserver/src/tenant/storage_layer/delta_layer.rs @@ -42,6 +42,7 @@ use crate::tenant::vectored_blob_io::{ VectoredReadPlanner, }; use crate::tenant::PageReconstructError; +use crate::virtual_file::owned_buffers_io::io_buf_ext::{FullSlice, IoBufExt}; use crate::virtual_file::{self, VirtualFile}; use crate::{walrecord, TEMP_FILE_SUFFIX}; use crate::{DELTA_FILE_MAGIC, STORAGE_FORMAT_VERSION}; @@ -63,6 +64,7 @@ use std::os::unix::fs::FileExt; use std::str::FromStr; use std::sync::Arc; use tokio::sync::OnceCell; +use tokio_epoll_uring::IoBufMut; use tracing::*; use utils::{ @@ -436,19 +438,28 @@ impl DeltaLayerWriterInner { ctx: &RequestContext, ) -> anyhow::Result<()> { let (_, res) = self - .put_value_bytes(key, lsn, Value::ser(&val)?, val.will_init(), ctx) + .put_value_bytes( + key, + lsn, + Value::ser(&val)?.slice_len(), + val.will_init(), + ctx, + ) .await; res } - async fn put_value_bytes( + async fn put_value_bytes( &mut self, key: Key, lsn: Lsn, - val: Vec, + val: FullSlice, will_init: bool, ctx: &RequestContext, - ) -> (Vec, anyhow::Result<()>) { + ) -> (FullSlice, anyhow::Result<()>) + where + Buf: IoBufMut + Send, + { assert!( self.lsn_range.start <= lsn, "lsn_start={}, lsn={}", @@ -514,7 +525,7 @@ impl DeltaLayerWriterInner { file.seek(SeekFrom::Start(index_start_blk as u64 * PAGE_SZ as u64)) .await?; for buf in block_buf.blocks { - let (_buf, res) = file.write_all(buf, ctx).await; + let (_buf, res) = file.write_all(buf.slice_len(), ctx).await; res?; } assert!(self.lsn_range.start < self.lsn_range.end); @@ -534,7 +545,7 @@ impl DeltaLayerWriterInner { // TODO: could use smallvec here but it's a pain with Slice Summary::ser_into(&summary, &mut buf)?; file.seek(SeekFrom::Start(0)).await?; - let (_buf, res) = file.write_all(buf, ctx).await; + let (_buf, res) = file.write_all(buf.slice_len(), ctx).await; res?; let metadata = file @@ -646,14 +657,17 @@ impl DeltaLayerWriter { .await } - pub async fn put_value_bytes( + pub async fn put_value_bytes( &mut self, key: Key, lsn: Lsn, - val: Vec, + val: FullSlice, will_init: bool, ctx: &RequestContext, - ) -> (Vec, anyhow::Result<()>) { + ) -> (FullSlice, anyhow::Result<()>) + where + Buf: IoBufMut + Send, + { self.inner .as_mut() .unwrap() @@ -743,7 +757,7 @@ impl DeltaLayer { // TODO: could use smallvec here, but it's a pain with Slice Summary::ser_into(&new_summary, &mut buf).context("serialize")?; file.seek(SeekFrom::Start(0)).await?; - let (_buf, res) = file.write_all(buf, ctx).await; + let (_buf, res) = file.write_all(buf.slice_len(), ctx).await; res?; Ok(()) } @@ -1291,12 +1305,12 @@ impl DeltaLayerInner { .put_value_bytes( key, lsn, - std::mem::take(&mut per_blob_copy), + std::mem::take(&mut per_blob_copy).slice_len(), will_init, ctx, ) .await; - per_blob_copy = tmp; + per_blob_copy = tmp.into_raw_slice().into_inner(); res?; @@ -1871,7 +1885,7 @@ pub(crate) mod test { for entry in entries { let (_, res) = writer - .put_value_bytes(entry.key, entry.lsn, entry.value, false, &ctx) + .put_value_bytes(entry.key, entry.lsn, entry.value.slice_len(), false, &ctx) .await; res?; } diff --git a/pageserver/src/tenant/storage_layer/image_layer.rs b/pageserver/src/tenant/storage_layer/image_layer.rs index f9d3fdf186..9a19e4e2c7 100644 --- a/pageserver/src/tenant/storage_layer/image_layer.rs +++ b/pageserver/src/tenant/storage_layer/image_layer.rs @@ -38,6 +38,7 @@ use crate::tenant::vectored_blob_io::{ VectoredReadPlanner, }; use crate::tenant::{PageReconstructError, Timeline}; +use crate::virtual_file::owned_buffers_io::io_buf_ext::IoBufExt; use crate::virtual_file::{self, VirtualFile}; use crate::{IMAGE_FILE_MAGIC, STORAGE_FORMAT_VERSION, TEMP_FILE_SUFFIX}; use anyhow::{anyhow, bail, ensure, Context, Result}; @@ -354,7 +355,7 @@ impl ImageLayer { // TODO: could use smallvec here but it's a pain with Slice Summary::ser_into(&new_summary, &mut buf).context("serialize")?; file.seek(SeekFrom::Start(0)).await?; - let (_buf, res) = file.write_all(buf, ctx).await; + let (_buf, res) = file.write_all(buf.slice_len(), ctx).await; res?; Ok(()) } @@ -786,7 +787,7 @@ impl ImageLayerWriterInner { self.num_keys += 1; let (_img, res) = self .blob_writer - .write_blob_maybe_compressed(img, ctx, compression) + .write_blob_maybe_compressed(img.slice_len(), ctx, compression) .await; // TODO: re-use the buffer for `img` further upstack let (off, compression_info) = res?; @@ -838,7 +839,7 @@ impl ImageLayerWriterInner { .await?; let (index_root_blk, block_buf) = self.tree.finish()?; for buf in block_buf.blocks { - let (_buf, res) = file.write_all(buf, ctx).await; + let (_buf, res) = file.write_all(buf.slice_len(), ctx).await; res?; } @@ -858,7 +859,7 @@ impl ImageLayerWriterInner { // TODO: could use smallvec here but it's a pain with Slice Summary::ser_into(&summary, &mut buf)?; file.seek(SeekFrom::Start(0)).await?; - let (_buf, res) = file.write_all(buf, ctx).await; + let (_buf, res) = file.write_all(buf.slice_len(), ctx).await; res?; let metadata = file diff --git a/pageserver/src/tenant/storage_layer/inmemory_layer.rs b/pageserver/src/tenant/storage_layer/inmemory_layer.rs index fb15ddfba9..748d79c149 100644 --- a/pageserver/src/tenant/storage_layer/inmemory_layer.rs +++ b/pageserver/src/tenant/storage_layer/inmemory_layer.rs @@ -12,6 +12,7 @@ use crate::tenant::block_io::{BlockCursor, BlockReader, BlockReaderRef}; use crate::tenant::ephemeral_file::EphemeralFile; use crate::tenant::timeline::GetVectoredError; use crate::tenant::PageReconstructError; +use crate::virtual_file::owned_buffers_io::io_buf_ext::IoBufExt; use crate::{l0_flush, page_cache, walrecord}; use anyhow::{anyhow, Result}; use camino::Utf8PathBuf; @@ -581,11 +582,17 @@ impl InMemoryLayer { for (lsn, pos) in vec_map.as_slice() { cursor.read_blob_into_buf(*pos, &mut buf, &ctx).await?; let will_init = Value::des(&buf)?.will_init(); - let res; - (buf, res) = delta_layer_writer - .put_value_bytes(Key::from_compact(*key), *lsn, buf, will_init, &ctx) + let (tmp, res) = delta_layer_writer + .put_value_bytes( + Key::from_compact(*key), + *lsn, + buf.slice_len(), + will_init, + &ctx, + ) .await; res?; + buf = tmp.into_raw_slice().into_inner(); } } } @@ -620,11 +627,17 @@ impl InMemoryLayer { // => https://github.com/neondatabase/neon/issues/8183 cursor.read_blob_into_buf(*pos, &mut buf, ctx).await?; let will_init = Value::des(&buf)?.will_init(); - let res; - (buf, res) = delta_layer_writer - .put_value_bytes(Key::from_compact(*key), *lsn, buf, will_init, ctx) + let (tmp, res) = delta_layer_writer + .put_value_bytes( + Key::from_compact(*key), + *lsn, + buf.slice_len(), + will_init, + ctx, + ) .await; res?; + buf = tmp.into_raw_slice().into_inner(); } } } diff --git a/pageserver/src/virtual_file.rs b/pageserver/src/virtual_file.rs index 27f6fe90a4..b4695e5f40 100644 --- a/pageserver/src/virtual_file.rs +++ b/pageserver/src/virtual_file.rs @@ -17,6 +17,7 @@ use crate::page_cache::{PageWriteGuard, PAGE_SZ}; use crate::tenant::TENANTS_SEGMENT_NAME; use camino::{Utf8Path, Utf8PathBuf}; use once_cell::sync::OnceCell; +use owned_buffers_io::io_buf_ext::FullSlice; use pageserver_api::shard::TenantShardId; use std::fs::File; use std::io::{Error, ErrorKind, Seek, SeekFrom}; @@ -50,6 +51,7 @@ pub(crate) mod owned_buffers_io { //! but for the time being we're proving out the primitives in the neon.git repo //! for faster iteration. + pub(crate) mod io_buf_ext; pub(crate) mod slice; pub(crate) mod write; pub(crate) mod util { @@ -637,24 +639,24 @@ impl VirtualFile { } // Copied from https://doc.rust-lang.org/1.72.0/src/std/os/unix/fs.rs.html#219-235 - pub async fn write_all_at, Buf: IoBuf + Send>( + pub async fn write_all_at( &self, - buf: B, + buf: FullSlice, mut offset: u64, ctx: &RequestContext, - ) -> (B::Buf, Result<(), Error>) { - let buf_len = buf.bytes_init(); - if buf_len == 0 { - return (Slice::into_inner(buf.slice_full()), Ok(())); - } - let mut buf = buf.slice(0..buf_len); + ) -> (FullSlice, Result<(), Error>) { + let buf = buf.into_raw_slice(); + let bounds = buf.bounds(); + let restore = + |buf: Slice<_>| FullSlice::must_new(Slice::from_buf_bounds(buf.into_inner(), bounds)); + let mut buf = buf; while !buf.is_empty() { - let res; - (buf, res) = self.write_at(buf, offset, ctx).await; + let (tmp, res) = self.write_at(FullSlice::must_new(buf), offset, ctx).await; + buf = tmp.into_raw_slice(); match res { Ok(0) => { return ( - Slice::into_inner(buf), + restore(buf), Err(Error::new( std::io::ErrorKind::WriteZero, "failed to write whole buffer", @@ -666,33 +668,33 @@ impl VirtualFile { offset += n as u64; } Err(e) if e.kind() == std::io::ErrorKind::Interrupted => {} - Err(e) => return (Slice::into_inner(buf), Err(e)), + Err(e) => return (restore(buf), Err(e)), } } - (Slice::into_inner(buf), Ok(())) + (restore(buf), Ok(())) } - /// Writes `buf.slice(0..buf.bytes_init())`. - /// Returns the IoBuf that is underlying the BoundedBuf `buf`. - /// I.e., the returned value's `bytes_init()` method returns something different than the `bytes_init()` that was passed in. - /// It's quite brittle and easy to mis-use, so, we return the size in the Ok() variant. - pub async fn write_all, Buf: IoBuf + Send>( + /// Writes `buf` to the file at the current offset. + /// + /// Panics if there is an uninitialized range in `buf`, as that is most likely a bug in the caller. + pub async fn write_all( &mut self, - buf: B, + buf: FullSlice, ctx: &RequestContext, - ) -> (B::Buf, Result) { - let nbytes = buf.bytes_init(); - if nbytes == 0 { - return (Slice::into_inner(buf.slice_full()), Ok(0)); - } - let mut buf = buf.slice(0..nbytes); + ) -> (FullSlice, Result) { + let buf = buf.into_raw_slice(); + let bounds = buf.bounds(); + let restore = + |buf: Slice<_>| FullSlice::must_new(Slice::from_buf_bounds(buf.into_inner(), bounds)); + let nbytes = buf.len(); + let mut buf = buf; while !buf.is_empty() { - let res; - (buf, res) = self.write(buf, ctx).await; + let (tmp, res) = self.write(FullSlice::must_new(buf), ctx).await; + buf = tmp.into_raw_slice(); match res { Ok(0) => { return ( - Slice::into_inner(buf), + restore(buf), Err(Error::new( std::io::ErrorKind::WriteZero, "failed to write whole buffer", @@ -703,17 +705,17 @@ impl VirtualFile { buf = buf.slice(n..); } Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => {} - Err(e) => return (Slice::into_inner(buf), Err(e)), + Err(e) => return (restore(buf), Err(e)), } } - (Slice::into_inner(buf), Ok(nbytes)) + (restore(buf), Ok(nbytes)) } async fn write( &mut self, - buf: Slice, + buf: FullSlice, ctx: &RequestContext, - ) -> (Slice, Result) { + ) -> (FullSlice, Result) { let pos = self.pos; let (buf, res) = self.write_at(buf, pos, ctx).await; let n = match res { @@ -756,10 +758,10 @@ impl VirtualFile { async fn write_at( &self, - buf: Slice, + buf: FullSlice, offset: u64, _ctx: &RequestContext, /* TODO: use for metrics: https://github.com/neondatabase/neon/issues/6107 */ - ) -> (Slice, Result) { + ) -> (FullSlice, Result) { let file_guard = match self.lock_file().await { Ok(file_guard) => file_guard, Err(e) => return (buf, Err(e)), @@ -1093,11 +1095,11 @@ impl Drop for VirtualFile { impl OwnedAsyncWriter for VirtualFile { #[inline(always)] - async fn write_all, Buf: IoBuf + Send>( + async fn write_all( &mut self, - buf: B, + buf: FullSlice, ctx: &RequestContext, - ) -> std::io::Result<(usize, B::Buf)> { + ) -> std::io::Result<(usize, FullSlice)> { let (buf, res) = VirtualFile::write_all(self, buf, ctx).await; res.map(move |v| (v, buf)) } @@ -1159,7 +1161,8 @@ mod tests { use crate::task_mgr::TaskKind; use super::*; - use owned_buffers_io::slice::SliceExt; + use owned_buffers_io::io_buf_ext::IoBufExt; + use owned_buffers_io::slice::SliceMutExt; use rand::seq::SliceRandom; use rand::thread_rng; use rand::Rng; @@ -1193,9 +1196,9 @@ mod tests { } } } - async fn write_all_at, Buf: IoBuf + Send>( + async fn write_all_at( &self, - buf: B, + buf: FullSlice, offset: u64, ctx: &RequestContext, ) -> Result<(), Error> { @@ -1204,13 +1207,7 @@ mod tests { let (_buf, res) = file.write_all_at(buf, offset, ctx).await; res } - MaybeVirtualFile::File(file) => { - let buf_len = buf.bytes_init(); - if buf_len == 0 { - return Ok(()); - } - file.write_all_at(&buf.slice(0..buf_len), offset) - } + MaybeVirtualFile::File(file) => file.write_all_at(&buf[..], offset), } } async fn seek(&mut self, pos: SeekFrom) -> Result { @@ -1219,9 +1216,9 @@ mod tests { MaybeVirtualFile::File(file) => file.seek(pos), } } - async fn write_all, Buf: IoBuf + Send>( + async fn write_all( &mut self, - buf: B, + buf: FullSlice, ctx: &RequestContext, ) -> Result<(), Error> { match self { @@ -1229,13 +1226,7 @@ mod tests { let (_buf, res) = file.write_all(buf, ctx).await; res.map(|_| ()) } - MaybeVirtualFile::File(file) => { - let buf_len = buf.bytes_init(); - if buf_len == 0 { - return Ok(()); - } - file.write_all(&buf.slice(0..buf_len)) - } + MaybeVirtualFile::File(file) => file.write_all(&buf[..]), } } @@ -1347,7 +1338,9 @@ mod tests { &ctx, ) .await?; - file_a.write_all(b"foobar".to_vec(), &ctx).await?; + file_a + .write_all(b"foobar".to_vec().slice_len(), &ctx) + .await?; // cannot read from a file opened in write-only mode let _ = file_a.read_string(&ctx).await.unwrap_err(); @@ -1356,7 +1349,10 @@ mod tests { let mut file_a = A::open(path_a, OpenOptions::new().read(true).to_owned(), &ctx).await?; // cannot write to a file opened in read-only mode - let _ = file_a.write_all(b"bar".to_vec(), &ctx).await.unwrap_err(); + let _ = file_a + .write_all(b"bar".to_vec().slice_len(), &ctx) + .await + .unwrap_err(); // Try simple read assert_eq!("foobar", file_a.read_string(&ctx).await?); @@ -1399,8 +1395,12 @@ mod tests { &ctx, ) .await?; - file_b.write_all_at(b"BAR".to_vec(), 3, &ctx).await?; - file_b.write_all_at(b"FOO".to_vec(), 0, &ctx).await?; + file_b + .write_all_at(b"BAR".to_vec().slice_len(), 3, &ctx) + .await?; + file_b + .write_all_at(b"FOO".to_vec().slice_len(), 0, &ctx) + .await?; assert_eq!(file_b.read_string_at(2, 3, &ctx).await?, "OBA"); diff --git a/pageserver/src/virtual_file/io_engine.rs b/pageserver/src/virtual_file/io_engine.rs index 0ffcd9fa05..faef1ba9ff 100644 --- a/pageserver/src/virtual_file/io_engine.rs +++ b/pageserver/src/virtual_file/io_engine.rs @@ -12,7 +12,7 @@ #[cfg(target_os = "linux")] pub(super) mod tokio_epoll_uring_ext; -use tokio_epoll_uring::{IoBuf, Slice}; +use tokio_epoll_uring::IoBuf; use tracing::Instrument; pub(crate) use super::api::IoEngineKind; @@ -107,7 +107,10 @@ use std::{ sync::atomic::{AtomicU8, Ordering}, }; -use super::{owned_buffers_io::slice::SliceExt, FileGuard, Metadata}; +use super::{ + owned_buffers_io::{io_buf_ext::FullSlice, slice::SliceMutExt}, + FileGuard, Metadata, +}; #[cfg(target_os = "linux")] fn epoll_uring_error_to_std(e: tokio_epoll_uring::Error) -> std::io::Error { @@ -206,8 +209,8 @@ impl IoEngine { &self, file_guard: FileGuard, offset: u64, - buf: Slice, - ) -> ((FileGuard, Slice), std::io::Result) { + buf: FullSlice, + ) -> ((FileGuard, FullSlice), std::io::Result) { match self { IoEngine::NotSet => panic!("not initialized"), IoEngine::StdFs => { @@ -217,8 +220,12 @@ impl IoEngine { #[cfg(target_os = "linux")] IoEngine::TokioEpollUring => { let system = tokio_epoll_uring_ext::thread_local_system().await; - let (resources, res) = system.write(file_guard, offset, buf).await; - (resources, res.map_err(epoll_uring_error_to_std)) + let ((file_guard, slice), res) = + system.write(file_guard, offset, buf.into_raw_slice()).await; + ( + (file_guard, FullSlice::must_new(slice)), + res.map_err(epoll_uring_error_to_std), + ) } } } diff --git a/pageserver/src/virtual_file/owned_buffers_io/io_buf_ext.rs b/pageserver/src/virtual_file/owned_buffers_io/io_buf_ext.rs new file mode 100644 index 0000000000..7c773b6b21 --- /dev/null +++ b/pageserver/src/virtual_file/owned_buffers_io/io_buf_ext.rs @@ -0,0 +1,78 @@ +//! See [`FullSlice`]. + +use bytes::{Bytes, BytesMut}; +use std::ops::{Deref, Range}; +use tokio_epoll_uring::{BoundedBuf, IoBuf, Slice}; + +/// The true owned equivalent for Rust [`slice`]. Use this for the write path. +/// +/// Unlike [`tokio_epoll_uring::Slice`], which we unfortunately inherited from `tokio-uring`, +/// [`FullSlice`] is guaranteed to have all its bytes initialized. This means that +/// [`>::len`] is equal to [`Slice::bytes_init`] and [`Slice::bytes_total`]. +/// +pub struct FullSlice { + slice: Slice, +} + +impl FullSlice +where + B: IoBuf, +{ + pub(crate) fn must_new(slice: Slice) -> Self { + assert_eq!(slice.bytes_init(), slice.bytes_total()); + FullSlice { slice } + } + pub(crate) fn into_raw_slice(self) -> Slice { + let FullSlice { slice: s } = self; + s + } +} + +impl Deref for FullSlice +where + B: IoBuf, +{ + type Target = [u8]; + + fn deref(&self) -> &[u8] { + let rust_slice = &self.slice[..]; + assert_eq!(rust_slice.len(), self.slice.bytes_init()); + assert_eq!(rust_slice.len(), self.slice.bytes_total()); + rust_slice + } +} + +pub(crate) trait IoBufExt { + /// Get a [`FullSlice`] for the entire buffer, i.e., `self[..]` or `self[0..self.len()]`. + fn slice_len(self) -> FullSlice + where + Self: Sized; +} + +macro_rules! impl_io_buf_ext { + ($T:ty) => { + impl IoBufExt for $T { + #[inline(always)] + fn slice_len(self) -> FullSlice { + let len = self.len(); + let s = if len == 0 { + // `BoundedBuf::slice(0..len)` or `BoundedBuf::slice(..)` has an incorrect assertion, + // causing a panic if len == 0. + // The Slice::from_buf_bounds has the correct assertion (<= instead of <). + // => https://github.com/neondatabase/tokio-epoll-uring/issues/46 + let slice = self.slice_full(); + let mut bounds: Range<_> = slice.bounds(); + bounds.end = bounds.start; + Slice::from_buf_bounds(slice.into_inner(), bounds) + } else { + self.slice(0..len) + }; + FullSlice::must_new(s) + } + } + }; +} + +impl_io_buf_ext!(Bytes); +impl_io_buf_ext!(BytesMut); +impl_io_buf_ext!(Vec); diff --git a/pageserver/src/virtual_file/owned_buffers_io/slice.rs b/pageserver/src/virtual_file/owned_buffers_io/slice.rs index d19e5ddffe..6100593663 100644 --- a/pageserver/src/virtual_file/owned_buffers_io/slice.rs +++ b/pageserver/src/virtual_file/owned_buffers_io/slice.rs @@ -3,14 +3,14 @@ use tokio_epoll_uring::BoundedBufMut; use tokio_epoll_uring::IoBufMut; use tokio_epoll_uring::Slice; -pub(crate) trait SliceExt { +pub(crate) trait SliceMutExt { /// Get a `&mut[0..self.bytes_total()`] slice, for when you need to do borrow-based IO. /// /// See the test case `test_slice_full_zeroed` for the difference to just doing `&slice[..]` fn as_mut_rust_slice_full_zeroed(&mut self) -> &mut [u8]; } -impl SliceExt for Slice +impl SliceMutExt for Slice where B: IoBufMut, { diff --git a/pageserver/src/virtual_file/owned_buffers_io/util/size_tracking_writer.rs b/pageserver/src/virtual_file/owned_buffers_io/util/size_tracking_writer.rs index 55b1d0b46b..efcb61ba65 100644 --- a/pageserver/src/virtual_file/owned_buffers_io/util/size_tracking_writer.rs +++ b/pageserver/src/virtual_file/owned_buffers_io/util/size_tracking_writer.rs @@ -1,5 +1,8 @@ -use crate::{context::RequestContext, virtual_file::owned_buffers_io::write::OwnedAsyncWriter}; -use tokio_epoll_uring::{BoundedBuf, IoBuf}; +use crate::{ + context::RequestContext, + virtual_file::owned_buffers_io::{io_buf_ext::FullSlice, write::OwnedAsyncWriter}, +}; +use tokio_epoll_uring::IoBuf; pub struct Writer { dst: W, @@ -35,11 +38,11 @@ where W: OwnedAsyncWriter, { #[inline(always)] - async fn write_all, Buf: IoBuf + Send>( + async fn write_all( &mut self, - buf: B, + buf: FullSlice, ctx: &RequestContext, - ) -> std::io::Result<(usize, B::Buf)> { + ) -> std::io::Result<(usize, FullSlice)> { let (nwritten, buf) = self.dst.write_all(buf, ctx).await?; self.bytes_amount += u64::try_from(nwritten).unwrap(); Ok((nwritten, buf)) diff --git a/pageserver/src/virtual_file/owned_buffers_io/write.rs b/pageserver/src/virtual_file/owned_buffers_io/write.rs index 8599d95cdf..f8f37b17e3 100644 --- a/pageserver/src/virtual_file/owned_buffers_io/write.rs +++ b/pageserver/src/virtual_file/owned_buffers_io/write.rs @@ -1,16 +1,18 @@ use bytes::BytesMut; -use tokio_epoll_uring::{BoundedBuf, IoBuf, Slice}; +use tokio_epoll_uring::IoBuf; use crate::context::RequestContext; +use super::io_buf_ext::{FullSlice, IoBufExt}; + /// A trait for doing owned-buffer write IO. /// Think [`tokio::io::AsyncWrite`] but with owned buffers. pub trait OwnedAsyncWriter { - async fn write_all, Buf: IoBuf + Send>( + async fn write_all( &mut self, - buf: B, + buf: FullSlice, ctx: &RequestContext, - ) -> std::io::Result<(usize, B::Buf)>; + ) -> std::io::Result<(usize, FullSlice)>; } /// A wrapper aorund an [`OwnedAsyncWriter`] that uses a [`Buffer`] to batch @@ -79,9 +81,11 @@ where #[cfg_attr(target_os = "macos", allow(dead_code))] pub async fn write_buffered( &mut self, - chunk: Slice, + chunk: FullSlice, ctx: &RequestContext, - ) -> std::io::Result<(usize, S)> { + ) -> std::io::Result<(usize, FullSlice)> { + let chunk = chunk.into_raw_slice(); + let chunk_len = chunk.len(); // avoid memcpy for the middle of the chunk if chunk.len() >= self.buf().cap() { @@ -94,7 +98,10 @@ where .pending(), 0 ); - let (nwritten, chunk) = self.writer.write_all(chunk, ctx).await?; + let (nwritten, chunk) = self + .writer + .write_all(FullSlice::must_new(chunk), ctx) + .await?; assert_eq!(nwritten, chunk_len); return Ok((nwritten, chunk)); } @@ -114,7 +121,7 @@ where } } assert!(slice.is_empty(), "by now we should have drained the chunk"); - Ok((chunk_len, chunk.into_inner())) + Ok((chunk_len, FullSlice::must_new(chunk))) } /// Strictly less performant variant of [`Self::write_buffered`] that allows writing borrowed data. @@ -150,9 +157,12 @@ where self.buf = Some(buf); return Ok(()); } - let (nwritten, io_buf) = self.writer.write_all(buf.flush(), ctx).await?; + let slice = buf.flush(); + let (nwritten, slice) = self.writer.write_all(slice, ctx).await?; assert_eq!(nwritten, buf_len); - self.buf = Some(Buffer::reuse_after_flush(io_buf)); + self.buf = Some(Buffer::reuse_after_flush( + slice.into_raw_slice().into_inner(), + )); Ok(()) } } @@ -172,9 +182,9 @@ pub trait Buffer { /// Number of bytes in the buffer. fn pending(&self) -> usize; - /// Turns `self` into a [`tokio_epoll_uring::Slice`] of the pending data + /// Turns `self` into a [`FullSlice`] of the pending data /// so we can use [`tokio_epoll_uring`] to write it to disk. - fn flush(self) -> Slice; + fn flush(self) -> FullSlice; /// After the write to disk is done and we have gotten back the slice, /// [`BufferedWriter`] uses this method to re-use the io buffer. @@ -198,12 +208,8 @@ impl Buffer for BytesMut { self.len() } - fn flush(self) -> Slice { - if self.is_empty() { - return self.slice_full(); - } - let len = self.len(); - self.slice(0..len) + fn flush(self) -> FullSlice { + self.slice_len() } fn reuse_after_flush(mut iobuf: BytesMut) -> Self { @@ -213,18 +219,13 @@ impl Buffer for BytesMut { } impl OwnedAsyncWriter for Vec { - async fn write_all, Buf: IoBuf + Send>( + async fn write_all( &mut self, - buf: B, + buf: FullSlice, _: &RequestContext, - ) -> std::io::Result<(usize, B::Buf)> { - let nbytes = buf.bytes_init(); - if nbytes == 0 { - return Ok((0, Slice::into_inner(buf.slice_full()))); - } - let buf = buf.slice(0..nbytes); + ) -> std::io::Result<(usize, FullSlice)> { self.extend_from_slice(&buf[..]); - Ok((buf.len(), Slice::into_inner(buf))) + Ok((buf.len(), buf)) } } @@ -241,19 +242,13 @@ mod tests { writes: Vec>, } impl OwnedAsyncWriter for RecorderWriter { - async fn write_all, Buf: IoBuf + Send>( + async fn write_all( &mut self, - buf: B, + buf: FullSlice, _: &RequestContext, - ) -> std::io::Result<(usize, B::Buf)> { - let nbytes = buf.bytes_init(); - if nbytes == 0 { - self.writes.push(vec![]); - return Ok((0, Slice::into_inner(buf.slice_full()))); - } - let buf = buf.slice(0..nbytes); + ) -> std::io::Result<(usize, FullSlice)> { self.writes.push(Vec::from(&buf[..])); - Ok((buf.len(), Slice::into_inner(buf))) + Ok((buf.len(), buf)) } } @@ -264,7 +259,7 @@ mod tests { macro_rules! write { ($writer:ident, $data:literal) => {{ $writer - .write_buffered(::bytes::Bytes::from_static($data).slice_full(), &test_ctx()) + .write_buffered(::bytes::Bytes::from_static($data).slice_len(), &test_ctx()) .await?; }}; }