From f25f71bc987b7b3f2835610d9690f7d6172e8b93 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Fri, 11 Apr 2025 16:57:19 +0200 Subject: [PATCH] undo all changes except gate,cancel,context propagation --- pageserver/src/metrics.rs | 2 - pageserver/src/tenant/blob_io.rs | 291 +++++++++++------- pageserver/src/tenant/block_io.rs | 10 +- pageserver/src/tenant/disk_btree.rs | 13 +- pageserver/src/tenant/ephemeral_file.rs | 1 - .../tenant/remote_timeline_client/download.rs | 37 +-- pageserver/src/tenant/secondary/downloader.rs | 3 +- .../src/tenant/storage_layer/delta_layer.rs | 110 ++----- .../src/tenant/storage_layer/image_layer.rs | 121 ++------ pageserver/src/tenant/vectored_blob_io.rs | 14 +- pageserver/src/virtual_file.rs | 281 ++++++++++++++--- pageserver/src/virtual_file/io_engine.rs | 16 - .../aligned_buffer/buffer_mut.rs | 11 - .../owned_buffers_io/io_buf_aligned.rs | 6 +- .../virtual_file/owned_buffers_io/write.rs | 50 +-- .../owned_buffers_io/write/flush.rs | 22 +- 16 files changed, 520 insertions(+), 468 deletions(-) diff --git a/pageserver/src/metrics.rs b/pageserver/src/metrics.rs index 2e81123384..1fe51021fd 100644 --- a/pageserver/src/metrics.rs +++ b/pageserver/src/metrics.rs @@ -1289,7 +1289,6 @@ pub(crate) enum StorageIoOperation { Seek, Fsync, Metadata, - SetLen, } impl StorageIoOperation { @@ -1304,7 +1303,6 @@ impl StorageIoOperation { StorageIoOperation::Seek => "seek", StorageIoOperation::Fsync => "fsync", StorageIoOperation::Metadata => "metadata", - StorageIoOperation::SetLen => "set_len", } } } diff --git a/pageserver/src/tenant/blob_io.rs b/pageserver/src/tenant/blob_io.rs index 0e0c5a99e0..abeaa166a4 100644 --- a/pageserver/src/tenant/blob_io.rs +++ b/pageserver/src/tenant/blob_io.rs @@ -15,23 +15,21 @@ //! len >= 128: 1CCCXXXX XXXXXXXX XXXXXXXX XXXXXXXX //! use std::cmp::min; -use std::sync::Arc; +use std::io::Error; use async_compression::Level; use bytes::{BufMut, BytesMut}; use pageserver_api::models::ImageCompressionAlgorithm; use tokio::io::AsyncWriteExt; -use tokio_epoll_uring::IoBuf; +use tokio_epoll_uring::{BoundedBuf, IoBuf, Slice}; use tokio_util::sync::CancellationToken; use tracing::warn; use crate::context::RequestContext; use crate::page_cache::PAGE_SZ; use crate::tenant::block_io::BlockCursor; -use crate::virtual_file::IoBufferMut; use crate::virtual_file::VirtualFile; use crate::virtual_file::owned_buffers_io::io_buf_ext::{FullSlice, IoBufExt}; -use crate::virtual_file::owned_buffers_io::write::{BufferedWriter, FlushTaskError}; #[derive(Copy, Clone, Debug)] pub struct CompressionInfo { @@ -39,14 +37,6 @@ pub struct CompressionInfo { pub compressed_size: Option, } -#[derive(Debug, thiserror::Error)] -pub enum WriteBlobError { - #[error(transparent)] - Flush(FlushTaskError), - #[error("blob too large ({len} bytes)")] - BlobTooLarge { len: usize }, -} - impl BlockCursor<'_> { /// Read a blob into a new buffer. pub async fn read_blob( @@ -168,62 +158,141 @@ pub(super) const BYTE_ZSTD: u8 = BYTE_UNCOMPRESSED | 0x10; /// A wrapper of `VirtualFile` that allows users to write blobs. /// /// If a `BlobWriter` is dropped, the internal buffer will be -/// discarded. You need to call [`Self::into_inner`] +/// discarded. You need to call [`flush_buffer`](Self::flush_buffer) /// manually before dropping. -pub struct BlobWriter { +pub struct BlobWriter { + inner: VirtualFile, + offset: u64, + /// A buffer to save on write calls, only used if BUFFERED=true + buf: Vec, /// We do tiny writes for the length headers; they need to be in an owned buffer; io_buf: Option, - writer: BufferedWriter, - offset: u64, } -impl BlobWriter { +impl BlobWriter { pub fn new( - file: Arc, + inner: VirtualFile, start_offset: u64, - gate: &utils::sync::gate::Gate, - cancel: CancellationToken, - ctx: &RequestContext, - flush_task_span: tracing::Span, - ) -> anyhow::Result { - Ok(Self { - io_buf: Some(BytesMut::new()), - writer: BufferedWriter::new( - file, - start_offset, - || IoBufferMut::with_capacity(Self::CAPACITY), - gate.enter()?, - cancel, - ctx, - flush_task_span, - ), + _gate: &utils::sync::gate::Gate, + _cancel: CancellationToken, + _ctx: &RequestContext, + ) -> Self { + Self { + inner, offset: start_offset, - }) + buf: Vec::with_capacity(Self::CAPACITY), + io_buf: Some(BytesMut::new()), + } } pub fn size(&self) -> u64 { self.offset } - const CAPACITY: usize = 64 * 1024; + const CAPACITY: usize = if BUFFERED { 64 * 1024 } else { 0 }; - /// Writes `src_buf` to the file at the current offset. + /// Writes the given buffer directly to the underlying `VirtualFile`. + /// You need to make sure that the internal buffer is empty, otherwise + /// data will be written in wrong order. + #[inline(always)] + async fn write_all_unbuffered( + &mut self, + src_buf: FullSlice, + ctx: &RequestContext, + ) -> (FullSlice, Result<(), Error>) { + let (src_buf, res) = self.inner.write_all(src_buf, ctx).await; + let nbytes = match res { + Ok(nbytes) => nbytes, + Err(e) => return (src_buf, Err(e)), + }; + self.offset += nbytes as u64; + (src_buf, Ok(())) + } + + #[inline(always)] + /// Flushes the internal buffer to the underlying `VirtualFile`. + pub async fn flush_buffer(&mut self, ctx: &RequestContext) -> Result<(), Error> { + let buf = std::mem::take(&mut self.buf); + let (slice, res) = self.inner.write_all(buf.slice_len(), ctx).await; + res?; + let mut buf = slice.into_raw_slice().into_inner(); + buf.clear(); + self.buf = buf; + Ok(()) + } + + #[inline(always)] + /// Writes as much of `src_buf` into the internal buffer as it fits + fn write_into_buffer(&mut self, src_buf: &[u8]) -> usize { + let remaining = Self::CAPACITY - self.buf.len(); + let to_copy = src_buf.len().min(remaining); + self.buf.extend_from_slice(&src_buf[..to_copy]); + self.offset += to_copy as u64; + to_copy + } + + /// Internal, possibly buffered, write function async fn write_all( &mut self, src_buf: FullSlice, ctx: &RequestContext, - ) -> (FullSlice, Result<(), FlushTaskError>) { - let res = self - .writer - // TODO: why are we taking a FullSlice if we're going to pass a borrow downstack? - // Can remove all the complexity around owned buffers upstack - .write_buffered_borrowed(&src_buf, ctx) - .await - .map(|len| { - self.offset += len as u64; - }); + ) -> (FullSlice, Result<(), Error>) { + let src_buf = src_buf.into_raw_slice(); + let src_buf_bounds = src_buf.bounds(); + let restore = move |src_buf_slice: Slice<_>| { + FullSlice::must_new(Slice::from_buf_bounds( + src_buf_slice.into_inner(), + src_buf_bounds, + )) + }; - (src_buf, res) + if !BUFFERED { + assert!(self.buf.is_empty()); + return self + .write_all_unbuffered(FullSlice::must_new(src_buf), ctx) + .await; + } + let remaining = Self::CAPACITY - self.buf.len(); + let src_buf_len = src_buf.bytes_init(); + if src_buf_len == 0 { + return (restore(src_buf), Ok(())); + } + let mut src_buf = src_buf.slice(0..src_buf_len); + // First try to copy as much as we can into the buffer + if remaining > 0 { + let copied = self.write_into_buffer(&src_buf); + src_buf = src_buf.slice(copied..); + } + // Then, if the buffer is full, flush it out + if self.buf.len() == Self::CAPACITY { + if let Err(e) = self.flush_buffer(ctx).await { + return (restore(src_buf), Err(e)); + } + } + // Finally, write the tail of src_buf: + // If it wholly fits into the buffer without + // completely filling it, then put it there. + // If not, write it out directly. + let src_buf = if !src_buf.is_empty() { + assert_eq!(self.buf.len(), 0); + if src_buf.len() < Self::CAPACITY { + let copied = self.write_into_buffer(&src_buf); + // We just verified above that src_buf fits into our internal buffer. + assert_eq!(copied, src_buf.len()); + restore(src_buf) + } else { + let (src_buf, res) = self + .write_all_unbuffered(FullSlice::must_new(src_buf), ctx) + .await; + if let Err(e) = res { + return (src_buf, Err(e)); + } + src_buf + } + } else { + restore(src_buf) + }; + (src_buf, Ok(())) } /// Write a blob of data. Returns the offset that it was written to, @@ -232,7 +301,7 @@ impl BlobWriter { &mut self, srcbuf: FullSlice, ctx: &RequestContext, - ) -> (FullSlice, Result) { + ) -> (FullSlice, Result) { let (buf, res) = self .write_blob_maybe_compressed(srcbuf, ctx, ImageCompressionAlgorithm::Disabled) .await; @@ -246,10 +315,7 @@ impl BlobWriter { srcbuf: FullSlice, ctx: &RequestContext, algorithm: ImageCompressionAlgorithm, - ) -> ( - FullSlice, - Result<(u64, CompressionInfo), WriteBlobError>, - ) { + ) -> (FullSlice, Result<(u64, CompressionInfo), Error>) { let offset = self.offset; let mut compression_info = CompressionInfo { written_compressed: false, @@ -265,16 +331,14 @@ impl BlobWriter { if len < 128 { // Short blob. Write a 1-byte length header io_buf.put_u8(len as u8); - let (slice, res) = self.write_all(io_buf.slice_len(), ctx).await; - let res = res.map_err(WriteBlobError::Flush); - ((slice, res), srcbuf) + (self.write_all(io_buf.slice_len(), ctx).await, srcbuf) } else { // Write a 4-byte length header if len > MAX_SUPPORTED_BLOB_LEN { return ( ( io_buf.slice_len(), - Err(WriteBlobError::BlobTooLarge { len }), + Err(Error::other(format!("blob too large ({len} bytes)"))), ), srcbuf, ); @@ -308,9 +372,7 @@ impl BlobWriter { assert_eq!(len_buf[0] & 0xf0, 0); len_buf[0] |= high_bit_mask; io_buf.extend_from_slice(&len_buf[..]); - let (slice, res) = self.write_all(io_buf.slice_len(), ctx).await; - let res = res.map_err(WriteBlobError::Flush); - ((slice, res), srcbuf) + (self.write_all(io_buf.slice_len(), ctx).await, srcbuf) } } .await; @@ -325,23 +387,33 @@ impl BlobWriter { } else { self.write_all(srcbuf, ctx).await }; - let res = res.map_err(WriteBlobError::Flush); (srcbuf, res.map(|_| (offset, compression_info))) } +} +impl BlobWriter { /// Access the underlying `VirtualFile`. /// /// This function flushes the internal buffer before giving access /// to the underlying `VirtualFile`. + pub async fn into_inner(mut self, ctx: &RequestContext) -> Result { + self.flush_buffer(ctx).await?; + Ok(self.inner) + } + + /// Access the underlying `VirtualFile`. /// - /// The caller can use the `handle_tail` function to change the tail of the buffer before flushing it to disk. - /// The buffer will not be flushed to disk if handle_tail returns `None`. - pub async fn into_inner( - self, - handle_tail: impl FnMut(IoBufferMut) -> Option, - ) -> Result { - let (_, file) = self.writer.shutdown(handle_tail).await?; - Ok(file) + /// Unlike [`into_inner`](Self::into_inner), this doesn't flush + /// the internal buffer before giving access. + pub fn into_inner_no_flush(self) -> VirtualFile { + self.inner + } +} + +impl BlobWriter { + /// Access the underlying `VirtualFile`. + pub fn into_inner(self) -> VirtualFile { + self.inner } } @@ -350,22 +422,21 @@ pub(crate) mod tests { use camino::Utf8PathBuf; use camino_tempfile::Utf8TempDir; use rand::{Rng, SeedableRng}; - use tracing::info_span; use super::*; use crate::context::DownloadBehavior; use crate::task_mgr::TaskKind; use crate::tenant::block_io::BlockReaderRef; - async fn round_trip_test(blobs: &[Vec]) -> anyhow::Result<()> { - round_trip_test_compressed(blobs, false).await + async fn round_trip_test(blobs: &[Vec]) -> Result<(), Error> { + round_trip_test_compressed::(blobs, false).await } - pub(crate) async fn write_maybe_compressed( + pub(crate) async fn write_maybe_compressed( blobs: &[Vec], compression: bool, ctx: &RequestContext, - ) -> anyhow::Result<(Utf8TempDir, Utf8PathBuf, Vec)> { + ) -> Result<(Utf8TempDir, Utf8PathBuf, Vec), Error> { let temp_dir = camino_tempfile::tempdir()?; let pathbuf = temp_dir.path().join("file"); let gate = utils::sync::gate::Gate::default(); @@ -374,9 +445,8 @@ pub(crate) mod tests { // Write part (in block to drop the file) let mut offsets = Vec::new(); { - let file = Arc::new(VirtualFile::create_v2(pathbuf.as_path(), ctx).await?); - let mut wtr = - BlobWriter::new(file, 0, &gate, cancel.clone(), ctx, info_span!("test")).unwrap(); + let file = VirtualFile::create(pathbuf.as_path(), ctx).await?; + let mut wtr = BlobWriter::::new(file, 0, &gate, cancel.clone(), ctx); for blob in blobs.iter() { let (_, res) = if compression { let res = wtr @@ -393,37 +463,26 @@ pub(crate) mod tests { let offs = res?; offsets.push(offs); } - wtr.into_inner(|mut buf| { - use crate::virtual_file::owned_buffers_io::write::Buffer; - - let len = buf.pending(); - let cap = buf.cap(); - - // pad zeros to the next io alignment requirement. - // TODO: this is actually padding to next PAGE_SZ multiple, but only if the buffer capacity is larger than that. - // We can't let the fact that we do direct IO, or the buffer capacity, dictate the on-disk format we write here. - // Need to find a better API that allows writing the format we intend to. - let count = len.next_multiple_of(PAGE_SZ).min(cap) - len; - buf.extend_with(0, count); - - Some(buf) - }) - .await?; // TODO: this here is the problem with the tests: we're dropping the tail end + // Write out one page worth of zeros so that we can + // read again with read_blk + let (_, res) = wtr.write_blob(vec![0; PAGE_SZ].slice_len(), ctx).await; + let offs = res?; + println!("Writing final blob at offs={offs}"); + wtr.flush_buffer(ctx).await?; } Ok((temp_dir, pathbuf, offsets)) } - async fn round_trip_test_compressed( + async fn round_trip_test_compressed( blobs: &[Vec], compression: bool, - ) -> anyhow::Result<()> { + ) -> Result<(), Error> { let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error).with_scope_unit_test(); let (_temp_dir, pathbuf, offsets) = - write_maybe_compressed(blobs, compression, &ctx).await?; + write_maybe_compressed::(blobs, compression, &ctx).await?; - println!("Done writing!"); - let file = VirtualFile::open_v2(pathbuf, &ctx).await?; + let file = VirtualFile::open(pathbuf, &ctx).await?; let rdr = BlockReaderRef::VirtualFile(&file); let rdr = BlockCursor::new_with_compression(rdr, compression); for (idx, (blob, offset)) in blobs.iter().zip(offsets.iter()).enumerate() { @@ -442,27 +501,30 @@ pub(crate) mod tests { } #[tokio::test] - async fn test_one() -> anyhow::Result<()> { + async fn test_one() -> Result<(), Error> { let blobs = &[vec![12, 21, 22]]; - round_trip_test(blobs).await?; + round_trip_test::(blobs).await?; + round_trip_test::(blobs).await?; Ok(()) } #[tokio::test] - async fn test_hello_simple() -> anyhow::Result<()> { + async fn test_hello_simple() -> Result<(), Error> { let blobs = &[ vec![0, 1, 2, 3], b"Hello, World!".to_vec(), Vec::new(), b"foobar".to_vec(), ]; - round_trip_test(blobs).await?; - round_trip_test_compressed(blobs, true).await?; + round_trip_test::(blobs).await?; + round_trip_test::(blobs).await?; + round_trip_test_compressed::(blobs, true).await?; + round_trip_test_compressed::(blobs, true).await?; Ok(()) } #[tokio::test] - async fn test_really_big_array() -> anyhow::Result<()> { + async fn test_really_big_array() -> Result<(), Error> { let blobs = &[ b"test".to_vec(), random_array(10 * PAGE_SZ), @@ -471,22 +533,25 @@ pub(crate) mod tests { vec![0xf3; 24 * PAGE_SZ], b"foobar".to_vec(), ]; - round_trip_test(blobs).await?; - round_trip_test_compressed(blobs, true).await?; + round_trip_test::(blobs).await?; + round_trip_test::(blobs).await?; + round_trip_test_compressed::(blobs, true).await?; + round_trip_test_compressed::(blobs, true).await?; Ok(()) } #[tokio::test] - async fn test_arrays_inc() -> anyhow::Result<()> { + async fn test_arrays_inc() -> Result<(), Error> { let blobs = (0..PAGE_SZ / 8) .map(|v| random_array(v * 16)) .collect::>(); - round_trip_test(&blobs).await?; + round_trip_test::(&blobs).await?; + round_trip_test::(&blobs).await?; Ok(()) } #[tokio::test] - async fn test_arrays_random_size() -> anyhow::Result<()> { + async fn test_arrays_random_size() -> Result<(), Error> { let mut rng = rand::rngs::StdRng::seed_from_u64(42); let blobs = (0..1024) .map(|_| { @@ -498,18 +563,20 @@ pub(crate) mod tests { random_array(sz.into()) }) .collect::>(); - round_trip_test(&blobs).await?; + round_trip_test::(&blobs).await?; + round_trip_test::(&blobs).await?; Ok(()) } #[tokio::test] - async fn test_arrays_page_boundary() -> anyhow::Result<()> { + async fn test_arrays_page_boundary() -> Result<(), Error> { let blobs = &[ random_array(PAGE_SZ - 4), random_array(PAGE_SZ - 4), random_array(PAGE_SZ - 4), ]; - round_trip_test(blobs).await?; + round_trip_test::(blobs).await?; + round_trip_test::(blobs).await?; Ok(()) } } diff --git a/pageserver/src/tenant/block_io.rs b/pageserver/src/tenant/block_io.rs index 686cc94126..6723155626 100644 --- a/pageserver/src/tenant/block_io.rs +++ b/pageserver/src/tenant/block_io.rs @@ -4,12 +4,14 @@ use std::ops::Deref; +use bytes::Bytes; + use super::storage_layer::delta_layer::{Adapter, DeltaLayerInner}; use crate::context::RequestContext; use crate::page_cache::{self, FileId, PAGE_SZ, PageReadGuard, PageWriteGuard, ReadBufResult}; #[cfg(test)] use crate::virtual_file::IoBufferMut; -use crate::virtual_file::{IoBuffer, VirtualFile}; +use crate::virtual_file::VirtualFile; /// This is implemented by anything that can read 8 kB (PAGE_SZ) /// blocks, using the page cache @@ -245,17 +247,17 @@ pub trait BlockWriter { /// 'buf' must be of size PAGE_SZ. Returns the block number the page was /// written to. /// - fn write_blk(&mut self, buf: IoBuffer) -> Result; + fn write_blk(&mut self, buf: Bytes) -> Result; } /// /// A simple in-memory buffer of blocks. /// pub struct BlockBuf { - pub blocks: Vec, + pub blocks: Vec, } impl BlockWriter for BlockBuf { - fn write_blk(&mut self, buf: IoBuffer) -> Result { + fn write_blk(&mut self, buf: Bytes) -> Result { assert!(buf.len() == PAGE_SZ); let blknum = self.blocks.len(); self.blocks.push(buf); diff --git a/pageserver/src/tenant/disk_btree.rs b/pageserver/src/tenant/disk_btree.rs index 419befa41b..1791e5996c 100644 --- a/pageserver/src/tenant/disk_btree.rs +++ b/pageserver/src/tenant/disk_btree.rs @@ -25,7 +25,7 @@ use std::{io, result}; use async_stream::try_stream; use byteorder::{BE, ReadBytesExt}; -use bytes::BufMut; +use bytes::{BufMut, Bytes, BytesMut}; use either::Either; use futures::{Stream, StreamExt}; use hex; @@ -34,7 +34,6 @@ use tracing::error; use crate::context::RequestContext; use crate::tenant::block_io::{BlockReader, BlockWriter}; -use crate::virtual_file::{IoBuffer, IoBufferMut, owned_buffers_io::write::Buffer}; // The maximum size of a value stored in the B-tree. 5 bytes is enough currently. pub const VALUE_SZ: usize = 5; @@ -788,12 +787,12 @@ impl BuildNode { /// /// Serialize the node to on-disk format. /// - fn pack(&self) -> IoBuffer { + fn pack(&self) -> Bytes { assert!(self.keys.len() == self.num_children as usize * self.suffix_len); assert!(self.values.len() == self.num_children as usize * VALUE_SZ); assert!(self.num_children > 0); - let mut buf = IoBufferMut::with_capacity(PAGE_SZ); + let mut buf = BytesMut::new(); buf.put_u16(self.num_children); buf.put_u8(self.level); @@ -806,7 +805,7 @@ impl BuildNode { assert!(buf.len() == self.size); assert!(buf.len() <= PAGE_SZ); - buf.extend_with(0, PAGE_SZ - buf.len()); + buf.resize(PAGE_SZ, 0); buf.freeze() } @@ -840,7 +839,7 @@ pub(crate) mod tests { #[derive(Clone, Default)] pub(crate) struct TestDisk { - blocks: Vec, + blocks: Vec, } impl TestDisk { fn new() -> Self { @@ -858,7 +857,7 @@ pub(crate) mod tests { } } impl BlockWriter for &mut TestDisk { - fn write_blk(&mut self, buf: IoBuffer) -> io::Result { + fn write_blk(&mut self, buf: Bytes) -> io::Result { let blknum = self.blocks.len(); self.blocks.push(buf); Ok(blknum as u32) diff --git a/pageserver/src/tenant/ephemeral_file.rs b/pageserver/src/tenant/ephemeral_file.rs index 19b5165d0f..19215bb918 100644 --- a/pageserver/src/tenant/ephemeral_file.rs +++ b/pageserver/src/tenant/ephemeral_file.rs @@ -75,7 +75,6 @@ impl EphemeralFile { bytes_written: 0, buffered_writer: owned_buffers_io::write::BufferedWriter::new( file, - 0, || IoBufferMut::with_capacity(TAIL_SZ), gate.enter()?, cancel.child_token(), diff --git a/pageserver/src/tenant/remote_timeline_client/download.rs b/pageserver/src/tenant/remote_timeline_client/download.rs index 32fce8cae3..8b399996d5 100644 --- a/pageserver/src/tenant/remote_timeline_client/download.rs +++ b/pageserver/src/tenant/remote_timeline_client/download.rs @@ -32,14 +32,12 @@ use super::{ remote_tenant_manifest_prefix, remote_tenant_path, }; use crate::TEMP_FILE_SUFFIX; -use crate::assert_u64_eq_usize::UsizeIsU64; use crate::config::PageServerConf; use crate::context::RequestContext; use crate::span::{ debug_assert_current_span_has_tenant_and_timeline_id, debug_assert_current_span_has_tenant_id, }; use crate::tenant::Generation; -use crate::tenant::disk_btree::PAGE_SZ; use crate::tenant::remote_timeline_client::{remote_layer_path, remote_timelines_path}; use crate::tenant::storage_layer::LayerName; use crate::virtual_file::{MaybeFatalIo, VirtualFile, on_fatal_io_error}; @@ -229,7 +227,6 @@ async fn download_object( let mut buffered = owned_buffers_io::write::BufferedWriter::::new( destination_file, - 0, || IoBufferMut::with_capacity(super::BUFFER_SIZE), gate.enter().map_err(|_| DownloadError::Cancelled)?, cancel.child_token(), @@ -254,41 +251,13 @@ async fn download_object( FlushTaskError::Cancelled => DownloadError::Cancelled, })?; } - let mut pad_amount = None; - let (bytes_amount, destination_file) = buffered - .shutdown(|mut buf| { - use crate::virtual_file::owned_buffers_io::write::Buffer; - - let len = buf.pending(); - let cap = buf.cap(); - - // pad zeros to the next io alignment requirement. - // TODO: this is actually padding to next PAGE_SZ multiple, but only if the buffer capacity is larger than that. - // We can't let the fact that we do direct IO, or the buffer capacity, dictate the on-disk format we write here. - // Need to find a better API that allows writing the format we intend to. - let count = len.next_multiple_of(PAGE_SZ).min(cap) - len; - pad_amount = Some(count); - buf.extend_with(0, count); - - Some(buf) - }) + let inner = buffered + .flush_and_into_inner(ctx) .await .map_err(|e| match e { FlushTaskError::Cancelled => DownloadError::Cancelled, })?; - - let pad_amount = pad_amount.expect("shutdown always invokes the closure").into_u64(); - let set_len_arg = bytes_amount - pad_amount; - destination_file - .set_len(set_len_arg) - .await - .maybe_fatal_err("download_object set_len") - .with_context(|| { - format!("set len for file at {dst_path}: 0x{set_len_arg:x} = 0x{bytes_amount:x} - 0x{pad_amount:x}") - }) - .map_err(DownloadError::Other)?; - - Ok((set_len_arg, destination_file)) + Ok(inner) } .await?; diff --git a/pageserver/src/tenant/secondary/downloader.rs b/pageserver/src/tenant/secondary/downloader.rs index f653dbe84c..60cf7ac79e 100644 --- a/pageserver/src/tenant/secondary/downloader.rs +++ b/pageserver/src/tenant/secondary/downloader.rs @@ -1521,11 +1521,12 @@ async fn load_heatmap( path: &Utf8PathBuf, ctx: &RequestContext, ) -> Result, anyhow::Error> { - let st = match VirtualFile::read_to_string(path, ctx).await { + let mut file = match VirtualFile::open(path, ctx).await { Ok(file) => file, Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(None), Err(e) => Err(e)?, }; + let st = file.read_to_string(ctx).await?; let htm = serde_json::from_str(&st)?; Ok(Some(htm)) } diff --git a/pageserver/src/tenant/storage_layer/delta_layer.rs b/pageserver/src/tenant/storage_layer/delta_layer.rs index 33f5ebfd7b..693d61d8fd 100644 --- a/pageserver/src/tenant/storage_layer/delta_layer.rs +++ b/pageserver/src/tenant/storage_layer/delta_layer.rs @@ -29,6 +29,7 @@ //! use std::collections::{HashMap, VecDeque}; use std::fs::File; +use std::io::SeekFrom; use std::ops::Range; use std::os::unix::fs::FileExt; use std::str::FromStr; @@ -51,7 +52,6 @@ use tokio_epoll_uring::IoBuf; use tokio_util::sync::CancellationToken; use tracing::*; use utils::bin_ser::BeSer; -use utils::bin_ser::SerializeError; use utils::id::{TenantId, TimelineId}; use utils::lsn::Lsn; @@ -74,8 +74,7 @@ use crate::tenant::vectored_blob_io::{ VectoredReadPlanner, }; use crate::virtual_file::owned_buffers_io::io_buf_ext::{FullSlice, IoBufExt}; -use crate::virtual_file::owned_buffers_io::write::Buffer; -use crate::virtual_file::{self, IoBuffer, IoBufferMut, MaybeFatalIo, VirtualFile}; +use crate::virtual_file::{self, IoBufferMut, MaybeFatalIo, VirtualFile}; use crate::{DELTA_FILE_MAGIC, STORAGE_FORMAT_VERSION, TEMP_FILE_SUFFIX}; /// @@ -113,15 +112,6 @@ impl From<&DeltaLayer> for Summary { } impl Summary { - /// Serializes the summary header into an aligned buffer of lenth `PAGE_SZ`. - pub fn ser_into_page(&self) -> Result { - let mut buf = IoBufferMut::with_capacity(PAGE_SZ); - Self::ser_into(self, &mut buf)?; - // Pad zeroes to the buffer so the length is a multiple of the alignment. - buf.extend_with(0, buf.capacity() - buf.len()); - Ok(buf.freeze()) - } - pub(super) fn expected( tenant_id: TenantId, timeline_id: TimelineId, @@ -402,12 +392,10 @@ struct DeltaLayerWriterInner { tree: DiskBtreeBuilder, - blob_writer: BlobWriter, + blob_writer: BlobWriter, // Number of key-lsns in the layer. num_keys: usize, - - _gate_guard: utils::sync::gate::GateGuard, } impl DeltaLayerWriterInner { @@ -434,17 +422,10 @@ impl DeltaLayerWriterInner { let path = DeltaLayer::temp_path_for(conf, &tenant_shard_id, &timeline_id, key_start, &lsn_range); - let file = Arc::new(VirtualFile::create_v2(&path, ctx).await?); - - // Start at PAGE_SZ, make room for the header block - let blob_writer = BlobWriter::new( - file, - PAGE_SZ as u64, - gate, - cancel, - ctx, - info_span!(parent: None, "delta_layer_writer_flush_task", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), timeline_id=%timeline_id, path = %path), - )?; + let mut file = VirtualFile::create(&path, ctx).await?; + // make room for the header block + file.seek(SeekFrom::Start(PAGE_SZ as u64)).await?; + let blob_writer = BlobWriter::new(file, PAGE_SZ as u64, gate, cancel, ctx); // Initialize the b-tree index builder let block_buf = BlockBuf::new(); @@ -459,7 +440,6 @@ impl DeltaLayerWriterInner { tree: tree_builder, blob_writer, num_keys: 0, - _gate_guard: gate.enter()?, }) } @@ -555,33 +535,15 @@ impl DeltaLayerWriterInner { ) -> anyhow::Result<(PersistentLayerDesc, Utf8PathBuf)> { let index_start_blk = self.blob_writer.size().div_ceil(PAGE_SZ as u64) as u32; - let file = self - .blob_writer - .into_inner(|mut buf| { - let len = buf.pending(); - let cap = buf.cap(); - - // pad zeros to the next io alignment requirement. - // TODO: this is actually padding to next PAGE_SZ multiple, but only if the buffer capacity is larger than that. - // We can't let the fact that we do direct IO, or the buffer capacity, dictate the on-disk format we write here. - // Need to find a better API that allows writing the format we intend to. - let count = len.next_multiple_of(PAGE_SZ).min(cap) - len; - buf.extend_with(0, count); - - Some(buf) - }) - .await?; + let mut file = self.blob_writer.into_inner(ctx).await?; // Write out the index let (index_root_blk, block_buf) = self.tree.finish()?; - let mut offset = index_start_blk as u64 * PAGE_SZ as u64; - - // TODO(yuchen): https://github.com/neondatabase/neon/issues/10092 - // Should we just replace BlockBuf::blocks with one big buffer + file.seek(SeekFrom::Start(index_start_blk as u64 * PAGE_SZ as u64)) + .await?; for buf in block_buf.blocks { - let (_buf, res) = file.write_all_at(buf.slice_len(), offset, ctx).await; + let (_buf, res) = file.write_all(buf.slice_len(), ctx).await; res?; - offset += PAGE_SZ as u64; } assert!(self.lsn_range.start < self.lsn_range.end); // Fill in the summary on blk 0 @@ -596,9 +558,11 @@ impl DeltaLayerWriterInner { index_root_blk, }; - // Writes summary at the first block (offset 0). - let buf = summary.ser_into_page()?; - let (_buf, res) = file.write_all_at(buf.slice_len(), 0, ctx).await; + let mut buf = Vec::with_capacity(PAGE_SZ); + // TODO: could use smallvec here but it's a pain with Slice + Summary::ser_into(&summary, &mut buf)?; + file.seek(SeekFrom::Start(0)).await?; + let (_buf, res) = file.write_all(buf.slice_len(), ctx).await; res?; let metadata = file @@ -765,33 +729,12 @@ impl DeltaLayerWriter { impl Drop for DeltaLayerWriter { fn drop(&mut self) { - let Some(inner) = self.inner.take() else { - return; - }; - - tokio::spawn(async move { - let DeltaLayerWriterInner { - blob_writer, - _gate_guard, - .. - } = inner; - - let vfile = match blob_writer.into_inner(|_| None).await { - Ok(vfile) => vfile, - Err(e) => { - error!(err=%e, "failed to remove delta layer writer file"); - drop(_gate_guard); - return; - } - }; - - if let Err(e) = std::fs::remove_file(vfile.path()) - .maybe_fatal_err("failed to remove the virtual file") - { - error!(err=%e, path=%vfile.path(), "failed to remove delta layer writer file"); - } - drop(_gate_guard); - }); + if let Some(inner) = self.inner.take() { + // We want to remove the virtual file here, so it's fine to not + // having completely flushed unwritten data. + let vfile = inner.blob_writer.into_inner_no_flush(); + vfile.remove(); + } } } @@ -818,7 +761,7 @@ impl DeltaLayer { where F: Fn(Summary) -> Summary, { - let file = VirtualFile::open_with_options_v2( + let mut file = VirtualFile::open_with_options( path, virtual_file::OpenOptions::new().read(true).write(true), ctx, @@ -835,8 +778,11 @@ impl DeltaLayer { let new_summary = rewrite(actual_summary); - let buf = new_summary.ser_into_page().context("serialize")?; - let (_buf, res) = file.write_all_at(buf.slice_len(), 0, ctx).await; + let mut buf = Vec::with_capacity(PAGE_SZ); + // TODO: could use smallvec here, but it's a pain with Slice + Summary::ser_into(&new_summary, &mut buf).context("serialize")?; + file.seek(SeekFrom::Start(0)).await?; + let (_buf, res) = file.write_all(buf.slice_len(), ctx).await; res?; Ok(()) } diff --git a/pageserver/src/tenant/storage_layer/image_layer.rs b/pageserver/src/tenant/storage_layer/image_layer.rs index 490b1ce664..7566e8c9fe 100644 --- a/pageserver/src/tenant/storage_layer/image_layer.rs +++ b/pageserver/src/tenant/storage_layer/image_layer.rs @@ -27,6 +27,7 @@ //! actual page images are stored in the "values" part. use std::collections::{HashMap, VecDeque}; use std::fs::File; +use std::io::SeekFrom; use std::ops::Range; use std::os::unix::prelude::FileExt; use std::str::FromStr; @@ -49,7 +50,6 @@ use tokio_stream::StreamExt; use tokio_util::sync::CancellationToken; use tracing::*; use utils::bin_ser::BeSer; -use utils::bin_ser::SerializeError; use utils::id::{TenantId, TimelineId}; use utils::lsn::Lsn; @@ -72,8 +72,7 @@ use crate::tenant::vectored_blob_io::{ VectoredReadPlanner, }; use crate::virtual_file::owned_buffers_io::io_buf_ext::IoBufExt; -use crate::virtual_file::owned_buffers_io::write::Buffer; -use crate::virtual_file::{self, IoBuffer, IoBufferMut, MaybeFatalIo, VirtualFile}; +use crate::virtual_file::{self, IoBufferMut, MaybeFatalIo, VirtualFile}; use crate::{IMAGE_FILE_MAGIC, STORAGE_FORMAT_VERSION, TEMP_FILE_SUFFIX}; /// @@ -112,15 +111,6 @@ impl From<&ImageLayer> for Summary { } impl Summary { - /// Serializes the summary header into an aligned buffer of lenth `PAGE_SZ`. - pub fn ser_into_page(&self) -> Result { - let mut buf = IoBufferMut::with_capacity(PAGE_SZ); - Self::ser_into(self, &mut buf)?; - // Pad zeroes to the buffer so the length is a multiple of the alignment. - buf.extend_with(0, buf.capacity() - buf.len()); - Ok(buf.freeze()) - } - pub(super) fn expected( tenant_id: TenantId, timeline_id: TimelineId, @@ -363,7 +353,7 @@ impl ImageLayer { where F: Fn(Summary) -> Summary, { - let file = VirtualFile::open_with_options_v2( + let mut file = VirtualFile::open_with_options( path, virtual_file::OpenOptions::new().read(true).write(true), ctx, @@ -380,8 +370,11 @@ impl ImageLayer { let new_summary = rewrite(actual_summary); - let buf = new_summary.ser_into_page().context("serialize")?; - let (_buf, res) = file.write_all_at(buf.slice_len(), 0, ctx).await; + let mut buf = Vec::with_capacity(PAGE_SZ); + // TODO: could use smallvec here but it's a pain with Slice + Summary::ser_into(&new_summary, &mut buf).context("serialize")?; + file.seek(SeekFrom::Start(0)).await?; + let (_buf, res) = file.write_all(buf.slice_len(), ctx).await; res?; Ok(()) } @@ -749,11 +742,9 @@ struct ImageLayerWriterInner { // Number of keys in the layer. num_keys: usize, - blob_writer: BlobWriter, + blob_writer: BlobWriter, tree: DiskBtreeBuilder, - _gate_guard: utils::sync::gate::GateGuard, - #[cfg(feature = "testing")] last_written_key: Key, } @@ -785,28 +776,19 @@ impl ImageLayerWriterInner { }, ); trace!("creating image layer {}", path); - let file = { - Arc::new( - VirtualFile::open_with_options_v2( - &path, - virtual_file::OpenOptions::new() - .write(true) - .create_new(true), - ctx, - ) - .await?, + let mut file = { + VirtualFile::open_with_options( + &path, + virtual_file::OpenOptions::new() + .write(true) + .create_new(true), + ctx, ) + .await? }; - - // Start at `PAGE_SZ` to make room for the header block. - let blob_writer = BlobWriter::new( - file, - PAGE_SZ as u64, - gate, - cancel, - ctx, - info_span!(parent: None, "image_layer_writer_flush_task", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), timeline_id=%timeline_id, path = %path), - )?; + // make room for the header block + file.seek(SeekFrom::Start(PAGE_SZ as u64)).await?; + let blob_writer = BlobWriter::new(file, PAGE_SZ as u64, gate, cancel, ctx); // Initialize the b-tree index builder let block_buf = BlockBuf::new(); @@ -827,8 +809,6 @@ impl ImageLayerWriterInner { num_keys: 0, #[cfg(feature = "testing")] last_written_key: Key::MIN, - - _gate_guard: gate.enter()?, }; Ok(writer) @@ -914,30 +894,15 @@ impl ImageLayerWriterInner { crate::metrics::COMPRESSION_IMAGE_INPUT_BYTES_CHOSEN.inc_by(self.uncompressed_bytes_chosen); crate::metrics::COMPRESSION_IMAGE_OUTPUT_BYTES.inc_by(compressed_size); - let file = self - .blob_writer - .into_inner(|mut buf| { - let len = buf.pending(); - let cap = buf.cap(); - - // pad zeros to the next io alignment requirement. - let count = len.next_multiple_of(PAGE_SZ).min(cap) - len; - buf.extend_with(0, count); - - Some(buf) - }) - .await?; + let mut file = self.blob_writer.into_inner(); // Write out the index - let mut offset = index_start_blk as u64 * PAGE_SZ as u64; + file.seek(SeekFrom::Start(index_start_blk as u64 * PAGE_SZ as u64)) + .await?; let (index_root_blk, block_buf) = self.tree.finish()?; - - // TODO(yuchen): https://github.com/neondatabase/neon/issues/10092 - // Should we just replace BlockBuf::blocks with one big buffer? for buf in block_buf.blocks { - let (_buf, res) = file.write_all_at(buf.slice_len(), offset, ctx).await; + let (_buf, res) = file.write_all(buf.slice_len(), ctx).await; res?; - offset += PAGE_SZ as u64; } let final_key_range = if let Some(end_key) = end_key { @@ -958,9 +923,11 @@ impl ImageLayerWriterInner { index_root_blk, }; - // Writes summary at the first block (offset 0). - let buf = summary.ser_into_page()?; - let (_buf, res) = file.write_all_at(buf.slice_len(), 0, ctx).await; + let mut buf = Vec::with_capacity(PAGE_SZ); + // TODO: could use smallvec here but it's a pain with Slice + Summary::ser_into(&summary, &mut buf)?; + file.seek(SeekFrom::Start(0)).await?; + let (_buf, res) = file.write_all(buf.slice_len(), ctx).await; res?; let metadata = file @@ -1103,33 +1070,9 @@ impl ImageLayerWriter { impl Drop for ImageLayerWriter { fn drop(&mut self) { - let Some(inner) = self.inner.take() else { - return; - }; - - tokio::spawn(async move { - let ImageLayerWriterInner { - blob_writer, - _gate_guard, - .. - } = inner; - - let vfile = match blob_writer.into_inner(|_| None).await { - Ok(vfile) => vfile, - Err(e) => { - error!(err=%e, "failed to remove image layer writer file"); - drop(_gate_guard); - return; - } - }; - - if let Err(e) = std::fs::remove_file(vfile.path()) - .maybe_fatal_err("failed to remove the virtual file") - { - error!(err=%e, path=%vfile.path(), "failed to remove image layer writer file"); - } - drop(_gate_guard); - }); + if let Some(inner) = self.inner.take() { + inner.blob_writer.into_inner().remove(); + } } } diff --git a/pageserver/src/tenant/vectored_blob_io.rs b/pageserver/src/tenant/vectored_blob_io.rs index 8b30e70e41..166917d674 100644 --- a/pageserver/src/tenant/vectored_blob_io.rs +++ b/pageserver/src/tenant/vectored_blob_io.rs @@ -677,6 +677,7 @@ impl StreamingVectoredReadPlanner { #[cfg(test)] mod tests { + use anyhow::Error; use super::super::blob_io::tests::{random_array, write_maybe_compressed}; use super::*; @@ -959,16 +960,13 @@ mod tests { } } - async fn round_trip_test_compressed( - blobs: &[Vec], - compression: bool, - ) -> anyhow::Result<()> { + async fn round_trip_test_compressed(blobs: &[Vec], compression: bool) -> Result<(), Error> { let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error).with_scope_unit_test(); let (_temp_dir, pathbuf, offsets) = - write_maybe_compressed(blobs, compression, &ctx).await?; + write_maybe_compressed::(blobs, compression, &ctx).await?; - let file = VirtualFile::open_v2(&pathbuf, &ctx).await?; + let file = VirtualFile::open(&pathbuf, &ctx).await?; let file_len = std::fs::metadata(&pathbuf)?.len(); // Multiply by two (compressed data might need more space), and add a few bytes for the header @@ -1005,7 +1003,7 @@ mod tests { } #[tokio::test] - async fn test_really_big_array() -> anyhow::Result<()> { + async fn test_really_big_array() -> Result<(), Error> { let blobs = &[ b"test".to_vec(), random_array(10 * PAGE_SZ), @@ -1020,7 +1018,7 @@ mod tests { } #[tokio::test] - async fn test_arrays_inc() -> anyhow::Result<()> { + async fn test_arrays_inc() -> Result<(), Error> { let blobs = (0..PAGE_SZ / 8) .map(|v| random_array(v * 16)) .collect::>(); diff --git a/pageserver/src/virtual_file.rs b/pageserver/src/virtual_file.rs index 63c1ccd4c8..cd3d897423 100644 --- a/pageserver/src/virtual_file.rs +++ b/pageserver/src/virtual_file.rs @@ -12,7 +12,7 @@ //! src/backend/storage/file/fd.c //! use std::fs::File; -use std::io::{Error, ErrorKind}; +use std::io::{Error, ErrorKind, Seek, SeekFrom}; use std::os::fd::{AsRawFd, FromRawFd, IntoRawFd, OwnedFd, RawFd}; #[cfg(target_os = "linux")] use std::os::unix::fs::OpenOptionsExt; @@ -185,14 +185,18 @@ impl VirtualFile { self.inner.sync_data().await } - pub async fn set_len(&self, len: u64) -> Result<(), Error> { - self.inner.set_len(len).await - } - pub async fn metadata(&self) -> Result { self.inner.metadata().await } + pub fn remove(self) { + self.inner.remove(); + } + + pub async fn seek(&mut self, pos: SeekFrom) -> Result { + self.inner.seek(pos).await + } + pub async fn read_exact_at( &self, slice: Slice, @@ -223,31 +227,25 @@ impl VirtualFile { self.inner.write_all_at(buf, offset, ctx).await } - pub(crate) async fn read_to_string>( - path: P, + pub async fn write_all( + &mut self, + buf: FullSlice, ctx: &RequestContext, - ) -> std::io::Result { - let file = VirtualFile::open(path, ctx).await?; // TODO: open_v2 + ) -> (FullSlice, Result) { + self.inner.write_all(buf, ctx).await + } + + async fn read_to_end(&mut self, buf: &mut Vec, ctx: &RequestContext) -> Result<(), Error> { + self.inner.read_to_end(buf, ctx).await + } + + pub(crate) async fn read_to_string( + &mut self, + ctx: &RequestContext, + ) -> Result { let mut buf = Vec::new(); - let mut tmp = vec![0; 128]; - let mut pos: u64 = 0; - loop { - let slice = tmp.slice(..128); - let (slice, res) = file.inner.read_at(slice, pos, ctx).await; - match res { - Ok(0) => break, - Ok(n) => { - pos += n as u64; - buf.extend_from_slice(&slice[..n]); - } - Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => {} - Err(e) => return Err(e), - } - tmp = slice.into_inner(); - } - String::from_utf8(buf).map_err(|_| { - std::io::Error::new(ErrorKind::InvalidData, "file contents are not valid UTF-8") - }) + self.read_to_end(&mut buf, ctx).await?; + Ok(String::from_utf8(buf)?) } } @@ -294,6 +292,9 @@ pub struct VirtualFileInner { /// belongs to a different VirtualFile. handle: RwLock, + /// Current file position + pos: u64, + /// File path and options to use to open it. /// /// Note: this only contains the options needed to re-open it. For example, @@ -607,6 +608,7 @@ impl VirtualFileInner { let vfile = VirtualFileInner { handle: RwLock::new(handle), + pos: 0, path: path.to_owned(), open_options: reopen_options, }; @@ -673,13 +675,6 @@ impl VirtualFileInner { }) } - pub async fn set_len(&self, len: u64) -> Result<(), Error> { - with_file!(self, StorageIoOperation::SetLen, |file_guard| { - let (_file_guard, res) = io_engine::get().set_len(file_guard, len).await; - res.maybe_fatal_err("set_len") - }) - } - /// Helper function internal to `VirtualFile` that looks up the underlying File, /// opens it and evicts some other File if necessary. The passed parameter is /// assumed to be a function available for the physical `File`. @@ -747,6 +742,38 @@ impl VirtualFileInner { }) } + pub fn remove(self) { + let path = self.path.clone(); + drop(self); + std::fs::remove_file(path).expect("failed to remove the virtual file"); + } + + pub async fn seek(&mut self, pos: SeekFrom) -> Result { + match pos { + SeekFrom::Start(offset) => { + self.pos = offset; + } + SeekFrom::End(offset) => { + self.pos = with_file!(self, StorageIoOperation::Seek, |mut file_guard| file_guard + .with_std_file_mut(|std_file| std_file.seek(SeekFrom::End(offset))))? + } + SeekFrom::Current(offset) => { + let pos = self.pos as i128 + offset as i128; + if pos < 0 { + return Err(Error::new( + ErrorKind::InvalidInput, + "offset would be negative", + )); + } + if pos > u64::MAX as i128 { + return Err(Error::new(ErrorKind::InvalidInput, "offset overflow")); + } + self.pos = pos as u64; + } + } + Ok(self.pos) + } + /// Read the file contents in range `offset..(offset + slice.bytes_total())` into `slice[0..slice.bytes_total()]`. /// /// The returned `Slice` is equivalent to the input `slice`, i.e., it's the same view into the same buffer. @@ -830,7 +857,59 @@ impl VirtualFileInner { (restore(buf), Ok(())) } - pub(super) async fn read_at( + /// Writes `buf` to the file at the current offset. + /// + /// Panics if there is an uninitialized range in `buf`, as that is most likely a bug in the caller. + pub async fn write_all( + &mut self, + buf: FullSlice, + ctx: &RequestContext, + ) -> (FullSlice, Result) { + let buf = buf.into_raw_slice(); + let bounds = buf.bounds(); + let restore = + |buf: Slice<_>| FullSlice::must_new(Slice::from_buf_bounds(buf.into_inner(), bounds)); + let nbytes = buf.len(); + let mut buf = buf; + while !buf.is_empty() { + let (tmp, res) = self.write(FullSlice::must_new(buf), ctx).await; + buf = tmp.into_raw_slice(); + match res { + Ok(0) => { + return ( + restore(buf), + Err(Error::new( + std::io::ErrorKind::WriteZero, + "failed to write whole buffer", + )), + ); + } + Ok(n) => { + buf = buf.slice(n..); + } + Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => {} + Err(e) => return (restore(buf), Err(e)), + } + } + (restore(buf), Ok(nbytes)) + } + + async fn write( + &mut self, + buf: FullSlice, + ctx: &RequestContext, + ) -> (FullSlice, Result) { + let pos = self.pos; + let (buf, res) = self.write_at(buf, pos, ctx).await; + let n = match res { + Ok(n) => n, + Err(e) => return (buf, Err(e)), + }; + self.pos += n as u64; + (buf, Ok(n)) + } + + pub(crate) async fn read_at( &self, buf: tokio_epoll_uring::Slice, offset: u64, @@ -858,11 +937,23 @@ impl VirtualFileInner { }) } + /// The function aborts the process if the error is fatal. async fn write_at( &self, buf: FullSlice, offset: u64, ctx: &RequestContext, + ) -> (FullSlice, Result) { + let (slice, result) = self.write_at_inner(buf, offset, ctx).await; + let result = result.maybe_fatal_err("write_at"); + (slice, result) + } + + async fn write_at_inner( + &self, + buf: FullSlice, + offset: u64, + ctx: &RequestContext, ) -> (FullSlice, Result) { let file_guard = match self.lock_file().await { Ok(file_guard) => file_guard, @@ -871,13 +962,30 @@ impl VirtualFileInner { observe_duration!(StorageIoOperation::Write, { let ((_file_guard, buf), result) = io_engine::get().write_at(file_guard, offset, buf).await; - let result = result.maybe_fatal_err("write_at"); if let Ok(size) = result { ctx.io_size_metrics().write.add(size.into_u64()); } (buf, result) }) } + + async fn read_to_end(&mut self, buf: &mut Vec, ctx: &RequestContext) -> Result<(), Error> { + let mut tmp = vec![0; 128]; + loop { + let slice = tmp.slice(..128); + let (slice, res) = self.read_at(slice, self.pos, ctx).await; + match res { + Ok(0) => return Ok(()), + Ok(n) => { + self.pos += n as u64; + buf.extend_from_slice(&slice[..n]); + } + Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => {} + Err(e) => return Err(e), + } + tmp = slice.into_inner(); + } + } } // Adapted from https://doc.rust-lang.org/1.72.0/src/std/os/unix/fs.rs.html#117-135 @@ -1092,6 +1200,19 @@ impl FileGuard { let _ = file.into_raw_fd(); res } + /// Soft deprecation: we'll move VirtualFile to async APIs and remove this function eventually. + fn with_std_file_mut(&mut self, with: F) -> R + where + F: FnOnce(&mut File) -> R, + { + // SAFETY: + // - lifetime of the fd: `file` doesn't outlive the OwnedFd stored in `self`. + // - &mut usage below: `self` is `&mut`, hence this call is the only task/thread that has control over the underlying fd + let mut file = unsafe { File::from_raw_fd(self.as_ref().as_raw_fd()) }; + let res = with(&mut file); + let _ = file.into_raw_fd(); + res + } } impl tokio_epoll_uring::IoFd for FileGuard { @@ -1259,6 +1380,7 @@ static SYNC_MODE: AtomicU8 = AtomicU8::new(SyncMode::Sync as u8); #[cfg(test)] mod tests { + use std::io::Write; use std::os::unix::fs::FileExt; use std::sync::Arc; @@ -1311,6 +1433,43 @@ mod tests { MaybeVirtualFile::File(file) => file.write_all_at(&buf[..], offset), } } + async fn seek(&mut self, pos: SeekFrom) -> Result { + match self { + MaybeVirtualFile::VirtualFile(file) => file.seek(pos).await, + MaybeVirtualFile::File(file) => file.seek(pos), + } + } + async fn write_all( + &mut self, + buf: FullSlice, + ctx: &RequestContext, + ) -> Result<(), Error> { + match self { + MaybeVirtualFile::VirtualFile(file) => { + let (_buf, res) = file.write_all(buf, ctx).await; + res.map(|_| ()) + } + MaybeVirtualFile::File(file) => file.write_all(&buf[..]), + } + } + + // Helper function to slurp contents of a file, starting at the current position, + // into a string + async fn read_string(&mut self, ctx: &RequestContext) -> Result { + use std::io::Read; + let mut buf = String::new(); + match self { + MaybeVirtualFile::VirtualFile(file) => { + let mut buf = Vec::new(); + file.read_to_end(&mut buf, ctx).await?; + return Ok(String::from_utf8(buf).unwrap()); + } + MaybeVirtualFile::File(file) => { + file.read_to_string(&mut buf)?; + } + } + Ok(buf) + } // Helper function to slurp a portion of a file into a string async fn read_string_at( @@ -1406,23 +1565,48 @@ mod tests { .await?; file_a - .write_all_at(IoBuffer::from(b"foobar").slice_len(), 0, &ctx) + .write_all(b"foobar".to_vec().slice_len(), &ctx) .await?; // cannot read from a file opened in write-only mode - let _ = file_a.read_string_at(0, 1, &ctx).await.unwrap_err(); + let _ = file_a.read_string(&ctx).await.unwrap_err(); // Close the file and re-open for reading let mut file_a = A::open(path_a, OpenOptions::new().read(true).to_owned(), &ctx).await?; // cannot write to a file opened in read-only mode let _ = file_a - .write_all_at(IoBuffer::from(b"bar").slice_len(), 0, &ctx) + .write_all(b"bar".to_vec().slice_len(), &ctx) .await .unwrap_err(); // Try simple read - assert_eq!("foobar", file_a.read_string_at(0, 6, &ctx).await?); + assert_eq!("foobar", file_a.read_string(&ctx).await?); + + // It's positioned at the EOF now. + assert_eq!("", file_a.read_string(&ctx).await?); + + // Test seeks. + assert_eq!(file_a.seek(SeekFrom::Start(1)).await?, 1); + assert_eq!("oobar", file_a.read_string(&ctx).await?); + + assert_eq!(file_a.seek(SeekFrom::End(-2)).await?, 4); + assert_eq!("ar", file_a.read_string(&ctx).await?); + + assert_eq!(file_a.seek(SeekFrom::Start(1)).await?, 1); + assert_eq!(file_a.seek(SeekFrom::Current(2)).await?, 3); + assert_eq!("bar", file_a.read_string(&ctx).await?); + + assert_eq!(file_a.seek(SeekFrom::Current(-5)).await?, 1); + assert_eq!("oobar", file_a.read_string(&ctx).await?); + + // Test erroneous seeks to before byte 0 + file_a.seek(SeekFrom::End(-7)).await.unwrap_err(); + assert_eq!(file_a.seek(SeekFrom::Start(1)).await?, 1); + file_a.seek(SeekFrom::Current(-2)).await.unwrap_err(); + + // the erroneous seek should have left the position unchanged + assert_eq!("oobar", file_a.read_string(&ctx).await?); // Create another test file, and try FileExt functions on it. let path_b = testdir.join("file_b"); @@ -1448,6 +1632,9 @@ mod tests { // Open a lot of files, enough to cause some evictions. (Or to be precise, // open the same file many times. The effect is the same.) + // + // leave file_a positioned at offset 1 before we start + assert_eq!(file_a.seek(SeekFrom::Start(1)).await?, 1); let mut vfiles = Vec::new(); for _ in 0..100 { @@ -1457,7 +1644,7 @@ mod tests { &ctx, ) .await?; - assert_eq!("FOOBAR", vfile.read_string_at(0, 6, &ctx).await?); + assert_eq!("FOOBAR", vfile.read_string(&ctx).await?); vfiles.push(vfile); } @@ -1465,8 +1652,8 @@ mod tests { assert!(vfiles.len() > TEST_MAX_FILE_DESCRIPTORS * 2); // The underlying file descriptor for 'file_a' should be closed now. Try to read - // from it again. - assert_eq!("foobar", file_a.read_string_at(0, 6, &ctx).await?); + // from it again. We left the file positioned at offset 1 above. + assert_eq!("oobar", file_a.read_string(&ctx).await?); // Check that all the other FDs still work too. Use them in random order for // good measure. @@ -1560,7 +1747,7 @@ mod tests { .await .unwrap(); let mut file = MaybeVirtualFile::from(VirtualFile::open(&path, &ctx).await.unwrap()); - let post = file.read_string_at(0, 3, &ctx).await.unwrap(); + let post = file.read_string(&ctx).await.unwrap(); assert_eq!(post, "foo"); assert!(!tmp_path.exists()); drop(file); @@ -1569,7 +1756,7 @@ mod tests { .await .unwrap(); let mut file = MaybeVirtualFile::from(VirtualFile::open(&path, &ctx).await.unwrap()); - let post = file.read_string_at(0, 3, &ctx).await.unwrap(); + let post = file.read_string(&ctx).await.unwrap(); assert_eq!(post, "bar"); assert!(!tmp_path.exists()); drop(file); @@ -1594,7 +1781,7 @@ mod tests { .unwrap(); let mut file = MaybeVirtualFile::from(VirtualFile::open(&path, &ctx).await.unwrap()); - let post = file.read_string_at(0, 3, &ctx).await.unwrap(); + let post = file.read_string(&ctx).await.unwrap(); assert_eq!(post, "foo"); assert!(!tmp_path.exists()); drop(file); diff --git a/pageserver/src/virtual_file/io_engine.rs b/pageserver/src/virtual_file/io_engine.rs index b7be243357..758dd6e377 100644 --- a/pageserver/src/virtual_file/io_engine.rs +++ b/pageserver/src/virtual_file/io_engine.rs @@ -209,22 +209,6 @@ impl IoEngine { } } } - - pub(super) async fn set_len( - &self, - file_guard: FileGuard, - len: u64, - ) -> (FileGuard, std::io::Result<()>) { - match self { - IoEngine::NotSet => panic!("not initialized"), - // TODO: ftruncate op for tokio-epoll-uring - IoEngine::StdFs | IoEngine::TokioEpollUring => { - let res = file_guard.with_std_file(|std_file| std_file.set_len(len)); - (file_guard, res) - } - } - } - pub(super) async fn write_at( &self, file_guard: FileGuard, diff --git a/pageserver/src/virtual_file/owned_buffers_io/aligned_buffer/buffer_mut.rs b/pageserver/src/virtual_file/owned_buffers_io/aligned_buffer/buffer_mut.rs index 07f949b89e..3ee1a3c162 100644 --- a/pageserver/src/virtual_file/owned_buffers_io/aligned_buffer/buffer_mut.rs +++ b/pageserver/src/virtual_file/owned_buffers_io/aligned_buffer/buffer_mut.rs @@ -282,17 +282,6 @@ unsafe impl tokio_epoll_uring::IoBufMut for AlignedBufferMut { } } -impl std::io::Write for AlignedBufferMut { - fn write(&mut self, buf: &[u8]) -> std::io::Result { - self.extend_from_slice(buf); - Ok(buf.len()) - } - - fn flush(&mut self) -> std::io::Result<()> { - Ok(()) - } -} - #[cfg(test)] mod tests { diff --git a/pageserver/src/virtual_file/owned_buffers_io/io_buf_aligned.rs b/pageserver/src/virtual_file/owned_buffers_io/io_buf_aligned.rs index 1aa2c3027d..4ea6b17744 100644 --- a/pageserver/src/virtual_file/owned_buffers_io/io_buf_aligned.rs +++ b/pageserver/src/virtual_file/owned_buffers_io/io_buf_aligned.rs @@ -1,11 +1,9 @@ use tokio_epoll_uring::{IoBuf, IoBufMut}; -use crate::virtual_file::{self, IoBuffer, IoBufferMut, PageWriteGuardBuf}; +use crate::virtual_file::{IoBuffer, IoBufferMut, PageWriteGuardBuf}; /// A marker trait for a mutable aligned buffer type. -pub trait IoBufAlignedMut: IoBufMut { - const ALIGN: usize = virtual_file::get_io_buffer_alignment(); -} +pub trait IoBufAlignedMut: IoBufMut {} /// A marker trait for an aligned buffer type. pub trait IoBufAligned: IoBuf {} diff --git a/pageserver/src/virtual_file/owned_buffers_io/write.rs b/pageserver/src/virtual_file/owned_buffers_io/write.rs index a87b9d175b..f3ab2c285a 100644 --- a/pageserver/src/virtual_file/owned_buffers_io/write.rs +++ b/pageserver/src/virtual_file/owned_buffers_io/write.rs @@ -1,7 +1,6 @@ mod flush; use std::sync::Arc; -use bytes::BufMut; pub(crate) use flush::FlushControl; use flush::FlushHandle; pub(crate) use flush::FlushTaskError; @@ -9,7 +8,6 @@ use tokio_epoll_uring::IoBuf; use tokio_util::sync::CancellationToken; use super::io_buf_aligned::IoBufAligned; -use super::io_buf_aligned::IoBufAlignedMut; use super::io_buf_ext::{FullSlice, IoBufExt}; use crate::context::RequestContext; use crate::virtual_file::{IoBuffer, IoBufferMut}; @@ -66,7 +64,7 @@ pub struct BufferedWriter { impl BufferedWriter where - B: IoBufAlignedMut + Buffer + Send + 'static, + B: Buffer + Send + 'static, Buf: IoBufAligned + Send + Sync + CheapCloneForRead, W: OwnedAsyncWriter + Send + Sync + 'static + std::fmt::Debug, { @@ -75,7 +73,6 @@ where /// The `buf_new` function provides a way to initialize the owned buffers used by this writer. pub fn new( writer: Arc, - start_offset: u64, buf_new: impl Fn() -> B, gate_guard: utils::sync::gate::GateGuard, cancel: CancellationToken, @@ -94,7 +91,7 @@ where ctx.attached_child(), flush_task_span, ), - bytes_submitted: start_offset, + bytes_submitted: 0, } } @@ -119,29 +116,21 @@ where } #[cfg_attr(target_os = "macos", allow(dead_code))] - pub async fn shutdown( - self, - mut handle_tail: impl FnMut(B) -> Option, - ) -> Result<(u64, W), FlushTaskError> { + pub async fn flush_and_into_inner( + mut self, + ctx: &RequestContext, + ) -> Result<(u64, Arc), FlushTaskError> { + self.flush(ctx).await?; + let Self { - mutable: tail, + mutable: buf, maybe_flushed: _, writer, mut flush_handle, - bytes_submitted: submit_offset, + bytes_submitted: bytes_amount, } = self; - - let ctx = flush_handle.shutdown().await?; - let buf = tail.expect("must not use after an error"); - let writer = Arc::into_inner(writer).expect("writer is the only strong reference"); - let mut bytes_amount = submit_offset; - if let Some(buf) = handle_tail(buf) { - bytes_amount += buf.pending() as u64; - // TODO: infinite retries + maybe_fatal_err like we do in the flush loop; can we just send this - // as work into the flush loop, as part of flush_handle.shutdown() - let (_, res) = writer.write_all_at(buf.flush(), submit_offset, &ctx).await; - let _: () = res.unwrap(); // DO NOT MERGE, THIS CAN FAIL E.G. on ENOSPC - } + flush_handle.shutdown().await?; + assert!(buf.is_some()); Ok((bytes_amount, writer)) } @@ -246,10 +235,6 @@ pub trait Buffer { /// panics if `other.len() > self.cap() - self.pending()`. fn extend_from_slice(&mut self, other: &[u8]); - /// Add `count` bytes `val` into `self`. - /// Panics if `count > self.cap() - self.pending()`. - fn extend_with(&mut self, val: u8, count: usize); - /// Number of bytes in the buffer. fn pending(&self) -> usize; @@ -277,14 +262,6 @@ impl Buffer for IoBufferMut { IoBufferMut::extend_from_slice(self, other); } - fn extend_with(&mut self, val: u8, count: usize) { - if self.len() + count > self.cap() { - panic!("Buffer capacity exceeded"); - } - - IoBufferMut::put_bytes(self, val, count); - } - fn pending(&self) -> usize { self.len() } @@ -357,7 +334,6 @@ mod tests { let cancel = CancellationToken::new(); let mut writer = BufferedWriter::<_, RecorderWriter>::new( recorder, - 0, || IoBufferMut::with_capacity(2), gate.enter()?, cancel, @@ -374,7 +350,7 @@ mod tests { writer.write_buffered_borrowed(b"j", ctx).await?; writer.write_buffered_borrowed(b"klmno", ctx).await?; - let (_, recorder) = writer.shutdown(Some).await?; + let (_, recorder) = writer.flush_and_into_inner(ctx).await?; assert_eq!( recorder.get_writes(), { diff --git a/pageserver/src/virtual_file/owned_buffers_io/write/flush.rs b/pageserver/src/virtual_file/owned_buffers_io/write/flush.rs index c417776139..c076ba0eca 100644 --- a/pageserver/src/virtual_file/owned_buffers_io/write/flush.rs +++ b/pageserver/src/virtual_file/owned_buffers_io/write/flush.rs @@ -1,5 +1,5 @@ use std::ops::ControlFlow; -use std::{marker::PhantomData, sync::Arc}; +use std::sync::Arc; use tokio_util::sync::CancellationToken; use tracing::{Instrument, info, info_span, warn}; @@ -21,10 +21,7 @@ pub struct FlushHandleInner { /// and receives recyled buffer. channel: duplex::mpsc::Duplex, FullSlice>, /// Join handle for the background flush task. - join_handle: tokio::task::JoinHandle>, - - // TODO: get rit of the type parameter? - _phantom: PhantomData, + join_handle: tokio::task::JoinHandle, FlushTaskError>>, } struct FlushRequest { @@ -147,7 +144,6 @@ where inner: Some(FlushHandleInner { channel: front, join_handle, - _phantom: PhantomData, }), } } @@ -183,13 +179,12 @@ where Err(self .shutdown() .await - .err() - .expect("flush task only disconnects duplex if it exits with an error")) + .expect_err("flush task only disconnects duplex if it exits with an error")) } /// Cleans up the channel, join the flush task. - pub async fn shutdown(&mut self) -> Result { - let handle: FlushHandleInner = self + pub async fn shutdown(&mut self) -> Result, FlushTaskError> { + let handle = self .inner .take() .expect("must not use after we returned an error"); @@ -248,7 +243,7 @@ where } /// Runs the background flush task. - async fn run(mut self) -> Result { + async fn run(mut self) -> Result, FlushTaskError> { // Exit condition: channel is closed and there is no remaining buffer to be flushed while let Some(request) = self.channel.recv().await { #[cfg(test)] @@ -318,7 +313,8 @@ where continue; } } - Ok(self.ctx) + + Ok(self.writer) } } @@ -353,7 +349,7 @@ impl FlushNotStarted { impl FlushInProgress { /// Waits until background flush is done. pub async fn wait_until_flush_is_done(self) -> FlushDone { - let _ = self.done_flush_rx.await; + self.done_flush_rx.await.unwrap(); FlushDone } }