diff --git a/.github/workflows/_build-and-test-locally.yml b/.github/workflows/_build-and-test-locally.yml index 3a88bc844a..e31d3dec5b 100644 --- a/.github/workflows/_build-and-test-locally.yml +++ b/.github/workflows/_build-and-test-locally.yml @@ -275,7 +275,7 @@ jobs: for io_mode in buffered direct direct-rw ; do NEON_PAGESERVER_UNIT_TEST_GET_VECTORED_CONCURRENT_IO=$get_vectored_concurrent_io \ NEON_PAGESERVER_UNIT_TEST_VIRTUAL_FILE_IOENGINE=$io_engine \ - NEON_PAGESERVER_UNIT_TEST_VIRTUAL_FILE_IOMODE=$io_mode \ + NEON_PAGESERVER_UNIT_TEST_VIRTUAL_FILE_IO_MODE=$io_mode \ ${cov_prefix} \ cargo nextest run $CARGO_FLAGS $CARGO_FEATURES -E 'package(pageserver)' done @@ -395,7 +395,7 @@ jobs: BUILD_TAG: ${{ inputs.build-tag }} PAGESERVER_VIRTUAL_FILE_IO_ENGINE: tokio-epoll-uring PAGESERVER_GET_VECTORED_CONCURRENT_IO: sidecar-task - PAGESERVER_VIRTUAL_FILE_IO_MODE: direct + PAGESERVER_VIRTUAL_FILE_IO_MODE: direct-rw USE_LFC: ${{ matrix.lfc_state == 'with-lfc' && 'true' || 'false' }} # Temporary disable this step until we figure out why it's so flaky diff --git a/.github/workflows/build_and_test.yml b/.github/workflows/build_and_test.yml index 5b292d88e2..f14222bb4e 100644 --- a/.github/workflows/build_and_test.yml +++ b/.github/workflows/build_and_test.yml @@ -324,7 +324,7 @@ jobs: TEST_RESULT_CONNSTR: "${{ secrets.REGRESS_TEST_RESULT_CONNSTR_NEW }}" PAGESERVER_VIRTUAL_FILE_IO_ENGINE: tokio-epoll-uring PAGESERVER_GET_VECTORED_CONCURRENT_IO: sidecar-task - PAGESERVER_VIRTUAL_FILE_IO_MODE: direct + PAGESERVER_VIRTUAL_FILE_IO_MODE: direct-rw SYNC_BETWEEN_TESTS: true # XXX: no coverage data handling here, since benchmarks are run on release builds, # while coverage is currently collected for the debug ones diff --git a/Cargo.lock b/Cargo.lock index fdd08141fa..4573629964 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4302,6 +4302,7 @@ dependencies = [ "remote_storage", "reqwest", "rpds", + "rstest", "rustls 0.23.18", "scopeguard", "send-future", diff --git a/libs/pageserver_api/src/models.rs b/libs/pageserver_api/src/models.rs index 120c508f7a..ff911499ab 100644 --- a/libs/pageserver_api/src/models.rs +++ b/libs/pageserver_api/src/models.rs @@ -1803,6 +1803,8 @@ pub struct TopTenantShardsResponse { } pub mod virtual_file { + use std::sync::LazyLock; + #[derive( Copy, Clone, @@ -1840,35 +1842,33 @@ pub mod virtual_file { pub enum IoMode { /// Uses buffered IO. Buffered, - /// Uses direct IO, error out if the operation fails. + /// Uses direct IO for reads only. #[cfg(target_os = "linux")] Direct, + /// Use direct IO for reads and writes. + #[cfg(target_os = "linux")] + DirectRw, } impl IoMode { pub fn preferred() -> Self { // The default behavior when running Rust unit tests without any further - // flags is to use the newest behavior if available on the platform (Direct). + // flags is to use the newest behavior (DirectRw). // The CI uses the following environment variable to unit tests for all // different modes. // NB: the Python regression & perf tests have their own defaults management // that writes pageserver.toml; they do not use this variable. if cfg!(test) { - use once_cell::sync::Lazy; - static CACHED: Lazy = Lazy::new(|| { + static CACHED: LazyLock = LazyLock::new(|| { utils::env::var_serde_json_string( "NEON_PAGESERVER_UNIT_TEST_VIRTUAL_FILE_IO_MODE", ) - .unwrap_or({ + .unwrap_or( #[cfg(target_os = "linux")] - { - IoMode::Direct - } + IoMode::DirectRw, #[cfg(not(target_os = "linux"))] - { - IoMode::Buffered - } - }) + IoMode::Buffered, + ) }); *CACHED } else { @@ -1885,6 +1885,8 @@ pub mod virtual_file { v if v == (IoMode::Buffered as u8) => IoMode::Buffered, #[cfg(target_os = "linux")] v if v == (IoMode::Direct as u8) => IoMode::Direct, + #[cfg(target_os = "linux")] + v if v == (IoMode::DirectRw as u8) => IoMode::DirectRw, x => return Err(x), }) } diff --git a/pageserver/Cargo.toml b/pageserver/Cargo.toml index fee78aa94d..8abd504922 100644 --- a/pageserver/Cargo.toml +++ b/pageserver/Cargo.toml @@ -106,6 +106,7 @@ hex-literal.workspace = true tokio = { workspace = true, features = ["process", "sync", "fs", "rt", "io-util", "time", "test-util"] } indoc.workspace = true uuid.workspace = true +rstest.workspace = true [[bench]] name = "bench_layer_map" diff --git a/pageserver/benches/bench_ingest.rs b/pageserver/benches/bench_ingest.rs index 5c16548de8..2836450a0e 100644 --- a/pageserver/benches/bench_ingest.rs +++ b/pageserver/benches/bench_ingest.rs @@ -248,6 +248,8 @@ fn criterion_benchmark(c: &mut Criterion) { IoMode::Buffered, #[cfg(target_os = "linux")] IoMode::Direct, + #[cfg(target_os = "linux")] + IoMode::DirectRw, ] { for param in expect.clone() { let HandPickedParameters { @@ -309,78 +311,114 @@ cargo bench --bench bench_ingest im4gn.2xlarge: ingest/io_mode=Buffered volume_mib=128 key_size_bytes=100 key_layout=Sequential write_delta=Yes - time: [1.8491 s 1.8540 s 1.8592 s] - thrpt: [68.847 MiB/s 69.039 MiB/s 69.222 MiB/s] + time: [1.2901 s 1.2943 s 1.2991 s] + thrpt: [98.533 MiB/s 98.892 MiB/s 99.220 MiB/s] ingest/io_mode=Buffered volume_mib=128 key_size_bytes=100 key_layout=Random write_delta=Yes - time: [2.6976 s 2.7123 s 2.7286 s] - thrpt: [46.911 MiB/s 47.193 MiB/s 47.450 MiB/s] + time: [2.1387 s 2.1623 s 2.1845 s] + thrpt: [58.595 MiB/s 59.197 MiB/s 59.851 MiB/s] ingest/io_mode=Buffered volume_mib=128 key_size_bytes=100 key_layout=RandomReuse(1023) write_delta=Y... - time: [1.7433 s 1.7510 s 1.7600 s] - thrpt: [72.729 MiB/s 73.099 MiB/s 73.423 MiB/s] + time: [1.2036 s 1.2074 s 1.2122 s] + thrpt: [105.60 MiB/s 106.01 MiB/s 106.35 MiB/s] ingest/io_mode=Buffered volume_mib=128 key_size_bytes=100 key_layout=Sequential write_delta=No - time: [499.63 ms 500.07 ms 500.46 ms] - thrpt: [255.77 MiB/s 255.96 MiB/s 256.19 MiB/s] + time: [520.55 ms 521.46 ms 522.57 ms] + thrpt: [244.94 MiB/s 245.47 MiB/s 245.89 MiB/s] ingest/io_mode=Buffered volume_mib=128 key_size_bytes=8192 key_layout=Sequential write_delta=Yes - time: [456.97 ms 459.61 ms 461.92 ms] - thrpt: [277.11 MiB/s 278.50 MiB/s 280.11 MiB/s] + time: [440.33 ms 442.24 ms 444.10 ms] + thrpt: [288.22 MiB/s 289.43 MiB/s 290.69 MiB/s] ingest/io_mode=Buffered volume_mib=128 key_size_bytes=8192 key_layout=Sequential write_delta=No - time: [158.82 ms 159.16 ms 159.56 ms] - thrpt: [802.22 MiB/s 804.24 MiB/s 805.93 MiB/s] + time: [168.78 ms 169.42 ms 170.18 ms] + thrpt: [752.16 MiB/s 755.52 MiB/s 758.40 MiB/s] ingest/io_mode=Direct volume_mib=128 key_size_bytes=100 key_layout=Sequential write_delta=Yes - time: [1.8856 s 1.8997 s 1.9179 s] - thrpt: [66.740 MiB/s 67.380 MiB/s 67.882 MiB/s] + time: [1.2978 s 1.3094 s 1.3227 s] + thrpt: [96.775 MiB/s 97.758 MiB/s 98.632 MiB/s] ingest/io_mode=Direct volume_mib=128 key_size_bytes=100 key_layout=Random write_delta=Yes - time: [2.7468 s 2.7625 s 2.7785 s] - thrpt: [46.068 MiB/s 46.335 MiB/s 46.600 MiB/s] + time: [2.1976 s 2.2067 s 2.2154 s] + thrpt: [57.777 MiB/s 58.006 MiB/s 58.245 MiB/s] ingest/io_mode=Direct volume_mib=128 key_size_bytes=100 key_layout=RandomReuse(1023) write_delta=Yes - time: [1.7689 s 1.7726 s 1.7767 s] - thrpt: [72.045 MiB/s 72.208 MiB/s 72.363 MiB/s] + time: [1.2103 s 1.2160 s 1.2233 s] + thrpt: [104.64 MiB/s 105.26 MiB/s 105.76 MiB/s] ingest/io_mode=Direct volume_mib=128 key_size_bytes=100 key_layout=Sequential write_delta=No - time: [497.64 ms 498.60 ms 499.67 ms] - thrpt: [256.17 MiB/s 256.72 MiB/s 257.21 MiB/s] + time: [525.05 ms 526.37 ms 527.79 ms] + thrpt: [242.52 MiB/s 243.17 MiB/s 243.79 MiB/s] ingest/io_mode=Direct volume_mib=128 key_size_bytes=8192 key_layout=Sequential write_delta=Yes - time: [493.72 ms 505.07 ms 518.03 ms] - thrpt: [247.09 MiB/s 253.43 MiB/s 259.26 MiB/s] + time: [443.06 ms 444.88 ms 447.15 ms] + thrpt: [286.26 MiB/s 287.72 MiB/s 288.90 MiB/s] ingest/io_mode=Direct volume_mib=128 key_size_bytes=8192 key_layout=Sequential write_delta=No - time: [267.76 ms 267.85 ms 267.96 ms] - thrpt: [477.69 MiB/s 477.88 MiB/s 478.03 MiB/s] + time: [169.40 ms 169.80 ms 170.17 ms] + thrpt: [752.21 MiB/s 753.81 MiB/s 755.60 MiB/s] +ingest/io_mode=DirectRw volume_mib=128 key_size_bytes=100 key_layout=Sequential write_delta=Yes + time: [1.2844 s 1.2915 s 1.2990 s] + thrpt: [98.536 MiB/s 99.112 MiB/s 99.657 MiB/s] +ingest/io_mode=DirectRw volume_mib=128 key_size_bytes=100 key_layout=Random write_delta=Yes + time: [2.1431 s 2.1663 s 2.1900 s] + thrpt: [58.446 MiB/s 59.087 MiB/s 59.726 MiB/s] +ingest/io_mode=DirectRw volume_mib=128 key_size_bytes=100 key_layout=RandomReuse(1023) write_delta=Y... + time: [1.1906 s 1.1926 s 1.1947 s] + thrpt: [107.14 MiB/s 107.33 MiB/s 107.51 MiB/s] +ingest/io_mode=DirectRw volume_mib=128 key_size_bytes=100 key_layout=Sequential write_delta=No + time: [516.86 ms 518.25 ms 519.47 ms] + thrpt: [246.40 MiB/s 246.98 MiB/s 247.65 MiB/s] +ingest/io_mode=DirectRw volume_mib=128 key_size_bytes=8192 key_layout=Sequential write_delta=Yes + time: [536.50 ms 536.53 ms 536.60 ms] + thrpt: [238.54 MiB/s 238.57 MiB/s 238.59 MiB/s] +ingest/io_mode=DirectRw volume_mib=128 key_size_bytes=8192 key_layout=Sequential write_delta=No + time: [267.77 ms 267.90 ms 268.04 ms] + thrpt: [477.53 MiB/s 477.79 MiB/s 478.02 MiB/s] Hetzner AX102: ingest/io_mode=Buffered volume_mib=128 key_size_bytes=100 key_layout=Sequential write_delta=Yes - time: [1.0683 s 1.1006 s 1.1386 s] - thrpt: [112.42 MiB/s 116.30 MiB/s 119.82 MiB/s] + time: [836.58 ms 861.93 ms 886.57 ms] + thrpt: [144.38 MiB/s 148.50 MiB/s 153.00 MiB/s] ingest/io_mode=Buffered volume_mib=128 key_size_bytes=100 key_layout=Random write_delta=Yes - time: [1.5719 s 1.6012 s 1.6228 s] - thrpt: [78.877 MiB/s 79.938 MiB/s 81.430 MiB/s] + time: [1.2782 s 1.3191 s 1.3665 s] + thrpt: [93.668 MiB/s 97.037 MiB/s 100.14 MiB/s] ingest/io_mode=Buffered volume_mib=128 key_size_bytes=100 key_layout=RandomReuse(1023) write_delta=Y... - time: [1.1095 s 1.1331 s 1.1580 s] - thrpt: [110.53 MiB/s 112.97 MiB/s 115.37 MiB/s] + time: [791.27 ms 807.08 ms 822.95 ms] + thrpt: [155.54 MiB/s 158.60 MiB/s 161.77 MiB/s] ingest/io_mode=Buffered volume_mib=128 key_size_bytes=100 key_layout=Sequential write_delta=No - time: [303.20 ms 307.83 ms 311.90 ms] - thrpt: [410.39 MiB/s 415.81 MiB/s 422.16 MiB/s] + time: [310.78 ms 314.66 ms 318.47 ms] + thrpt: [401.92 MiB/s 406.79 MiB/s 411.87 MiB/s] ingest/io_mode=Buffered volume_mib=128 key_size_bytes=8192 key_layout=Sequential write_delta=Yes - time: [406.34 ms 429.37 ms 451.63 ms] - thrpt: [283.42 MiB/s 298.11 MiB/s 315.00 MiB/s] + time: [377.11 ms 387.77 ms 399.21 ms] + thrpt: [320.63 MiB/s 330.10 MiB/s 339.42 MiB/s] ingest/io_mode=Buffered volume_mib=128 key_size_bytes=8192 key_layout=Sequential write_delta=No - time: [134.01 ms 135.78 ms 137.48 ms] - thrpt: [931.03 MiB/s 942.68 MiB/s 955.12 MiB/s] + time: [128.37 ms 132.96 ms 138.55 ms] + thrpt: [923.83 MiB/s 962.69 MiB/s 997.11 MiB/s] ingest/io_mode=Direct volume_mib=128 key_size_bytes=100 key_layout=Sequential write_delta=Yes - time: [1.0406 s 1.0580 s 1.0772 s] - thrpt: [118.83 MiB/s 120.98 MiB/s 123.00 MiB/s] + time: [900.38 ms 914.88 ms 928.86 ms] + thrpt: [137.80 MiB/s 139.91 MiB/s 142.16 MiB/s] ingest/io_mode=Direct volume_mib=128 key_size_bytes=100 key_layout=Random write_delta=Yes - time: [1.5059 s 1.5339 s 1.5625 s] - thrpt: [81.920 MiB/s 83.448 MiB/s 84.999 MiB/s] + time: [1.2538 s 1.2936 s 1.3313 s] + thrpt: [96.149 MiB/s 98.946 MiB/s 102.09 MiB/s] ingest/io_mode=Direct volume_mib=128 key_size_bytes=100 key_layout=RandomReuse(1023) write_delta=Yes - time: [1.0714 s 1.0934 s 1.1161 s] - thrpt: [114.69 MiB/s 117.06 MiB/s 119.47 MiB/s] + time: [787.17 ms 803.89 ms 820.63 ms] + thrpt: [155.98 MiB/s 159.23 MiB/s 162.61 MiB/s] ingest/io_mode=Direct volume_mib=128 key_size_bytes=100 key_layout=Sequential write_delta=No - time: [262.68 ms 265.14 ms 267.71 ms] - thrpt: [478.13 MiB/s 482.76 MiB/s 487.29 MiB/s] + time: [318.78 ms 321.89 ms 324.74 ms] + thrpt: [394.16 MiB/s 397.65 MiB/s 401.53 MiB/s] ingest/io_mode=Direct volume_mib=128 key_size_bytes=8192 key_layout=Sequential write_delta=Yes - time: [375.19 ms 393.80 ms 411.40 ms] - thrpt: [311.14 MiB/s 325.04 MiB/s 341.16 MiB/s] + time: [374.01 ms 383.45 ms 393.20 ms] + thrpt: [325.53 MiB/s 333.81 MiB/s 342.24 MiB/s] ingest/io_mode=Direct volume_mib=128 key_size_bytes=8192 key_layout=Sequential write_delta=No - time: [123.02 ms 123.85 ms 124.66 ms] - thrpt: [1.0027 GiB/s 1.0093 GiB/s 1.0161 GiB/s] + time: [137.98 ms 141.31 ms 143.57 ms] + thrpt: [891.58 MiB/s 905.79 MiB/s 927.66 MiB/s] +ingest/io_mode=DirectRw volume_mib=128 key_size_bytes=100 key_layout=Sequential write_delta=Yes + time: [613.69 ms 622.48 ms 630.97 ms] + thrpt: [202.86 MiB/s 205.63 MiB/s 208.57 MiB/s] +ingest/io_mode=DirectRw volume_mib=128 key_size_bytes=100 key_layout=Random write_delta=Yes + time: [1.0299 s 1.0766 s 1.1273 s] + thrpt: [113.55 MiB/s 118.90 MiB/s 124.29 MiB/s] +ingest/io_mode=DirectRw volume_mib=128 key_size_bytes=100 key_layout=RandomReuse(1023) write_delta=Y... + time: [637.80 ms 647.78 ms 658.01 ms] + thrpt: [194.53 MiB/s 197.60 MiB/s 200.69 MiB/s] +ingest/io_mode=DirectRw volume_mib=128 key_size_bytes=100 key_layout=Sequential write_delta=No + time: [266.09 ms 267.20 ms 268.31 ms] + thrpt: [477.06 MiB/s 479.04 MiB/s 481.04 MiB/s] +ingest/io_mode=DirectRw volume_mib=128 key_size_bytes=8192 key_layout=Sequential write_delta=Yes + time: [269.34 ms 273.27 ms 277.69 ms] + thrpt: [460.95 MiB/s 468.40 MiB/s 475.24 MiB/s] +ingest/io_mode=DirectRw volume_mib=128 key_size_bytes=8192 key_layout=Sequential write_delta=No + time: [123.18 ms 124.24 ms 125.15 ms] + thrpt: [1022.8 MiB/s 1.0061 GiB/s 1.0148 GiB/s] */ diff --git a/pageserver/src/metrics.rs b/pageserver/src/metrics.rs index ce229bbbec..b16970c911 100644 --- a/pageserver/src/metrics.rs +++ b/pageserver/src/metrics.rs @@ -1289,6 +1289,7 @@ pub(crate) enum StorageIoOperation { Seek, Fsync, Metadata, + SetLen, } impl StorageIoOperation { @@ -1303,6 +1304,7 @@ impl StorageIoOperation { StorageIoOperation::Seek => "seek", StorageIoOperation::Fsync => "fsync", StorageIoOperation::Metadata => "metadata", + StorageIoOperation::SetLen => "set_len", } } } diff --git a/pageserver/src/tenant/blob_io.rs b/pageserver/src/tenant/blob_io.rs index a147fe4468..8cf3c548c9 100644 --- a/pageserver/src/tenant/blob_io.rs +++ b/pageserver/src/tenant/blob_io.rs @@ -15,21 +15,23 @@ //! len >= 128: 1CCCXXXX XXXXXXXX XXXXXXXX XXXXXXXX //! use std::cmp::min; -use std::io::Error; +use anyhow::Context; use async_compression::Level; use bytes::{BufMut, BytesMut}; use pageserver_api::models::ImageCompressionAlgorithm; use tokio::io::AsyncWriteExt; -use tokio_epoll_uring::{BoundedBuf, IoBuf, Slice}; +use tokio_epoll_uring::IoBuf; use tokio_util::sync::CancellationToken; use tracing::warn; use crate::context::RequestContext; use crate::page_cache::PAGE_SZ; use crate::tenant::block_io::BlockCursor; -use crate::virtual_file::TempVirtualFile; +use crate::virtual_file::IoBufferMut; use crate::virtual_file::owned_buffers_io::io_buf_ext::{FullSlice, IoBufExt}; +use crate::virtual_file::owned_buffers_io::write::{BufferedWriter, FlushTaskError}; +use crate::virtual_file::owned_buffers_io::write::{BufferedWriterShutdownMode, OwnedAsyncWriter}; #[derive(Copy, Clone, Debug)] pub struct CompressionInfo { @@ -50,12 +52,9 @@ pub struct Header { impl Header { /// Decodes a header from a byte slice. - pub fn decode(bytes: &[u8]) -> Result { + pub fn decode(bytes: &[u8]) -> anyhow::Result { let Some(&first_header_byte) = bytes.first() else { - return Err(std::io::Error::new( - std::io::ErrorKind::InvalidData, - "zero-length blob header", - )); + anyhow::bail!("zero-length blob header"); }; // If the first bit is 0, this is just a 1-byte length prefix up to 128 bytes. @@ -69,12 +68,9 @@ impl Header { // Otherwise, this is a 4-byte header containing compression information and length. const HEADER_LEN: usize = 4; - let mut header_buf: [u8; HEADER_LEN] = bytes[0..HEADER_LEN].try_into().map_err(|_| { - std::io::Error::new( - std::io::ErrorKind::InvalidData, - format!("blob header too short: {bytes:?}"), - ) - })?; + let mut header_buf: [u8; HEADER_LEN] = bytes[0..HEADER_LEN] + .try_into() + .map_err(|_| anyhow::anyhow!("blob header too short: {bytes:?}"))?; // TODO: verify the compression bits and convert to an enum. let compression_bits = header_buf[0] & LEN_COMPRESSION_BIT_MASK; @@ -94,6 +90,16 @@ impl Header { } } +#[derive(Debug, thiserror::Error)] +pub enum WriteBlobError { + #[error(transparent)] + Flush(FlushTaskError), + #[error("blob too large ({len} bytes)")] + BlobTooLarge { len: usize }, + #[error(transparent)] + WriteBlobRaw(anyhow::Error), +} + impl BlockCursor<'_> { /// Read a blob into a new buffer. pub async fn read_blob( @@ -213,143 +219,64 @@ pub(super) const BYTE_UNCOMPRESSED: u8 = 0x80; pub(super) const BYTE_ZSTD: u8 = BYTE_UNCOMPRESSED | 0x10; /// A wrapper of `VirtualFile` that allows users to write blobs. -/// -/// If a `BlobWriter` is dropped, the internal buffer will be -/// discarded. You need to call [`flush_buffer`](Self::flush_buffer) -/// manually before dropping. -pub struct BlobWriter { - inner: TempVirtualFile, - offset: u64, - /// A buffer to save on write calls, only used if BUFFERED=true - buf: Vec, +pub struct BlobWriter { /// We do tiny writes for the length headers; they need to be in an owned buffer; io_buf: Option, + writer: BufferedWriter, + offset: u64, } -impl BlobWriter { +impl BlobWriter +where + W: OwnedAsyncWriter + std::fmt::Debug + Send + Sync + 'static, +{ + /// See [`BufferedWriter`] struct-level doc comment for semantics of `start_offset`. pub fn new( - inner: TempVirtualFile, + file: W, start_offset: u64, - _gate: &utils::sync::gate::Gate, - _cancel: CancellationToken, - _ctx: &RequestContext, - ) -> Self { - Self { - inner, - offset: start_offset, - buf: Vec::with_capacity(Self::CAPACITY), + gate: &utils::sync::gate::Gate, + cancel: CancellationToken, + ctx: &RequestContext, + flush_task_span: tracing::Span, + ) -> anyhow::Result { + Ok(Self { io_buf: Some(BytesMut::new()), - } + writer: BufferedWriter::new( + file, + start_offset, + || IoBufferMut::with_capacity(Self::CAPACITY), + gate.enter()?, + cancel, + ctx, + flush_task_span, + ), + offset: start_offset, + }) } pub fn size(&self) -> u64 { self.offset } - const CAPACITY: usize = if BUFFERED { 64 * 1024 } else { 0 }; + const CAPACITY: usize = 64 * 1024; - /// Writes the given buffer directly to the underlying `VirtualFile`. - /// You need to make sure that the internal buffer is empty, otherwise - /// data will be written in wrong order. - #[inline(always)] - async fn write_all_unbuffered( - &mut self, - src_buf: FullSlice, - ctx: &RequestContext, - ) -> (FullSlice, Result<(), Error>) { - let (src_buf, res) = self.inner.write_all(src_buf, ctx).await; - let nbytes = match res { - Ok(nbytes) => nbytes, - Err(e) => return (src_buf, Err(e)), - }; - self.offset += nbytes as u64; - (src_buf, Ok(())) - } - - #[inline(always)] - /// Flushes the internal buffer to the underlying `VirtualFile`. - pub async fn flush_buffer(&mut self, ctx: &RequestContext) -> Result<(), Error> { - let buf = std::mem::take(&mut self.buf); - let (slice, res) = self.inner.write_all(buf.slice_len(), ctx).await; - res?; - let mut buf = slice.into_raw_slice().into_inner(); - buf.clear(); - self.buf = buf; - Ok(()) - } - - #[inline(always)] - /// Writes as much of `src_buf` into the internal buffer as it fits - fn write_into_buffer(&mut self, src_buf: &[u8]) -> usize { - let remaining = Self::CAPACITY - self.buf.len(); - let to_copy = src_buf.len().min(remaining); - self.buf.extend_from_slice(&src_buf[..to_copy]); - self.offset += to_copy as u64; - to_copy - } - - /// Internal, possibly buffered, write function + /// Writes `src_buf` to the file at the current offset. async fn write_all( &mut self, src_buf: FullSlice, ctx: &RequestContext, - ) -> (FullSlice, Result<(), Error>) { - let src_buf = src_buf.into_raw_slice(); - let src_buf_bounds = src_buf.bounds(); - let restore = move |src_buf_slice: Slice<_>| { - FullSlice::must_new(Slice::from_buf_bounds( - src_buf_slice.into_inner(), - src_buf_bounds, - )) - }; + ) -> (FullSlice, Result<(), FlushTaskError>) { + let res = self + .writer + // TODO: why are we taking a FullSlice if we're going to pass a borrow downstack? + // Can remove all the complexity around owned buffers upstack + .write_buffered_borrowed(&src_buf, ctx) + .await + .map(|len| { + self.offset += len as u64; + }); - if !BUFFERED { - assert!(self.buf.is_empty()); - return self - .write_all_unbuffered(FullSlice::must_new(src_buf), ctx) - .await; - } - let remaining = Self::CAPACITY - self.buf.len(); - let src_buf_len = src_buf.bytes_init(); - if src_buf_len == 0 { - return (restore(src_buf), Ok(())); - } - let mut src_buf = src_buf.slice(0..src_buf_len); - // First try to copy as much as we can into the buffer - if remaining > 0 { - let copied = self.write_into_buffer(&src_buf); - src_buf = src_buf.slice(copied..); - } - // Then, if the buffer is full, flush it out - if self.buf.len() == Self::CAPACITY { - if let Err(e) = self.flush_buffer(ctx).await { - return (restore(src_buf), Err(e)); - } - } - // Finally, write the tail of src_buf: - // If it wholly fits into the buffer without - // completely filling it, then put it there. - // If not, write it out directly. - let src_buf = if !src_buf.is_empty() { - assert_eq!(self.buf.len(), 0); - if src_buf.len() < Self::CAPACITY { - let copied = self.write_into_buffer(&src_buf); - // We just verified above that src_buf fits into our internal buffer. - assert_eq!(copied, src_buf.len()); - restore(src_buf) - } else { - let (src_buf, res) = self - .write_all_unbuffered(FullSlice::must_new(src_buf), ctx) - .await; - if let Err(e) = res { - return (src_buf, Err(e)); - } - src_buf - } - } else { - restore(src_buf) - }; - (src_buf, Ok(())) + (src_buf, res) } /// Write a blob of data. Returns the offset that it was written to, @@ -358,7 +285,7 @@ impl BlobWriter { &mut self, srcbuf: FullSlice, ctx: &RequestContext, - ) -> (FullSlice, Result) { + ) -> (FullSlice, Result) { let (buf, res) = self .write_blob_maybe_compressed(srcbuf, ctx, ImageCompressionAlgorithm::Disabled) .await; @@ -372,7 +299,10 @@ impl BlobWriter { srcbuf: FullSlice, ctx: &RequestContext, algorithm: ImageCompressionAlgorithm, - ) -> (FullSlice, Result<(u64, CompressionInfo), Error>) { + ) -> ( + FullSlice, + Result<(u64, CompressionInfo), WriteBlobError>, + ) { let offset = self.offset; let mut compression_info = CompressionInfo { written_compressed: false, @@ -388,14 +318,16 @@ impl BlobWriter { if len < 128 { // Short blob. Write a 1-byte length header io_buf.put_u8(len as u8); - (self.write_all(io_buf.slice_len(), ctx).await, srcbuf) + let (slice, res) = self.write_all(io_buf.slice_len(), ctx).await; + let res = res.map_err(WriteBlobError::Flush); + ((slice, res), srcbuf) } else { // Write a 4-byte length header if len > MAX_SUPPORTED_BLOB_LEN { return ( ( io_buf.slice_len(), - Err(Error::other(format!("blob too large ({len} bytes)"))), + Err(WriteBlobError::BlobTooLarge { len }), ), srcbuf, ); @@ -429,7 +361,9 @@ impl BlobWriter { assert_eq!(len_buf[0] & 0xf0, 0); len_buf[0] |= high_bit_mask; io_buf.extend_from_slice(&len_buf[..]); - (self.write_all(io_buf.slice_len(), ctx).await, srcbuf) + let (slice, res) = self.write_all(io_buf.slice_len(), ctx).await; + let res = res.map_err(WriteBlobError::Flush); + ((slice, res), srcbuf) } } .await; @@ -444,6 +378,7 @@ impl BlobWriter { } else { self.write_all(srcbuf, ctx).await }; + let res = res.map_err(WriteBlobError::Flush); (srcbuf, res.map(|_| (offset, compression_info))) } @@ -452,9 +387,12 @@ impl BlobWriter { &mut self, raw_with_header: FullSlice, ctx: &RequestContext, - ) -> (FullSlice, Result) { + ) -> (FullSlice, Result) { // Verify the header, to ensure we don't write invalid/corrupt data. - let header = match Header::decode(&raw_with_header) { + let header = match Header::decode(&raw_with_header) + .context("decoding blob header") + .map_err(WriteBlobError::WriteBlobRaw) + { Ok(header) => header, Err(err) => return (raw_with_header, Err(err)), }; @@ -463,29 +401,26 @@ impl BlobWriter { let raw_len = raw_with_header.len(); return ( raw_with_header, - Err(std::io::Error::new( - std::io::ErrorKind::InvalidData, - format!("header length mismatch: {header_total_len} != {raw_len}"), - )), + Err(WriteBlobError::WriteBlobRaw(anyhow::anyhow!( + "header length mismatch: {header_total_len} != {raw_len}" + ))), ); } let offset = self.offset; let (raw_with_header, result) = self.write_all(raw_with_header, ctx).await; + let result = result.map_err(WriteBlobError::Flush); (raw_with_header, result.map(|_| offset)) } -} -impl BlobWriter { - /// Finish this blob writer and return the underlying [`TempVirtualFile`]. - /// - /// If there is an internal buffer (depends on `BUFFERED`), it will - /// be flushed before this method returns. - pub async fn into_inner(mut self, ctx: &RequestContext) -> Result { - if BUFFERED { - self.flush_buffer(ctx).await?; - } - Ok(self.inner) + /// Finish this blob writer and return the underlying `W`. + pub async fn shutdown( + self, + mode: BufferedWriterShutdownMode, + ctx: &RequestContext, + ) -> Result { + let (_, file) = self.writer.shutdown(mode, ctx).await?; + Ok(file) } } @@ -494,22 +429,25 @@ pub(crate) mod tests { use camino::Utf8PathBuf; use camino_tempfile::Utf8TempDir; use rand::{Rng, SeedableRng}; + use tracing::info_span; use super::*; use crate::context::DownloadBehavior; use crate::task_mgr::TaskKind; use crate::tenant::block_io::BlockReaderRef; + use crate::virtual_file; + use crate::virtual_file::TempVirtualFile; use crate::virtual_file::VirtualFile; - async fn round_trip_test(blobs: &[Vec]) -> Result<(), Error> { - round_trip_test_compressed::(blobs, false).await + async fn round_trip_test(blobs: &[Vec]) -> anyhow::Result<()> { + round_trip_test_compressed(blobs, false).await } - pub(crate) async fn write_maybe_compressed( + pub(crate) async fn write_maybe_compressed( blobs: &[Vec], compression: bool, ctx: &RequestContext, - ) -> Result<(Utf8TempDir, Utf8PathBuf, Vec), Error> { + ) -> anyhow::Result<(Utf8TempDir, Utf8PathBuf, Vec)> { let temp_dir = camino_tempfile::tempdir()?; let pathbuf = temp_dir.path().join("file"); let gate = utils::sync::gate::Gate::default(); @@ -519,10 +457,18 @@ pub(crate) mod tests { let mut offsets = Vec::new(); { let file = TempVirtualFile::new( - VirtualFile::create(pathbuf.as_path(), ctx).await?, - gate.enter().unwrap(), + VirtualFile::open_with_options_v2( + pathbuf.as_path(), + virtual_file::OpenOptions::new() + .create_new(true) + .write(true), + ctx, + ) + .await?, + gate.enter()?, ); - let mut wtr = BlobWriter::::new(file, 0, &gate, cancel.clone(), ctx); + let mut wtr = + BlobWriter::new(file, 0, &gate, cancel.clone(), ctx, info_span!("test")).unwrap(); for blob in blobs.iter() { let (_, res) = if compression { let res = wtr @@ -539,28 +485,28 @@ pub(crate) mod tests { let offs = res?; offsets.push(offs); } - // Write out one page worth of zeros so that we can - // read again with read_blk - let (_, res) = wtr.write_blob(vec![0; PAGE_SZ].slice_len(), ctx).await; - let offs = res?; - println!("Writing final blob at offs={offs}"); - - let file = wtr.into_inner(ctx).await?; - file.disarm_into_inner(); - } + let file = wtr + .shutdown( + BufferedWriterShutdownMode::ZeroPadToNextMultiple(PAGE_SZ), + ctx, + ) + .await?; + file.disarm_into_inner() + }; Ok((temp_dir, pathbuf, offsets)) } - async fn round_trip_test_compressed( + async fn round_trip_test_compressed( blobs: &[Vec], compression: bool, - ) -> Result<(), Error> { + ) -> anyhow::Result<()> { let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error).with_scope_unit_test(); let (_temp_dir, pathbuf, offsets) = - write_maybe_compressed::(blobs, compression, &ctx).await?; + write_maybe_compressed(blobs, compression, &ctx).await?; - let file = VirtualFile::open(pathbuf, &ctx).await?; + println!("Done writing!"); + let file = VirtualFile::open_v2(pathbuf, &ctx).await?; let rdr = BlockReaderRef::VirtualFile(&file); let rdr = BlockCursor::new_with_compression(rdr, compression); for (idx, (blob, offset)) in blobs.iter().zip(offsets.iter()).enumerate() { @@ -579,30 +525,27 @@ pub(crate) mod tests { } #[tokio::test] - async fn test_one() -> Result<(), Error> { + async fn test_one() -> anyhow::Result<()> { let blobs = &[vec![12, 21, 22]]; - round_trip_test::(blobs).await?; - round_trip_test::(blobs).await?; + round_trip_test(blobs).await?; Ok(()) } #[tokio::test] - async fn test_hello_simple() -> Result<(), Error> { + async fn test_hello_simple() -> anyhow::Result<()> { let blobs = &[ vec![0, 1, 2, 3], b"Hello, World!".to_vec(), Vec::new(), b"foobar".to_vec(), ]; - round_trip_test::(blobs).await?; - round_trip_test::(blobs).await?; - round_trip_test_compressed::(blobs, true).await?; - round_trip_test_compressed::(blobs, true).await?; + round_trip_test(blobs).await?; + round_trip_test_compressed(blobs, true).await?; Ok(()) } #[tokio::test] - async fn test_really_big_array() -> Result<(), Error> { + async fn test_really_big_array() -> anyhow::Result<()> { let blobs = &[ b"test".to_vec(), random_array(10 * PAGE_SZ), @@ -611,25 +554,22 @@ pub(crate) mod tests { vec![0xf3; 24 * PAGE_SZ], b"foobar".to_vec(), ]; - round_trip_test::(blobs).await?; - round_trip_test::(blobs).await?; - round_trip_test_compressed::(blobs, true).await?; - round_trip_test_compressed::(blobs, true).await?; + round_trip_test(blobs).await?; + round_trip_test_compressed(blobs, true).await?; Ok(()) } #[tokio::test] - async fn test_arrays_inc() -> Result<(), Error> { + async fn test_arrays_inc() -> anyhow::Result<()> { let blobs = (0..PAGE_SZ / 8) .map(|v| random_array(v * 16)) .collect::>(); - round_trip_test::(&blobs).await?; - round_trip_test::(&blobs).await?; + round_trip_test(&blobs).await?; Ok(()) } #[tokio::test] - async fn test_arrays_random_size() -> Result<(), Error> { + async fn test_arrays_random_size() -> anyhow::Result<()> { let mut rng = rand::rngs::StdRng::seed_from_u64(42); let blobs = (0..1024) .map(|_| { @@ -641,20 +581,18 @@ pub(crate) mod tests { random_array(sz.into()) }) .collect::>(); - round_trip_test::(&blobs).await?; - round_trip_test::(&blobs).await?; + round_trip_test(&blobs).await?; Ok(()) } #[tokio::test] - async fn test_arrays_page_boundary() -> Result<(), Error> { + async fn test_arrays_page_boundary() -> anyhow::Result<()> { let blobs = &[ random_array(PAGE_SZ - 4), random_array(PAGE_SZ - 4), random_array(PAGE_SZ - 4), ]; - round_trip_test::(blobs).await?; - round_trip_test::(blobs).await?; + round_trip_test(blobs).await?; Ok(()) } } diff --git a/pageserver/src/tenant/block_io.rs b/pageserver/src/tenant/block_io.rs index 6723155626..686cc94126 100644 --- a/pageserver/src/tenant/block_io.rs +++ b/pageserver/src/tenant/block_io.rs @@ -4,14 +4,12 @@ use std::ops::Deref; -use bytes::Bytes; - use super::storage_layer::delta_layer::{Adapter, DeltaLayerInner}; use crate::context::RequestContext; use crate::page_cache::{self, FileId, PAGE_SZ, PageReadGuard, PageWriteGuard, ReadBufResult}; #[cfg(test)] use crate::virtual_file::IoBufferMut; -use crate::virtual_file::VirtualFile; +use crate::virtual_file::{IoBuffer, VirtualFile}; /// This is implemented by anything that can read 8 kB (PAGE_SZ) /// blocks, using the page cache @@ -247,17 +245,17 @@ pub trait BlockWriter { /// 'buf' must be of size PAGE_SZ. Returns the block number the page was /// written to. /// - fn write_blk(&mut self, buf: Bytes) -> Result; + fn write_blk(&mut self, buf: IoBuffer) -> Result; } /// /// A simple in-memory buffer of blocks. /// pub struct BlockBuf { - pub blocks: Vec, + pub blocks: Vec, } impl BlockWriter for BlockBuf { - fn write_blk(&mut self, buf: Bytes) -> Result { + fn write_blk(&mut self, buf: IoBuffer) -> Result { assert!(buf.len() == PAGE_SZ); let blknum = self.blocks.len(); self.blocks.push(buf); diff --git a/pageserver/src/tenant/disk_btree.rs b/pageserver/src/tenant/disk_btree.rs index 1791e5996c..419befa41b 100644 --- a/pageserver/src/tenant/disk_btree.rs +++ b/pageserver/src/tenant/disk_btree.rs @@ -25,7 +25,7 @@ use std::{io, result}; use async_stream::try_stream; use byteorder::{BE, ReadBytesExt}; -use bytes::{BufMut, Bytes, BytesMut}; +use bytes::BufMut; use either::Either; use futures::{Stream, StreamExt}; use hex; @@ -34,6 +34,7 @@ use tracing::error; use crate::context::RequestContext; use crate::tenant::block_io::{BlockReader, BlockWriter}; +use crate::virtual_file::{IoBuffer, IoBufferMut, owned_buffers_io::write::Buffer}; // The maximum size of a value stored in the B-tree. 5 bytes is enough currently. pub const VALUE_SZ: usize = 5; @@ -787,12 +788,12 @@ impl BuildNode { /// /// Serialize the node to on-disk format. /// - fn pack(&self) -> Bytes { + fn pack(&self) -> IoBuffer { assert!(self.keys.len() == self.num_children as usize * self.suffix_len); assert!(self.values.len() == self.num_children as usize * VALUE_SZ); assert!(self.num_children > 0); - let mut buf = BytesMut::new(); + let mut buf = IoBufferMut::with_capacity(PAGE_SZ); buf.put_u16(self.num_children); buf.put_u8(self.level); @@ -805,7 +806,7 @@ impl BuildNode { assert!(buf.len() == self.size); assert!(buf.len() <= PAGE_SZ); - buf.resize(PAGE_SZ, 0); + buf.extend_with(0, PAGE_SZ - buf.len()); buf.freeze() } @@ -839,7 +840,7 @@ pub(crate) mod tests { #[derive(Clone, Default)] pub(crate) struct TestDisk { - blocks: Vec, + blocks: Vec, } impl TestDisk { fn new() -> Self { @@ -857,7 +858,7 @@ pub(crate) mod tests { } } impl BlockWriter for &mut TestDisk { - fn write_blk(&mut self, buf: Bytes) -> io::Result { + fn write_blk(&mut self, buf: IoBuffer) -> io::Result { let blknum = self.blocks.len(); self.blocks.push(buf); Ok(blknum as u32) diff --git a/pageserver/src/tenant/ephemeral_file.rs b/pageserver/src/tenant/ephemeral_file.rs index be28ac725b..2edf22e9fd 100644 --- a/pageserver/src/tenant/ephemeral_file.rs +++ b/pageserver/src/tenant/ephemeral_file.rs @@ -79,9 +79,9 @@ impl EphemeralFile { VirtualFile::open_with_options_v2( &filename, virtual_file::OpenOptions::new() + .create_new(true) .read(true) - .write(true) - .create(true), + .write(true), ctx, ) .await?, @@ -98,6 +98,7 @@ impl EphemeralFile { file: file.clone(), buffered_writer: BufferedWriter::new( file, + 0, || IoBufferMut::with_capacity(TAIL_SZ), gate.enter()?, cancel.child_token(), @@ -130,6 +131,14 @@ impl OwnedAsyncWriter for TempVirtualFileCoOwnedByEphemeralFileAndBufferedWriter > + Send { self.inner.write_all_at(buf, offset, ctx) } + + fn set_len( + &self, + len: u64, + ctx: &RequestContext, + ) -> impl Future> + Send { + self.inner.set_len(len, ctx) + } } impl std::ops::Deref for TempVirtualFileCoOwnedByEphemeralFileAndBufferedWriter { diff --git a/pageserver/src/tenant/remote_timeline_client/download.rs b/pageserver/src/tenant/remote_timeline_client/download.rs index 19cf70a055..84989e0fb8 100644 --- a/pageserver/src/tenant/remote_timeline_client/download.rs +++ b/pageserver/src/tenant/remote_timeline_client/download.rs @@ -91,9 +91,7 @@ pub async fn download_layer_file<'a>( ); let temp_file = TempVirtualFile::new( - // Not _v2 yet which is sensitive to virtual_file_io_mode. - // That'll happen in PR https://github.com/neondatabase/neon/pull/11558 - VirtualFile::open_with_options( + VirtualFile::open_with_options_v2( &temp_file_path, virtual_file::OpenOptions::new() .create_new(true) @@ -197,6 +195,7 @@ async fn download_object( let dst_path = destination_file.path().to_owned(); let mut buffered = owned_buffers_io::write::BufferedWriter::::new( destination_file, + 0, || IoBufferMut::with_capacity(super::BUFFER_SIZE), gate.enter().map_err(|_| DownloadError::Cancelled)?, cancel.child_token(), @@ -219,10 +218,15 @@ async fn download_object( FlushTaskError::Cancelled => DownloadError::Cancelled, })?; } - let inner = buffered.shutdown(ctx).await.map_err(|e| match e { - FlushTaskError::Cancelled => DownloadError::Cancelled, - })?; - Ok(inner) + buffered + .shutdown( + owned_buffers_io::write::BufferedWriterShutdownMode::PadThenTruncate, + ctx, + ) + .await + .map_err(|e| match e { + FlushTaskError::Cancelled => DownloadError::Cancelled, + }) } .await?; diff --git a/pageserver/src/tenant/secondary/downloader.rs b/pageserver/src/tenant/secondary/downloader.rs index 4338c4d4cf..c26b7626ef 100644 --- a/pageserver/src/tenant/secondary/downloader.rs +++ b/pageserver/src/tenant/secondary/downloader.rs @@ -1521,12 +1521,11 @@ async fn load_heatmap( path: &Utf8PathBuf, ctx: &RequestContext, ) -> Result, anyhow::Error> { - let mut file = match VirtualFile::open(path, ctx).await { - Ok(file) => file, + let st = match VirtualFile::read_to_string(path, ctx).await { + Ok(st) => st, Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(None), Err(e) => Err(e)?, }; - let st = file.read_to_string(ctx).await?; let htm = serde_json::from_str(&st)?; Ok(Some(htm)) } diff --git a/pageserver/src/tenant/storage_layer/delta_layer.rs b/pageserver/src/tenant/storage_layer/delta_layer.rs index 40d4cffef3..a09d8b26ec 100644 --- a/pageserver/src/tenant/storage_layer/delta_layer.rs +++ b/pageserver/src/tenant/storage_layer/delta_layer.rs @@ -29,7 +29,6 @@ //! use std::collections::{HashMap, VecDeque}; use std::fs::File; -use std::io::SeekFrom; use std::ops::Range; use std::os::unix::fs::FileExt; use std::str::FromStr; @@ -52,6 +51,7 @@ use tokio_epoll_uring::IoBuf; use tokio_util::sync::CancellationToken; use tracing::*; use utils::bin_ser::BeSer; +use utils::bin_ser::SerializeError; use utils::id::{TenantId, TimelineId}; use utils::lsn::Lsn; @@ -75,7 +75,8 @@ use crate::tenant::vectored_blob_io::{ }; use crate::virtual_file::TempVirtualFile; use crate::virtual_file::owned_buffers_io::io_buf_ext::{FullSlice, IoBufExt}; -use crate::virtual_file::{self, IoBufferMut, MaybeFatalIo, VirtualFile}; +use crate::virtual_file::owned_buffers_io::write::{Buffer, BufferedWriterShutdownMode}; +use crate::virtual_file::{self, IoBuffer, IoBufferMut, MaybeFatalIo, VirtualFile}; use crate::{DELTA_FILE_MAGIC, STORAGE_FORMAT_VERSION, TEMP_FILE_SUFFIX}; /// @@ -113,6 +114,15 @@ impl From<&DeltaLayer> for Summary { } impl Summary { + /// Serializes the summary header into an aligned buffer of lenth `PAGE_SZ`. + pub fn ser_into_page(&self) -> Result { + let mut buf = IoBufferMut::with_capacity(PAGE_SZ); + Self::ser_into(self, &mut buf)?; + // Pad zeroes to the buffer so the length is a multiple of the alignment. + buf.extend_with(0, buf.capacity() - buf.len()); + Ok(buf.freeze()) + } + pub(super) fn expected( tenant_id: TenantId, timeline_id: TimelineId, @@ -392,7 +402,7 @@ struct DeltaLayerWriterInner { tree: DiskBtreeBuilder, - blob_writer: BlobWriter, + blob_writer: BlobWriter, // Number of key-lsns in the layer. num_keys: usize, @@ -416,16 +426,29 @@ impl DeltaLayerWriterInner { // Create the file initially with a temporary filename. We don't know // the end key yet, so we cannot form the final filename yet. We will // rename it when we're done. - // - // Note: This overwrites any existing file. There shouldn't be any. - // FIXME: throw an error instead? let path = DeltaLayer::temp_path_for(conf, &tenant_shard_id, &timeline_id, key_start, &lsn_range); + let file = TempVirtualFile::new( + VirtualFile::open_with_options_v2( + &path, + virtual_file::OpenOptions::new() + .create_new(true) + .write(true), + ctx, + ) + .await?, + gate.enter()?, + ); - let mut file = TempVirtualFile::new(VirtualFile::create(&path, ctx).await?, gate.enter()?); - // make room for the header block - file.seek(SeekFrom::Start(PAGE_SZ as u64)).await?; - let blob_writer = BlobWriter::new(file, PAGE_SZ as u64, gate, cancel, ctx); + // Start at PAGE_SZ, make room for the header block + let blob_writer = BlobWriter::new( + file, + PAGE_SZ as u64, + gate, + cancel, + ctx, + info_span!(parent: None, "delta_layer_writer_flush_task", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), timeline_id=%timeline_id, path = %path), + )?; // Initialize the b-tree index builder let block_buf = BlockBuf::new(); @@ -519,15 +542,24 @@ impl DeltaLayerWriterInner { ) -> anyhow::Result<(PersistentLayerDesc, Utf8PathBuf)> { let index_start_blk = self.blob_writer.size().div_ceil(PAGE_SZ as u64) as u32; - let mut file = self.blob_writer.into_inner(ctx).await?; + let file = self + .blob_writer + .shutdown( + BufferedWriterShutdownMode::ZeroPadToNextMultiple(PAGE_SZ), + ctx, + ) + .await?; // Write out the index let (index_root_blk, block_buf) = self.tree.finish()?; - file.seek(SeekFrom::Start(index_start_blk as u64 * PAGE_SZ as u64)) - .await?; + let mut offset = index_start_blk as u64 * PAGE_SZ as u64; + + // TODO(yuchen): https://github.com/neondatabase/neon/issues/10092 + // Should we just replace BlockBuf::blocks with one big buffer for buf in block_buf.blocks { - let (_buf, res) = file.write_all(buf.slice_len(), ctx).await; + let (_buf, res) = file.write_all_at(buf.slice_len(), offset, ctx).await; res?; + offset += PAGE_SZ as u64; } assert!(self.lsn_range.start < self.lsn_range.end); // Fill in the summary on blk 0 @@ -542,11 +574,9 @@ impl DeltaLayerWriterInner { index_root_blk, }; - let mut buf = Vec::with_capacity(PAGE_SZ); - // TODO: could use smallvec here but it's a pain with Slice - Summary::ser_into(&summary, &mut buf)?; - file.seek(SeekFrom::Start(0)).await?; - let (_buf, res) = file.write_all(buf.slice_len(), ctx).await; + // Writes summary at the first block (offset 0). + let buf = summary.ser_into_page()?; + let (_buf, res) = file.write_all_at(buf.slice_len(), 0, ctx).await; res?; let metadata = file @@ -738,7 +768,7 @@ impl DeltaLayer { where F: Fn(Summary) -> Summary, { - let mut file = VirtualFile::open_with_options( + let file = VirtualFile::open_with_options_v2( path, virtual_file::OpenOptions::new().read(true).write(true), ctx, @@ -755,11 +785,8 @@ impl DeltaLayer { let new_summary = rewrite(actual_summary); - let mut buf = Vec::with_capacity(PAGE_SZ); - // TODO: could use smallvec here, but it's a pain with Slice - Summary::ser_into(&new_summary, &mut buf).context("serialize")?; - file.seek(SeekFrom::Start(0)).await?; - let (_buf, res) = file.write_all(buf.slice_len(), ctx).await; + let buf = new_summary.ser_into_page().context("serialize")?; + let (_buf, res) = file.write_all_at(buf.slice_len(), 0, ctx).await; res?; Ok(()) } diff --git a/pageserver/src/tenant/storage_layer/image_layer.rs b/pageserver/src/tenant/storage_layer/image_layer.rs index 22b5f96d17..a617ffc308 100644 --- a/pageserver/src/tenant/storage_layer/image_layer.rs +++ b/pageserver/src/tenant/storage_layer/image_layer.rs @@ -27,7 +27,6 @@ //! actual page images are stored in the "values" part. use std::collections::{HashMap, VecDeque}; use std::fs::File; -use std::io::SeekFrom; use std::ops::Range; use std::os::unix::prelude::FileExt; use std::str::FromStr; @@ -50,6 +49,7 @@ use tokio_stream::StreamExt; use tokio_util::sync::CancellationToken; use tracing::*; use utils::bin_ser::BeSer; +use utils::bin_ser::SerializeError; use utils::id::{TenantId, TimelineId}; use utils::lsn::Lsn; @@ -73,7 +73,8 @@ use crate::tenant::vectored_blob_io::{ }; use crate::virtual_file::TempVirtualFile; use crate::virtual_file::owned_buffers_io::io_buf_ext::IoBufExt; -use crate::virtual_file::{self, IoBufferMut, MaybeFatalIo, VirtualFile}; +use crate::virtual_file::owned_buffers_io::write::{Buffer, BufferedWriterShutdownMode}; +use crate::virtual_file::{self, IoBuffer, IoBufferMut, MaybeFatalIo, VirtualFile}; use crate::{IMAGE_FILE_MAGIC, STORAGE_FORMAT_VERSION, TEMP_FILE_SUFFIX}; /// @@ -112,6 +113,15 @@ impl From<&ImageLayer> for Summary { } impl Summary { + /// Serializes the summary header into an aligned buffer of lenth `PAGE_SZ`. + pub fn ser_into_page(&self) -> Result { + let mut buf = IoBufferMut::with_capacity(PAGE_SZ); + Self::ser_into(self, &mut buf)?; + // Pad zeroes to the buffer so the length is a multiple of the alignment. + buf.extend_with(0, buf.capacity() - buf.len()); + Ok(buf.freeze()) + } + pub(super) fn expected( tenant_id: TenantId, timeline_id: TimelineId, @@ -353,7 +363,7 @@ impl ImageLayer { where F: Fn(Summary) -> Summary, { - let mut file = VirtualFile::open_with_options( + let file = VirtualFile::open_with_options_v2( path, virtual_file::OpenOptions::new().read(true).write(true), ctx, @@ -370,11 +380,8 @@ impl ImageLayer { let new_summary = rewrite(actual_summary); - let mut buf = Vec::with_capacity(PAGE_SZ); - // TODO: could use smallvec here but it's a pain with Slice - Summary::ser_into(&new_summary, &mut buf).context("serialize")?; - file.seek(SeekFrom::Start(0)).await?; - let (_buf, res) = file.write_all(buf.slice_len(), ctx).await; + let buf = new_summary.ser_into_page().context("serialize")?; + let (_buf, res) = file.write_all_at(buf.slice_len(), 0, ctx).await; res?; Ok(()) } @@ -743,7 +750,7 @@ struct ImageLayerWriterInner { // Number of keys in the layer. num_keys: usize, - blob_writer: BlobWriter, + blob_writer: BlobWriter, tree: DiskBtreeBuilder, #[cfg(feature = "testing")] @@ -777,20 +784,27 @@ impl ImageLayerWriterInner { }, ); trace!("creating image layer {}", path); - let mut file = TempVirtualFile::new( - VirtualFile::open_with_options( + let file = TempVirtualFile::new( + VirtualFile::open_with_options_v2( &path, virtual_file::OpenOptions::new() - .write(true) - .create_new(true), + .create_new(true) + .write(true), ctx, ) .await?, gate.enter()?, ); - // make room for the header block - file.seek(SeekFrom::Start(PAGE_SZ as u64)).await?; - let blob_writer = BlobWriter::new(file, PAGE_SZ as u64, gate, cancel, ctx); + + // Start at `PAGE_SZ` to make room for the header block. + let blob_writer = BlobWriter::new( + file, + PAGE_SZ as u64, + gate, + cancel, + ctx, + info_span!(parent: None, "image_layer_writer_flush_task", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), timeline_id=%timeline_id, path = %path), + )?; // Initialize the b-tree index builder let block_buf = BlockBuf::new(); @@ -918,15 +932,24 @@ impl ImageLayerWriterInner { crate::metrics::COMPRESSION_IMAGE_OUTPUT_BYTES.inc_by(compressed_size); }; - let mut file = self.blob_writer.into_inner(ctx).await?; + let file = self + .blob_writer + .shutdown( + BufferedWriterShutdownMode::ZeroPadToNextMultiple(PAGE_SZ), + ctx, + ) + .await?; // Write out the index - file.seek(SeekFrom::Start(index_start_blk as u64 * PAGE_SZ as u64)) - .await?; + let mut offset = index_start_blk as u64 * PAGE_SZ as u64; let (index_root_blk, block_buf) = self.tree.finish()?; + + // TODO(yuchen): https://github.com/neondatabase/neon/issues/10092 + // Should we just replace BlockBuf::blocks with one big buffer? for buf in block_buf.blocks { - let (_buf, res) = file.write_all(buf.slice_len(), ctx).await; + let (_buf, res) = file.write_all_at(buf.slice_len(), offset, ctx).await; res?; + offset += PAGE_SZ as u64; } let final_key_range = if let Some(end_key) = end_key { @@ -947,11 +970,9 @@ impl ImageLayerWriterInner { index_root_blk, }; - let mut buf = Vec::with_capacity(PAGE_SZ); - // TODO: could use smallvec here but it's a pain with Slice - Summary::ser_into(&summary, &mut buf)?; - file.seek(SeekFrom::Start(0)).await?; - let (_buf, res) = file.write_all(buf.slice_len(), ctx).await; + // Writes summary at the first block (offset 0). + let buf = summary.ser_into_page()?; + let (_buf, res) = file.write_all_at(buf.slice_len(), 0, ctx).await; res?; let metadata = file diff --git a/pageserver/src/tenant/vectored_blob_io.rs b/pageserver/src/tenant/vectored_blob_io.rs index f9a44fe4ca..c9fdefaf66 100644 --- a/pageserver/src/tenant/vectored_blob_io.rs +++ b/pageserver/src/tenant/vectored_blob_io.rs @@ -507,7 +507,9 @@ impl<'a> VectoredBlobReader<'a> { for (blob_start, meta) in blobs_at.iter().copied() { let header_start = (blob_start - read.start) as usize; - let header = Header::decode(&buf[header_start..])?; + let header = Header::decode(&buf[header_start..]).map_err(|anyhow_err| { + std::io::Error::new(std::io::ErrorKind::InvalidData, anyhow_err) + })?; let data_start = header_start + header.header_len; let end = data_start + header.data_len; let compression_bits = header.compression_bits; @@ -662,7 +664,6 @@ impl StreamingVectoredReadPlanner { #[cfg(test)] mod tests { - use anyhow::Error; use super::super::blob_io::tests::{random_array, write_maybe_compressed}; use super::*; @@ -945,13 +946,16 @@ mod tests { } } - async fn round_trip_test_compressed(blobs: &[Vec], compression: bool) -> Result<(), Error> { + async fn round_trip_test_compressed( + blobs: &[Vec], + compression: bool, + ) -> anyhow::Result<()> { let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error).with_scope_unit_test(); let (_temp_dir, pathbuf, offsets) = - write_maybe_compressed::(blobs, compression, &ctx).await?; + write_maybe_compressed(blobs, compression, &ctx).await?; - let file = VirtualFile::open(&pathbuf, &ctx).await?; + let file = VirtualFile::open_v2(&pathbuf, &ctx).await?; let file_len = std::fs::metadata(&pathbuf)?.len(); // Multiply by two (compressed data might need more space), and add a few bytes for the header @@ -997,7 +1001,7 @@ mod tests { } #[tokio::test] - async fn test_really_big_array() -> Result<(), Error> { + async fn test_really_big_array() -> anyhow::Result<()> { let blobs = &[ b"test".to_vec(), random_array(10 * PAGE_SZ), @@ -1012,7 +1016,7 @@ mod tests { } #[tokio::test] - async fn test_arrays_inc() -> Result<(), Error> { + async fn test_arrays_inc() -> anyhow::Result<()> { let blobs = (0..PAGE_SZ / 8) .map(|v| random_array(v * 16)) .collect::>(); diff --git a/pageserver/src/virtual_file.rs b/pageserver/src/virtual_file.rs index e75500aa39..58953407b1 100644 --- a/pageserver/src/virtual_file.rs +++ b/pageserver/src/virtual_file.rs @@ -12,10 +12,11 @@ //! src/backend/storage/file/fd.c //! use std::fs::File; -use std::io::{Error, ErrorKind, Seek, SeekFrom}; +use std::io::{Error, ErrorKind}; use std::os::fd::{AsRawFd, FromRawFd, IntoRawFd, OwnedFd, RawFd}; #[cfg(target_os = "linux")] use std::os::unix::fs::OpenOptionsExt; +use std::sync::LazyLock; use std::sync::atomic::{AtomicBool, AtomicU8, AtomicUsize, Ordering}; use camino::{Utf8Path, Utf8PathBuf}; @@ -96,69 +97,38 @@ impl VirtualFile { Self::open_with_options_v2(path.as_ref(), OpenOptions::new().read(true), ctx).await } - pub async fn create>( - path: P, - ctx: &RequestContext, - ) -> Result { - let inner = VirtualFileInner::create(path, ctx).await?; - Ok(VirtualFile { - inner, - _mode: IoMode::Buffered, - }) - } - - pub async fn create_v2>( - path: P, - ctx: &RequestContext, - ) -> Result { - VirtualFile::open_with_options_v2( - path.as_ref(), - OpenOptions::new().write(true).create(true).truncate(true), - ctx, - ) - .await - } - - pub async fn open_with_options>( - path: P, - open_options: &OpenOptions, - ctx: &RequestContext, - ) -> Result { - let inner = VirtualFileInner::open_with_options(path, open_options, ctx).await?; - Ok(VirtualFile { - inner, - _mode: IoMode::Buffered, - }) - } - pub async fn open_with_options_v2>( path: P, open_options: &OpenOptions, ctx: &RequestContext, ) -> Result { - let file = match get_io_mode() { - IoMode::Buffered => { - let inner = VirtualFileInner::open_with_options(path, open_options, ctx).await?; - VirtualFile { - inner, - _mode: IoMode::Buffered, - } - } + let mode = get_io_mode(); + let set_o_direct = match (mode, open_options.is_write()) { + (IoMode::Buffered, _) => false, #[cfg(target_os = "linux")] - IoMode::Direct => { - let inner = VirtualFileInner::open_with_options( - path, - open_options.clone().custom_flags(nix::libc::O_DIRECT), - ctx, - ) - .await?; - VirtualFile { - inner, - _mode: IoMode::Direct, - } - } + (IoMode::Direct, false) => true, + #[cfg(target_os = "linux")] + (IoMode::Direct, true) => false, + #[cfg(target_os = "linux")] + (IoMode::DirectRw, _) => true, }; - Ok(file) + let open_options = open_options.clone(); + let open_options = if set_o_direct { + #[cfg(target_os = "linux")] + { + let mut open_options = open_options; + open_options.custom_flags(nix::libc::O_DIRECT); + open_options + } + #[cfg(not(target_os = "linux"))] + unreachable!( + "O_DIRECT is not supported on this platform, IoMode's that result in set_o_direct=true shouldn't even be defined" + ); + } else { + open_options + }; + let inner = VirtualFileInner::open_with_options(path, open_options, ctx).await?; + Ok(VirtualFile { inner, _mode: mode }) } pub fn path(&self) -> &Utf8Path { @@ -187,18 +157,14 @@ impl VirtualFile { self.inner.sync_data().await } + pub async fn set_len(&self, len: u64, ctx: &RequestContext) -> Result<(), Error> { + self.inner.set_len(len, ctx).await + } + pub async fn metadata(&self) -> Result { self.inner.metadata().await } - pub fn remove(self) { - self.inner.remove(); - } - - pub async fn seek(&mut self, pos: SeekFrom) -> Result { - self.inner.seek(pos).await - } - pub async fn read_exact_at( &self, slice: Slice, @@ -229,25 +195,31 @@ impl VirtualFile { self.inner.write_all_at(buf, offset, ctx).await } - pub async fn write_all( - &mut self, - buf: FullSlice, + pub(crate) async fn read_to_string>( + path: P, ctx: &RequestContext, - ) -> (FullSlice, Result) { - self.inner.write_all(buf, ctx).await - } - - async fn read_to_end(&mut self, buf: &mut Vec, ctx: &RequestContext) -> Result<(), Error> { - self.inner.read_to_end(buf, ctx).await - } - - pub(crate) async fn read_to_string( - &mut self, - ctx: &RequestContext, - ) -> Result { + ) -> std::io::Result { + let file = VirtualFile::open(path, ctx).await?; // TODO: open_v2 let mut buf = Vec::new(); - self.read_to_end(&mut buf, ctx).await?; - Ok(String::from_utf8(buf)?) + let mut tmp = vec![0; 128]; + let mut pos: u64 = 0; + loop { + let slice = tmp.slice(..128); + let (slice, res) = file.inner.read_at(slice, pos, ctx).await; + match res { + Ok(0) => break, + Ok(n) => { + pos += n as u64; + buf.extend_from_slice(&slice[..n]); + } + Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => {} + Err(e) => return Err(e), + } + tmp = slice.into_inner(); + } + String::from_utf8(buf).map_err(|_| { + std::io::Error::new(ErrorKind::InvalidData, "file contents are not valid UTF-8") + }) } } @@ -294,9 +266,6 @@ pub struct VirtualFileInner { /// belongs to a different VirtualFile. handle: RwLock, - /// Current file position - pos: u64, - /// File path and options to use to open it. /// /// Note: this only contains the options needed to re-open it. For example, @@ -561,21 +530,7 @@ impl VirtualFileInner { path: P, ctx: &RequestContext, ) -> Result { - Self::open_with_options(path.as_ref(), OpenOptions::new().read(true), ctx).await - } - - /// Create a new file for writing. If the file exists, it will be truncated. - /// Like File::create. - pub async fn create>( - path: P, - ctx: &RequestContext, - ) -> Result { - Self::open_with_options( - path.as_ref(), - OpenOptions::new().write(true).create(true).truncate(true), - ctx, - ) - .await + Self::open_with_options(path.as_ref(), OpenOptions::new().read(true).clone(), ctx).await } /// Open a file with given options. @@ -585,7 +540,7 @@ impl VirtualFileInner { /// on the first time. Make sure that's sane! pub async fn open_with_options>( path: P, - open_options: &OpenOptions, + open_options: OpenOptions, _ctx: &RequestContext, ) -> Result { let path = path.as_ref(); @@ -610,7 +565,6 @@ impl VirtualFileInner { let vfile = VirtualFileInner { handle: RwLock::new(handle), - pos: 0, path: path.to_owned(), open_options: reopen_options, }; @@ -677,6 +631,13 @@ impl VirtualFileInner { }) } + pub async fn set_len(&self, len: u64, _ctx: &RequestContext) -> Result<(), Error> { + with_file!(self, StorageIoOperation::SetLen, |file_guard| { + let (_file_guard, res) = io_engine::get().set_len(file_guard, len).await; + res.maybe_fatal_err("set_len") + }) + } + /// Helper function internal to `VirtualFile` that looks up the underlying File, /// opens it and evicts some other File if necessary. The passed parameter is /// assumed to be a function available for the physical `File`. @@ -744,38 +705,6 @@ impl VirtualFileInner { }) } - pub fn remove(self) { - let path = self.path.clone(); - drop(self); - std::fs::remove_file(path).expect("failed to remove the virtual file"); - } - - pub async fn seek(&mut self, pos: SeekFrom) -> Result { - match pos { - SeekFrom::Start(offset) => { - self.pos = offset; - } - SeekFrom::End(offset) => { - self.pos = with_file!(self, StorageIoOperation::Seek, |mut file_guard| file_guard - .with_std_file_mut(|std_file| std_file.seek(SeekFrom::End(offset))))? - } - SeekFrom::Current(offset) => { - let pos = self.pos as i128 + offset as i128; - if pos < 0 { - return Err(Error::new( - ErrorKind::InvalidInput, - "offset would be negative", - )); - } - if pos > u64::MAX as i128 { - return Err(Error::new(ErrorKind::InvalidInput, "offset overflow")); - } - self.pos = pos as u64; - } - } - Ok(self.pos) - } - /// Read the file contents in range `offset..(offset + slice.bytes_total())` into `slice[0..slice.bytes_total()]`. /// /// The returned `Slice` is equivalent to the input `slice`, i.e., it's the same view into the same buffer. @@ -859,59 +788,7 @@ impl VirtualFileInner { (restore(buf), Ok(())) } - /// Writes `buf` to the file at the current offset. - /// - /// Panics if there is an uninitialized range in `buf`, as that is most likely a bug in the caller. - pub async fn write_all( - &mut self, - buf: FullSlice, - ctx: &RequestContext, - ) -> (FullSlice, Result) { - let buf = buf.into_raw_slice(); - let bounds = buf.bounds(); - let restore = - |buf: Slice<_>| FullSlice::must_new(Slice::from_buf_bounds(buf.into_inner(), bounds)); - let nbytes = buf.len(); - let mut buf = buf; - while !buf.is_empty() { - let (tmp, res) = self.write(FullSlice::must_new(buf), ctx).await; - buf = tmp.into_raw_slice(); - match res { - Ok(0) => { - return ( - restore(buf), - Err(Error::new( - std::io::ErrorKind::WriteZero, - "failed to write whole buffer", - )), - ); - } - Ok(n) => { - buf = buf.slice(n..); - } - Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => {} - Err(e) => return (restore(buf), Err(e)), - } - } - (restore(buf), Ok(nbytes)) - } - - async fn write( - &mut self, - buf: FullSlice, - ctx: &RequestContext, - ) -> (FullSlice, Result) { - let pos = self.pos; - let (buf, res) = self.write_at(buf, pos, ctx).await; - let n = match res { - Ok(n) => n, - Err(e) => return (buf, Err(e)), - }; - self.pos += n as u64; - (buf, Ok(n)) - } - - pub(crate) async fn read_at( + pub(super) async fn read_at( &self, buf: tokio_epoll_uring::Slice, offset: u64, @@ -939,23 +816,11 @@ impl VirtualFileInner { }) } - /// The function aborts the process if the error is fatal. async fn write_at( &self, buf: FullSlice, offset: u64, ctx: &RequestContext, - ) -> (FullSlice, Result) { - let (slice, result) = self.write_at_inner(buf, offset, ctx).await; - let result = result.maybe_fatal_err("write_at"); - (slice, result) - } - - async fn write_at_inner( - &self, - buf: FullSlice, - offset: u64, - ctx: &RequestContext, ) -> (FullSlice, Result) { let file_guard = match self.lock_file().await { Ok(file_guard) => file_guard, @@ -964,30 +829,13 @@ impl VirtualFileInner { observe_duration!(StorageIoOperation::Write, { let ((_file_guard, buf), result) = io_engine::get().write_at(file_guard, offset, buf).await; + let result = result.maybe_fatal_err("write_at"); if let Ok(size) = result { ctx.io_size_metrics().write.add(size.into_u64()); } (buf, result) }) } - - async fn read_to_end(&mut self, buf: &mut Vec, ctx: &RequestContext) -> Result<(), Error> { - let mut tmp = vec![0; 128]; - loop { - let slice = tmp.slice(..128); - let (slice, res) = self.read_at(slice, self.pos, ctx).await; - match res { - Ok(0) => return Ok(()), - Ok(n) => { - self.pos += n as u64; - buf.extend_from_slice(&slice[..n]); - } - Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => {} - Err(e) => return Err(e), - } - tmp = slice.into_inner(); - } - } } // Adapted from https://doc.rust-lang.org/1.72.0/src/std/os/unix/fs.rs.html#117-135 @@ -1202,19 +1050,6 @@ impl FileGuard { let _ = file.into_raw_fd(); res } - /// Soft deprecation: we'll move VirtualFile to async APIs and remove this function eventually. - fn with_std_file_mut(&mut self, with: F) -> R - where - F: FnOnce(&mut File) -> R, - { - // SAFETY: - // - lifetime of the fd: `file` doesn't outlive the OwnedFd stored in `self`. - // - &mut usage below: `self` is `&mut`, hence this call is the only task/thread that has control over the underlying fd - let mut file = unsafe { File::from_raw_fd(self.as_ref().as_raw_fd()) }; - let res = with(&mut file); - let _ = file.into_raw_fd(); - res - } } impl tokio_epoll_uring::IoFd for FileGuard { @@ -1304,6 +1139,9 @@ impl OwnedAsyncWriter for VirtualFile { ) -> (FullSlice, std::io::Result<()>) { VirtualFile::write_all_at(self, buf, offset, ctx).await } + async fn set_len(&self, len: u64, ctx: &RequestContext) -> std::io::Result<()> { + VirtualFile::set_len(self, len, ctx).await + } } impl OpenFiles { @@ -1368,8 +1206,7 @@ pub(crate) type IoBuffer = AlignedBuffer = AlignedSlice<'a, PAGE_SZ, ConstAlign<{ get_io_buffer_alignment() }>>; -static IO_MODE: once_cell::sync::Lazy = - once_cell::sync::Lazy::new(|| AtomicU8::new(IoMode::preferred() as u8)); +static IO_MODE: LazyLock = LazyLock::new(|| AtomicU8::new(IoMode::preferred() as u8)); pub fn set_io_mode(mode: IoMode) { IO_MODE.store(mode as u8, std::sync::atomic::Ordering::Relaxed); @@ -1383,7 +1220,6 @@ static SYNC_MODE: AtomicU8 = AtomicU8::new(SyncMode::Sync as u8); #[cfg(test)] mod tests { - use std::io::Write; use std::os::unix::fs::FileExt; use std::sync::Arc; @@ -1436,43 +1272,6 @@ mod tests { MaybeVirtualFile::File(file) => file.write_all_at(&buf[..], offset), } } - async fn seek(&mut self, pos: SeekFrom) -> Result { - match self { - MaybeVirtualFile::VirtualFile(file) => file.seek(pos).await, - MaybeVirtualFile::File(file) => file.seek(pos), - } - } - async fn write_all( - &mut self, - buf: FullSlice, - ctx: &RequestContext, - ) -> Result<(), Error> { - match self { - MaybeVirtualFile::VirtualFile(file) => { - let (_buf, res) = file.write_all(buf, ctx).await; - res.map(|_| ()) - } - MaybeVirtualFile::File(file) => file.write_all(&buf[..]), - } - } - - // Helper function to slurp contents of a file, starting at the current position, - // into a string - async fn read_string(&mut self, ctx: &RequestContext) -> Result { - use std::io::Read; - let mut buf = String::new(); - match self { - MaybeVirtualFile::VirtualFile(file) => { - let mut buf = Vec::new(); - file.read_to_end(&mut buf, ctx).await?; - return Ok(String::from_utf8(buf).unwrap()); - } - MaybeVirtualFile::File(file) => { - file.read_to_string(&mut buf)?; - } - } - Ok(buf) - } // Helper function to slurp a portion of a file into a string async fn read_string_at( @@ -1508,7 +1307,7 @@ mod tests { opts: OpenOptions, ctx: &RequestContext, ) -> Result { - let vf = VirtualFile::open_with_options(&path, &opts, ctx).await?; + let vf = VirtualFile::open_with_options_v2(&path, &opts, ctx).await?; Ok(MaybeVirtualFile::VirtualFile(vf)) } } @@ -1568,48 +1367,23 @@ mod tests { .await?; file_a - .write_all(b"foobar".to_vec().slice_len(), &ctx) + .write_all_at(IoBuffer::from(b"foobar").slice_len(), 0, &ctx) .await?; // cannot read from a file opened in write-only mode - let _ = file_a.read_string(&ctx).await.unwrap_err(); + let _ = file_a.read_string_at(0, 1, &ctx).await.unwrap_err(); // Close the file and re-open for reading let mut file_a = A::open(path_a, OpenOptions::new().read(true).to_owned(), &ctx).await?; // cannot write to a file opened in read-only mode let _ = file_a - .write_all(b"bar".to_vec().slice_len(), &ctx) + .write_all_at(IoBuffer::from(b"bar").slice_len(), 0, &ctx) .await .unwrap_err(); // Try simple read - assert_eq!("foobar", file_a.read_string(&ctx).await?); - - // It's positioned at the EOF now. - assert_eq!("", file_a.read_string(&ctx).await?); - - // Test seeks. - assert_eq!(file_a.seek(SeekFrom::Start(1)).await?, 1); - assert_eq!("oobar", file_a.read_string(&ctx).await?); - - assert_eq!(file_a.seek(SeekFrom::End(-2)).await?, 4); - assert_eq!("ar", file_a.read_string(&ctx).await?); - - assert_eq!(file_a.seek(SeekFrom::Start(1)).await?, 1); - assert_eq!(file_a.seek(SeekFrom::Current(2)).await?, 3); - assert_eq!("bar", file_a.read_string(&ctx).await?); - - assert_eq!(file_a.seek(SeekFrom::Current(-5)).await?, 1); - assert_eq!("oobar", file_a.read_string(&ctx).await?); - - // Test erroneous seeks to before byte 0 - file_a.seek(SeekFrom::End(-7)).await.unwrap_err(); - assert_eq!(file_a.seek(SeekFrom::Start(1)).await?, 1); - file_a.seek(SeekFrom::Current(-2)).await.unwrap_err(); - - // the erroneous seek should have left the position unchanged - assert_eq!("oobar", file_a.read_string(&ctx).await?); + assert_eq!("foobar", file_a.read_string_at(0, 6, &ctx).await?); // Create another test file, and try FileExt functions on it. let path_b = testdir.join("file_b"); @@ -1635,9 +1409,6 @@ mod tests { // Open a lot of files, enough to cause some evictions. (Or to be precise, // open the same file many times. The effect is the same.) - // - // leave file_a positioned at offset 1 before we start - assert_eq!(file_a.seek(SeekFrom::Start(1)).await?, 1); let mut vfiles = Vec::new(); for _ in 0..100 { @@ -1647,7 +1418,7 @@ mod tests { &ctx, ) .await?; - assert_eq!("FOOBAR", vfile.read_string(&ctx).await?); + assert_eq!("FOOBAR", vfile.read_string_at(0, 6, &ctx).await?); vfiles.push(vfile); } @@ -1655,8 +1426,8 @@ mod tests { assert!(vfiles.len() > TEST_MAX_FILE_DESCRIPTORS * 2); // The underlying file descriptor for 'file_a' should be closed now. Try to read - // from it again. We left the file positioned at offset 1 above. - assert_eq!("oobar", file_a.read_string(&ctx).await?); + // from it again. + assert_eq!("foobar", file_a.read_string_at(0, 6, &ctx).await?); // Check that all the other FDs still work too. Use them in random order for // good measure. @@ -1695,7 +1466,7 @@ mod tests { for _ in 0..VIRTUAL_FILES { let f = VirtualFileInner::open_with_options( &test_file_path, - OpenOptions::new().read(true), + OpenOptions::new().read(true).clone(), &ctx, ) .await?; @@ -1750,7 +1521,7 @@ mod tests { .await .unwrap(); let mut file = MaybeVirtualFile::from(VirtualFile::open(&path, &ctx).await.unwrap()); - let post = file.read_string(&ctx).await.unwrap(); + let post = file.read_string_at(0, 3, &ctx).await.unwrap(); assert_eq!(post, "foo"); assert!(!tmp_path.exists()); drop(file); @@ -1759,7 +1530,7 @@ mod tests { .await .unwrap(); let mut file = MaybeVirtualFile::from(VirtualFile::open(&path, &ctx).await.unwrap()); - let post = file.read_string(&ctx).await.unwrap(); + let post = file.read_string_at(0, 3, &ctx).await.unwrap(); assert_eq!(post, "bar"); assert!(!tmp_path.exists()); drop(file); @@ -1784,7 +1555,7 @@ mod tests { .unwrap(); let mut file = MaybeVirtualFile::from(VirtualFile::open(&path, &ctx).await.unwrap()); - let post = file.read_string(&ctx).await.unwrap(); + let post = file.read_string_at(0, 3, &ctx).await.unwrap(); assert_eq!(post, "foo"); assert!(!tmp_path.exists()); drop(file); diff --git a/pageserver/src/virtual_file/io_engine.rs b/pageserver/src/virtual_file/io_engine.rs index 758dd6e377..dd04fb561a 100644 --- a/pageserver/src/virtual_file/io_engine.rs +++ b/pageserver/src/virtual_file/io_engine.rs @@ -209,6 +209,27 @@ impl IoEngine { } } } + + pub(super) async fn set_len( + &self, + file_guard: FileGuard, + len: u64, + ) -> (FileGuard, std::io::Result<()>) { + match self { + IoEngine::NotSet => panic!("not initialized"), + IoEngine::StdFs => { + let res = file_guard.with_std_file(|std_file| std_file.set_len(len)); + (file_guard, res) + } + #[cfg(target_os = "linux")] + IoEngine::TokioEpollUring => { + // TODO: ftruncate op for tokio-epoll-uring + let res = file_guard.with_std_file(|std_file| std_file.set_len(len)); + (file_guard, res) + } + } + } + pub(super) async fn write_at( &self, file_guard: FileGuard, diff --git a/pageserver/src/virtual_file/open_options.rs b/pageserver/src/virtual_file/open_options.rs index e188b8649b..7d323f3d8f 100644 --- a/pageserver/src/virtual_file/open_options.rs +++ b/pageserver/src/virtual_file/open_options.rs @@ -6,7 +6,12 @@ use std::path::Path; use super::io_engine::IoEngine; #[derive(Debug, Clone)] -pub enum OpenOptions { +pub struct OpenOptions { + write: bool, + inner: Inner, +} +#[derive(Debug, Clone)] +enum Inner { StdFs(std::fs::OpenOptions), #[cfg(target_os = "linux")] TokioEpollUring(tokio_epoll_uring::ops::open_at::OpenOptions), @@ -14,13 +19,17 @@ pub enum OpenOptions { impl Default for OpenOptions { fn default() -> Self { - match super::io_engine::get() { + let inner = match super::io_engine::get() { IoEngine::NotSet => panic!("io engine not set"), - IoEngine::StdFs => Self::StdFs(std::fs::OpenOptions::new()), + IoEngine::StdFs => Inner::StdFs(std::fs::OpenOptions::new()), #[cfg(target_os = "linux")] IoEngine::TokioEpollUring => { - Self::TokioEpollUring(tokio_epoll_uring::ops::open_at::OpenOptions::new()) + Inner::TokioEpollUring(tokio_epoll_uring::ops::open_at::OpenOptions::new()) } + }; + Self { + write: false, + inner, } } } @@ -30,13 +39,17 @@ impl OpenOptions { Self::default() } + pub(super) fn is_write(&self) -> bool { + self.write + } + pub fn read(&mut self, read: bool) -> &mut OpenOptions { - match self { - OpenOptions::StdFs(x) => { + match &mut self.inner { + Inner::StdFs(x) => { let _ = x.read(read); } #[cfg(target_os = "linux")] - OpenOptions::TokioEpollUring(x) => { + Inner::TokioEpollUring(x) => { let _ = x.read(read); } } @@ -44,12 +57,13 @@ impl OpenOptions { } pub fn write(&mut self, write: bool) -> &mut OpenOptions { - match self { - OpenOptions::StdFs(x) => { + self.write = write; + match &mut self.inner { + Inner::StdFs(x) => { let _ = x.write(write); } #[cfg(target_os = "linux")] - OpenOptions::TokioEpollUring(x) => { + Inner::TokioEpollUring(x) => { let _ = x.write(write); } } @@ -57,12 +71,12 @@ impl OpenOptions { } pub fn create(&mut self, create: bool) -> &mut OpenOptions { - match self { - OpenOptions::StdFs(x) => { + match &mut self.inner { + Inner::StdFs(x) => { let _ = x.create(create); } #[cfg(target_os = "linux")] - OpenOptions::TokioEpollUring(x) => { + Inner::TokioEpollUring(x) => { let _ = x.create(create); } } @@ -70,12 +84,12 @@ impl OpenOptions { } pub fn create_new(&mut self, create_new: bool) -> &mut OpenOptions { - match self { - OpenOptions::StdFs(x) => { + match &mut self.inner { + Inner::StdFs(x) => { let _ = x.create_new(create_new); } #[cfg(target_os = "linux")] - OpenOptions::TokioEpollUring(x) => { + Inner::TokioEpollUring(x) => { let _ = x.create_new(create_new); } } @@ -83,12 +97,12 @@ impl OpenOptions { } pub fn truncate(&mut self, truncate: bool) -> &mut OpenOptions { - match self { - OpenOptions::StdFs(x) => { + match &mut self.inner { + Inner::StdFs(x) => { let _ = x.truncate(truncate); } #[cfg(target_os = "linux")] - OpenOptions::TokioEpollUring(x) => { + Inner::TokioEpollUring(x) => { let _ = x.truncate(truncate); } } @@ -96,10 +110,10 @@ impl OpenOptions { } pub(in crate::virtual_file) async fn open(&self, path: &Path) -> std::io::Result { - match self { - OpenOptions::StdFs(x) => x.open(path).map(|file| file.into()), + match &self.inner { + Inner::StdFs(x) => x.open(path).map(|file| file.into()), #[cfg(target_os = "linux")] - OpenOptions::TokioEpollUring(x) => { + Inner::TokioEpollUring(x) => { let system = super::io_engine::tokio_epoll_uring_ext::thread_local_system().await; system.open(path, x).await.map_err(|e| match e { tokio_epoll_uring::Error::Op(e) => e, @@ -114,12 +128,12 @@ impl OpenOptions { impl std::os::unix::prelude::OpenOptionsExt for OpenOptions { fn mode(&mut self, mode: u32) -> &mut OpenOptions { - match self { - OpenOptions::StdFs(x) => { + match &mut self.inner { + Inner::StdFs(x) => { let _ = x.mode(mode); } #[cfg(target_os = "linux")] - OpenOptions::TokioEpollUring(x) => { + Inner::TokioEpollUring(x) => { let _ = x.mode(mode); } } @@ -127,12 +141,12 @@ impl std::os::unix::prelude::OpenOptionsExt for OpenOptions { } fn custom_flags(&mut self, flags: i32) -> &mut OpenOptions { - match self { - OpenOptions::StdFs(x) => { + match &mut self.inner { + Inner::StdFs(x) => { let _ = x.custom_flags(flags); } #[cfg(target_os = "linux")] - OpenOptions::TokioEpollUring(x) => { + Inner::TokioEpollUring(x) => { let _ = x.custom_flags(flags); } } diff --git a/pageserver/src/virtual_file/owned_buffers_io/aligned_buffer/buffer_mut.rs b/pageserver/src/virtual_file/owned_buffers_io/aligned_buffer/buffer_mut.rs index 3ee1a3c162..07f949b89e 100644 --- a/pageserver/src/virtual_file/owned_buffers_io/aligned_buffer/buffer_mut.rs +++ b/pageserver/src/virtual_file/owned_buffers_io/aligned_buffer/buffer_mut.rs @@ -282,6 +282,17 @@ unsafe impl tokio_epoll_uring::IoBufMut for AlignedBufferMut { } } +impl std::io::Write for AlignedBufferMut { + fn write(&mut self, buf: &[u8]) -> std::io::Result { + self.extend_from_slice(buf); + Ok(buf.len()) + } + + fn flush(&mut self) -> std::io::Result<()> { + Ok(()) + } +} + #[cfg(test)] mod tests { diff --git a/pageserver/src/virtual_file/owned_buffers_io/write.rs b/pageserver/src/virtual_file/owned_buffers_io/write.rs index 628f8d3afd..060834bf8c 100644 --- a/pageserver/src/virtual_file/owned_buffers_io/write.rs +++ b/pageserver/src/virtual_file/owned_buffers_io/write.rs @@ -1,14 +1,19 @@ mod flush; +use bytes::BufMut; pub(crate) use flush::FlushControl; use flush::FlushHandle; pub(crate) use flush::FlushTaskError; +use flush::ShutdownRequest; use tokio_epoll_uring::IoBuf; use tokio_util::sync::CancellationToken; +use tracing::trace; use super::io_buf_aligned::IoBufAligned; +use super::io_buf_aligned::IoBufAlignedMut; use super::io_buf_ext::{FullSlice, IoBufExt}; use crate::context::RequestContext; +use crate::virtual_file::UsizeIsU64; use crate::virtual_file::{IoBuffer, IoBufferMut}; pub(crate) trait CheapCloneForRead { @@ -33,12 +38,49 @@ pub trait OwnedAsyncWriter { offset: u64, ctx: &RequestContext, ) -> impl std::future::Future, std::io::Result<()>)> + Send; + fn set_len( + &self, + len: u64, + ctx: &RequestContext, + ) -> impl Future> + Send; } /// A wrapper aorund an [`OwnedAsyncWriter`] that uses a [`Buffer`] to batch /// small writes into larger writes of size [`Buffer::cap`]. +/// +/// The buffer is flushed if and only if it is full ([`Buffer::pending`] == [`Buffer::cap`]). +/// This guarantees that writes to the filesystem happen +/// - at offsets that are multiples of [`Buffer::cap`] +/// - in lengths that are multiples of [`Buffer::cap`] +/// +/// Above property is useful for Direct IO, where whatever the +/// effectively dominating disk-sector/filesystem-block/memory-page size +/// determines the requirements on +/// - the alignment of the pointer passed to the read/write operation +/// - the value of `count` (i.e., the length of the read/write operation) +/// which must be a multiple of the dominating sector/block/page size. +/// +/// See [`BufferedWriter::shutdown`] / [`BufferedWriterShutdownMode`] for different +/// ways of dealing with the special case that the buffer is not full by the time +/// we are done writing. +/// +/// The first flush to the underlying `W` happens at offset `start_offset` (arg of [`BufferedWriter::new`]). +/// The next flush is to offset `start_offset + Buffer::cap`. The one after at `start_offset + 2 * Buffer::cap` and so on. +/// +/// TODO: decouple buffer capacity from alignment requirement. +/// Right now we assume [`Buffer::cap`] is the alignment requirement, +/// but actually [`Buffer::cap`] should only determine how often we flush +/// while writing, while a separate alignment requirement argument should +/// be passed to determine alignment requirement. This could be used by +/// [`BufferedWriterShutdownMode::PadThenTruncate`] to avoid excessive +/// padding of zeroes. For example, today, with a capacity of 64KiB, we +/// would pad up to 64KiB-1 bytes of zeroes, then truncate off 64KiB-1. +/// This is wasteful, e.g., if the alignment requirement is 4KiB, we only +/// need to pad & truncate up to 4KiB-1 bytes of zeroes +/// // TODO(yuchen): For large write, implementing buffer bypass for aligned parts of the write could be beneficial to throughput, // since we would avoid copying majority of the data into the internal buffer. +// https://github.com/neondatabase/neon/issues/10101 pub struct BufferedWriter { /// Clone of the buffer that was last submitted to the flush loop. /// `None` if no flush request has been submitted, Some forever after. @@ -60,9 +102,24 @@ pub struct BufferedWriter { bytes_submitted: u64, } +/// How [`BufferedWriter::shutdown`] should deal with pending (=not-yet-flushed) data. +/// +/// Cf the [`BufferedWriter`] comment's paragraph for context on why we need to think about this. +pub enum BufferedWriterShutdownMode { + /// Drop pending data, don't write back to file. + DropTail, + /// Pad the pending data with zeroes (cf [`usize::next_multiple_of`]). + ZeroPadToNextMultiple(usize), + /// Fill the IO buffer with zeroes, flush to disk, the `ftruncate` the + /// file to the exact number of bytes written to [`Self`]. + /// + /// TODO: see in [`BufferedWriter`] comment about decoupling buffer capacity from alignment requirement. + PadThenTruncate, +} + impl BufferedWriter where - B: Buffer + Send + 'static, + B: IoBufAlignedMut + Buffer + Send + 'static, Buf: IoBufAligned + Send + Sync + CheapCloneForRead, W: OwnedAsyncWriter + Send + Sync + 'static + std::fmt::Debug, { @@ -71,6 +128,7 @@ where /// The `buf_new` function provides a way to initialize the owned buffers used by this writer. pub fn new( writer: W, + start_offset: u64, buf_new: impl Fn() -> B, gate_guard: utils::sync::gate::GateGuard, cancel: CancellationToken, @@ -88,7 +146,7 @@ where ctx.attached_child(), flush_task_span, ), - bytes_submitted: 0, + bytes_submitted: start_offset, } } @@ -109,18 +167,80 @@ where } #[cfg_attr(target_os = "macos", allow(dead_code))] - pub async fn shutdown(mut self, ctx: &RequestContext) -> Result<(u64, W), FlushTaskError> { - self.flush(ctx).await?; + pub async fn shutdown( + mut self, + mode: BufferedWriterShutdownMode, + ctx: &RequestContext, + ) -> Result<(u64, W), FlushTaskError> { + let mut mutable = self.mutable.take().expect("must not use after an error"); + let unpadded_pending = mutable.pending(); + let final_len: u64; + let shutdown_req; + match mode { + BufferedWriterShutdownMode::DropTail => { + trace!(pending=%mutable.pending(), "dropping pending data"); + drop(mutable); + final_len = self.bytes_submitted; + shutdown_req = ShutdownRequest { set_len: None }; + } + BufferedWriterShutdownMode::ZeroPadToNextMultiple(next_multiple) => { + let len = mutable.pending(); + let cap = mutable.cap(); + assert!( + len <= cap, + "buffer impl ensures this, but let's check because the extend_with below would panic if we go beyond" + ); + let padded_len = len.next_multiple_of(next_multiple); + assert!( + padded_len <= cap, + "caller specified a multiple that is larger than the buffer capacity" + ); + let count = padded_len - len; + mutable.extend_with(0, count); + trace!(count, "padding with zeros"); + self.mutable = Some(mutable); + + final_len = self.bytes_submitted + padded_len.into_u64(); + shutdown_req = ShutdownRequest { set_len: None }; + } + BufferedWriterShutdownMode::PadThenTruncate => { + let len = mutable.pending(); + let cap = mutable.cap(); + // TODO: see struct comment TODO on decoupling buffer capacity from alignment requirement. + let alignment_requirement = cap; + assert!(len <= cap, "buffer impl should ensure this"); + let padding_end_offset = len.next_multiple_of(alignment_requirement); + assert!( + padding_end_offset <= cap, + "{padding_end_offset} <= {cap} ({alignment_requirement})" + ); + let count = padding_end_offset - len; + mutable.extend_with(0, count); + trace!(count, "padding with zeros"); + self.mutable = Some(mutable); + + final_len = self.bytes_submitted + len.into_u64(); + shutdown_req = ShutdownRequest { + // Avoid set_len call if we didn't need to pad anything. + set_len: if count > 0 { Some(final_len) } else { None }, + }; + } + }; + let padded_pending = self.mutable.as_ref().map(|b| b.pending()); + trace!(unpadded_pending, padded_pending, "padding done"); + if self.mutable.is_some() { + self.flush(ctx).await?; + } let Self { - mutable: buf, + mutable: _, maybe_flushed: _, mut flush_handle, - bytes_submitted: bytes_amount, + bytes_submitted: _, } = self; - let writer = flush_handle.shutdown().await?; - assert!(buf.is_some()); - Ok((bytes_amount, writer)) + let writer = flush_handle.shutdown(shutdown_req).await?; + + Ok((final_len, writer)) } #[cfg(test)] @@ -224,6 +344,10 @@ pub trait Buffer { /// panics if `other.len() > self.cap() - self.pending()`. fn extend_from_slice(&mut self, other: &[u8]); + /// Add `count` bytes `val` into `self`. + /// Panics if `count > self.cap() - self.pending()`. + fn extend_with(&mut self, val: u8, count: usize); + /// Number of bytes in the buffer. fn pending(&self) -> usize; @@ -251,6 +375,14 @@ impl Buffer for IoBufferMut { IoBufferMut::extend_from_slice(self, other); } + fn extend_with(&mut self, val: u8, count: usize) { + if self.len() + count > self.cap() { + panic!("Buffer capacity exceeded"); + } + + IoBufferMut::put_bytes(self, val, count); + } + fn pending(&self) -> usize { self.len() } @@ -273,26 +405,22 @@ impl Buffer for IoBufferMut { mod tests { use std::sync::Mutex; + use rstest::rstest; + use super::*; use crate::context::{DownloadBehavior, RequestContext}; use crate::task_mgr::TaskKind; + #[derive(Debug, PartialEq, Eq)] + enum Op { + Write { buf: Vec, offset: u64 }, + SetLen { len: u64 }, + } + #[derive(Default, Debug)] struct RecorderWriter { /// record bytes and write offsets. - writes: Mutex, u64)>>, - } - - impl RecorderWriter { - /// Gets recorded bytes and write offsets. - fn get_writes(&self) -> Vec> { - self.writes - .lock() - .unwrap() - .iter() - .map(|(buf, _)| buf.clone()) - .collect() - } + recording: Mutex>, } impl OwnedAsyncWriter for RecorderWriter { @@ -302,28 +430,42 @@ mod tests { offset: u64, _: &RequestContext, ) -> (FullSlice, std::io::Result<()>) { - self.writes - .lock() - .unwrap() - .push((Vec::from(&buf[..]), offset)); + self.recording.lock().unwrap().push(Op::Write { + buf: Vec::from(&buf[..]), + offset, + }); (buf, Ok(())) } + async fn set_len(&self, len: u64, _ctx: &RequestContext) -> std::io::Result<()> { + self.recording.lock().unwrap().push(Op::SetLen { len }); + Ok(()) + } } fn test_ctx() -> RequestContext { RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error) } + #[rstest] #[tokio::test] - async fn test_write_all_borrowed_always_goes_through_buffer() -> anyhow::Result<()> { + async fn test_write_all_borrowed_always_goes_through_buffer( + #[values( + BufferedWriterShutdownMode::DropTail, + BufferedWriterShutdownMode::ZeroPadToNextMultiple(2), + BufferedWriterShutdownMode::PadThenTruncate + )] + mode: BufferedWriterShutdownMode, + ) -> anyhow::Result<()> { let ctx = test_ctx(); let ctx = &ctx; let recorder = RecorderWriter::default(); let gate = utils::sync::gate::Gate::default(); let cancel = CancellationToken::new(); + let cap = 4; let mut writer = BufferedWriter::<_, RecorderWriter>::new( recorder, - || IoBufferMut::with_capacity(2), + 0, + || IoBufferMut::with_capacity(cap), gate.enter()?, cancel, ctx, @@ -333,23 +475,89 @@ mod tests { writer.write_buffered_borrowed(b"abc", ctx).await?; writer.write_buffered_borrowed(b"", ctx).await?; writer.write_buffered_borrowed(b"d", ctx).await?; - writer.write_buffered_borrowed(b"e", ctx).await?; - writer.write_buffered_borrowed(b"fg", ctx).await?; - writer.write_buffered_borrowed(b"hi", ctx).await?; - writer.write_buffered_borrowed(b"j", ctx).await?; - writer.write_buffered_borrowed(b"klmno", ctx).await?; + writer.write_buffered_borrowed(b"efg", ctx).await?; + writer.write_buffered_borrowed(b"hijklm", ctx).await?; - let (_, recorder) = writer.shutdown(ctx).await?; - assert_eq!( - recorder.get_writes(), - { - let expect: &[&[u8]] = &[b"ab", b"cd", b"ef", b"gh", b"ij", b"kl", b"mn", b"o"]; - expect + let mut expect = { + [(0, b"abcd"), (4, b"efgh"), (8, b"ijkl")] + .into_iter() + .map(|(offset, v)| Op::Write { + offset, + buf: v[..].to_vec(), + }) + .collect::>() + }; + let expect_next_offset = 12; + + match &mode { + BufferedWriterShutdownMode::DropTail => (), + // We test the case with padding to next multiple of 2 so that it's different + // from the alignment requirement of 4 inferred from buffer capacity. + // See TODOs in the `BufferedWriter` struct comment on decoupling buffer capacity from alignment requirement. + BufferedWriterShutdownMode::ZeroPadToNextMultiple(2) => { + expect.push(Op::Write { + offset: expect_next_offset, + // it's legitimate for pad-to-next multiple 2 to be < alignment requirement 4 inferred from buffer capacity + buf: b"m\0".to_vec(), + }); } - .iter() - .map(|v| v[..].to_vec()) - .collect::>() + BufferedWriterShutdownMode::ZeroPadToNextMultiple(_) => unimplemented!(), + BufferedWriterShutdownMode::PadThenTruncate => { + expect.push(Op::Write { + offset: expect_next_offset, + buf: b"m\0\0\0".to_vec(), + }); + expect.push(Op::SetLen { len: 13 }); + } + } + + let (_, recorder) = writer.shutdown(mode, ctx).await?; + assert_eq!(&*recorder.recording.lock().unwrap(), &expect); + Ok(()) + } + + #[tokio::test] + async fn test_set_len_is_skipped_if_not_needed() -> anyhow::Result<()> { + let ctx = test_ctx(); + let ctx = &ctx; + let recorder = RecorderWriter::default(); + let gate = utils::sync::gate::Gate::default(); + let cancel = CancellationToken::new(); + let cap = 4; + let mut writer = BufferedWriter::<_, RecorderWriter>::new( + recorder, + 0, + || IoBufferMut::with_capacity(cap), + gate.enter()?, + cancel, + ctx, + tracing::Span::none(), ); + + // write a multiple of `cap` + writer.write_buffered_borrowed(b"abc", ctx).await?; + writer.write_buffered_borrowed(b"defgh", ctx).await?; + + let (_, recorder) = writer + .shutdown(BufferedWriterShutdownMode::PadThenTruncate, ctx) + .await?; + + let expect = { + [(0, b"abcd"), (4, b"efgh")] + .into_iter() + .map(|(offset, v)| Op::Write { + offset, + buf: v[..].to_vec(), + }) + .collect::>() + }; + + assert_eq!( + &*recorder.recording.lock().unwrap(), + &expect, + "set_len should not be called if the buffer is already aligned" + ); + Ok(()) } } diff --git a/pageserver/src/virtual_file/owned_buffers_io/write/flush.rs b/pageserver/src/virtual_file/owned_buffers_io/write/flush.rs index a522b807f3..b41a9f6cd2 100644 --- a/pageserver/src/virtual_file/owned_buffers_io/write/flush.rs +++ b/pageserver/src/virtual_file/owned_buffers_io/write/flush.rs @@ -1,7 +1,7 @@ use std::ops::ControlFlow; use tokio_util::sync::CancellationToken; -use tracing::{Instrument, info, info_span, warn}; +use tracing::{Instrument, info_span, warn}; use utils::sync::duplex; use super::{Buffer, CheapCloneForRead, OwnedAsyncWriter}; @@ -18,7 +18,7 @@ pub struct FlushHandle { pub struct FlushHandleInner { /// A bi-directional channel that sends (buffer, offset) for writes, /// and receives recyled buffer. - channel: duplex::mpsc::Duplex, FullSlice>, + channel: duplex::mpsc::Duplex, FullSlice>, /// Join handle for the background flush task. join_handle: tokio::task::JoinHandle>, } @@ -27,9 +27,27 @@ struct FlushRequest { slice: FullSlice, offset: u64, #[cfg(test)] - ready_to_flush_rx: tokio::sync::oneshot::Receiver<()>, + ready_to_flush_rx: Option>, #[cfg(test)] - done_flush_tx: tokio::sync::oneshot::Sender<()>, + done_flush_tx: Option>, +} + +pub struct ShutdownRequest { + pub set_len: Option, +} + +enum Request { + Flush(FlushRequest), + Shutdown(ShutdownRequest), +} + +impl Request { + fn op_str(&self) -> &'static str { + match self { + Request::Flush(_) => "flush", + Request::Shutdown(_) => "shutdown", + } + } } /// Constructs a request and a control object for a new flush operation. @@ -51,8 +69,8 @@ fn new_flush_op(slice: FullSlice, offset: u64) -> (FlushRequest, let request = FlushRequest { slice, offset, - ready_to_flush_rx, - done_flush_tx, + ready_to_flush_rx: Some(ready_to_flush_rx), + done_flush_tx: Some(done_flush_tx), }; (request, control) } @@ -159,10 +177,7 @@ where let (request, flush_control) = new_flush_op(slice, offset); // Submits the buffer to the background task. - let submit = self.inner_mut().channel.send(request).await; - if submit.is_err() { - return self.handle_error().await; - } + self.send(Request::Flush(request)).await?; // Wait for an available buffer from the background flush task. // This is the BACKPRESSURE mechanism: if the flush task can't keep up, @@ -174,15 +189,28 @@ where Ok((recycled, flush_control)) } + /// Sends poison pill to flush task and waits for it to exit. + pub async fn shutdown(&mut self, req: ShutdownRequest) -> Result { + self.send(Request::Shutdown(req)).await?; + self.wait().await + } + + async fn send(&mut self, request: Request) -> Result<(), FlushTaskError> { + let submit = self.inner_mut().channel.send(request).await; + if submit.is_err() { + return self.handle_error().await; + } + Ok(()) + } + async fn handle_error(&mut self) -> Result { Err(self - .shutdown() + .wait() .await .expect_err("flush task only disconnects duplex if it exits with an error")) } - /// Cleans up the channel, join the flush task. - pub async fn shutdown(&mut self) -> Result { + async fn wait(&mut self) -> Result { let handle = self .inner .take() @@ -204,7 +232,7 @@ where pub struct FlushBackgroundTask { /// A bi-directional channel that receives (buffer, offset) for writes, /// and send back recycled buffer. - channel: duplex::mpsc::Duplex, FlushRequest>, + channel: duplex::mpsc::Duplex, Request>, /// A writter for persisting data to disk. writer: W, ctx: RequestContext, @@ -226,7 +254,7 @@ where { /// Creates a new background flush task. fn new( - channel: duplex::mpsc::Duplex, FlushRequest>, + channel: duplex::mpsc::Duplex, Request>, file: W, gate_guard: utils::sync::gate::GateGuard, cancel: CancellationToken, @@ -245,15 +273,9 @@ where async fn run(mut self) -> Result { // Exit condition: channel is closed and there is no remaining buffer to be flushed while let Some(request) = self.channel.recv().await { - #[cfg(test)] - { - // In test, wait for control to signal that we are ready to flush. - if request.ready_to_flush_rx.await.is_err() { - tracing::debug!("control dropped"); - } - } + let op_kind = request.op_str(); - // Write slice to disk at `offset`. + // Perform the requested operation. // // Error handling happens according to the current policy of crashing // on fatal IO errors and retrying in place otherwise (deeming all other errors retryable). @@ -262,52 +284,112 @@ where // // TODO: use utils::backoff::retry once async closures are actually usable // - let mut slice_storage = Some(request.slice); + let mut request_storage = Some(request); for attempt in 1.. { if self.cancel.is_cancelled() { return Err(FlushTaskError::Cancelled); } let result = async { - if attempt > 1 { - info!("retrying flush"); - } - let slice = slice_storage.take().expect( + let request: Request = request_storage .take().expect( "likely previous invocation of this future didn't get polled to completion", ); - // Don't cancel this write by doing tokio::select with self.cancel.cancelled(). + match &request { + Request::Shutdown(ShutdownRequest { set_len: None }) => { + request_storage = Some(request); + return ControlFlow::Break(()); + }, + Request::Flush(_) | Request::Shutdown(ShutdownRequest { set_len: Some(_) }) => { + }, + } + if attempt > 1 { + warn!(op=%request.op_str(), "retrying"); + } + // borrows so we can async move the requests into async block while not moving these borrows here + let writer = &self.writer; + let request_storage = &mut request_storage; + let ctx = &self.ctx; + let io_fut = match request { + Request::Flush(FlushRequest { slice, offset, #[cfg(test)] ready_to_flush_rx, #[cfg(test)] done_flush_tx }) => futures::future::Either::Left(async move { + #[cfg(test)] + if let Some(ready_to_flush_rx) = ready_to_flush_rx { + { + // In test, wait for control to signal that we are ready to flush. + if ready_to_flush_rx.await.is_err() { + tracing::debug!("control dropped"); + } + } + } + let (slice, res) = writer.write_all_at(slice, offset, ctx).await; + *request_storage = Some(Request::Flush(FlushRequest { + slice, + offset, + #[cfg(test)] + ready_to_flush_rx: None, // the contract is that we notify before first attempt + #[cfg(test)] + done_flush_tx + })); + res + }), + Request::Shutdown(ShutdownRequest { set_len }) => futures::future::Either::Right(async move { + let set_len = set_len.expect("we filter out the None case above"); + let res = writer.set_len(set_len, ctx).await; + *request_storage = Some(Request::Shutdown(ShutdownRequest { + set_len: Some(set_len), + })); + res + }), + }; + // Don't cancel the io_fut by doing tokio::select with self.cancel.cancelled(). // The underlying tokio-epoll-uring slot / kernel operation is still ongoing and occupies resources. // If we retry indefinitely, we'll deplete those resources. // Future: teach tokio-epoll-uring io_uring operation cancellation, but still, // wait for cancelled ops to complete and discard their error. - let (slice, res) = self.writer.write_all_at(slice, request.offset, &self.ctx).await; - slice_storage = Some(slice); + let res = io_fut.await; let res = res.maybe_fatal_err("owned_buffers_io flush"); let Err(err) = res else { + if attempt > 1 { + warn!(op=%op_kind, "retry succeeded"); + } return ControlFlow::Break(()); }; warn!(%err, "error flushing buffered writer buffer to disk, retrying after backoff"); utils::backoff::exponential_backoff(attempt, 1.0, 10.0, &self.cancel).await; ControlFlow::Continue(()) } - .instrument(info_span!("flush_attempt", %attempt)) + .instrument(info_span!("attempt", %attempt, %op_kind)) .await; match result { ControlFlow::Break(()) => break, ControlFlow::Continue(()) => continue, } } - let slice = slice_storage.expect("loop must have run at least once"); + let request = request_storage.expect("loop must have run at least once"); - #[cfg(test)] - { - // In test, tell control we are done flushing buffer. - if request.done_flush_tx.send(()).is_err() { - tracing::debug!("control dropped"); + let slice = match request { + Request::Flush(FlushRequest { + slice, + #[cfg(test)] + mut done_flush_tx, + .. + }) => { + #[cfg(test)] + { + // In test, tell control we are done flushing buffer. + if done_flush_tx.take().expect("always Some").send(()).is_err() { + tracing::debug!("control dropped"); + } + } + slice } - } + Request::Shutdown(_) => { + // next iteration will observe recv() returning None + continue; + } + }; // Sends the buffer back to the handle for reuse. The handle is in charged of cleaning the buffer. - if self.channel.send(slice).await.is_err() { + let send_res = self.channel.send(slice).await; + if send_res.is_err() { // Although channel is closed. Still need to finish flushing the remaining buffers. continue; } diff --git a/pageserver/src/virtual_file/temporary.rs b/pageserver/src/virtual_file/temporary.rs index c4af1f6b22..84f69f9f7f 100644 --- a/pageserver/src/virtual_file/temporary.rs +++ b/pageserver/src/virtual_file/temporary.rs @@ -33,6 +33,10 @@ impl OwnedAsyncWriter for TempVirtualFile { ) -> impl std::future::Future, std::io::Result<()>)> + Send { VirtualFile::write_all_at(self, buf, offset, ctx) } + + async fn set_len(&self, len: u64, ctx: &RequestContext) -> std::io::Result<()> { + VirtualFile::set_len(self, len, ctx).await + } } impl Drop for TempVirtualFile { diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index 546b954486..6dbc2f6aa3 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -1291,7 +1291,11 @@ class NeonEnv: ps_cfg[key] = value if self.pageserver_virtual_file_io_mode is not None: - ps_cfg["virtual_file_io_mode"] = self.pageserver_virtual_file_io_mode + # TODO(christian): https://github.com/neondatabase/neon/issues/11598 + if not config.test_may_use_compatibility_snapshot_binaries: + ps_cfg["virtual_file_io_mode"] = self.pageserver_virtual_file_io_mode + else: + log.info("ignoring virtual_file_io_mode parametrization for compatibility test") if self.pageserver_wal_receiver_protocol is not None: key, value = PageserverWalReceiverProtocol.to_config_key_value(