diff --git a/pageserver/src/tenant/blob_io.rs b/pageserver/src/tenant/blob_io.rs index 2be8816cef..c0da90d521 100644 --- a/pageserver/src/tenant/blob_io.rs +++ b/pageserver/src/tenant/blob_io.rs @@ -127,11 +127,11 @@ impl BlobWriter { /// You need to make sure that the internal buffer is empty, otherwise /// data will be written in wrong order. #[inline(always)] - async fn write_all_unbuffered, Buf: IoBuf + Send>( + async fn write_all_unbuffered( &mut self, - src_buf: B, + src_buf: Slice, ctx: &RequestContext, - ) -> (B::Buf, Result<(), Error>) { + ) -> (Slice, Result<(), Error>) { let (src_buf, res) = self.inner.write_all(src_buf, ctx).await; let nbytes = match res { Ok(nbytes) => nbytes, @@ -145,8 +145,9 @@ impl BlobWriter { /// Flushes the internal buffer to the underlying `VirtualFile`. pub async fn flush_buffer(&mut self, ctx: &RequestContext) -> Result<(), Error> { let buf = std::mem::take(&mut self.buf); - let (mut buf, res) = self.inner.write_all(buf, ctx).await; + let (buf, res) = self.inner.write_all(buf.slice_full(), ctx).await; res?; + let mut buf = Slice::into_inner(buf); buf.clear(); self.buf = buf; Ok(()) @@ -163,11 +164,11 @@ impl BlobWriter { } /// Internal, possibly buffered, write function - async fn write_all, Buf: IoBuf + Send>( + async fn write_all( &mut self, - src_buf: B, + src_buf: Slice, ctx: &RequestContext, - ) -> (B::Buf, Result<(), Error>) { + ) -> (Slice, Result<(), Error>) { if !BUFFERED { assert!(self.buf.is_empty()); return self.write_all_unbuffered(src_buf, ctx).await; @@ -175,7 +176,7 @@ impl BlobWriter { let remaining = Self::CAPACITY - self.buf.len(); let src_buf_len = src_buf.bytes_init(); if src_buf_len == 0 { - return (Slice::into_inner(src_buf.slice_full()), Ok(())); + return (src_buf, Ok(())); } let mut src_buf = src_buf.slice(0..src_buf_len); // First try to copy as much as we can into the buffer @@ -186,7 +187,7 @@ impl BlobWriter { // Then, if the buffer is full, flush it out if self.buf.len() == Self::CAPACITY { if let Err(e) = self.flush_buffer(ctx).await { - return (Slice::into_inner(src_buf), Err(e)); + return (src_buf, Err(e)); } } // Finally, write the tail of src_buf: @@ -199,7 +200,7 @@ impl BlobWriter { let copied = self.write_into_buffer(&src_buf); // We just verified above that src_buf fits into our internal buffer. assert_eq!(copied, src_buf.len()); - Slice::into_inner(src_buf) + src_buf } else { let (src_buf, res) = self.write_all_unbuffered(src_buf, ctx).await; if let Err(e) = res { @@ -208,18 +209,18 @@ impl BlobWriter { src_buf } } else { - Slice::into_inner(src_buf) + src_buf }; (src_buf, Ok(())) } /// Write a blob of data. Returns the offset that it was written to, /// which can be used to retrieve the data later. - pub async fn write_blob, Buf: IoBuf + Send>( + pub async fn write_blob( &mut self, - srcbuf: B, + srcbuf: Slice, ctx: &RequestContext, - ) -> (B::Buf, Result) { + ) -> (Slice, Result) { let offset = self.offset; let len = srcbuf.bytes_init(); @@ -230,12 +231,12 @@ impl BlobWriter { if len < 128 { // Short blob. Write a 1-byte length header io_buf.put_u8(len as u8); - self.write_all(io_buf, ctx).await + self.write_all(io_buf.slice_full(), ctx).await } else { // Write a 4-byte length header if len > 0x7fff_ffff { return ( - io_buf, + io_buf.slice_full(), Err(Error::new( ErrorKind::Other, format!("blob too large ({len} bytes)"), @@ -248,14 +249,14 @@ impl BlobWriter { let mut len_buf = (len as u32).to_be_bytes(); len_buf[0] |= 0x80; io_buf.extend_from_slice(&len_buf[..]); - self.write_all(io_buf, ctx).await + self.write_all(io_buf.slice_full(), ctx).await } } .await; - self.io_buf = Some(io_buf); + self.io_buf = Some(io_buf.into_inner()); match hdr_res { Ok(_) => (), - Err(e) => return (Slice::into_inner(srcbuf.slice(..)), Err(e)), + Err(e) => return (srcbuf, Err(e)), } let (srcbuf, res) = self.write_all(srcbuf, ctx).await; (srcbuf, res.map(|_| offset)) diff --git a/pageserver/src/tenant/ephemeral_file/page_caching.rs b/pageserver/src/tenant/ephemeral_file/page_caching.rs index 276ac87064..bd86e6ec33 100644 --- a/pageserver/src/tenant/ephemeral_file/page_caching.rs +++ b/pageserver/src/tenant/ephemeral_file/page_caching.rs @@ -158,7 +158,7 @@ impl crate::virtual_file::owned_buffers_io::write::OwnedAsyncWriter for PreWarmi let iobuf = match self.file.write_all(buf, ctx).await { (iobuf, Ok(nwritten)) => { assert_eq!(nwritten, buflen); - iobuf + iobuf.into_inner() } (_, Err(e)) => { return Err(std::io::Error::new( diff --git a/pageserver/src/tenant/storage_layer/delta_layer.rs b/pageserver/src/tenant/storage_layer/delta_layer.rs index c2d4a2776b..1385437996 100644 --- a/pageserver/src/tenant/storage_layer/delta_layer.rs +++ b/pageserver/src/tenant/storage_layer/delta_layer.rs @@ -60,6 +60,7 @@ use std::os::unix::fs::FileExt; use std::str::FromStr; use std::sync::Arc; use tokio::sync::OnceCell; +use tokio_epoll_uring::{BoundedBuf as _, Slice}; use tracing::*; use utils::{ @@ -452,7 +453,8 @@ impl DeltaLayerWriterInner { ctx: &RequestContext, ) -> (Vec, anyhow::Result<()>) { assert!(self.lsn_range.start <= lsn); - let (val, res) = self.blob_writer.write_blob(val, ctx).await; + let (val, res) = self.blob_writer.write_blob(val.slice_full(), ctx).await; + let val = Slice::into_inner(val); let off = match res { Ok(off) => off, Err(e) => return (val, Err(anyhow::anyhow!(e))), @@ -505,7 +507,7 @@ impl DeltaLayerWriterInner { file.seek(SeekFrom::Start(index_start_blk as u64 * PAGE_SZ as u64)) .await?; for buf in block_buf.blocks { - let (_buf, res) = file.write_all(buf, ctx).await; + let (_buf, res) = file.write_all(buf.slice_full(), ctx).await; res?; } assert!(self.lsn_range.start < self.lsn_range.end); @@ -525,7 +527,7 @@ impl DeltaLayerWriterInner { // TODO: could use smallvec here but it's a pain with Slice Summary::ser_into(&summary, &mut buf)?; file.seek(SeekFrom::Start(0)).await?; - let (_buf, res) = file.write_all(buf, ctx).await; + let (_buf, res) = file.write_all(buf.slice_full(), ctx).await; res?; let metadata = file @@ -730,7 +732,7 @@ impl DeltaLayer { // TODO: could use smallvec here, but it's a pain with Slice Summary::ser_into(&new_summary, &mut buf).context("serialize")?; file.seek(SeekFrom::Start(0)).await?; - let (_buf, res) = file.write_all(buf, ctx).await; + let (_buf, res) = file.write_all(buf.slice_full(), ctx).await; res?; Ok(()) } diff --git a/pageserver/src/tenant/storage_layer/image_layer.rs b/pageserver/src/tenant/storage_layer/image_layer.rs index 50aacbd9ad..08a7ce2a97 100644 --- a/pageserver/src/tenant/storage_layer/image_layer.rs +++ b/pageserver/src/tenant/storage_layer/image_layer.rs @@ -341,6 +341,7 @@ impl ImageLayer { where F: Fn(Summary) -> Summary, { + use tokio_epoll_uring::BoundedBuf as _; let mut file = VirtualFile::open_with_options( path, virtual_file::OpenOptions::new().read(true).write(true), @@ -362,7 +363,7 @@ impl ImageLayer { // TODO: could use smallvec here but it's a pain with Slice Summary::ser_into(&new_summary, &mut buf).context("serialize")?; file.seek(SeekFrom::Start(0)).await?; - let (_buf, res) = file.write_all(buf, ctx).await; + let (_buf, res) = file.write_all(buf.slice_full(), ctx).await; res?; Ok(()) } @@ -798,8 +799,9 @@ impl ImageLayerWriterInner { img: Bytes, ctx: &RequestContext, ) -> anyhow::Result<()> { + use tokio_epoll_uring::BoundedBuf as _; ensure!(self.key_range.contains(&key)); - let (_img, res) = self.blob_writer.write_blob(img, ctx).await; + let (_img, res) = self.blob_writer.write_blob(img.slice_full(), ctx).await; // TODO: re-use the buffer for `img` further upstack let off = res?; @@ -818,6 +820,7 @@ impl ImageLayerWriterInner { timeline: &Arc, ctx: &RequestContext, ) -> anyhow::Result { + use tokio_epoll_uring::BoundedBuf as _; let index_start_blk = ((self.blob_writer.size() + PAGE_SZ as u64 - 1) / PAGE_SZ as u64) as u32; @@ -828,7 +831,7 @@ impl ImageLayerWriterInner { .await?; let (index_root_blk, block_buf) = self.tree.finish()?; for buf in block_buf.blocks { - let (_buf, res) = file.write_all(buf, ctx).await; + let (_slice, res) = file.write_all(buf.slice_full(), ctx).await; res?; } @@ -848,7 +851,7 @@ impl ImageLayerWriterInner { // TODO: could use smallvec here but it's a pain with Slice Summary::ser_into(&summary, &mut buf)?; file.seek(SeekFrom::Start(0)).await?; - let (_buf, res) = file.write_all(buf, ctx).await; + let (_slice, res) = file.write_all(buf.slice_full(), ctx).await; res?; let metadata = file diff --git a/pageserver/src/virtual_file.rs b/pageserver/src/virtual_file.rs index 51b0c420c3..e9e7d2d6ed 100644 --- a/pageserver/src/virtual_file.rs +++ b/pageserver/src/virtual_file.rs @@ -674,14 +674,14 @@ impl VirtualFile { /// Returns the IoBuf that is underlying the BoundedBuf `buf`. /// I.e., the returned value's `bytes_init()` method returns something different than the `bytes_init()` that was passed in. /// It's quite brittle and easy to mis-use, so, we return the size in the Ok() variant. - pub async fn write_all, Buf: IoBuf + Send>( + pub async fn write_all( &mut self, - buf: B, + buf: Slice, ctx: &RequestContext, - ) -> (B::Buf, Result) { + ) -> (Slice, Result) { let nbytes = buf.bytes_init(); if nbytes == 0 { - return (Slice::into_inner(buf.slice_full()), Ok(0)); + return (buf, Ok(0)); } let mut buf = buf.slice(0..nbytes); while !buf.is_empty() { @@ -690,7 +690,7 @@ impl VirtualFile { match res { Ok(0) => { return ( - Slice::into_inner(buf), + buf, Err(Error::new( std::io::ErrorKind::WriteZero, "failed to write whole buffer", @@ -701,10 +701,10 @@ impl VirtualFile { buf = buf.slice(n..); } Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => {} - Err(e) => return (Slice::into_inner(buf), Err(e)), + Err(e) => return (buf, Err(e)), } } - (Slice::into_inner(buf), Ok(nbytes)) + (buf, Ok(nbytes)) } async fn write( @@ -1096,8 +1096,8 @@ impl OwnedAsyncWriter for VirtualFile { buf: B, ctx: &RequestContext, ) -> std::io::Result<(usize, B::Buf)> { - let (buf, res) = VirtualFile::write_all(self, buf, ctx).await; - res.map(move |v| (v, buf)) + let (buf, res) = VirtualFile::write_all(self, buf.slice_full(), ctx).await; + res.map(move |v| (v, Slice::into_inner(buf))) } }