From c6209b4a39f7af96d0eb9f87b64a88a282996be4 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Fri, 11 Apr 2025 16:57:36 +0200 Subject: [PATCH] Revert "undo all changes except gate,cancel,context propagation" This reverts commit f25f71bc987b7b3f2835610d9690f7d6172e8b93. --- pageserver/src/metrics.rs | 2 + pageserver/src/tenant/blob_io.rs | 291 +++++++----------- pageserver/src/tenant/block_io.rs | 10 +- pageserver/src/tenant/disk_btree.rs | 13 +- pageserver/src/tenant/ephemeral_file.rs | 1 + .../tenant/remote_timeline_client/download.rs | 37 ++- pageserver/src/tenant/secondary/downloader.rs | 3 +- .../src/tenant/storage_layer/delta_layer.rs | 110 +++++-- .../src/tenant/storage_layer/image_layer.rs | 121 ++++++-- pageserver/src/tenant/vectored_blob_io.rs | 14 +- pageserver/src/virtual_file.rs | 281 +++-------------- pageserver/src/virtual_file/io_engine.rs | 16 + .../aligned_buffer/buffer_mut.rs | 11 + .../owned_buffers_io/io_buf_aligned.rs | 6 +- .../virtual_file/owned_buffers_io/write.rs | 50 ++- .../owned_buffers_io/write/flush.rs | 22 +- 16 files changed, 468 insertions(+), 520 deletions(-) diff --git a/pageserver/src/metrics.rs b/pageserver/src/metrics.rs index 1fe51021fd..2e81123384 100644 --- a/pageserver/src/metrics.rs +++ b/pageserver/src/metrics.rs @@ -1289,6 +1289,7 @@ pub(crate) enum StorageIoOperation { Seek, Fsync, Metadata, + SetLen, } impl StorageIoOperation { @@ -1303,6 +1304,7 @@ impl StorageIoOperation { StorageIoOperation::Seek => "seek", StorageIoOperation::Fsync => "fsync", StorageIoOperation::Metadata => "metadata", + StorageIoOperation::SetLen => "set_len", } } } diff --git a/pageserver/src/tenant/blob_io.rs b/pageserver/src/tenant/blob_io.rs index abeaa166a4..0e0c5a99e0 100644 --- a/pageserver/src/tenant/blob_io.rs +++ b/pageserver/src/tenant/blob_io.rs @@ -15,21 +15,23 @@ //! len >= 128: 1CCCXXXX XXXXXXXX XXXXXXXX XXXXXXXX //! use std::cmp::min; -use std::io::Error; +use std::sync::Arc; use async_compression::Level; use bytes::{BufMut, BytesMut}; use pageserver_api::models::ImageCompressionAlgorithm; use tokio::io::AsyncWriteExt; -use tokio_epoll_uring::{BoundedBuf, IoBuf, Slice}; +use tokio_epoll_uring::IoBuf; use tokio_util::sync::CancellationToken; use tracing::warn; use crate::context::RequestContext; use crate::page_cache::PAGE_SZ; use crate::tenant::block_io::BlockCursor; +use crate::virtual_file::IoBufferMut; use crate::virtual_file::VirtualFile; use crate::virtual_file::owned_buffers_io::io_buf_ext::{FullSlice, IoBufExt}; +use crate::virtual_file::owned_buffers_io::write::{BufferedWriter, FlushTaskError}; #[derive(Copy, Clone, Debug)] pub struct CompressionInfo { @@ -37,6 +39,14 @@ pub struct CompressionInfo { pub compressed_size: Option, } +#[derive(Debug, thiserror::Error)] +pub enum WriteBlobError { + #[error(transparent)] + Flush(FlushTaskError), + #[error("blob too large ({len} bytes)")] + BlobTooLarge { len: usize }, +} + impl BlockCursor<'_> { /// Read a blob into a new buffer. pub async fn read_blob( @@ -158,141 +168,62 @@ pub(super) const BYTE_ZSTD: u8 = BYTE_UNCOMPRESSED | 0x10; /// A wrapper of `VirtualFile` that allows users to write blobs. /// /// If a `BlobWriter` is dropped, the internal buffer will be -/// discarded. You need to call [`flush_buffer`](Self::flush_buffer) +/// discarded. You need to call [`Self::into_inner`] /// manually before dropping. -pub struct BlobWriter { - inner: VirtualFile, - offset: u64, - /// A buffer to save on write calls, only used if BUFFERED=true - buf: Vec, +pub struct BlobWriter { /// We do tiny writes for the length headers; they need to be in an owned buffer; io_buf: Option, + writer: BufferedWriter, + offset: u64, } -impl BlobWriter { +impl BlobWriter { pub fn new( - inner: VirtualFile, + file: Arc, start_offset: u64, - _gate: &utils::sync::gate::Gate, - _cancel: CancellationToken, - _ctx: &RequestContext, - ) -> Self { - Self { - inner, - offset: start_offset, - buf: Vec::with_capacity(Self::CAPACITY), + gate: &utils::sync::gate::Gate, + cancel: CancellationToken, + ctx: &RequestContext, + flush_task_span: tracing::Span, + ) -> anyhow::Result { + Ok(Self { io_buf: Some(BytesMut::new()), - } + writer: BufferedWriter::new( + file, + start_offset, + || IoBufferMut::with_capacity(Self::CAPACITY), + gate.enter()?, + cancel, + ctx, + flush_task_span, + ), + offset: start_offset, + }) } pub fn size(&self) -> u64 { self.offset } - const CAPACITY: usize = if BUFFERED { 64 * 1024 } else { 0 }; + const CAPACITY: usize = 64 * 1024; - /// Writes the given buffer directly to the underlying `VirtualFile`. - /// You need to make sure that the internal buffer is empty, otherwise - /// data will be written in wrong order. - #[inline(always)] - async fn write_all_unbuffered( - &mut self, - src_buf: FullSlice, - ctx: &RequestContext, - ) -> (FullSlice, Result<(), Error>) { - let (src_buf, res) = self.inner.write_all(src_buf, ctx).await; - let nbytes = match res { - Ok(nbytes) => nbytes, - Err(e) => return (src_buf, Err(e)), - }; - self.offset += nbytes as u64; - (src_buf, Ok(())) - } - - #[inline(always)] - /// Flushes the internal buffer to the underlying `VirtualFile`. - pub async fn flush_buffer(&mut self, ctx: &RequestContext) -> Result<(), Error> { - let buf = std::mem::take(&mut self.buf); - let (slice, res) = self.inner.write_all(buf.slice_len(), ctx).await; - res?; - let mut buf = slice.into_raw_slice().into_inner(); - buf.clear(); - self.buf = buf; - Ok(()) - } - - #[inline(always)] - /// Writes as much of `src_buf` into the internal buffer as it fits - fn write_into_buffer(&mut self, src_buf: &[u8]) -> usize { - let remaining = Self::CAPACITY - self.buf.len(); - let to_copy = src_buf.len().min(remaining); - self.buf.extend_from_slice(&src_buf[..to_copy]); - self.offset += to_copy as u64; - to_copy - } - - /// Internal, possibly buffered, write function + /// Writes `src_buf` to the file at the current offset. async fn write_all( &mut self, src_buf: FullSlice, ctx: &RequestContext, - ) -> (FullSlice, Result<(), Error>) { - let src_buf = src_buf.into_raw_slice(); - let src_buf_bounds = src_buf.bounds(); - let restore = move |src_buf_slice: Slice<_>| { - FullSlice::must_new(Slice::from_buf_bounds( - src_buf_slice.into_inner(), - src_buf_bounds, - )) - }; + ) -> (FullSlice, Result<(), FlushTaskError>) { + let res = self + .writer + // TODO: why are we taking a FullSlice if we're going to pass a borrow downstack? + // Can remove all the complexity around owned buffers upstack + .write_buffered_borrowed(&src_buf, ctx) + .await + .map(|len| { + self.offset += len as u64; + }); - if !BUFFERED { - assert!(self.buf.is_empty()); - return self - .write_all_unbuffered(FullSlice::must_new(src_buf), ctx) - .await; - } - let remaining = Self::CAPACITY - self.buf.len(); - let src_buf_len = src_buf.bytes_init(); - if src_buf_len == 0 { - return (restore(src_buf), Ok(())); - } - let mut src_buf = src_buf.slice(0..src_buf_len); - // First try to copy as much as we can into the buffer - if remaining > 0 { - let copied = self.write_into_buffer(&src_buf); - src_buf = src_buf.slice(copied..); - } - // Then, if the buffer is full, flush it out - if self.buf.len() == Self::CAPACITY { - if let Err(e) = self.flush_buffer(ctx).await { - return (restore(src_buf), Err(e)); - } - } - // Finally, write the tail of src_buf: - // If it wholly fits into the buffer without - // completely filling it, then put it there. - // If not, write it out directly. - let src_buf = if !src_buf.is_empty() { - assert_eq!(self.buf.len(), 0); - if src_buf.len() < Self::CAPACITY { - let copied = self.write_into_buffer(&src_buf); - // We just verified above that src_buf fits into our internal buffer. - assert_eq!(copied, src_buf.len()); - restore(src_buf) - } else { - let (src_buf, res) = self - .write_all_unbuffered(FullSlice::must_new(src_buf), ctx) - .await; - if let Err(e) = res { - return (src_buf, Err(e)); - } - src_buf - } - } else { - restore(src_buf) - }; - (src_buf, Ok(())) + (src_buf, res) } /// Write a blob of data. Returns the offset that it was written to, @@ -301,7 +232,7 @@ impl BlobWriter { &mut self, srcbuf: FullSlice, ctx: &RequestContext, - ) -> (FullSlice, Result) { + ) -> (FullSlice, Result) { let (buf, res) = self .write_blob_maybe_compressed(srcbuf, ctx, ImageCompressionAlgorithm::Disabled) .await; @@ -315,7 +246,10 @@ impl BlobWriter { srcbuf: FullSlice, ctx: &RequestContext, algorithm: ImageCompressionAlgorithm, - ) -> (FullSlice, Result<(u64, CompressionInfo), Error>) { + ) -> ( + FullSlice, + Result<(u64, CompressionInfo), WriteBlobError>, + ) { let offset = self.offset; let mut compression_info = CompressionInfo { written_compressed: false, @@ -331,14 +265,16 @@ impl BlobWriter { if len < 128 { // Short blob. Write a 1-byte length header io_buf.put_u8(len as u8); - (self.write_all(io_buf.slice_len(), ctx).await, srcbuf) + let (slice, res) = self.write_all(io_buf.slice_len(), ctx).await; + let res = res.map_err(WriteBlobError::Flush); + ((slice, res), srcbuf) } else { // Write a 4-byte length header if len > MAX_SUPPORTED_BLOB_LEN { return ( ( io_buf.slice_len(), - Err(Error::other(format!("blob too large ({len} bytes)"))), + Err(WriteBlobError::BlobTooLarge { len }), ), srcbuf, ); @@ -372,7 +308,9 @@ impl BlobWriter { assert_eq!(len_buf[0] & 0xf0, 0); len_buf[0] |= high_bit_mask; io_buf.extend_from_slice(&len_buf[..]); - (self.write_all(io_buf.slice_len(), ctx).await, srcbuf) + let (slice, res) = self.write_all(io_buf.slice_len(), ctx).await; + let res = res.map_err(WriteBlobError::Flush); + ((slice, res), srcbuf) } } .await; @@ -387,33 +325,23 @@ impl BlobWriter { } else { self.write_all(srcbuf, ctx).await }; + let res = res.map_err(WriteBlobError::Flush); (srcbuf, res.map(|_| (offset, compression_info))) } -} -impl BlobWriter { /// Access the underlying `VirtualFile`. /// /// This function flushes the internal buffer before giving access /// to the underlying `VirtualFile`. - pub async fn into_inner(mut self, ctx: &RequestContext) -> Result { - self.flush_buffer(ctx).await?; - Ok(self.inner) - } - - /// Access the underlying `VirtualFile`. /// - /// Unlike [`into_inner`](Self::into_inner), this doesn't flush - /// the internal buffer before giving access. - pub fn into_inner_no_flush(self) -> VirtualFile { - self.inner - } -} - -impl BlobWriter { - /// Access the underlying `VirtualFile`. - pub fn into_inner(self) -> VirtualFile { - self.inner + /// The caller can use the `handle_tail` function to change the tail of the buffer before flushing it to disk. + /// The buffer will not be flushed to disk if handle_tail returns `None`. + pub async fn into_inner( + self, + handle_tail: impl FnMut(IoBufferMut) -> Option, + ) -> Result { + let (_, file) = self.writer.shutdown(handle_tail).await?; + Ok(file) } } @@ -422,21 +350,22 @@ pub(crate) mod tests { use camino::Utf8PathBuf; use camino_tempfile::Utf8TempDir; use rand::{Rng, SeedableRng}; + use tracing::info_span; use super::*; use crate::context::DownloadBehavior; use crate::task_mgr::TaskKind; use crate::tenant::block_io::BlockReaderRef; - async fn round_trip_test(blobs: &[Vec]) -> Result<(), Error> { - round_trip_test_compressed::(blobs, false).await + async fn round_trip_test(blobs: &[Vec]) -> anyhow::Result<()> { + round_trip_test_compressed(blobs, false).await } - pub(crate) async fn write_maybe_compressed( + pub(crate) async fn write_maybe_compressed( blobs: &[Vec], compression: bool, ctx: &RequestContext, - ) -> Result<(Utf8TempDir, Utf8PathBuf, Vec), Error> { + ) -> anyhow::Result<(Utf8TempDir, Utf8PathBuf, Vec)> { let temp_dir = camino_tempfile::tempdir()?; let pathbuf = temp_dir.path().join("file"); let gate = utils::sync::gate::Gate::default(); @@ -445,8 +374,9 @@ pub(crate) mod tests { // Write part (in block to drop the file) let mut offsets = Vec::new(); { - let file = VirtualFile::create(pathbuf.as_path(), ctx).await?; - let mut wtr = BlobWriter::::new(file, 0, &gate, cancel.clone(), ctx); + let file = Arc::new(VirtualFile::create_v2(pathbuf.as_path(), ctx).await?); + let mut wtr = + BlobWriter::new(file, 0, &gate, cancel.clone(), ctx, info_span!("test")).unwrap(); for blob in blobs.iter() { let (_, res) = if compression { let res = wtr @@ -463,26 +393,37 @@ pub(crate) mod tests { let offs = res?; offsets.push(offs); } - // Write out one page worth of zeros so that we can - // read again with read_blk - let (_, res) = wtr.write_blob(vec![0; PAGE_SZ].slice_len(), ctx).await; - let offs = res?; - println!("Writing final blob at offs={offs}"); - wtr.flush_buffer(ctx).await?; + wtr.into_inner(|mut buf| { + use crate::virtual_file::owned_buffers_io::write::Buffer; + + let len = buf.pending(); + let cap = buf.cap(); + + // pad zeros to the next io alignment requirement. + // TODO: this is actually padding to next PAGE_SZ multiple, but only if the buffer capacity is larger than that. + // We can't let the fact that we do direct IO, or the buffer capacity, dictate the on-disk format we write here. + // Need to find a better API that allows writing the format we intend to. + let count = len.next_multiple_of(PAGE_SZ).min(cap) - len; + buf.extend_with(0, count); + + Some(buf) + }) + .await?; // TODO: this here is the problem with the tests: we're dropping the tail end } Ok((temp_dir, pathbuf, offsets)) } - async fn round_trip_test_compressed( + async fn round_trip_test_compressed( blobs: &[Vec], compression: bool, - ) -> Result<(), Error> { + ) -> anyhow::Result<()> { let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error).with_scope_unit_test(); let (_temp_dir, pathbuf, offsets) = - write_maybe_compressed::(blobs, compression, &ctx).await?; + write_maybe_compressed(blobs, compression, &ctx).await?; - let file = VirtualFile::open(pathbuf, &ctx).await?; + println!("Done writing!"); + let file = VirtualFile::open_v2(pathbuf, &ctx).await?; let rdr = BlockReaderRef::VirtualFile(&file); let rdr = BlockCursor::new_with_compression(rdr, compression); for (idx, (blob, offset)) in blobs.iter().zip(offsets.iter()).enumerate() { @@ -501,30 +442,27 @@ pub(crate) mod tests { } #[tokio::test] - async fn test_one() -> Result<(), Error> { + async fn test_one() -> anyhow::Result<()> { let blobs = &[vec![12, 21, 22]]; - round_trip_test::(blobs).await?; - round_trip_test::(blobs).await?; + round_trip_test(blobs).await?; Ok(()) } #[tokio::test] - async fn test_hello_simple() -> Result<(), Error> { + async fn test_hello_simple() -> anyhow::Result<()> { let blobs = &[ vec![0, 1, 2, 3], b"Hello, World!".to_vec(), Vec::new(), b"foobar".to_vec(), ]; - round_trip_test::(blobs).await?; - round_trip_test::(blobs).await?; - round_trip_test_compressed::(blobs, true).await?; - round_trip_test_compressed::(blobs, true).await?; + round_trip_test(blobs).await?; + round_trip_test_compressed(blobs, true).await?; Ok(()) } #[tokio::test] - async fn test_really_big_array() -> Result<(), Error> { + async fn test_really_big_array() -> anyhow::Result<()> { let blobs = &[ b"test".to_vec(), random_array(10 * PAGE_SZ), @@ -533,25 +471,22 @@ pub(crate) mod tests { vec![0xf3; 24 * PAGE_SZ], b"foobar".to_vec(), ]; - round_trip_test::(blobs).await?; - round_trip_test::(blobs).await?; - round_trip_test_compressed::(blobs, true).await?; - round_trip_test_compressed::(blobs, true).await?; + round_trip_test(blobs).await?; + round_trip_test_compressed(blobs, true).await?; Ok(()) } #[tokio::test] - async fn test_arrays_inc() -> Result<(), Error> { + async fn test_arrays_inc() -> anyhow::Result<()> { let blobs = (0..PAGE_SZ / 8) .map(|v| random_array(v * 16)) .collect::>(); - round_trip_test::(&blobs).await?; - round_trip_test::(&blobs).await?; + round_trip_test(&blobs).await?; Ok(()) } #[tokio::test] - async fn test_arrays_random_size() -> Result<(), Error> { + async fn test_arrays_random_size() -> anyhow::Result<()> { let mut rng = rand::rngs::StdRng::seed_from_u64(42); let blobs = (0..1024) .map(|_| { @@ -563,20 +498,18 @@ pub(crate) mod tests { random_array(sz.into()) }) .collect::>(); - round_trip_test::(&blobs).await?; - round_trip_test::(&blobs).await?; + round_trip_test(&blobs).await?; Ok(()) } #[tokio::test] - async fn test_arrays_page_boundary() -> Result<(), Error> { + async fn test_arrays_page_boundary() -> anyhow::Result<()> { let blobs = &[ random_array(PAGE_SZ - 4), random_array(PAGE_SZ - 4), random_array(PAGE_SZ - 4), ]; - round_trip_test::(blobs).await?; - round_trip_test::(blobs).await?; + round_trip_test(blobs).await?; Ok(()) } } diff --git a/pageserver/src/tenant/block_io.rs b/pageserver/src/tenant/block_io.rs index 6723155626..686cc94126 100644 --- a/pageserver/src/tenant/block_io.rs +++ b/pageserver/src/tenant/block_io.rs @@ -4,14 +4,12 @@ use std::ops::Deref; -use bytes::Bytes; - use super::storage_layer::delta_layer::{Adapter, DeltaLayerInner}; use crate::context::RequestContext; use crate::page_cache::{self, FileId, PAGE_SZ, PageReadGuard, PageWriteGuard, ReadBufResult}; #[cfg(test)] use crate::virtual_file::IoBufferMut; -use crate::virtual_file::VirtualFile; +use crate::virtual_file::{IoBuffer, VirtualFile}; /// This is implemented by anything that can read 8 kB (PAGE_SZ) /// blocks, using the page cache @@ -247,17 +245,17 @@ pub trait BlockWriter { /// 'buf' must be of size PAGE_SZ. Returns the block number the page was /// written to. /// - fn write_blk(&mut self, buf: Bytes) -> Result; + fn write_blk(&mut self, buf: IoBuffer) -> Result; } /// /// A simple in-memory buffer of blocks. /// pub struct BlockBuf { - pub blocks: Vec, + pub blocks: Vec, } impl BlockWriter for BlockBuf { - fn write_blk(&mut self, buf: Bytes) -> Result { + fn write_blk(&mut self, buf: IoBuffer) -> Result { assert!(buf.len() == PAGE_SZ); let blknum = self.blocks.len(); self.blocks.push(buf); diff --git a/pageserver/src/tenant/disk_btree.rs b/pageserver/src/tenant/disk_btree.rs index 1791e5996c..419befa41b 100644 --- a/pageserver/src/tenant/disk_btree.rs +++ b/pageserver/src/tenant/disk_btree.rs @@ -25,7 +25,7 @@ use std::{io, result}; use async_stream::try_stream; use byteorder::{BE, ReadBytesExt}; -use bytes::{BufMut, Bytes, BytesMut}; +use bytes::BufMut; use either::Either; use futures::{Stream, StreamExt}; use hex; @@ -34,6 +34,7 @@ use tracing::error; use crate::context::RequestContext; use crate::tenant::block_io::{BlockReader, BlockWriter}; +use crate::virtual_file::{IoBuffer, IoBufferMut, owned_buffers_io::write::Buffer}; // The maximum size of a value stored in the B-tree. 5 bytes is enough currently. pub const VALUE_SZ: usize = 5; @@ -787,12 +788,12 @@ impl BuildNode { /// /// Serialize the node to on-disk format. /// - fn pack(&self) -> Bytes { + fn pack(&self) -> IoBuffer { assert!(self.keys.len() == self.num_children as usize * self.suffix_len); assert!(self.values.len() == self.num_children as usize * VALUE_SZ); assert!(self.num_children > 0); - let mut buf = BytesMut::new(); + let mut buf = IoBufferMut::with_capacity(PAGE_SZ); buf.put_u16(self.num_children); buf.put_u8(self.level); @@ -805,7 +806,7 @@ impl BuildNode { assert!(buf.len() == self.size); assert!(buf.len() <= PAGE_SZ); - buf.resize(PAGE_SZ, 0); + buf.extend_with(0, PAGE_SZ - buf.len()); buf.freeze() } @@ -839,7 +840,7 @@ pub(crate) mod tests { #[derive(Clone, Default)] pub(crate) struct TestDisk { - blocks: Vec, + blocks: Vec, } impl TestDisk { fn new() -> Self { @@ -857,7 +858,7 @@ pub(crate) mod tests { } } impl BlockWriter for &mut TestDisk { - fn write_blk(&mut self, buf: Bytes) -> io::Result { + fn write_blk(&mut self, buf: IoBuffer) -> io::Result { let blknum = self.blocks.len(); self.blocks.push(buf); Ok(blknum as u32) diff --git a/pageserver/src/tenant/ephemeral_file.rs b/pageserver/src/tenant/ephemeral_file.rs index 19215bb918..19b5165d0f 100644 --- a/pageserver/src/tenant/ephemeral_file.rs +++ b/pageserver/src/tenant/ephemeral_file.rs @@ -75,6 +75,7 @@ impl EphemeralFile { bytes_written: 0, buffered_writer: owned_buffers_io::write::BufferedWriter::new( file, + 0, || IoBufferMut::with_capacity(TAIL_SZ), gate.enter()?, cancel.child_token(), diff --git a/pageserver/src/tenant/remote_timeline_client/download.rs b/pageserver/src/tenant/remote_timeline_client/download.rs index 8b399996d5..32fce8cae3 100644 --- a/pageserver/src/tenant/remote_timeline_client/download.rs +++ b/pageserver/src/tenant/remote_timeline_client/download.rs @@ -32,12 +32,14 @@ use super::{ remote_tenant_manifest_prefix, remote_tenant_path, }; use crate::TEMP_FILE_SUFFIX; +use crate::assert_u64_eq_usize::UsizeIsU64; use crate::config::PageServerConf; use crate::context::RequestContext; use crate::span::{ debug_assert_current_span_has_tenant_and_timeline_id, debug_assert_current_span_has_tenant_id, }; use crate::tenant::Generation; +use crate::tenant::disk_btree::PAGE_SZ; use crate::tenant::remote_timeline_client::{remote_layer_path, remote_timelines_path}; use crate::tenant::storage_layer::LayerName; use crate::virtual_file::{MaybeFatalIo, VirtualFile, on_fatal_io_error}; @@ -227,6 +229,7 @@ async fn download_object( let mut buffered = owned_buffers_io::write::BufferedWriter::::new( destination_file, + 0, || IoBufferMut::with_capacity(super::BUFFER_SIZE), gate.enter().map_err(|_| DownloadError::Cancelled)?, cancel.child_token(), @@ -251,13 +254,41 @@ async fn download_object( FlushTaskError::Cancelled => DownloadError::Cancelled, })?; } - let inner = buffered - .flush_and_into_inner(ctx) + let mut pad_amount = None; + let (bytes_amount, destination_file) = buffered + .shutdown(|mut buf| { + use crate::virtual_file::owned_buffers_io::write::Buffer; + + let len = buf.pending(); + let cap = buf.cap(); + + // pad zeros to the next io alignment requirement. + // TODO: this is actually padding to next PAGE_SZ multiple, but only if the buffer capacity is larger than that. + // We can't let the fact that we do direct IO, or the buffer capacity, dictate the on-disk format we write here. + // Need to find a better API that allows writing the format we intend to. + let count = len.next_multiple_of(PAGE_SZ).min(cap) - len; + pad_amount = Some(count); + buf.extend_with(0, count); + + Some(buf) + }) .await .map_err(|e| match e { FlushTaskError::Cancelled => DownloadError::Cancelled, })?; - Ok(inner) + + let pad_amount = pad_amount.expect("shutdown always invokes the closure").into_u64(); + let set_len_arg = bytes_amount - pad_amount; + destination_file + .set_len(set_len_arg) + .await + .maybe_fatal_err("download_object set_len") + .with_context(|| { + format!("set len for file at {dst_path}: 0x{set_len_arg:x} = 0x{bytes_amount:x} - 0x{pad_amount:x}") + }) + .map_err(DownloadError::Other)?; + + Ok((set_len_arg, destination_file)) } .await?; diff --git a/pageserver/src/tenant/secondary/downloader.rs b/pageserver/src/tenant/secondary/downloader.rs index 60cf7ac79e..f653dbe84c 100644 --- a/pageserver/src/tenant/secondary/downloader.rs +++ b/pageserver/src/tenant/secondary/downloader.rs @@ -1521,12 +1521,11 @@ async fn load_heatmap( path: &Utf8PathBuf, ctx: &RequestContext, ) -> Result, anyhow::Error> { - let mut file = match VirtualFile::open(path, ctx).await { + let st = match VirtualFile::read_to_string(path, ctx).await { Ok(file) => file, Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(None), Err(e) => Err(e)?, }; - let st = file.read_to_string(ctx).await?; let htm = serde_json::from_str(&st)?; Ok(Some(htm)) } diff --git a/pageserver/src/tenant/storage_layer/delta_layer.rs b/pageserver/src/tenant/storage_layer/delta_layer.rs index 693d61d8fd..33f5ebfd7b 100644 --- a/pageserver/src/tenant/storage_layer/delta_layer.rs +++ b/pageserver/src/tenant/storage_layer/delta_layer.rs @@ -29,7 +29,6 @@ //! use std::collections::{HashMap, VecDeque}; use std::fs::File; -use std::io::SeekFrom; use std::ops::Range; use std::os::unix::fs::FileExt; use std::str::FromStr; @@ -52,6 +51,7 @@ use tokio_epoll_uring::IoBuf; use tokio_util::sync::CancellationToken; use tracing::*; use utils::bin_ser::BeSer; +use utils::bin_ser::SerializeError; use utils::id::{TenantId, TimelineId}; use utils::lsn::Lsn; @@ -74,7 +74,8 @@ use crate::tenant::vectored_blob_io::{ VectoredReadPlanner, }; use crate::virtual_file::owned_buffers_io::io_buf_ext::{FullSlice, IoBufExt}; -use crate::virtual_file::{self, IoBufferMut, MaybeFatalIo, VirtualFile}; +use crate::virtual_file::owned_buffers_io::write::Buffer; +use crate::virtual_file::{self, IoBuffer, IoBufferMut, MaybeFatalIo, VirtualFile}; use crate::{DELTA_FILE_MAGIC, STORAGE_FORMAT_VERSION, TEMP_FILE_SUFFIX}; /// @@ -112,6 +113,15 @@ impl From<&DeltaLayer> for Summary { } impl Summary { + /// Serializes the summary header into an aligned buffer of lenth `PAGE_SZ`. + pub fn ser_into_page(&self) -> Result { + let mut buf = IoBufferMut::with_capacity(PAGE_SZ); + Self::ser_into(self, &mut buf)?; + // Pad zeroes to the buffer so the length is a multiple of the alignment. + buf.extend_with(0, buf.capacity() - buf.len()); + Ok(buf.freeze()) + } + pub(super) fn expected( tenant_id: TenantId, timeline_id: TimelineId, @@ -392,10 +402,12 @@ struct DeltaLayerWriterInner { tree: DiskBtreeBuilder, - blob_writer: BlobWriter, + blob_writer: BlobWriter, // Number of key-lsns in the layer. num_keys: usize, + + _gate_guard: utils::sync::gate::GateGuard, } impl DeltaLayerWriterInner { @@ -422,10 +434,17 @@ impl DeltaLayerWriterInner { let path = DeltaLayer::temp_path_for(conf, &tenant_shard_id, &timeline_id, key_start, &lsn_range); - let mut file = VirtualFile::create(&path, ctx).await?; - // make room for the header block - file.seek(SeekFrom::Start(PAGE_SZ as u64)).await?; - let blob_writer = BlobWriter::new(file, PAGE_SZ as u64, gate, cancel, ctx); + let file = Arc::new(VirtualFile::create_v2(&path, ctx).await?); + + // Start at PAGE_SZ, make room for the header block + let blob_writer = BlobWriter::new( + file, + PAGE_SZ as u64, + gate, + cancel, + ctx, + info_span!(parent: None, "delta_layer_writer_flush_task", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), timeline_id=%timeline_id, path = %path), + )?; // Initialize the b-tree index builder let block_buf = BlockBuf::new(); @@ -440,6 +459,7 @@ impl DeltaLayerWriterInner { tree: tree_builder, blob_writer, num_keys: 0, + _gate_guard: gate.enter()?, }) } @@ -535,15 +555,33 @@ impl DeltaLayerWriterInner { ) -> anyhow::Result<(PersistentLayerDesc, Utf8PathBuf)> { let index_start_blk = self.blob_writer.size().div_ceil(PAGE_SZ as u64) as u32; - let mut file = self.blob_writer.into_inner(ctx).await?; + let file = self + .blob_writer + .into_inner(|mut buf| { + let len = buf.pending(); + let cap = buf.cap(); + + // pad zeros to the next io alignment requirement. + // TODO: this is actually padding to next PAGE_SZ multiple, but only if the buffer capacity is larger than that. + // We can't let the fact that we do direct IO, or the buffer capacity, dictate the on-disk format we write here. + // Need to find a better API that allows writing the format we intend to. + let count = len.next_multiple_of(PAGE_SZ).min(cap) - len; + buf.extend_with(0, count); + + Some(buf) + }) + .await?; // Write out the index let (index_root_blk, block_buf) = self.tree.finish()?; - file.seek(SeekFrom::Start(index_start_blk as u64 * PAGE_SZ as u64)) - .await?; + let mut offset = index_start_blk as u64 * PAGE_SZ as u64; + + // TODO(yuchen): https://github.com/neondatabase/neon/issues/10092 + // Should we just replace BlockBuf::blocks with one big buffer for buf in block_buf.blocks { - let (_buf, res) = file.write_all(buf.slice_len(), ctx).await; + let (_buf, res) = file.write_all_at(buf.slice_len(), offset, ctx).await; res?; + offset += PAGE_SZ as u64; } assert!(self.lsn_range.start < self.lsn_range.end); // Fill in the summary on blk 0 @@ -558,11 +596,9 @@ impl DeltaLayerWriterInner { index_root_blk, }; - let mut buf = Vec::with_capacity(PAGE_SZ); - // TODO: could use smallvec here but it's a pain with Slice - Summary::ser_into(&summary, &mut buf)?; - file.seek(SeekFrom::Start(0)).await?; - let (_buf, res) = file.write_all(buf.slice_len(), ctx).await; + // Writes summary at the first block (offset 0). + let buf = summary.ser_into_page()?; + let (_buf, res) = file.write_all_at(buf.slice_len(), 0, ctx).await; res?; let metadata = file @@ -729,12 +765,33 @@ impl DeltaLayerWriter { impl Drop for DeltaLayerWriter { fn drop(&mut self) { - if let Some(inner) = self.inner.take() { - // We want to remove the virtual file here, so it's fine to not - // having completely flushed unwritten data. - let vfile = inner.blob_writer.into_inner_no_flush(); - vfile.remove(); - } + let Some(inner) = self.inner.take() else { + return; + }; + + tokio::spawn(async move { + let DeltaLayerWriterInner { + blob_writer, + _gate_guard, + .. + } = inner; + + let vfile = match blob_writer.into_inner(|_| None).await { + Ok(vfile) => vfile, + Err(e) => { + error!(err=%e, "failed to remove delta layer writer file"); + drop(_gate_guard); + return; + } + }; + + if let Err(e) = std::fs::remove_file(vfile.path()) + .maybe_fatal_err("failed to remove the virtual file") + { + error!(err=%e, path=%vfile.path(), "failed to remove delta layer writer file"); + } + drop(_gate_guard); + }); } } @@ -761,7 +818,7 @@ impl DeltaLayer { where F: Fn(Summary) -> Summary, { - let mut file = VirtualFile::open_with_options( + let file = VirtualFile::open_with_options_v2( path, virtual_file::OpenOptions::new().read(true).write(true), ctx, @@ -778,11 +835,8 @@ impl DeltaLayer { let new_summary = rewrite(actual_summary); - let mut buf = Vec::with_capacity(PAGE_SZ); - // TODO: could use smallvec here, but it's a pain with Slice - Summary::ser_into(&new_summary, &mut buf).context("serialize")?; - file.seek(SeekFrom::Start(0)).await?; - let (_buf, res) = file.write_all(buf.slice_len(), ctx).await; + let buf = new_summary.ser_into_page().context("serialize")?; + let (_buf, res) = file.write_all_at(buf.slice_len(), 0, ctx).await; res?; Ok(()) } diff --git a/pageserver/src/tenant/storage_layer/image_layer.rs b/pageserver/src/tenant/storage_layer/image_layer.rs index 7566e8c9fe..490b1ce664 100644 --- a/pageserver/src/tenant/storage_layer/image_layer.rs +++ b/pageserver/src/tenant/storage_layer/image_layer.rs @@ -27,7 +27,6 @@ //! actual page images are stored in the "values" part. use std::collections::{HashMap, VecDeque}; use std::fs::File; -use std::io::SeekFrom; use std::ops::Range; use std::os::unix::prelude::FileExt; use std::str::FromStr; @@ -50,6 +49,7 @@ use tokio_stream::StreamExt; use tokio_util::sync::CancellationToken; use tracing::*; use utils::bin_ser::BeSer; +use utils::bin_ser::SerializeError; use utils::id::{TenantId, TimelineId}; use utils::lsn::Lsn; @@ -72,7 +72,8 @@ use crate::tenant::vectored_blob_io::{ VectoredReadPlanner, }; use crate::virtual_file::owned_buffers_io::io_buf_ext::IoBufExt; -use crate::virtual_file::{self, IoBufferMut, MaybeFatalIo, VirtualFile}; +use crate::virtual_file::owned_buffers_io::write::Buffer; +use crate::virtual_file::{self, IoBuffer, IoBufferMut, MaybeFatalIo, VirtualFile}; use crate::{IMAGE_FILE_MAGIC, STORAGE_FORMAT_VERSION, TEMP_FILE_SUFFIX}; /// @@ -111,6 +112,15 @@ impl From<&ImageLayer> for Summary { } impl Summary { + /// Serializes the summary header into an aligned buffer of lenth `PAGE_SZ`. + pub fn ser_into_page(&self) -> Result { + let mut buf = IoBufferMut::with_capacity(PAGE_SZ); + Self::ser_into(self, &mut buf)?; + // Pad zeroes to the buffer so the length is a multiple of the alignment. + buf.extend_with(0, buf.capacity() - buf.len()); + Ok(buf.freeze()) + } + pub(super) fn expected( tenant_id: TenantId, timeline_id: TimelineId, @@ -353,7 +363,7 @@ impl ImageLayer { where F: Fn(Summary) -> Summary, { - let mut file = VirtualFile::open_with_options( + let file = VirtualFile::open_with_options_v2( path, virtual_file::OpenOptions::new().read(true).write(true), ctx, @@ -370,11 +380,8 @@ impl ImageLayer { let new_summary = rewrite(actual_summary); - let mut buf = Vec::with_capacity(PAGE_SZ); - // TODO: could use smallvec here but it's a pain with Slice - Summary::ser_into(&new_summary, &mut buf).context("serialize")?; - file.seek(SeekFrom::Start(0)).await?; - let (_buf, res) = file.write_all(buf.slice_len(), ctx).await; + let buf = new_summary.ser_into_page().context("serialize")?; + let (_buf, res) = file.write_all_at(buf.slice_len(), 0, ctx).await; res?; Ok(()) } @@ -742,9 +749,11 @@ struct ImageLayerWriterInner { // Number of keys in the layer. num_keys: usize, - blob_writer: BlobWriter, + blob_writer: BlobWriter, tree: DiskBtreeBuilder, + _gate_guard: utils::sync::gate::GateGuard, + #[cfg(feature = "testing")] last_written_key: Key, } @@ -776,19 +785,28 @@ impl ImageLayerWriterInner { }, ); trace!("creating image layer {}", path); - let mut file = { - VirtualFile::open_with_options( - &path, - virtual_file::OpenOptions::new() - .write(true) - .create_new(true), - ctx, + let file = { + Arc::new( + VirtualFile::open_with_options_v2( + &path, + virtual_file::OpenOptions::new() + .write(true) + .create_new(true), + ctx, + ) + .await?, ) - .await? }; - // make room for the header block - file.seek(SeekFrom::Start(PAGE_SZ as u64)).await?; - let blob_writer = BlobWriter::new(file, PAGE_SZ as u64, gate, cancel, ctx); + + // Start at `PAGE_SZ` to make room for the header block. + let blob_writer = BlobWriter::new( + file, + PAGE_SZ as u64, + gate, + cancel, + ctx, + info_span!(parent: None, "image_layer_writer_flush_task", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), timeline_id=%timeline_id, path = %path), + )?; // Initialize the b-tree index builder let block_buf = BlockBuf::new(); @@ -809,6 +827,8 @@ impl ImageLayerWriterInner { num_keys: 0, #[cfg(feature = "testing")] last_written_key: Key::MIN, + + _gate_guard: gate.enter()?, }; Ok(writer) @@ -894,15 +914,30 @@ impl ImageLayerWriterInner { crate::metrics::COMPRESSION_IMAGE_INPUT_BYTES_CHOSEN.inc_by(self.uncompressed_bytes_chosen); crate::metrics::COMPRESSION_IMAGE_OUTPUT_BYTES.inc_by(compressed_size); - let mut file = self.blob_writer.into_inner(); + let file = self + .blob_writer + .into_inner(|mut buf| { + let len = buf.pending(); + let cap = buf.cap(); + + // pad zeros to the next io alignment requirement. + let count = len.next_multiple_of(PAGE_SZ).min(cap) - len; + buf.extend_with(0, count); + + Some(buf) + }) + .await?; // Write out the index - file.seek(SeekFrom::Start(index_start_blk as u64 * PAGE_SZ as u64)) - .await?; + let mut offset = index_start_blk as u64 * PAGE_SZ as u64; let (index_root_blk, block_buf) = self.tree.finish()?; + + // TODO(yuchen): https://github.com/neondatabase/neon/issues/10092 + // Should we just replace BlockBuf::blocks with one big buffer? for buf in block_buf.blocks { - let (_buf, res) = file.write_all(buf.slice_len(), ctx).await; + let (_buf, res) = file.write_all_at(buf.slice_len(), offset, ctx).await; res?; + offset += PAGE_SZ as u64; } let final_key_range = if let Some(end_key) = end_key { @@ -923,11 +958,9 @@ impl ImageLayerWriterInner { index_root_blk, }; - let mut buf = Vec::with_capacity(PAGE_SZ); - // TODO: could use smallvec here but it's a pain with Slice - Summary::ser_into(&summary, &mut buf)?; - file.seek(SeekFrom::Start(0)).await?; - let (_buf, res) = file.write_all(buf.slice_len(), ctx).await; + // Writes summary at the first block (offset 0). + let buf = summary.ser_into_page()?; + let (_buf, res) = file.write_all_at(buf.slice_len(), 0, ctx).await; res?; let metadata = file @@ -1070,9 +1103,33 @@ impl ImageLayerWriter { impl Drop for ImageLayerWriter { fn drop(&mut self) { - if let Some(inner) = self.inner.take() { - inner.blob_writer.into_inner().remove(); - } + let Some(inner) = self.inner.take() else { + return; + }; + + tokio::spawn(async move { + let ImageLayerWriterInner { + blob_writer, + _gate_guard, + .. + } = inner; + + let vfile = match blob_writer.into_inner(|_| None).await { + Ok(vfile) => vfile, + Err(e) => { + error!(err=%e, "failed to remove image layer writer file"); + drop(_gate_guard); + return; + } + }; + + if let Err(e) = std::fs::remove_file(vfile.path()) + .maybe_fatal_err("failed to remove the virtual file") + { + error!(err=%e, path=%vfile.path(), "failed to remove image layer writer file"); + } + drop(_gate_guard); + }); } } diff --git a/pageserver/src/tenant/vectored_blob_io.rs b/pageserver/src/tenant/vectored_blob_io.rs index 166917d674..8b30e70e41 100644 --- a/pageserver/src/tenant/vectored_blob_io.rs +++ b/pageserver/src/tenant/vectored_blob_io.rs @@ -677,7 +677,6 @@ impl StreamingVectoredReadPlanner { #[cfg(test)] mod tests { - use anyhow::Error; use super::super::blob_io::tests::{random_array, write_maybe_compressed}; use super::*; @@ -960,13 +959,16 @@ mod tests { } } - async fn round_trip_test_compressed(blobs: &[Vec], compression: bool) -> Result<(), Error> { + async fn round_trip_test_compressed( + blobs: &[Vec], + compression: bool, + ) -> anyhow::Result<()> { let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error).with_scope_unit_test(); let (_temp_dir, pathbuf, offsets) = - write_maybe_compressed::(blobs, compression, &ctx).await?; + write_maybe_compressed(blobs, compression, &ctx).await?; - let file = VirtualFile::open(&pathbuf, &ctx).await?; + let file = VirtualFile::open_v2(&pathbuf, &ctx).await?; let file_len = std::fs::metadata(&pathbuf)?.len(); // Multiply by two (compressed data might need more space), and add a few bytes for the header @@ -1003,7 +1005,7 @@ mod tests { } #[tokio::test] - async fn test_really_big_array() -> Result<(), Error> { + async fn test_really_big_array() -> anyhow::Result<()> { let blobs = &[ b"test".to_vec(), random_array(10 * PAGE_SZ), @@ -1018,7 +1020,7 @@ mod tests { } #[tokio::test] - async fn test_arrays_inc() -> Result<(), Error> { + async fn test_arrays_inc() -> anyhow::Result<()> { let blobs = (0..PAGE_SZ / 8) .map(|v| random_array(v * 16)) .collect::>(); diff --git a/pageserver/src/virtual_file.rs b/pageserver/src/virtual_file.rs index cd3d897423..63c1ccd4c8 100644 --- a/pageserver/src/virtual_file.rs +++ b/pageserver/src/virtual_file.rs @@ -12,7 +12,7 @@ //! src/backend/storage/file/fd.c //! use std::fs::File; -use std::io::{Error, ErrorKind, Seek, SeekFrom}; +use std::io::{Error, ErrorKind}; use std::os::fd::{AsRawFd, FromRawFd, IntoRawFd, OwnedFd, RawFd}; #[cfg(target_os = "linux")] use std::os::unix::fs::OpenOptionsExt; @@ -185,18 +185,14 @@ impl VirtualFile { self.inner.sync_data().await } + pub async fn set_len(&self, len: u64) -> Result<(), Error> { + self.inner.set_len(len).await + } + pub async fn metadata(&self) -> Result { self.inner.metadata().await } - pub fn remove(self) { - self.inner.remove(); - } - - pub async fn seek(&mut self, pos: SeekFrom) -> Result { - self.inner.seek(pos).await - } - pub async fn read_exact_at( &self, slice: Slice, @@ -227,25 +223,31 @@ impl VirtualFile { self.inner.write_all_at(buf, offset, ctx).await } - pub async fn write_all( - &mut self, - buf: FullSlice, + pub(crate) async fn read_to_string>( + path: P, ctx: &RequestContext, - ) -> (FullSlice, Result) { - self.inner.write_all(buf, ctx).await - } - - async fn read_to_end(&mut self, buf: &mut Vec, ctx: &RequestContext) -> Result<(), Error> { - self.inner.read_to_end(buf, ctx).await - } - - pub(crate) async fn read_to_string( - &mut self, - ctx: &RequestContext, - ) -> Result { + ) -> std::io::Result { + let file = VirtualFile::open(path, ctx).await?; // TODO: open_v2 let mut buf = Vec::new(); - self.read_to_end(&mut buf, ctx).await?; - Ok(String::from_utf8(buf)?) + let mut tmp = vec![0; 128]; + let mut pos: u64 = 0; + loop { + let slice = tmp.slice(..128); + let (slice, res) = file.inner.read_at(slice, pos, ctx).await; + match res { + Ok(0) => break, + Ok(n) => { + pos += n as u64; + buf.extend_from_slice(&slice[..n]); + } + Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => {} + Err(e) => return Err(e), + } + tmp = slice.into_inner(); + } + String::from_utf8(buf).map_err(|_| { + std::io::Error::new(ErrorKind::InvalidData, "file contents are not valid UTF-8") + }) } } @@ -292,9 +294,6 @@ pub struct VirtualFileInner { /// belongs to a different VirtualFile. handle: RwLock, - /// Current file position - pos: u64, - /// File path and options to use to open it. /// /// Note: this only contains the options needed to re-open it. For example, @@ -608,7 +607,6 @@ impl VirtualFileInner { let vfile = VirtualFileInner { handle: RwLock::new(handle), - pos: 0, path: path.to_owned(), open_options: reopen_options, }; @@ -675,6 +673,13 @@ impl VirtualFileInner { }) } + pub async fn set_len(&self, len: u64) -> Result<(), Error> { + with_file!(self, StorageIoOperation::SetLen, |file_guard| { + let (_file_guard, res) = io_engine::get().set_len(file_guard, len).await; + res.maybe_fatal_err("set_len") + }) + } + /// Helper function internal to `VirtualFile` that looks up the underlying File, /// opens it and evicts some other File if necessary. The passed parameter is /// assumed to be a function available for the physical `File`. @@ -742,38 +747,6 @@ impl VirtualFileInner { }) } - pub fn remove(self) { - let path = self.path.clone(); - drop(self); - std::fs::remove_file(path).expect("failed to remove the virtual file"); - } - - pub async fn seek(&mut self, pos: SeekFrom) -> Result { - match pos { - SeekFrom::Start(offset) => { - self.pos = offset; - } - SeekFrom::End(offset) => { - self.pos = with_file!(self, StorageIoOperation::Seek, |mut file_guard| file_guard - .with_std_file_mut(|std_file| std_file.seek(SeekFrom::End(offset))))? - } - SeekFrom::Current(offset) => { - let pos = self.pos as i128 + offset as i128; - if pos < 0 { - return Err(Error::new( - ErrorKind::InvalidInput, - "offset would be negative", - )); - } - if pos > u64::MAX as i128 { - return Err(Error::new(ErrorKind::InvalidInput, "offset overflow")); - } - self.pos = pos as u64; - } - } - Ok(self.pos) - } - /// Read the file contents in range `offset..(offset + slice.bytes_total())` into `slice[0..slice.bytes_total()]`. /// /// The returned `Slice` is equivalent to the input `slice`, i.e., it's the same view into the same buffer. @@ -857,59 +830,7 @@ impl VirtualFileInner { (restore(buf), Ok(())) } - /// Writes `buf` to the file at the current offset. - /// - /// Panics if there is an uninitialized range in `buf`, as that is most likely a bug in the caller. - pub async fn write_all( - &mut self, - buf: FullSlice, - ctx: &RequestContext, - ) -> (FullSlice, Result) { - let buf = buf.into_raw_slice(); - let bounds = buf.bounds(); - let restore = - |buf: Slice<_>| FullSlice::must_new(Slice::from_buf_bounds(buf.into_inner(), bounds)); - let nbytes = buf.len(); - let mut buf = buf; - while !buf.is_empty() { - let (tmp, res) = self.write(FullSlice::must_new(buf), ctx).await; - buf = tmp.into_raw_slice(); - match res { - Ok(0) => { - return ( - restore(buf), - Err(Error::new( - std::io::ErrorKind::WriteZero, - "failed to write whole buffer", - )), - ); - } - Ok(n) => { - buf = buf.slice(n..); - } - Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => {} - Err(e) => return (restore(buf), Err(e)), - } - } - (restore(buf), Ok(nbytes)) - } - - async fn write( - &mut self, - buf: FullSlice, - ctx: &RequestContext, - ) -> (FullSlice, Result) { - let pos = self.pos; - let (buf, res) = self.write_at(buf, pos, ctx).await; - let n = match res { - Ok(n) => n, - Err(e) => return (buf, Err(e)), - }; - self.pos += n as u64; - (buf, Ok(n)) - } - - pub(crate) async fn read_at( + pub(super) async fn read_at( &self, buf: tokio_epoll_uring::Slice, offset: u64, @@ -937,23 +858,11 @@ impl VirtualFileInner { }) } - /// The function aborts the process if the error is fatal. async fn write_at( &self, buf: FullSlice, offset: u64, ctx: &RequestContext, - ) -> (FullSlice, Result) { - let (slice, result) = self.write_at_inner(buf, offset, ctx).await; - let result = result.maybe_fatal_err("write_at"); - (slice, result) - } - - async fn write_at_inner( - &self, - buf: FullSlice, - offset: u64, - ctx: &RequestContext, ) -> (FullSlice, Result) { let file_guard = match self.lock_file().await { Ok(file_guard) => file_guard, @@ -962,30 +871,13 @@ impl VirtualFileInner { observe_duration!(StorageIoOperation::Write, { let ((_file_guard, buf), result) = io_engine::get().write_at(file_guard, offset, buf).await; + let result = result.maybe_fatal_err("write_at"); if let Ok(size) = result { ctx.io_size_metrics().write.add(size.into_u64()); } (buf, result) }) } - - async fn read_to_end(&mut self, buf: &mut Vec, ctx: &RequestContext) -> Result<(), Error> { - let mut tmp = vec![0; 128]; - loop { - let slice = tmp.slice(..128); - let (slice, res) = self.read_at(slice, self.pos, ctx).await; - match res { - Ok(0) => return Ok(()), - Ok(n) => { - self.pos += n as u64; - buf.extend_from_slice(&slice[..n]); - } - Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => {} - Err(e) => return Err(e), - } - tmp = slice.into_inner(); - } - } } // Adapted from https://doc.rust-lang.org/1.72.0/src/std/os/unix/fs.rs.html#117-135 @@ -1200,19 +1092,6 @@ impl FileGuard { let _ = file.into_raw_fd(); res } - /// Soft deprecation: we'll move VirtualFile to async APIs and remove this function eventually. - fn with_std_file_mut(&mut self, with: F) -> R - where - F: FnOnce(&mut File) -> R, - { - // SAFETY: - // - lifetime of the fd: `file` doesn't outlive the OwnedFd stored in `self`. - // - &mut usage below: `self` is `&mut`, hence this call is the only task/thread that has control over the underlying fd - let mut file = unsafe { File::from_raw_fd(self.as_ref().as_raw_fd()) }; - let res = with(&mut file); - let _ = file.into_raw_fd(); - res - } } impl tokio_epoll_uring::IoFd for FileGuard { @@ -1380,7 +1259,6 @@ static SYNC_MODE: AtomicU8 = AtomicU8::new(SyncMode::Sync as u8); #[cfg(test)] mod tests { - use std::io::Write; use std::os::unix::fs::FileExt; use std::sync::Arc; @@ -1433,43 +1311,6 @@ mod tests { MaybeVirtualFile::File(file) => file.write_all_at(&buf[..], offset), } } - async fn seek(&mut self, pos: SeekFrom) -> Result { - match self { - MaybeVirtualFile::VirtualFile(file) => file.seek(pos).await, - MaybeVirtualFile::File(file) => file.seek(pos), - } - } - async fn write_all( - &mut self, - buf: FullSlice, - ctx: &RequestContext, - ) -> Result<(), Error> { - match self { - MaybeVirtualFile::VirtualFile(file) => { - let (_buf, res) = file.write_all(buf, ctx).await; - res.map(|_| ()) - } - MaybeVirtualFile::File(file) => file.write_all(&buf[..]), - } - } - - // Helper function to slurp contents of a file, starting at the current position, - // into a string - async fn read_string(&mut self, ctx: &RequestContext) -> Result { - use std::io::Read; - let mut buf = String::new(); - match self { - MaybeVirtualFile::VirtualFile(file) => { - let mut buf = Vec::new(); - file.read_to_end(&mut buf, ctx).await?; - return Ok(String::from_utf8(buf).unwrap()); - } - MaybeVirtualFile::File(file) => { - file.read_to_string(&mut buf)?; - } - } - Ok(buf) - } // Helper function to slurp a portion of a file into a string async fn read_string_at( @@ -1565,48 +1406,23 @@ mod tests { .await?; file_a - .write_all(b"foobar".to_vec().slice_len(), &ctx) + .write_all_at(IoBuffer::from(b"foobar").slice_len(), 0, &ctx) .await?; // cannot read from a file opened in write-only mode - let _ = file_a.read_string(&ctx).await.unwrap_err(); + let _ = file_a.read_string_at(0, 1, &ctx).await.unwrap_err(); // Close the file and re-open for reading let mut file_a = A::open(path_a, OpenOptions::new().read(true).to_owned(), &ctx).await?; // cannot write to a file opened in read-only mode let _ = file_a - .write_all(b"bar".to_vec().slice_len(), &ctx) + .write_all_at(IoBuffer::from(b"bar").slice_len(), 0, &ctx) .await .unwrap_err(); // Try simple read - assert_eq!("foobar", file_a.read_string(&ctx).await?); - - // It's positioned at the EOF now. - assert_eq!("", file_a.read_string(&ctx).await?); - - // Test seeks. - assert_eq!(file_a.seek(SeekFrom::Start(1)).await?, 1); - assert_eq!("oobar", file_a.read_string(&ctx).await?); - - assert_eq!(file_a.seek(SeekFrom::End(-2)).await?, 4); - assert_eq!("ar", file_a.read_string(&ctx).await?); - - assert_eq!(file_a.seek(SeekFrom::Start(1)).await?, 1); - assert_eq!(file_a.seek(SeekFrom::Current(2)).await?, 3); - assert_eq!("bar", file_a.read_string(&ctx).await?); - - assert_eq!(file_a.seek(SeekFrom::Current(-5)).await?, 1); - assert_eq!("oobar", file_a.read_string(&ctx).await?); - - // Test erroneous seeks to before byte 0 - file_a.seek(SeekFrom::End(-7)).await.unwrap_err(); - assert_eq!(file_a.seek(SeekFrom::Start(1)).await?, 1); - file_a.seek(SeekFrom::Current(-2)).await.unwrap_err(); - - // the erroneous seek should have left the position unchanged - assert_eq!("oobar", file_a.read_string(&ctx).await?); + assert_eq!("foobar", file_a.read_string_at(0, 6, &ctx).await?); // Create another test file, and try FileExt functions on it. let path_b = testdir.join("file_b"); @@ -1632,9 +1448,6 @@ mod tests { // Open a lot of files, enough to cause some evictions. (Or to be precise, // open the same file many times. The effect is the same.) - // - // leave file_a positioned at offset 1 before we start - assert_eq!(file_a.seek(SeekFrom::Start(1)).await?, 1); let mut vfiles = Vec::new(); for _ in 0..100 { @@ -1644,7 +1457,7 @@ mod tests { &ctx, ) .await?; - assert_eq!("FOOBAR", vfile.read_string(&ctx).await?); + assert_eq!("FOOBAR", vfile.read_string_at(0, 6, &ctx).await?); vfiles.push(vfile); } @@ -1652,8 +1465,8 @@ mod tests { assert!(vfiles.len() > TEST_MAX_FILE_DESCRIPTORS * 2); // The underlying file descriptor for 'file_a' should be closed now. Try to read - // from it again. We left the file positioned at offset 1 above. - assert_eq!("oobar", file_a.read_string(&ctx).await?); + // from it again. + assert_eq!("foobar", file_a.read_string_at(0, 6, &ctx).await?); // Check that all the other FDs still work too. Use them in random order for // good measure. @@ -1747,7 +1560,7 @@ mod tests { .await .unwrap(); let mut file = MaybeVirtualFile::from(VirtualFile::open(&path, &ctx).await.unwrap()); - let post = file.read_string(&ctx).await.unwrap(); + let post = file.read_string_at(0, 3, &ctx).await.unwrap(); assert_eq!(post, "foo"); assert!(!tmp_path.exists()); drop(file); @@ -1756,7 +1569,7 @@ mod tests { .await .unwrap(); let mut file = MaybeVirtualFile::from(VirtualFile::open(&path, &ctx).await.unwrap()); - let post = file.read_string(&ctx).await.unwrap(); + let post = file.read_string_at(0, 3, &ctx).await.unwrap(); assert_eq!(post, "bar"); assert!(!tmp_path.exists()); drop(file); @@ -1781,7 +1594,7 @@ mod tests { .unwrap(); let mut file = MaybeVirtualFile::from(VirtualFile::open(&path, &ctx).await.unwrap()); - let post = file.read_string(&ctx).await.unwrap(); + let post = file.read_string_at(0, 3, &ctx).await.unwrap(); assert_eq!(post, "foo"); assert!(!tmp_path.exists()); drop(file); diff --git a/pageserver/src/virtual_file/io_engine.rs b/pageserver/src/virtual_file/io_engine.rs index 758dd6e377..b7be243357 100644 --- a/pageserver/src/virtual_file/io_engine.rs +++ b/pageserver/src/virtual_file/io_engine.rs @@ -209,6 +209,22 @@ impl IoEngine { } } } + + pub(super) async fn set_len( + &self, + file_guard: FileGuard, + len: u64, + ) -> (FileGuard, std::io::Result<()>) { + match self { + IoEngine::NotSet => panic!("not initialized"), + // TODO: ftruncate op for tokio-epoll-uring + IoEngine::StdFs | IoEngine::TokioEpollUring => { + let res = file_guard.with_std_file(|std_file| std_file.set_len(len)); + (file_guard, res) + } + } + } + pub(super) async fn write_at( &self, file_guard: FileGuard, diff --git a/pageserver/src/virtual_file/owned_buffers_io/aligned_buffer/buffer_mut.rs b/pageserver/src/virtual_file/owned_buffers_io/aligned_buffer/buffer_mut.rs index 3ee1a3c162..07f949b89e 100644 --- a/pageserver/src/virtual_file/owned_buffers_io/aligned_buffer/buffer_mut.rs +++ b/pageserver/src/virtual_file/owned_buffers_io/aligned_buffer/buffer_mut.rs @@ -282,6 +282,17 @@ unsafe impl tokio_epoll_uring::IoBufMut for AlignedBufferMut { } } +impl std::io::Write for AlignedBufferMut { + fn write(&mut self, buf: &[u8]) -> std::io::Result { + self.extend_from_slice(buf); + Ok(buf.len()) + } + + fn flush(&mut self) -> std::io::Result<()> { + Ok(()) + } +} + #[cfg(test)] mod tests { diff --git a/pageserver/src/virtual_file/owned_buffers_io/io_buf_aligned.rs b/pageserver/src/virtual_file/owned_buffers_io/io_buf_aligned.rs index 4ea6b17744..1aa2c3027d 100644 --- a/pageserver/src/virtual_file/owned_buffers_io/io_buf_aligned.rs +++ b/pageserver/src/virtual_file/owned_buffers_io/io_buf_aligned.rs @@ -1,9 +1,11 @@ use tokio_epoll_uring::{IoBuf, IoBufMut}; -use crate::virtual_file::{IoBuffer, IoBufferMut, PageWriteGuardBuf}; +use crate::virtual_file::{self, IoBuffer, IoBufferMut, PageWriteGuardBuf}; /// A marker trait for a mutable aligned buffer type. -pub trait IoBufAlignedMut: IoBufMut {} +pub trait IoBufAlignedMut: IoBufMut { + const ALIGN: usize = virtual_file::get_io_buffer_alignment(); +} /// A marker trait for an aligned buffer type. pub trait IoBufAligned: IoBuf {} diff --git a/pageserver/src/virtual_file/owned_buffers_io/write.rs b/pageserver/src/virtual_file/owned_buffers_io/write.rs index f3ab2c285a..a87b9d175b 100644 --- a/pageserver/src/virtual_file/owned_buffers_io/write.rs +++ b/pageserver/src/virtual_file/owned_buffers_io/write.rs @@ -1,6 +1,7 @@ mod flush; use std::sync::Arc; +use bytes::BufMut; pub(crate) use flush::FlushControl; use flush::FlushHandle; pub(crate) use flush::FlushTaskError; @@ -8,6 +9,7 @@ use tokio_epoll_uring::IoBuf; use tokio_util::sync::CancellationToken; use super::io_buf_aligned::IoBufAligned; +use super::io_buf_aligned::IoBufAlignedMut; use super::io_buf_ext::{FullSlice, IoBufExt}; use crate::context::RequestContext; use crate::virtual_file::{IoBuffer, IoBufferMut}; @@ -64,7 +66,7 @@ pub struct BufferedWriter { impl BufferedWriter where - B: Buffer + Send + 'static, + B: IoBufAlignedMut + Buffer + Send + 'static, Buf: IoBufAligned + Send + Sync + CheapCloneForRead, W: OwnedAsyncWriter + Send + Sync + 'static + std::fmt::Debug, { @@ -73,6 +75,7 @@ where /// The `buf_new` function provides a way to initialize the owned buffers used by this writer. pub fn new( writer: Arc, + start_offset: u64, buf_new: impl Fn() -> B, gate_guard: utils::sync::gate::GateGuard, cancel: CancellationToken, @@ -91,7 +94,7 @@ where ctx.attached_child(), flush_task_span, ), - bytes_submitted: 0, + bytes_submitted: start_offset, } } @@ -116,21 +119,29 @@ where } #[cfg_attr(target_os = "macos", allow(dead_code))] - pub async fn flush_and_into_inner( - mut self, - ctx: &RequestContext, - ) -> Result<(u64, Arc), FlushTaskError> { - self.flush(ctx).await?; - + pub async fn shutdown( + self, + mut handle_tail: impl FnMut(B) -> Option, + ) -> Result<(u64, W), FlushTaskError> { let Self { - mutable: buf, + mutable: tail, maybe_flushed: _, writer, mut flush_handle, - bytes_submitted: bytes_amount, + bytes_submitted: submit_offset, } = self; - flush_handle.shutdown().await?; - assert!(buf.is_some()); + + let ctx = flush_handle.shutdown().await?; + let buf = tail.expect("must not use after an error"); + let writer = Arc::into_inner(writer).expect("writer is the only strong reference"); + let mut bytes_amount = submit_offset; + if let Some(buf) = handle_tail(buf) { + bytes_amount += buf.pending() as u64; + // TODO: infinite retries + maybe_fatal_err like we do in the flush loop; can we just send this + // as work into the flush loop, as part of flush_handle.shutdown() + let (_, res) = writer.write_all_at(buf.flush(), submit_offset, &ctx).await; + let _: () = res.unwrap(); // DO NOT MERGE, THIS CAN FAIL E.G. on ENOSPC + } Ok((bytes_amount, writer)) } @@ -235,6 +246,10 @@ pub trait Buffer { /// panics if `other.len() > self.cap() - self.pending()`. fn extend_from_slice(&mut self, other: &[u8]); + /// Add `count` bytes `val` into `self`. + /// Panics if `count > self.cap() - self.pending()`. + fn extend_with(&mut self, val: u8, count: usize); + /// Number of bytes in the buffer. fn pending(&self) -> usize; @@ -262,6 +277,14 @@ impl Buffer for IoBufferMut { IoBufferMut::extend_from_slice(self, other); } + fn extend_with(&mut self, val: u8, count: usize) { + if self.len() + count > self.cap() { + panic!("Buffer capacity exceeded"); + } + + IoBufferMut::put_bytes(self, val, count); + } + fn pending(&self) -> usize { self.len() } @@ -334,6 +357,7 @@ mod tests { let cancel = CancellationToken::new(); let mut writer = BufferedWriter::<_, RecorderWriter>::new( recorder, + 0, || IoBufferMut::with_capacity(2), gate.enter()?, cancel, @@ -350,7 +374,7 @@ mod tests { writer.write_buffered_borrowed(b"j", ctx).await?; writer.write_buffered_borrowed(b"klmno", ctx).await?; - let (_, recorder) = writer.flush_and_into_inner(ctx).await?; + let (_, recorder) = writer.shutdown(Some).await?; assert_eq!( recorder.get_writes(), { diff --git a/pageserver/src/virtual_file/owned_buffers_io/write/flush.rs b/pageserver/src/virtual_file/owned_buffers_io/write/flush.rs index c076ba0eca..c417776139 100644 --- a/pageserver/src/virtual_file/owned_buffers_io/write/flush.rs +++ b/pageserver/src/virtual_file/owned_buffers_io/write/flush.rs @@ -1,5 +1,5 @@ use std::ops::ControlFlow; -use std::sync::Arc; +use std::{marker::PhantomData, sync::Arc}; use tokio_util::sync::CancellationToken; use tracing::{Instrument, info, info_span, warn}; @@ -21,7 +21,10 @@ pub struct FlushHandleInner { /// and receives recyled buffer. channel: duplex::mpsc::Duplex, FullSlice>, /// Join handle for the background flush task. - join_handle: tokio::task::JoinHandle, FlushTaskError>>, + join_handle: tokio::task::JoinHandle>, + + // TODO: get rit of the type parameter? + _phantom: PhantomData, } struct FlushRequest { @@ -144,6 +147,7 @@ where inner: Some(FlushHandleInner { channel: front, join_handle, + _phantom: PhantomData, }), } } @@ -179,12 +183,13 @@ where Err(self .shutdown() .await - .expect_err("flush task only disconnects duplex if it exits with an error")) + .err() + .expect("flush task only disconnects duplex if it exits with an error")) } /// Cleans up the channel, join the flush task. - pub async fn shutdown(&mut self) -> Result, FlushTaskError> { - let handle = self + pub async fn shutdown(&mut self) -> Result { + let handle: FlushHandleInner = self .inner .take() .expect("must not use after we returned an error"); @@ -243,7 +248,7 @@ where } /// Runs the background flush task. - async fn run(mut self) -> Result, FlushTaskError> { + async fn run(mut self) -> Result { // Exit condition: channel is closed and there is no remaining buffer to be flushed while let Some(request) = self.channel.recv().await { #[cfg(test)] @@ -313,8 +318,7 @@ where continue; } } - - Ok(self.writer) + Ok(self.ctx) } } @@ -349,7 +353,7 @@ impl FlushNotStarted { impl FlushInProgress { /// Waits until background flush is done. pub async fn wait_until_flush_is_done(self) -> FlushDone { - self.done_flush_rx.await.unwrap(); + let _ = self.done_flush_rx.await; FlushDone } }