bypass PageCache for InMemoryLayer + avoid Value::deser on L0 flush (#8537)

Part of [Epic: Bypass PageCache for user data
blocks](https://github.com/neondatabase/neon/issues/7386).

# Problem

`InMemoryLayer` still uses the `PageCache` for all data stored in the
`VirtualFile` that underlies the `EphemeralFile`.

# Background

Before this PR, `EphemeralFile` is a fancy and (code-bloated) buffered
writer around a `VirtualFile` that supports `blob_io`.

The `InMemoryLayerInner::index` stores offsets into the `EphemeralFile`.
At those offset, we find a varint length followed by the serialized
`Value`.

Vectored reads (`get_values_reconstruct_data`) are not in fact vectored
- each `Value` that needs to be read is read sequentially.

The `will_init` bit of information which we use to early-exit the
`get_values_reconstruct_data` for a given key is stored in the
serialized `Value`, meaning we have to read & deserialize the `Value`
from the `EphemeralFile`.

The L0 flushing **also** needs to re-determine the `will_init` bit of
information, by deserializing each value during L0 flush.

# Changes

1. Store the value length and `will_init` information in the
`InMemoryLayer::index`. The `EphemeralFile` thus only needs to store the
values.
2. For `get_values_reconstruct_data`:
- Use the in-memory `index` figures out which values need to be read.
Having the `will_init` stored in the index enables us to do that.
- View the EphemeralFile as a byte array of "DIO chunks", each 512 bytes
in size (adjustable constant). A "DIO chunk" is the minimal unit that we
can read under direct IO.
- Figure out which chunks need to be read to retrieve the serialized
bytes for thes values we need to read.
- Coalesce chunk reads such that each DIO chunk is only read once to
serve all value reads that need data from that chunk.
- Merge adjacent chunk reads into larger
`EphemeralFile::read_exact_at_eof_ok` of up to 128k (adjustable
constant).
3. The new `EphemeralFile::read_exact_at_eof_ok` fills the IO buffer
from the underlying VirtualFile and/or its in-memory buffer.
4. The L0 flush code is changed to use the `index` directly, `blob_io` 
5. We can remove the `ephemeral_file::page_caching` construct now.

The `get_values_reconstruct_data` changes seem like a bit overkill but
they are necessary so we issue the equivalent amount of read system
calls compared to before this PR where it was highly likely that even if
the first PageCache access was a miss, remaining reads within the same
`get_values_reconstruct_data` call from the same `EphemeralFile` page
were a hit.

The "DIO chunk" stuff is truly unnecessary for page cache bypass, but,
since we're working on [direct
IO](https://github.com/neondatabase/neon/issues/8130) and
https://github.com/neondatabase/neon/issues/8719 specifically, we need
to do _something_ like this anyways in the near future.

# Alternative Design

The original plan was to use the `vectored_blob_io` code it relies on
the invariant of Delta&Image layers that `index order == values order`.

Further, `vectored_blob_io` code's strategy for merging IOs is limited
to adjacent reads. However, with direct IO, there is another level of
merging that should be done, specifically, if multiple reads map to the
same "DIO chunk" (=alignment-requirement-sized and -aligned region of
the file), then it's "free" to read the chunk into an IO buffer and
serve the two reads from that buffer.
=> https://github.com/neondatabase/neon/issues/8719

# Testing / Performance

Correctness of the IO merging code is ensured by unit tests.

Additionally, minimal tests are added for the `EphemeralFile`
implementation and the bit-packed `InMemoryLayerIndexValue`.

Performance testing results are presented below.
All pref testing done on my M2 MacBook Pro, running a Linux VM.
It's a release build without `--features testing`.

We see definitive improvement in ingest performance microbenchmark and
an ad-hoc microbenchmark for getpage against InMemoryLayer.

```
baseline: commit 7c74112b2a origin/main
HEAD: ef1c55c52e
```

<details>

```
cargo bench --bench bench_ingest -- 'ingest 128MB/100b seq, no delta'

baseline

ingest-small-values/ingest 128MB/100b seq, no delta
                        time:   [483.50 ms 498.73 ms 522.53 ms]
                        thrpt:  [244.96 MiB/s 256.65 MiB/s 264.73 MiB/s]

HEAD

ingest-small-values/ingest 128MB/100b seq, no delta
                        time:   [479.22 ms 482.92 ms 487.35 ms]
                        thrpt:  [262.64 MiB/s 265.06 MiB/s 267.10 MiB/s]
```

</details>

We don't have a micro-benchmark for InMemoryLayer and it's quite
cumbersome to add one. So, I did manual testing in `neon_local`.

<details>

```

  ./target/release/neon_local stop
  rm -rf .neon
  ./target/release/neon_local init
  ./target/release/neon_local start
  ./target/release/neon_local tenant create --set-default
  ./target/release/neon_local endpoint create foo
  ./target/release/neon_local endpoint start foo
  psql 'postgresql://cloud_admin@127.0.0.1:55432/postgres'
psql (13.16 (Debian 13.16-0+deb11u1), server 15.7)

CREATE TABLE wal_test (
    id SERIAL PRIMARY KEY,
    data TEXT
);

DO $$
DECLARE
    i INTEGER := 1;
BEGIN
    WHILE i <= 500000 LOOP
        INSERT INTO wal_test (data) VALUES ('data');
        i := i + 1;
    END LOOP;
END $$;

-- => result is one L0 from initdb and one 137M-sized ephemeral-2

DO $$
DECLARE
    i INTEGER := 1;
    random_id INTEGER;
    random_record wal_test%ROWTYPE;
    start_time TIMESTAMP := clock_timestamp();
    selects_completed INTEGER := 0;
    min_id INTEGER := 1;  -- Minimum ID value
    max_id INTEGER := 100000;  -- Maximum ID value, based on your insert range
    iters INTEGER := 100000000;  -- Number of iterations to run
BEGIN
    WHILE i <= iters LOOP
        -- Generate a random ID within the known range
        random_id := min_id + floor(random() * (max_id - min_id + 1))::int;

        -- Select the row with the generated random ID
        SELECT * INTO random_record
        FROM wal_test
        WHERE id = random_id;

        -- Increment the select counter
        selects_completed := selects_completed + 1;

        -- Check if a second has passed
        IF EXTRACT(EPOCH FROM clock_timestamp() - start_time) >= 1 THEN
            -- Print the number of selects completed in the last second
            RAISE NOTICE 'Selects completed in last second: %', selects_completed;

            -- Reset counters for the next second
            selects_completed := 0;
            start_time := clock_timestamp();
        END IF;

        -- Increment the loop counter
        i := i + 1;
    END LOOP;
END $$;

./target/release/neon_local stop

baseline: commit 7c74112b2a origin/main

NOTICE:  Selects completed in last second: 1864
NOTICE:  Selects completed in last second: 1850
NOTICE:  Selects completed in last second: 1851
NOTICE:  Selects completed in last second: 1918
NOTICE:  Selects completed in last second: 1911
NOTICE:  Selects completed in last second: 1879
NOTICE:  Selects completed in last second: 1858
NOTICE:  Selects completed in last second: 1827
NOTICE:  Selects completed in last second: 1933

ours

NOTICE:  Selects completed in last second: 1915
NOTICE:  Selects completed in last second: 1928
NOTICE:  Selects completed in last second: 1913
NOTICE:  Selects completed in last second: 1932
NOTICE:  Selects completed in last second: 1846
NOTICE:  Selects completed in last second: 1955
NOTICE:  Selects completed in last second: 1991
NOTICE:  Selects completed in last second: 1973
```

NB: the ephemeral file sizes differ by ca 1MiB, ours being 1MiB smaller.

</details>

# Rollout

This PR changes the code in-place and  is not gated by a feature flag.
This commit is contained in:
Christian Schwarz
2024-08-28 20:31:41 +02:00
committed by GitHub
parent 63a0d0d039
commit 9627747d35
20 changed files with 1757 additions and 654 deletions

14
Cargo.lock generated
View File

@@ -936,6 +936,12 @@ dependencies = [
"which",
]
[[package]]
name = "bit_field"
version = "0.10.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dc827186963e592360843fb5ba4b973e145841266c1357f7180c43526f2e5b61"
[[package]]
name = "bitflags"
version = "1.3.2"
@@ -3683,6 +3689,7 @@ dependencies = [
"async-compression",
"async-stream",
"async-trait",
"bit_field",
"byteorder",
"bytes",
"camino",
@@ -3732,6 +3739,7 @@ dependencies = [
"reqwest 0.12.4",
"rpds",
"scopeguard",
"send-future",
"serde",
"serde_json",
"serde_path_to_error",
@@ -5455,6 +5463,12 @@ version = "1.0.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bebd363326d05ec3e2f532ab7660680f3b02130d780c299bca73469d521bc0ed"
[[package]]
name = "send-future"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "224e328af6e080cddbab3c770b1cf50f0351ba0577091ef2410c3951d835ff87"
[[package]]
name = "sentry"
version = "0.32.3"

View File

@@ -65,6 +65,7 @@ axum = { version = "0.6.20", features = ["ws"] }
base64 = "0.13.0"
bincode = "1.3"
bindgen = "0.65"
bit_field = "0.10.2"
bstr = "1.0"
byteorder = "1.4"
bytes = "1.0"
@@ -145,6 +146,7 @@ rustls-split = "0.3"
scopeguard = "1.1"
sysinfo = "0.29.2"
sd-notify = "0.4.1"
send-future = "0.1.0"
sentry = { version = "0.32", default-features = false, features = ["backtrace", "contexts", "panic", "rustls", "reqwest" ] }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1"

View File

@@ -16,6 +16,7 @@ arc-swap.workspace = true
async-compression.workspace = true
async-stream.workspace = true
async-trait.workspace = true
bit_field.workspace = true
byteorder.workspace = true
bytes.workspace = true
camino.workspace = true
@@ -52,6 +53,7 @@ rand.workspace = true
range-set-blaze = { version = "0.1.16", features = ["alloc"] }
regex.workspace = true
scopeguard.workspace = true
send-future.workspace = true
serde.workspace = true
serde_json = { workspace = true, features = ["raw_value"] }
serde_path_to_error.workspace = true

View File

@@ -103,13 +103,13 @@ async fn ingest(
batch.push((key.to_compact(), lsn, data_ser_size, data.clone()));
if batch.len() >= BATCH_SIZE {
let this_batch = std::mem::take(&mut batch);
let serialized = SerializedBatch::from_values(this_batch);
let serialized = SerializedBatch::from_values(this_batch).unwrap();
layer.put_batch(serialized, &ctx).await?;
}
}
if !batch.is_empty() {
let this_batch = std::mem::take(&mut batch);
let serialized = SerializedBatch::from_values(this_batch);
let serialized = SerializedBatch::from_values(this_batch).unwrap();
layer.put_batch(serialized, &ctx).await?;
}
layer.freeze(lsn + 1).await;

View File

@@ -0,0 +1,39 @@
//! `u64`` and `usize`` aren't guaranteed to be identical in Rust, but life is much simpler if that's the case.
pub(crate) const _ASSERT_U64_EQ_USIZE: () = {
if std::mem::size_of::<usize>() != std::mem::size_of::<u64>() {
panic!("the traits defined in this module assume that usize and u64 can be converted to each other without loss of information");
}
};
pub(crate) trait U64IsUsize {
fn into_usize(self) -> usize;
}
impl U64IsUsize for u64 {
#[inline(always)]
fn into_usize(self) -> usize {
#[allow(clippy::let_unit_value)]
let _ = _ASSERT_U64_EQ_USIZE;
self as usize
}
}
pub(crate) trait UsizeIsU64 {
fn into_u64(self) -> u64;
}
impl UsizeIsU64 for usize {
#[inline(always)]
fn into_u64(self) -> u64 {
#[allow(clippy::let_unit_value)]
let _ = _ASSERT_U64_EQ_USIZE;
self as u64
}
}
pub const fn u64_to_usize(x: u64) -> usize {
#[allow(clippy::let_unit_value)]
let _ = _ASSERT_U64_EQ_USIZE;
x as usize
}

View File

@@ -31,6 +31,7 @@ use utils::{
use crate::l0_flush::L0FlushConfig;
use crate::tenant::config::TenantConfOpt;
use crate::tenant::storage_layer::inmemory_layer::IndexEntry;
use crate::tenant::timeline::compaction::CompactL0Phase1ValueAccess;
use crate::tenant::vectored_blob_io::MaxVectoredReadBytes;
use crate::tenant::{TENANTS_SEGMENT_NAME, TIMELINES_SEGMENT_NAME};
@@ -1020,6 +1021,15 @@ impl PageServerConf {
conf.default_tenant_conf = t_conf.merge(TenantConf::default());
IndexEntry::validate_checkpoint_distance(conf.default_tenant_conf.checkpoint_distance)
.map_err(|msg| anyhow::anyhow!("{msg}"))
.with_context(|| {
format!(
"effective checkpoint distance is unsupported: {}",
conf.default_tenant_conf.checkpoint_distance
)
})?;
Ok(conf)
}

View File

@@ -16,6 +16,7 @@ pub mod l0_flush;
use futures::{stream::FuturesUnordered, StreamExt};
pub use pageserver_api::keyspace;
use tokio_util::sync::CancellationToken;
mod assert_u64_eq_usize;
pub mod aux_file;
pub mod metrics;
pub mod page_cache;

View File

@@ -877,6 +877,12 @@ impl Tenant {
});
};
// TODO: should also be rejecting tenant conf changes that violate this check.
if let Err(e) = crate::tenant::storage_layer::inmemory_layer::IndexEntry::validate_checkpoint_distance(tenant_clone.get_checkpoint_distance()) {
make_broken(&tenant_clone, anyhow::anyhow!(e), BrokenVerbosity::Error);
return Ok(());
}
let mut init_order = init_order;
// take the completion because initial tenant loading will complete when all of
// these tasks complete.

View File

@@ -148,7 +148,7 @@ pub(super) const LEN_COMPRESSION_BIT_MASK: u8 = 0xf0;
/// The maximum size of blobs we support. The highest few bits
/// are reserved for compression and other further uses.
const MAX_SUPPORTED_LEN: usize = 0x0fff_ffff;
pub(crate) const MAX_SUPPORTED_BLOB_LEN: usize = 0x0fff_ffff;
pub(super) const BYTE_UNCOMPRESSED: u8 = 0x80;
pub(super) const BYTE_ZSTD: u8 = BYTE_UNCOMPRESSED | 0x10;
@@ -326,7 +326,7 @@ impl<const BUFFERED: bool> BlobWriter<BUFFERED> {
(self.write_all(io_buf.slice_len(), ctx).await, srcbuf)
} else {
// Write a 4-byte length header
if len > MAX_SUPPORTED_LEN {
if len > MAX_SUPPORTED_BLOB_LEN {
return (
(
io_buf.slice_len(),

View File

@@ -2,7 +2,6 @@
//! Low-level Block-oriented I/O functions
//!
use super::ephemeral_file::EphemeralFile;
use super::storage_layer::delta_layer::{Adapter, DeltaLayerInner};
use crate::context::RequestContext;
use crate::page_cache::{self, FileId, PageReadGuard, PageWriteGuard, ReadBufResult, PAGE_SZ};
@@ -81,9 +80,7 @@ impl<'a> Deref for BlockLease<'a> {
/// Unlike traits, we also support the read function to be async though.
pub(crate) enum BlockReaderRef<'a> {
FileBlockReader(&'a FileBlockReader<'a>),
EphemeralFile(&'a EphemeralFile),
Adapter(Adapter<&'a DeltaLayerInner>),
Slice(&'a [u8]),
#[cfg(test)]
TestDisk(&'a super::disk_btree::tests::TestDisk),
#[cfg(test)]
@@ -100,9 +97,7 @@ impl<'a> BlockReaderRef<'a> {
use BlockReaderRef::*;
match self {
FileBlockReader(r) => r.read_blk(blknum, ctx).await,
EphemeralFile(r) => r.read_blk(blknum, ctx).await,
Adapter(r) => r.read_blk(blknum, ctx).await,
Slice(s) => Self::read_blk_slice(s, blknum),
#[cfg(test)]
TestDisk(r) => r.read_blk(blknum),
#[cfg(test)]
@@ -111,24 +106,6 @@ impl<'a> BlockReaderRef<'a> {
}
}
impl<'a> BlockReaderRef<'a> {
fn read_blk_slice(slice: &[u8], blknum: u32) -> std::io::Result<BlockLease> {
let start = (blknum as usize).checked_mul(PAGE_SZ).unwrap();
let end = start.checked_add(PAGE_SZ).unwrap();
if end > slice.len() {
return Err(std::io::Error::new(
std::io::ErrorKind::UnexpectedEof,
format!("slice too short, len={} end={}", slice.len(), end),
));
}
let slice = &slice[start..end];
let page_sized: &[u8; PAGE_SZ] = slice
.try_into()
.expect("we add PAGE_SZ to start, so the slice must have PAGE_SZ");
Ok(BlockLease::Slice(page_sized))
}
}
///
/// A "cursor" for efficiently reading multiple pages from a BlockReader
///

View File

@@ -1,13 +1,21 @@
//! Implementation of append-only file data structure
//! used to keep in-memory layers spilled on disk.
use crate::assert_u64_eq_usize::{U64IsUsize, UsizeIsU64};
use crate::config::PageServerConf;
use crate::context::RequestContext;
use crate::page_cache;
use crate::tenant::block_io::{BlockCursor, BlockLease, BlockReader};
use crate::virtual_file::{self, VirtualFile};
use crate::tenant::storage_layer::inmemory_layer::vectored_dio_read::File;
use crate::virtual_file::owned_buffers_io::slice::SliceMutExt;
use crate::virtual_file::owned_buffers_io::util::size_tracking_writer;
use crate::virtual_file::owned_buffers_io::write::Buffer;
use crate::virtual_file::{self, owned_buffers_io, VirtualFile};
use bytes::BytesMut;
use camino::Utf8PathBuf;
use num_traits::Num;
use pageserver_api::shard::TenantShardId;
use tokio_epoll_uring::{BoundedBuf, Slice};
use tracing::error;
use std::io;
use std::sync::atomic::AtomicU64;
@@ -16,12 +24,17 @@ use utils::id::TimelineId;
pub struct EphemeralFile {
_tenant_shard_id: TenantShardId,
_timeline_id: TimelineId,
rw: page_caching::RW,
page_cache_file_id: page_cache::FileId,
bytes_written: u64,
buffered_writer: owned_buffers_io::write::BufferedWriter<
BytesMut,
size_tracking_writer::Writer<VirtualFile>,
>,
/// Gate guard is held on as long as we need to do operations in the path (delete on drop)
_gate_guard: utils::sync::gate::GateGuard,
}
mod page_caching;
mod zero_padded_read_write;
const TAIL_SZ: usize = 64 * 1024;
impl EphemeralFile {
pub async fn create(
@@ -51,75 +64,178 @@ impl EphemeralFile {
)
.await?;
let page_cache_file_id = page_cache::next_file_id(); // XXX get rid, we're not page-caching anymore
Ok(EphemeralFile {
_tenant_shard_id: tenant_shard_id,
_timeline_id: timeline_id,
rw: page_caching::RW::new(file, gate_guard),
page_cache_file_id,
bytes_written: 0,
buffered_writer: owned_buffers_io::write::BufferedWriter::new(
size_tracking_writer::Writer::new(file),
BytesMut::with_capacity(TAIL_SZ),
),
_gate_guard: gate_guard,
})
}
}
impl Drop for EphemeralFile {
fn drop(&mut self) {
// unlink the file
// we are clear to do this, because we have entered a gate
let path = &self.buffered_writer.as_inner().as_inner().path;
let res = std::fs::remove_file(path);
if let Err(e) = res {
if e.kind() != std::io::ErrorKind::NotFound {
// just never log the not found errors, we cannot do anything for them; on detach
// the tenant directory is already gone.
//
// not found files might also be related to https://github.com/neondatabase/neon/issues/2442
error!("could not remove ephemeral file '{path}': {e}");
}
}
}
}
impl EphemeralFile {
pub(crate) fn len(&self) -> u64 {
self.rw.bytes_written()
self.bytes_written
}
pub(crate) fn page_cache_file_id(&self) -> page_cache::FileId {
self.rw.page_cache_file_id()
self.page_cache_file_id
}
/// See [`self::page_caching::RW::load_to_vec`].
pub(crate) async fn load_to_vec(&self, ctx: &RequestContext) -> Result<Vec<u8>, io::Error> {
self.rw.load_to_vec(ctx).await
}
pub(crate) async fn read_blk(
&self,
blknum: u32,
ctx: &RequestContext,
) -> Result<BlockLease, io::Error> {
self.rw.read_blk(blknum, ctx).await
}
#[cfg(test)]
// This is a test helper: outside of tests, we are always written to via a pre-serialized batch.
pub(crate) async fn write_blob(
&mut self,
srcbuf: &[u8],
ctx: &RequestContext,
) -> Result<u64, io::Error> {
let pos = self.rw.bytes_written();
let mut len_bytes = std::io::Cursor::new(Vec::new());
crate::tenant::storage_layer::inmemory_layer::SerializedBatch::write_blob_length(
srcbuf.len(),
&mut len_bytes,
);
let len_bytes = len_bytes.into_inner();
// Write the length field
self.rw.write_all_borrowed(&len_bytes, ctx).await?;
// Write the payload
self.rw.write_all_borrowed(srcbuf, ctx).await?;
Ok(pos)
let size = self.len().into_usize();
let vec = Vec::with_capacity(size);
let (slice, nread) = self.read_exact_at_eof_ok(0, vec.slice_full(), ctx).await?;
assert_eq!(nread, size);
let vec = slice.into_inner();
assert_eq!(vec.len(), nread);
assert_eq!(vec.capacity(), size, "we shouldn't be reallocating");
Ok(vec)
}
/// Returns the offset at which the first byte of the input was written, for use
/// in constructing indices over the written value.
///
/// Panics if the write is short because there's no way we can recover from that.
/// TODO: make upstack handle this as an error.
pub(crate) async fn write_raw(
&mut self,
srcbuf: &[u8],
ctx: &RequestContext,
) -> Result<u64, io::Error> {
let pos = self.rw.bytes_written();
) -> std::io::Result<u64> {
let pos = self.bytes_written;
let new_bytes_written = pos.checked_add(srcbuf.len().into_u64()).ok_or_else(|| {
std::io::Error::new(
std::io::ErrorKind::Other,
format!(
"write would grow EphemeralFile beyond u64::MAX: len={pos} writen={srcbuf_len}",
srcbuf_len = srcbuf.len(),
),
)
})?;
// Write the payload
self.rw.write_all_borrowed(srcbuf, ctx).await?;
let nwritten = self
.buffered_writer
.write_buffered_borrowed(srcbuf, ctx)
.await?;
assert_eq!(
nwritten,
srcbuf.len(),
"buffered writer has no short writes"
);
self.bytes_written = new_bytes_written;
Ok(pos)
}
}
impl super::storage_layer::inmemory_layer::vectored_dio_read::File for EphemeralFile {
async fn read_exact_at_eof_ok<'a, 'b, B: tokio_epoll_uring::IoBufMut + Send>(
&'b self,
start: u64,
dst: tokio_epoll_uring::Slice<B>,
ctx: &'a RequestContext,
) -> std::io::Result<(tokio_epoll_uring::Slice<B>, usize)> {
let file_size_tracking_writer = self.buffered_writer.as_inner();
let flushed_offset = file_size_tracking_writer.bytes_written();
let buffer = self.buffered_writer.inspect_buffer();
let buffered = &buffer[0..buffer.pending()];
let dst_cap = dst.bytes_total().into_u64();
let end = {
// saturating_add is correct here because the max file size is u64::MAX, so,
// if start + dst.len() > u64::MAX, then we know it will be a short read
let mut end: u64 = start.saturating_add(dst_cap);
if end > self.bytes_written {
end = self.bytes_written;
}
end
};
// inclusive, exclusive
#[derive(Debug)]
struct Range<N>(N, N);
impl<N: Num + Clone + Copy + PartialOrd + Ord> Range<N> {
fn len(&self) -> N {
if self.0 > self.1 {
N::zero()
} else {
self.1 - self.0
}
}
}
let written_range = Range(start, std::cmp::min(end, flushed_offset));
let buffered_range = Range(std::cmp::max(start, flushed_offset), end);
let dst = if written_range.len() > 0 {
let file: &VirtualFile = file_size_tracking_writer.as_inner();
let bounds = dst.bounds();
let slice = file
.read_exact_at(dst.slice(0..written_range.len().into_usize()), start, ctx)
.await?;
Slice::from_buf_bounds(Slice::into_inner(slice), bounds)
} else {
dst
};
let dst = if buffered_range.len() > 0 {
let offset_in_buffer = buffered_range
.0
.checked_sub(flushed_offset)
.unwrap()
.into_usize();
let to_copy =
&buffered[offset_in_buffer..(offset_in_buffer + buffered_range.len().into_usize())];
let bounds = dst.bounds();
let mut view = dst.slice({
let start = written_range.len().into_usize();
let end = start
.checked_add(buffered_range.len().into_usize())
.unwrap();
start..end
});
view.as_mut_rust_slice_full_zeroed()
.copy_from_slice(to_copy);
Slice::from_buf_bounds(Slice::into_inner(view), bounds)
} else {
dst
};
// TODO: in debug mode, randomize the remaining bytes in `dst` to catch bugs
Ok((dst, (end - start).into_usize()))
}
}
/// Does the given filename look like an ephemeral file?
pub fn is_ephemeral_file(filename: &str) -> bool {
if let Some(rest) = filename.strip_prefix("ephemeral-") {
@@ -129,19 +245,13 @@ pub fn is_ephemeral_file(filename: &str) -> bool {
}
}
impl BlockReader for EphemeralFile {
fn block_cursor(&self) -> super::block_io::BlockCursor<'_> {
BlockCursor::new(super::block_io::BlockReaderRef::EphemeralFile(self))
}
}
#[cfg(test)]
mod tests {
use rand::Rng;
use super::*;
use crate::context::DownloadBehavior;
use crate::task_mgr::TaskKind;
use crate::tenant::block_io::BlockReaderRef;
use rand::{thread_rng, RngCore};
use std::fs;
use std::str::FromStr;
@@ -172,69 +282,6 @@ mod tests {
Ok((conf, tenant_shard_id, timeline_id, ctx))
}
#[tokio::test]
async fn test_ephemeral_blobs() -> Result<(), io::Error> {
let (conf, tenant_id, timeline_id, ctx) = harness("ephemeral_blobs")?;
let gate = utils::sync::gate::Gate::default();
let entered = gate.enter().unwrap();
let mut file = EphemeralFile::create(conf, tenant_id, timeline_id, entered, &ctx).await?;
let pos_foo = file.write_blob(b"foo", &ctx).await?;
assert_eq!(
b"foo",
file.block_cursor()
.read_blob(pos_foo, &ctx)
.await?
.as_slice()
);
let pos_bar = file.write_blob(b"bar", &ctx).await?;
assert_eq!(
b"foo",
file.block_cursor()
.read_blob(pos_foo, &ctx)
.await?
.as_slice()
);
assert_eq!(
b"bar",
file.block_cursor()
.read_blob(pos_bar, &ctx)
.await?
.as_slice()
);
let mut blobs = Vec::new();
for i in 0..10000 {
let data = Vec::from(format!("blob{}", i).as_bytes());
let pos = file.write_blob(&data, &ctx).await?;
blobs.push((pos, data));
}
// also test with a large blobs
for i in 0..100 {
let data = format!("blob{}", i).as_bytes().repeat(100);
let pos = file.write_blob(&data, &ctx).await?;
blobs.push((pos, data));
}
let cursor = BlockCursor::new(BlockReaderRef::EphemeralFile(&file));
for (pos, expected) in blobs {
let actual = cursor.read_blob(pos, &ctx).await?;
assert_eq!(actual, expected);
}
// Test a large blob that spans multiple pages
let mut large_data = vec![0; 20000];
thread_rng().fill_bytes(&mut large_data);
let pos_large = file.write_blob(&large_data, &ctx).await?;
let result = file.block_cursor().read_blob(pos_large, &ctx).await?;
assert_eq!(result, large_data);
Ok(())
}
#[tokio::test]
async fn ephemeral_file_holds_gate_open() {
const FOREVER: std::time::Duration = std::time::Duration::from_secs(5);
@@ -268,4 +315,151 @@ mod tests {
.expect("closing completes right away")
.expect("closing does not panic");
}
#[tokio::test]
async fn test_ephemeral_file_basics() {
let (conf, tenant_id, timeline_id, ctx) = harness("test_ephemeral_file_basics").unwrap();
let gate = utils::sync::gate::Gate::default();
let mut file =
EphemeralFile::create(conf, tenant_id, timeline_id, gate.enter().unwrap(), &ctx)
.await
.unwrap();
let cap = file.buffered_writer.inspect_buffer().capacity();
let write_nbytes = cap + cap / 2;
let content: Vec<u8> = rand::thread_rng()
.sample_iter(rand::distributions::Standard)
.take(write_nbytes)
.collect();
let mut value_offsets = Vec::new();
for i in 0..write_nbytes {
let off = file.write_raw(&content[i..i + 1], &ctx).await.unwrap();
value_offsets.push(off);
}
assert!(file.len() as usize == write_nbytes);
for i in 0..write_nbytes {
assert_eq!(value_offsets[i], i.into_u64());
let buf = Vec::with_capacity(1);
let (buf_slice, nread) = file
.read_exact_at_eof_ok(i.into_u64(), buf.slice_full(), &ctx)
.await
.unwrap();
let buf = buf_slice.into_inner();
assert_eq!(nread, 1);
assert_eq!(&buf, &content[i..i + 1]);
}
let file_contents =
std::fs::read(&file.buffered_writer.as_inner().as_inner().path).unwrap();
assert_eq!(file_contents, &content[0..cap]);
let buffer_contents = file.buffered_writer.inspect_buffer();
assert_eq!(buffer_contents, &content[cap..write_nbytes]);
}
#[tokio::test]
async fn test_flushes_do_happen() {
let (conf, tenant_id, timeline_id, ctx) = harness("test_flushes_do_happen").unwrap();
let gate = utils::sync::gate::Gate::default();
let mut file =
EphemeralFile::create(conf, tenant_id, timeline_id, gate.enter().unwrap(), &ctx)
.await
.unwrap();
let cap = file.buffered_writer.inspect_buffer().capacity();
let content: Vec<u8> = rand::thread_rng()
.sample_iter(rand::distributions::Standard)
.take(cap + cap / 2)
.collect();
file.write_raw(&content, &ctx).await.unwrap();
// assert the state is as this test expects it to be
assert_eq!(
&file.load_to_vec(&ctx).await.unwrap(),
&content[0..cap + cap / 2]
);
let md = file
.buffered_writer
.as_inner()
.as_inner()
.path
.metadata()
.unwrap();
assert_eq!(
md.len(),
cap.into_u64(),
"buffered writer does one write if we write 1.5x buffer capacity"
);
assert_eq!(
&file.buffered_writer.inspect_buffer()[0..cap / 2],
&content[cap..cap + cap / 2]
);
}
#[tokio::test]
async fn test_read_split_across_file_and_buffer() {
// This test exercises the logic on the read path that splits the logical read
// into a read from the flushed part (= the file) and a copy from the buffered writer's buffer.
//
// This test build on the assertions in test_flushes_do_happen
let (conf, tenant_id, timeline_id, ctx) =
harness("test_read_split_across_file_and_buffer").unwrap();
let gate = utils::sync::gate::Gate::default();
let mut file =
EphemeralFile::create(conf, tenant_id, timeline_id, gate.enter().unwrap(), &ctx)
.await
.unwrap();
let cap = file.buffered_writer.inspect_buffer().capacity();
let content: Vec<u8> = rand::thread_rng()
.sample_iter(rand::distributions::Standard)
.take(cap + cap / 2)
.collect();
file.write_raw(&content, &ctx).await.unwrap();
let test_read = |start: usize, len: usize| {
let file = &file;
let ctx = &ctx;
let content = &content;
async move {
let (buf, nread) = file
.read_exact_at_eof_ok(
start.into_u64(),
Vec::with_capacity(len).slice_full(),
ctx,
)
.await
.unwrap();
assert_eq!(nread, len);
assert_eq!(&buf.into_inner(), &content[start..(start + len)]);
}
};
// completely within the file range
assert!(20 < cap, "test assumption");
test_read(10, 10).await;
// border onto edge of file
test_read(cap - 10, 10).await;
// read across file and buffer
test_read(cap - 10, 20).await;
// stay from start of buffer
test_read(cap, 10).await;
// completely within buffer
test_read(cap + 10, 10).await;
}
}

View File

@@ -1,153 +0,0 @@
//! Wrapper around [`super::zero_padded_read_write::RW`] that uses the
//! [`crate::page_cache`] to serve reads that need to go to the underlying [`VirtualFile`].
//!
//! Subject to removal in <https://github.com/neondatabase/neon/pull/8537>
use crate::context::RequestContext;
use crate::page_cache::{self, PAGE_SZ};
use crate::tenant::block_io::BlockLease;
use crate::virtual_file::owned_buffers_io::util::size_tracking_writer;
use crate::virtual_file::VirtualFile;
use std::io::{self};
use tokio_epoll_uring::BoundedBuf;
use tracing::*;
use super::zero_padded_read_write;
/// See module-level comment.
pub struct RW {
page_cache_file_id: page_cache::FileId,
rw: super::zero_padded_read_write::RW<size_tracking_writer::Writer<VirtualFile>>,
/// Gate guard is held on as long as we need to do operations in the path (delete on drop).
_gate_guard: utils::sync::gate::GateGuard,
}
impl RW {
pub fn new(file: VirtualFile, _gate_guard: utils::sync::gate::GateGuard) -> Self {
let page_cache_file_id = page_cache::next_file_id();
Self {
page_cache_file_id,
rw: super::zero_padded_read_write::RW::new(size_tracking_writer::Writer::new(file)),
_gate_guard,
}
}
pub fn page_cache_file_id(&self) -> page_cache::FileId {
self.page_cache_file_id
}
pub(crate) async fn write_all_borrowed(
&mut self,
srcbuf: &[u8],
ctx: &RequestContext,
) -> Result<usize, io::Error> {
// It doesn't make sense to proactively fill the page cache on the Pageserver write path
// because Compute is unlikely to access recently written data.
self.rw.write_all_borrowed(srcbuf, ctx).await
}
pub(crate) fn bytes_written(&self) -> u64 {
self.rw.bytes_written()
}
/// Load all blocks that can be read via [`Self::read_blk`] into a contiguous memory buffer.
///
/// This includes the blocks that aren't yet flushed to disk by the internal buffered writer.
/// The last block is zero-padded to [`PAGE_SZ`], so, the returned buffer is always a multiple of [`PAGE_SZ`].
pub(super) async fn load_to_vec(&self, ctx: &RequestContext) -> Result<Vec<u8>, io::Error> {
// round up to the next PAGE_SZ multiple, required by blob_io
let size = {
let s = usize::try_from(self.bytes_written()).unwrap();
if s % PAGE_SZ == 0 {
s
} else {
s.checked_add(PAGE_SZ - (s % PAGE_SZ)).unwrap()
}
};
let vec = Vec::with_capacity(size);
// read from disk what we've already flushed
let file_size_tracking_writer = self.rw.as_writer();
let flushed_range = 0..usize::try_from(file_size_tracking_writer.bytes_written()).unwrap();
let mut vec = file_size_tracking_writer
.as_inner()
.read_exact_at(
vec.slice(0..(flushed_range.end - flushed_range.start)),
u64::try_from(flushed_range.start).unwrap(),
ctx,
)
.await?
.into_inner();
// copy from in-memory buffer what we haven't flushed yet but would return when accessed via read_blk
let buffered = self.rw.get_tail_zero_padded();
vec.extend_from_slice(buffered);
assert_eq!(vec.len(), size);
assert_eq!(vec.len() % PAGE_SZ, 0);
Ok(vec)
}
pub(crate) async fn read_blk(
&self,
blknum: u32,
ctx: &RequestContext,
) -> Result<BlockLease, io::Error> {
match self.rw.read_blk(blknum).await? {
zero_padded_read_write::ReadResult::NeedsReadFromWriter { writer } => {
let cache = page_cache::get();
match cache
.read_immutable_buf(self.page_cache_file_id, blknum, ctx)
.await
.map_err(|e| {
std::io::Error::new(
std::io::ErrorKind::Other,
// order path before error because error is anyhow::Error => might have many contexts
format!(
"ephemeral file: read immutable page #{}: {}: {:#}",
blknum,
self.rw.as_writer().as_inner().path,
e,
),
)
})? {
page_cache::ReadBufResult::Found(guard) => {
return Ok(BlockLease::PageReadGuard(guard))
}
page_cache::ReadBufResult::NotFound(write_guard) => {
let write_guard = writer
.as_inner()
.read_exact_at_page(write_guard, blknum as u64 * PAGE_SZ as u64, ctx)
.await?;
let read_guard = write_guard.mark_valid();
return Ok(BlockLease::PageReadGuard(read_guard));
}
}
}
zero_padded_read_write::ReadResult::ServedFromZeroPaddedMutableTail { buffer } => {
Ok(BlockLease::EphemeralFileMutableTail(buffer))
}
}
}
}
impl Drop for RW {
fn drop(&mut self) {
// There might still be pages in the [`crate::page_cache`] for this file.
// We leave them there, [`crate::page_cache::PageCache::find_victim`] will evict them when needed.
// unlink the file
// we are clear to do this, because we have entered a gate
let path = &self.rw.as_writer().as_inner().path;
let res = std::fs::remove_file(path);
if let Err(e) = res {
if e.kind() != std::io::ErrorKind::NotFound {
// just never log the not found errors, we cannot do anything for them; on detach
// the tenant directory is already gone.
//
// not found files might also be related to https://github.com/neondatabase/neon/issues/2442
error!("could not remove ephemeral file '{path}': {e}");
}
}
}
}

View File

@@ -1,145 +0,0 @@
//! The heart of how [`super::EphemeralFile`] does its reads and writes.
//!
//! # Writes
//!
//! [`super::EphemeralFile`] writes small, borrowed buffers using [`RW::write_all_borrowed`].
//! The [`RW`] batches these into [`TAIL_SZ`] bigger writes, using [`owned_buffers_io::write::BufferedWriter`].
//!
//! # Reads
//!
//! [`super::EphemeralFile`] always reads full [`PAGE_SZ`]ed blocks using [`RW::read_blk`].
//!
//! The [`RW`] serves these reads either from the buffered writer's in-memory buffer
//! or redirects the caller to read from the underlying [`OwnedAsyncWriter`]
//! if the read is for the prefix that has already been flushed.
//!
//! # Current Usage
//!
//! The current user of this module is [`super::page_caching::RW`].
mod zero_padded;
use crate::{
context::RequestContext,
page_cache::PAGE_SZ,
virtual_file::owned_buffers_io::{
self,
write::{Buffer, OwnedAsyncWriter},
},
};
const TAIL_SZ: usize = 64 * 1024;
/// See module-level comment.
pub struct RW<W: OwnedAsyncWriter> {
buffered_writer: owned_buffers_io::write::BufferedWriter<
zero_padded::Buffer<TAIL_SZ>,
owned_buffers_io::util::size_tracking_writer::Writer<W>,
>,
}
pub enum ReadResult<'a, W> {
NeedsReadFromWriter { writer: &'a W },
ServedFromZeroPaddedMutableTail { buffer: &'a [u8; PAGE_SZ] },
}
impl<W> RW<W>
where
W: OwnedAsyncWriter,
{
pub fn new(writer: W) -> Self {
let bytes_flushed_tracker =
owned_buffers_io::util::size_tracking_writer::Writer::new(writer);
let buffered_writer = owned_buffers_io::write::BufferedWriter::new(
bytes_flushed_tracker,
zero_padded::Buffer::default(),
);
Self { buffered_writer }
}
pub(crate) fn as_writer(&self) -> &W {
self.buffered_writer.as_inner().as_inner()
}
pub async fn write_all_borrowed(
&mut self,
buf: &[u8],
ctx: &RequestContext,
) -> std::io::Result<usize> {
self.buffered_writer.write_buffered_borrowed(buf, ctx).await
}
pub fn bytes_written(&self) -> u64 {
let flushed_offset = self.buffered_writer.as_inner().bytes_written();
let buffer: &zero_padded::Buffer<TAIL_SZ> = self.buffered_writer.inspect_buffer();
flushed_offset + u64::try_from(buffer.pending()).unwrap()
}
/// Get a slice of all blocks that [`Self::read_blk`] would return as [`ReadResult::ServedFromZeroPaddedMutableTail`].
pub fn get_tail_zero_padded(&self) -> &[u8] {
let buffer: &zero_padded::Buffer<TAIL_SZ> = self.buffered_writer.inspect_buffer();
let buffer_written_up_to = buffer.pending();
// pad to next page boundary
let read_up_to = if buffer_written_up_to % PAGE_SZ == 0 {
buffer_written_up_to
} else {
buffer_written_up_to
.checked_add(PAGE_SZ - (buffer_written_up_to % PAGE_SZ))
.unwrap()
};
&buffer.as_zero_padded_slice()[0..read_up_to]
}
pub(crate) async fn read_blk(&self, blknum: u32) -> Result<ReadResult<'_, W>, std::io::Error> {
let flushed_offset = self.buffered_writer.as_inner().bytes_written();
let buffer: &zero_padded::Buffer<TAIL_SZ> = self.buffered_writer.inspect_buffer();
let buffered_offset = flushed_offset + u64::try_from(buffer.pending()).unwrap();
let read_offset = (blknum as u64) * (PAGE_SZ as u64);
// The trailing page ("block") might only be partially filled,
// yet the blob_io code relies on us to return a full PAGE_SZed slice anyway.
// Moreover, it has to be zero-padded, because when we still had
// a write-back page cache, it provided pre-zeroed pages, and blob_io came to rely on it.
// DeltaLayer probably has the same issue, not sure why it needs no special treatment.
// => check here that the read doesn't go beyond this potentially trailing
// => the zero-padding is done in the `else` branch below
let blocks_written = if buffered_offset % (PAGE_SZ as u64) == 0 {
buffered_offset / (PAGE_SZ as u64)
} else {
(buffered_offset / (PAGE_SZ as u64)) + 1
};
if (blknum as u64) >= blocks_written {
return Err(std::io::Error::new(std::io::ErrorKind::Other, anyhow::anyhow!("read past end of ephemeral_file: read=0x{read_offset:x} buffered=0x{buffered_offset:x} flushed=0x{flushed_offset}")));
}
// assertions for the `if-else` below
assert_eq!(
flushed_offset % (TAIL_SZ as u64), 0,
"we only use write_buffered_borrowed to write to the buffered writer, so it's guaranteed that flushes happen buffer.cap()-sized chunks"
);
assert_eq!(
flushed_offset % (PAGE_SZ as u64),
0,
"the logic below can't handle if the page is spread across the flushed part and the buffer"
);
if read_offset < flushed_offset {
assert!(read_offset + (PAGE_SZ as u64) <= flushed_offset);
Ok(ReadResult::NeedsReadFromWriter {
writer: self.as_writer(),
})
} else {
let read_offset_in_buffer = read_offset
.checked_sub(flushed_offset)
.expect("would have taken `if` branch instead of this one");
let read_offset_in_buffer = usize::try_from(read_offset_in_buffer).unwrap();
let zero_padded_slice = buffer.as_zero_padded_slice();
let page = &zero_padded_slice[read_offset_in_buffer..(read_offset_in_buffer + PAGE_SZ)];
Ok(ReadResult::ServedFromZeroPaddedMutableTail {
buffer: page
.try_into()
.expect("the slice above got it as page-size slice"),
})
}
}
}

View File

@@ -1,110 +0,0 @@
//! A [`crate::virtual_file::owned_buffers_io::write::Buffer`] whose
//! unwritten range is guaranteed to be zero-initialized.
//! This is used by [`crate::tenant::ephemeral_file::zero_padded_read_write::RW::read_blk`]
//! to serve page-sized reads of the trailing page when the trailing page has only been partially filled.
use std::mem::MaybeUninit;
use crate::virtual_file::owned_buffers_io::io_buf_ext::FullSlice;
/// See module-level comment.
pub struct Buffer<const N: usize> {
allocation: Box<[u8; N]>,
written: usize,
}
impl<const N: usize> Default for Buffer<N> {
fn default() -> Self {
Self {
allocation: Box::new(
// SAFETY: zeroed memory is a valid [u8; N]
unsafe { MaybeUninit::zeroed().assume_init() },
),
written: 0,
}
}
}
impl<const N: usize> Buffer<N> {
#[inline(always)]
fn invariants(&self) {
// don't check by default, unoptimized is too expensive even for debug mode
if false {
debug_assert!(self.written <= N, "{}", self.written);
debug_assert!(self.allocation[self.written..N].iter().all(|v| *v == 0));
}
}
pub fn as_zero_padded_slice(&self) -> &[u8; N] {
&self.allocation
}
}
impl<const N: usize> crate::virtual_file::owned_buffers_io::write::Buffer for Buffer<N> {
type IoBuf = Self;
fn cap(&self) -> usize {
self.allocation.len()
}
fn extend_from_slice(&mut self, other: &[u8]) {
self.invariants();
let remaining = self.allocation.len() - self.written;
if other.len() > remaining {
panic!("calling extend_from_slice() with insufficient remaining capacity");
}
self.allocation[self.written..(self.written + other.len())].copy_from_slice(other);
self.written += other.len();
self.invariants();
}
fn pending(&self) -> usize {
self.written
}
fn flush(self) -> FullSlice<Self> {
self.invariants();
let written = self.written;
FullSlice::must_new(tokio_epoll_uring::BoundedBuf::slice(self, 0..written))
}
fn reuse_after_flush(iobuf: Self::IoBuf) -> Self {
let Self {
mut allocation,
written,
} = iobuf;
allocation[0..written].fill(0);
let new = Self {
allocation,
written: 0,
};
new.invariants();
new
}
}
/// We have this trait impl so that the `flush` method in the `Buffer` impl above can produce a
/// [`tokio_epoll_uring::BoundedBuf::slice`] of the [`Self::written`] range of the data.
///
/// Remember that bytes_init is generally _not_ a tracker of the amount
/// of valid data in the io buffer; we use `Slice` for that.
/// The `IoBuf` is _only_ for keeping track of uninitialized memory, a bit like MaybeUninit.
///
/// SAFETY:
///
/// The [`Self::allocation`] is stable becauses boxes are stable.
/// The memory is zero-initialized, so, bytes_init is always N.
unsafe impl<const N: usize> tokio_epoll_uring::IoBuf for Buffer<N> {
fn stable_ptr(&self) -> *const u8 {
self.allocation.as_ptr()
}
fn bytes_init(&self) -> usize {
// Yes, N, not self.written; Read the full comment of this impl block!
N
}
fn bytes_total(&self) -> usize {
N
}
}

View File

@@ -65,7 +65,7 @@ use std::os::unix::fs::FileExt;
use std::str::FromStr;
use std::sync::Arc;
use tokio::sync::OnceCell;
use tokio_epoll_uring::IoBufMut;
use tokio_epoll_uring::IoBuf;
use tracing::*;
use utils::{
@@ -471,7 +471,7 @@ impl DeltaLayerWriterInner {
ctx: &RequestContext,
) -> (FullSlice<Buf>, anyhow::Result<()>)
where
Buf: IoBufMut + Send,
Buf: IoBuf + Send,
{
assert!(
self.lsn_range.start <= lsn,
@@ -678,7 +678,7 @@ impl DeltaLayerWriter {
ctx: &RequestContext,
) -> (FullSlice<Buf>, anyhow::Result<()>)
where
Buf: IoBufMut + Send,
Buf: IoBuf + Send,
{
self.inner
.as_mut()

View File

@@ -4,23 +4,23 @@
//! held in an ephemeral file, not in memory. The metadata for each page version, i.e.
//! its position in the file, is kept in memory, though.
//!
use crate::assert_u64_eq_usize::{u64_to_usize, U64IsUsize, UsizeIsU64};
use crate::config::PageServerConf;
use crate::context::{PageContentKind, RequestContext, RequestContextBuilder};
use crate::page_cache::PAGE_SZ;
use crate::repository::{Key, Value};
use crate::tenant::block_io::{BlockCursor, BlockReader, BlockReaderRef};
use crate::tenant::ephemeral_file::EphemeralFile;
use crate::tenant::timeline::GetVectoredError;
use crate::tenant::PageReconstructError;
use crate::virtual_file::owned_buffers_io::io_buf_ext::IoBufExt;
use crate::{l0_flush, page_cache};
use anyhow::{anyhow, Result};
use anyhow::{anyhow, Context, Result};
use bytes::Bytes;
use camino::Utf8PathBuf;
use pageserver_api::key::CompactKey;
use pageserver_api::keyspace::KeySpace;
use pageserver_api::models::InMemoryLayerInfo;
use pageserver_api::shard::TenantShardId;
use std::collections::BTreeMap;
use std::collections::{BTreeMap, HashMap};
use std::sync::{Arc, OnceLock};
use std::time::Instant;
use tracing::*;
@@ -39,6 +39,8 @@ use super::{
DeltaLayerWriter, PersistentLayerDesc, ValueReconstructSituation, ValuesReconstructState,
};
pub(crate) mod vectored_dio_read;
#[derive(Debug, PartialEq, Eq, Clone, Copy, Hash)]
pub(crate) struct InMemoryLayerFileId(page_cache::FileId);
@@ -78,9 +80,9 @@ impl std::fmt::Debug for InMemoryLayer {
pub struct InMemoryLayerInner {
/// All versions of all pages in the layer are kept here. Indexed
/// by block number and LSN. The value is an offset into the
/// by block number and LSN. The [`IndexEntry`] is an offset into the
/// ephemeral file where the page version is stored.
index: BTreeMap<CompactKey, VecMap<Lsn, u64>>,
index: BTreeMap<CompactKey, VecMap<Lsn, IndexEntry>>,
/// The values are stored in a serialized format in this file.
/// Each serialized Value is preceded by a 'u32' length field.
@@ -90,6 +92,154 @@ pub struct InMemoryLayerInner {
resource_units: GlobalResourceUnits,
}
/// Support the same max blob length as blob_io, because ultimately
/// all the InMemoryLayer contents end up being written into a delta layer,
/// using the [`crate::tenant::blob_io`].
const MAX_SUPPORTED_BLOB_LEN: usize = crate::tenant::blob_io::MAX_SUPPORTED_BLOB_LEN;
const MAX_SUPPORTED_BLOB_LEN_BITS: usize = {
let trailing_ones = MAX_SUPPORTED_BLOB_LEN.trailing_ones() as usize;
let leading_zeroes = MAX_SUPPORTED_BLOB_LEN.leading_zeros() as usize;
assert!(trailing_ones + leading_zeroes == std::mem::size_of::<usize>() * 8);
trailing_ones
};
/// See [`InMemoryLayerInner::index`].
///
/// For memory efficiency, the data is packed into a u64.
///
/// Layout:
/// - 1 bit: `will_init`
/// - [`MAX_SUPPORTED_BLOB_LEN_BITS`]: `len`
/// - [`MAX_SUPPORTED_POS_BITS`]: `pos`
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct IndexEntry(u64);
impl IndexEntry {
/// See [`Self::MAX_SUPPORTED_POS`].
const MAX_SUPPORTED_POS_BITS: usize = {
let remainder = 64 - 1 - MAX_SUPPORTED_BLOB_LEN_BITS;
if remainder < 32 {
panic!("pos can be u32 as per type system, support that");
}
remainder
};
/// The maximum supported blob offset that can be represented by [`Self`].
/// See also [`Self::validate_checkpoint_distance`].
const MAX_SUPPORTED_POS: usize = (1 << Self::MAX_SUPPORTED_POS_BITS) - 1;
// Layout
const WILL_INIT_RANGE: Range<usize> = 0..1;
const LEN_RANGE: Range<usize> =
Self::WILL_INIT_RANGE.end..Self::WILL_INIT_RANGE.end + MAX_SUPPORTED_BLOB_LEN_BITS;
const POS_RANGE: Range<usize> =
Self::LEN_RANGE.end..Self::LEN_RANGE.end + Self::MAX_SUPPORTED_POS_BITS;
const _ASSERT: () = {
if Self::POS_RANGE.end != 64 {
panic!("we don't want undefined bits for our own sanity")
}
};
/// Fails if and only if the offset or length encoded in `arg` is too large to be represented by [`Self`].
///
/// The only reason why that can happen in the system is if the [`InMemoryLayer`] grows too long.
/// The [`InMemoryLayer`] size is determined by the checkpoint distance, enforced by [`crate::tenant::Timeline::should_roll`].
///
/// Thus, to avoid failure of this function, whenever we start up and/or change checkpoint distance,
/// call [`Self::validate_checkpoint_distance`] with the new checkpoint distance value.
///
/// TODO: this check should happen ideally at config parsing time (and in the request handler when a change to checkpoint distance is requested)
/// When cleaning this up, also look into the s3 max file size check that is performed in delta layer writer.
#[inline(always)]
fn new(arg: IndexEntryNewArgs) -> anyhow::Result<Self> {
let IndexEntryNewArgs {
base_offset,
batch_offset,
len,
will_init,
} = arg;
let pos = base_offset
.checked_add(batch_offset)
.ok_or_else(|| anyhow::anyhow!("base_offset + batch_offset overflows u64: base_offset={base_offset} batch_offset={batch_offset}"))?;
if pos.into_usize() > Self::MAX_SUPPORTED_POS {
anyhow::bail!(
"base_offset+batch_offset exceeds the maximum supported value: base_offset={base_offset} batch_offset={batch_offset} (+)={pos} max={max}",
max = Self::MAX_SUPPORTED_POS
);
}
if len > MAX_SUPPORTED_BLOB_LEN {
anyhow::bail!(
"len exceeds the maximum supported length: len={len} max={MAX_SUPPORTED_BLOB_LEN}",
);
}
let mut data: u64 = 0;
use bit_field::BitField;
data.set_bits(Self::WILL_INIT_RANGE, if will_init { 1 } else { 0 });
data.set_bits(Self::LEN_RANGE, len.into_u64());
data.set_bits(Self::POS_RANGE, pos);
Ok(Self(data))
}
#[inline(always)]
fn unpack(&self) -> IndexEntryUnpacked {
use bit_field::BitField;
IndexEntryUnpacked {
will_init: self.0.get_bits(Self::WILL_INIT_RANGE) != 0,
len: self.0.get_bits(Self::LEN_RANGE),
pos: self.0.get_bits(Self::POS_RANGE),
}
}
/// See [`Self::new`].
pub(crate) const fn validate_checkpoint_distance(
checkpoint_distance: u64,
) -> Result<(), &'static str> {
if checkpoint_distance > Self::MAX_SUPPORTED_POS as u64 {
return Err("exceeds the maximum supported value");
}
let res = u64_to_usize(checkpoint_distance).checked_add(MAX_SUPPORTED_BLOB_LEN);
if res.is_none() {
return Err(
"checkpoint distance + max supported blob len overflows in-memory addition",
);
}
// NB: it is ok for the result of the addition to be larger than MAX_SUPPORTED_POS
Ok(())
}
const _ASSERT_DEFAULT_CHECKPOINT_DISTANCE_IS_VALID: () = {
let res = Self::validate_checkpoint_distance(
crate::tenant::config::defaults::DEFAULT_CHECKPOINT_DISTANCE,
);
if res.is_err() {
panic!("default checkpoint distance is valid")
}
};
}
/// Args to [`IndexEntry::new`].
#[derive(Clone, Copy)]
struct IndexEntryNewArgs {
base_offset: u64,
batch_offset: u64,
len: usize,
will_init: bool,
}
/// Unpacked representation of the bitfielded [`IndexEntry`].
#[derive(Clone, Copy, PartialEq, Eq, Debug)]
struct IndexEntryUnpacked {
will_init: bool,
len: u64,
pos: u64,
}
impl std::fmt::Debug for InMemoryLayerInner {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("InMemoryLayerInner").finish()
@@ -276,7 +426,12 @@ impl InMemoryLayer {
.build();
let inner = self.inner.read().await;
let reader = inner.file.block_cursor();
struct ValueRead {
entry_lsn: Lsn,
read: vectored_dio_read::LogicalRead<Vec<u8>>,
}
let mut reads: HashMap<Key, Vec<ValueRead>> = HashMap::new();
for range in keyspace.ranges.iter() {
for (key, vec_map) in inner
@@ -291,24 +446,62 @@ impl InMemoryLayer {
let slice = vec_map.slice_range(lsn_range);
for (entry_lsn, pos) in slice.iter().rev() {
// TODO: this uses the page cache => https://github.com/neondatabase/neon/issues/8183
let buf = reader.read_blob(*pos, &ctx).await;
if let Err(e) = buf {
reconstruct_state.on_key_error(key, PageReconstructError::from(anyhow!(e)));
for (entry_lsn, index_entry) in slice.iter().rev() {
let IndexEntryUnpacked {
pos,
len,
will_init,
} = index_entry.unpack();
reads.entry(key).or_default().push(ValueRead {
entry_lsn: *entry_lsn,
read: vectored_dio_read::LogicalRead::new(
pos,
Vec::with_capacity(len as usize),
),
});
if will_init {
break;
}
}
}
}
let value = Value::des(&buf.unwrap());
if let Err(e) = value {
// Execute the reads.
let f = vectored_dio_read::execute(
&inner.file,
reads
.iter()
.flat_map(|(_, value_reads)| value_reads.iter().map(|v| &v.read)),
&ctx,
);
send_future::SendFuture::send(f) // https://github.com/rust-lang/rust/issues/96865
.await;
// Process results into the reconstruct state
'next_key: for (key, value_reads) in reads {
for ValueRead { entry_lsn, read } in value_reads {
match read.into_result().expect("we run execute() above") {
Err(e) => {
reconstruct_state.on_key_error(key, PageReconstructError::from(anyhow!(e)));
break;
continue 'next_key;
}
Ok(value_buf) => {
let value = Value::des(&value_buf);
if let Err(e) = value {
reconstruct_state
.on_key_error(key, PageReconstructError::from(anyhow!(e)));
continue 'next_key;
}
let key_situation =
reconstruct_state.update_key(&key, *entry_lsn, value.unwrap());
if key_situation == ValueReconstructSituation::Complete {
break;
let key_situation =
reconstruct_state.update_key(&key, entry_lsn, value.unwrap());
if key_situation == ValueReconstructSituation::Complete {
// TODO: metric to see if we fetched more values than necessary
continue 'next_key;
}
// process the next value in the next iteration of the loop
}
}
}
@@ -324,8 +517,9 @@ impl InMemoryLayer {
struct SerializedBatchOffset {
key: CompactKey,
lsn: Lsn,
/// offset in bytes from the start of the batch's buffer to the Value's serialized size header.
offset: u64,
// TODO: separate type when we start serde-serializing this value, to avoid coupling
// in-memory representation to serialization format.
index_entry: IndexEntry,
}
pub struct SerializedBatch {
@@ -340,30 +534,10 @@ pub struct SerializedBatch {
}
impl SerializedBatch {
/// Write a blob length in the internal format of the EphemeralFile
pub(crate) fn write_blob_length(len: usize, cursor: &mut std::io::Cursor<Vec<u8>>) {
use std::io::Write;
if len < 0x80 {
// short one-byte length header
let len_buf = [len as u8];
cursor
.write_all(&len_buf)
.expect("Writing to Vec is infallible");
} else {
let mut len_buf = u32::to_be_bytes(len as u32);
len_buf[0] |= 0x80;
cursor
.write_all(&len_buf)
.expect("Writing to Vec is infallible");
}
}
pub fn from_values(batch: Vec<(CompactKey, Lsn, usize, Value)>) -> Self {
pub fn from_values(batch: Vec<(CompactKey, Lsn, usize, Value)>) -> anyhow::Result<Self> {
// Pre-allocate a big flat buffer to write into. This should be large but not huge: it is soft-limited in practice by
// [`crate::pgdatadir_mapping::DatadirModification::MAX_PENDING_BYTES`]
let buffer_size = batch.iter().map(|i| i.2).sum::<usize>() + 4 * batch.len();
let buffer_size = batch.iter().map(|i| i.2).sum::<usize>();
let mut cursor = std::io::Cursor::new(Vec::<u8>::with_capacity(buffer_size));
let mut offsets: Vec<SerializedBatchOffset> = Vec::with_capacity(batch.len());
@@ -371,14 +545,19 @@ impl SerializedBatch {
for (key, lsn, val_ser_size, val) in batch {
let relative_off = cursor.position();
Self::write_blob_length(val_ser_size, &mut cursor);
val.ser_into(&mut cursor)
.expect("Writing into in-memory buffer is infallible");
offsets.push(SerializedBatchOffset {
key,
lsn,
offset: relative_off,
index_entry: IndexEntry::new(IndexEntryNewArgs {
base_offset: 0,
batch_offset: relative_off,
len: val_ser_size,
will_init: val.will_init(),
})
.context("higher-level code ensures that values are within supported ranges")?,
});
max_lsn = std::cmp::max(max_lsn, lsn);
}
@@ -388,11 +567,11 @@ impl SerializedBatch {
// Assert that we didn't do any extra allocations while building buffer.
debug_assert!(buffer.len() <= buffer_size);
Self {
Ok(Self {
raw: buffer,
offsets,
max_lsn,
}
})
}
}
@@ -456,44 +635,69 @@ impl InMemoryLayer {
})
}
// Write path.
/// Write path.
///
/// Errors are not retryable, the [`InMemoryLayer`] must be discarded, and not be read from.
/// The reason why it's not retryable is that the [`EphemeralFile`] writes are not retryable.
/// TODO: it can be made retryable if we aborted the process on EphemeralFile write errors.
pub async fn put_batch(
&self,
serialized_batch: SerializedBatch,
ctx: &RequestContext,
) -> Result<()> {
) -> anyhow::Result<()> {
let mut inner = self.inner.write().await;
self.assert_writable();
let base_off = {
inner
.file
.write_raw(
&serialized_batch.raw,
&RequestContextBuilder::extend(ctx)
.page_content_kind(PageContentKind::InMemoryLayer)
.build(),
)
.await?
};
let base_offset = inner.file.len();
let SerializedBatch {
raw,
mut offsets,
max_lsn: _,
} = serialized_batch;
// Add the base_offset to the batch's index entries which are relative to the batch start.
for offset in &mut offsets {
let IndexEntryUnpacked {
will_init,
len,
pos,
} = offset.index_entry.unpack();
offset.index_entry = IndexEntry::new(IndexEntryNewArgs {
base_offset,
batch_offset: pos,
len: len.into_usize(),
will_init,
})?;
}
// Write the batch to the file
inner.file.write_raw(&raw, ctx).await?;
let new_size = inner.file.len();
let expected_new_len = base_offset
.checked_add(raw.len().into_u64())
// write_raw would error if we were to overflow u64.
// also IndexEntry and higher levels in
//the code don't allow the file to grow that large
.unwrap();
assert_eq!(new_size, expected_new_len);
// Update the index with the new entries
for SerializedBatchOffset {
key,
lsn,
offset: relative_off,
} in serialized_batch.offsets
index_entry,
} in offsets
{
let off = base_off + relative_off;
let vec_map = inner.index.entry(key).or_default();
let old = vec_map.append_or_update_last(lsn, off).unwrap().0;
let old = vec_map.append_or_update_last(lsn, index_entry).unwrap().0;
if old.is_some() {
// We already had an entry for this LSN. That's odd..
warn!("Key {} at {} already exists", key, lsn);
}
}
let size = inner.file.len();
inner.resource_units.maybe_publish_size(size);
inner.resource_units.maybe_publish_size(new_size);
Ok(())
}
@@ -537,7 +741,7 @@ impl InMemoryLayer {
{
let inner = self.inner.write().await;
for vec_map in inner.index.values() {
for (lsn, _pos) in vec_map.as_slice() {
for (lsn, _) in vec_map.as_slice() {
assert!(*lsn < end_lsn);
}
}
@@ -601,36 +805,23 @@ impl InMemoryLayer {
match l0_flush_global_state {
l0_flush::Inner::Direct { .. } => {
let file_contents: Vec<u8> = inner.file.load_to_vec(ctx).await?;
assert_eq!(
file_contents.len() % PAGE_SZ,
0,
"needed by BlockReaderRef::Slice"
);
assert_eq!(file_contents.len(), {
let written = usize::try_from(inner.file.len()).unwrap();
if written % PAGE_SZ == 0 {
written
} else {
written.checked_add(PAGE_SZ - (written % PAGE_SZ)).unwrap()
}
});
let cursor = BlockCursor::new(BlockReaderRef::Slice(&file_contents));
let mut buf = Vec::new();
let file_contents = Bytes::from(file_contents);
for (key, vec_map) in inner.index.iter() {
// Write all page versions
for (lsn, pos) in vec_map.as_slice() {
// TODO: once we have blob lengths in the in-memory index, we can
// 1. get rid of the blob_io / BlockReaderRef::Slice business and
// 2. load the file contents into a Bytes and
// 3. the use `Bytes::slice` to get the `buf` that is our blob
// 4. pass that `buf` into `put_value_bytes`
// => https://github.com/neondatabase/neon/issues/8183
cursor.read_blob_into_buf(*pos, &mut buf, ctx).await?;
let will_init = Value::des(&buf)?.will_init();
let (tmp, res) = delta_layer_writer
for (lsn, entry) in vec_map
.as_slice()
.iter()
.map(|(lsn, entry)| (lsn, entry.unpack()))
{
let IndexEntryUnpacked {
pos,
len,
will_init,
} = entry;
let buf = Bytes::slice(&file_contents, pos as usize..(pos + len) as usize);
let (_buf, res) = delta_layer_writer
.put_value_bytes(
Key::from_compact(*key),
*lsn,
@@ -640,7 +831,6 @@ impl InMemoryLayer {
)
.await;
res?;
buf = tmp.into_raw_slice().into_inner();
}
}
}
@@ -662,3 +852,134 @@ impl InMemoryLayer {
Ok(Some((desc, path)))
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_index_entry() {
const MAX_SUPPORTED_POS: usize = IndexEntry::MAX_SUPPORTED_POS;
use IndexEntryNewArgs as Args;
use IndexEntryUnpacked as Unpacked;
let roundtrip = |args, expect: Unpacked| {
let res = IndexEntry::new(args).expect("this tests expects no errors");
let IndexEntryUnpacked {
will_init,
len,
pos,
} = res.unpack();
assert_eq!(will_init, expect.will_init);
assert_eq!(len, expect.len);
assert_eq!(pos, expect.pos);
};
// basic roundtrip
for pos in [0, MAX_SUPPORTED_POS] {
for len in [0, MAX_SUPPORTED_BLOB_LEN] {
for will_init in [true, false] {
let expect = Unpacked {
will_init,
len: len.into_u64(),
pos: pos.into_u64(),
};
roundtrip(
Args {
will_init,
base_offset: pos.into_u64(),
batch_offset: 0,
len,
},
expect,
);
roundtrip(
Args {
will_init,
base_offset: 0,
batch_offset: pos.into_u64(),
len,
},
expect,
);
}
}
}
// too-large len
let too_large = Args {
will_init: false,
len: MAX_SUPPORTED_BLOB_LEN + 1,
base_offset: 0,
batch_offset: 0,
};
assert!(IndexEntry::new(too_large).is_err());
// too-large pos
{
let too_large = Args {
will_init: false,
len: 0,
base_offset: MAX_SUPPORTED_POS.into_u64() + 1,
batch_offset: 0,
};
assert!(IndexEntry::new(too_large).is_err());
let too_large = Args {
will_init: false,
len: 0,
base_offset: 0,
batch_offset: MAX_SUPPORTED_POS.into_u64() + 1,
};
assert!(IndexEntry::new(too_large).is_err());
}
// too large (base_offset + batch_offset)
{
let too_large = Args {
will_init: false,
len: 0,
base_offset: MAX_SUPPORTED_POS.into_u64(),
batch_offset: 1,
};
assert!(IndexEntry::new(too_large).is_err());
let too_large = Args {
will_init: false,
len: 0,
base_offset: MAX_SUPPORTED_POS.into_u64() - 1,
batch_offset: MAX_SUPPORTED_POS.into_u64() - 1,
};
assert!(IndexEntry::new(too_large).is_err());
}
// valid special cases
// - area past the max supported pos that is accessible by len
for len in [1, MAX_SUPPORTED_BLOB_LEN] {
roundtrip(
Args {
will_init: false,
len,
base_offset: MAX_SUPPORTED_POS.into_u64(),
batch_offset: 0,
},
Unpacked {
will_init: false,
len: len as u64,
pos: MAX_SUPPORTED_POS.into_u64(),
},
);
roundtrip(
Args {
will_init: false,
len,
base_offset: 0,
batch_offset: MAX_SUPPORTED_POS.into_u64(),
},
Unpacked {
will_init: false,
len: len as u64,
pos: MAX_SUPPORTED_POS.into_u64(),
},
);
}
}
}

View File

@@ -0,0 +1,937 @@
use std::{
collections::BTreeMap,
sync::{Arc, RwLock},
};
use itertools::Itertools;
use tokio_epoll_uring::{BoundedBuf, IoBufMut, Slice};
use crate::{
assert_u64_eq_usize::{U64IsUsize, UsizeIsU64},
context::RequestContext,
};
/// The file interface we require. At runtime, this is a [`crate::tenant::ephemeral_file::EphemeralFile`].
pub trait File: Send {
/// Attempt to read the bytes in `self` in range `[start,start+dst.bytes_total())`
/// and return the number of bytes read (let's call it `nread`).
/// The bytes read are placed in `dst`, i.e., `&dst[..nread]` will contain the read bytes.
///
/// The only reason why the read may be short (i.e., `nread != dst.bytes_total()`)
/// is if the file is shorter than `start+dst.len()`.
///
/// This is unlike [`std::os::unix::fs::FileExt::read_exact_at`] which returns an
/// [`std::io::ErrorKind::UnexpectedEof`] error if the file is shorter than `start+dst.len()`.
///
/// No guarantees are made about the remaining bytes in `dst` in case of a short read.
async fn read_exact_at_eof_ok<'a, 'b, B: IoBufMut + Send>(
&'b self,
start: u64,
dst: Slice<B>,
ctx: &'a RequestContext,
) -> std::io::Result<(Slice<B>, usize)>;
}
/// A logical read from [`File`]. See [`Self::new`].
pub struct LogicalRead<B: Buffer> {
pos: u64,
state: RwLockRefCell<LogicalReadState<B>>,
}
enum LogicalReadState<B: Buffer> {
NotStarted(B),
Ongoing(B),
Ok(B),
Error(Arc<std::io::Error>),
Undefined,
}
impl<B: Buffer> LogicalRead<B> {
/// Create a new [`LogicalRead`] from [`File`] of the data in the file in range `[ pos, pos + buf.cap() )`.
pub fn new(pos: u64, buf: B) -> Self {
Self {
pos,
state: RwLockRefCell::new(LogicalReadState::NotStarted(buf)),
}
}
pub fn into_result(self) -> Option<Result<B, Arc<std::io::Error>>> {
match self.state.into_inner() {
LogicalReadState::Ok(buf) => Some(Ok(buf)),
LogicalReadState::Error(e) => Some(Err(e)),
LogicalReadState::NotStarted(_) | LogicalReadState::Ongoing(_) => None,
LogicalReadState::Undefined => unreachable!(),
}
}
}
/// The buffer into which a [`LogicalRead`] result is placed.
pub trait Buffer: std::ops::Deref<Target = [u8]> {
/// Immutable.
fn cap(&self) -> usize;
/// Changes only through [`Self::extend_from_slice`].
fn len(&self) -> usize;
/// Panics if the total length would exceed the initialized capacity.
fn extend_from_slice(&mut self, src: &[u8]);
}
/// The minimum alignment and size requirement for disk offsets and memory buffer size for direct IO.
const DIO_CHUNK_SIZE: usize = 512;
/// If multiple chunks need to be read, merge adjacent chunk reads into batches of max size `MAX_CHUNK_BATCH_SIZE`.
/// (The unit is the number of chunks.)
const MAX_CHUNK_BATCH_SIZE: usize = {
let desired = 128 * 1024; // 128k
if desired % DIO_CHUNK_SIZE != 0 {
panic!("MAX_CHUNK_BATCH_SIZE must be a multiple of DIO_CHUNK_SIZE")
// compile-time error
}
desired / DIO_CHUNK_SIZE
};
/// Execute the given logical `reads` against `file`.
/// The results are placed in the buffers of the [`LogicalRead`]s.
/// Retrieve the results by calling [`LogicalRead::into_result`] on each [`LogicalRead`].
///
/// The [`LogicalRead`]s must be freshly created using [`LogicalRead::new`] when calling this function.
/// Otherwise, this function panics.
pub async fn execute<'a, I, F, B>(file: &F, reads: I, ctx: &RequestContext)
where
I: IntoIterator<Item = &'a LogicalRead<B>>,
F: File,
B: Buffer + IoBufMut + Send,
{
// Terminology:
// logical read = a request to read an arbitrary range of bytes from `file`; byte-level granularity
// chunk = we conceptually divide up the byte range of `file` into DIO_CHUNK_SIZEs ranges
// interest = a range within a chunk that a logical read is interested in; one logical read gets turned into many interests
// physical read = the read request we're going to issue to the OS; covers a range of chunks; chunk-level granularity
// Preserve a copy of the logical reads for debug assertions at the end
#[cfg(debug_assertions)]
let (reads, assert_logical_reads) = {
let (reads, assert) = reads.into_iter().tee();
(reads, Some(Vec::from_iter(assert)))
};
#[cfg(not(debug_assertions))]
let (reads, assert_logical_reads): (_, Option<Vec<&'a LogicalRead<B>>>) = (reads, None);
// Plan which parts of which chunks need to be appended to which buffer
let mut by_chunk: BTreeMap<u64, Vec<Interest<B>>> = BTreeMap::new();
struct Interest<'a, B: Buffer> {
logical_read: &'a LogicalRead<B>,
offset_in_chunk: u64,
len: u64,
}
for logical_read in reads {
let LogicalRead { pos, state } = logical_read;
let mut state = state.borrow_mut();
// transition from NotStarted to Ongoing
let cur = std::mem::replace(&mut *state, LogicalReadState::Undefined);
let req_len = match cur {
LogicalReadState::NotStarted(buf) => {
if buf.len() != 0 {
panic!("The `LogicalRead`s that are passed in must be freshly created using `LogicalRead::new`");
}
// buf.cap() == 0 is ok
// transition into Ongoing state
let req_len = buf.cap();
*state = LogicalReadState::Ongoing(buf);
req_len
}
x => panic!("must only call with fresh LogicalReads, got another state, leaving Undefined state behind state={x:?}"),
};
// plan which chunks we need to read from
let mut remaining = req_len;
let mut chunk_no = *pos / (DIO_CHUNK_SIZE.into_u64());
let mut offset_in_chunk = pos.into_usize() % DIO_CHUNK_SIZE;
while remaining > 0 {
let remaining_in_chunk = std::cmp::min(remaining, DIO_CHUNK_SIZE - offset_in_chunk);
by_chunk.entry(chunk_no).or_default().push(Interest {
logical_read,
offset_in_chunk: offset_in_chunk.into_u64(),
len: remaining_in_chunk.into_u64(),
});
offset_in_chunk = 0;
chunk_no += 1;
remaining -= remaining_in_chunk;
}
}
// At this point, we could iterate over by_chunk, in chunk order,
// read each chunk from disk, and fill the buffers.
// However, we can merge adjacent chunks into batches of MAX_CHUNK_BATCH_SIZE
// so we issue fewer IOs = fewer roundtrips = lower overall latency.
struct PhysicalRead<'a, B: Buffer> {
start_chunk_no: u64,
nchunks: usize,
dsts: Vec<PhysicalInterest<'a, B>>,
}
struct PhysicalInterest<'a, B: Buffer> {
logical_read: &'a LogicalRead<B>,
offset_in_physical_read: u64,
len: u64,
}
let mut physical_reads: Vec<PhysicalRead<B>> = Vec::new();
let mut by_chunk = by_chunk.into_iter().peekable();
loop {
let mut last_chunk_no = None;
let to_merge: Vec<(u64, Vec<Interest<B>>)> = by_chunk
.peeking_take_while(|(chunk_no, _)| {
if let Some(last_chunk_no) = last_chunk_no {
if *chunk_no != last_chunk_no + 1 {
return false;
}
}
last_chunk_no = Some(*chunk_no);
true
})
.take(MAX_CHUNK_BATCH_SIZE)
.collect(); // TODO: avoid this .collect()
let Some(start_chunk_no) = to_merge.first().map(|(chunk_no, _)| *chunk_no) else {
break;
};
let nchunks = to_merge.len();
let dsts = to_merge
.into_iter()
.enumerate()
.flat_map(|(i, (_, dsts))| {
dsts.into_iter().map(
move |Interest {
logical_read,
offset_in_chunk,
len,
}| {
PhysicalInterest {
logical_read,
offset_in_physical_read: i
.checked_mul(DIO_CHUNK_SIZE)
.unwrap()
.into_u64()
+ offset_in_chunk,
len,
}
},
)
})
.collect();
physical_reads.push(PhysicalRead {
start_chunk_no,
nchunks,
dsts,
});
}
drop(by_chunk);
// Execute physical reads and fill the logical read buffers
// TODO: pipelined reads; prefetch;
let get_io_buffer = |nchunks| Vec::with_capacity(nchunks * DIO_CHUNK_SIZE);
for PhysicalRead {
start_chunk_no,
nchunks,
dsts,
} in physical_reads
{
let all_done = dsts
.iter()
.all(|PhysicalInterest { logical_read, .. }| logical_read.state.borrow().is_terminal());
if all_done {
continue;
}
let read_offset = start_chunk_no
.checked_mul(DIO_CHUNK_SIZE.into_u64())
.expect("we produce chunk_nos by dividing by DIO_CHUNK_SIZE earlier");
let io_buf = get_io_buffer(nchunks).slice_full();
let req_len = io_buf.len();
let (io_buf_slice, nread) = match file.read_exact_at_eof_ok(read_offset, io_buf, ctx).await
{
Ok(t) => t,
Err(e) => {
let e = Arc::new(e);
for PhysicalInterest { logical_read, .. } in dsts {
*logical_read.state.borrow_mut() = LogicalReadState::Error(Arc::clone(&e));
// this will make later reads for the given LogicalRead short-circuit, see top of loop body
}
continue;
}
};
let io_buf = io_buf_slice.into_inner();
assert!(
nread <= io_buf.len(),
"the last chunk in the file can be a short read, so, no =="
);
let io_buf = &io_buf[..nread];
for PhysicalInterest {
logical_read,
offset_in_physical_read,
len,
} in dsts
{
let mut logical_read_state_borrow = logical_read.state.borrow_mut();
let logical_read_buf = match &mut *logical_read_state_borrow {
LogicalReadState::NotStarted(_) => {
unreachable!("we transition it into Ongoing at function entry")
}
LogicalReadState::Ongoing(buf) => buf,
LogicalReadState::Ok(_) | LogicalReadState::Error(_) => {
continue;
}
LogicalReadState::Undefined => unreachable!(),
};
let range_in_io_buf = std::ops::Range {
start: offset_in_physical_read as usize,
end: offset_in_physical_read as usize + len as usize,
};
assert!(range_in_io_buf.end >= range_in_io_buf.start);
if range_in_io_buf.end > nread {
let msg = format!(
"physical read returned EOF where this logical read expected more data in the file: offset=0x{read_offset:x} req_len=0x{req_len:x} nread=0x{nread:x} {:?}",
&*logical_read_state_borrow
);
logical_read_state_borrow.transition_to_terminal(Err(std::io::Error::new(
std::io::ErrorKind::UnexpectedEof,
msg,
)));
continue;
}
let data = &io_buf[range_in_io_buf];
// Copy data from io buffer into the logical read buffer.
// (And in debug mode, validate that the buffer impl adheres to the Buffer trait spec.)
let pre = if cfg!(debug_assertions) {
Some((logical_read_buf.len(), logical_read_buf.cap()))
} else {
None
};
logical_read_buf.extend_from_slice(data);
let post = if cfg!(debug_assertions) {
Some((logical_read_buf.len(), logical_read_buf.cap()))
} else {
None
};
match (pre, post) {
(None, None) => {}
(Some(_), None) | (None, Some(_)) => unreachable!(),
(Some((pre_len, pre_cap)), Some((post_len, post_cap))) => {
assert_eq!(pre_len + len as usize, post_len);
assert_eq!(pre_cap, post_cap);
}
}
if logical_read_buf.len() == logical_read_buf.cap() {
logical_read_state_borrow.transition_to_terminal(Ok(()));
}
}
}
if let Some(assert_logical_reads) = assert_logical_reads {
for logical_read in assert_logical_reads {
assert!(logical_read.state.borrow().is_terminal());
}
}
}
impl<B: Buffer> LogicalReadState<B> {
fn is_terminal(&self) -> bool {
match self {
LogicalReadState::NotStarted(_) | LogicalReadState::Ongoing(_) => false,
LogicalReadState::Ok(_) | LogicalReadState::Error(_) => true,
LogicalReadState::Undefined => unreachable!(),
}
}
fn transition_to_terminal(&mut self, err: std::io::Result<()>) {
let cur = std::mem::replace(self, LogicalReadState::Undefined);
let buf = match cur {
LogicalReadState::Ongoing(buf) => buf,
x => panic!("must only call in state Ongoing, got {x:?}"),
};
*self = match err {
Ok(()) => LogicalReadState::Ok(buf),
Err(e) => LogicalReadState::Error(Arc::new(e)),
};
}
}
impl<B: Buffer> std::fmt::Debug for LogicalReadState<B> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
#[derive(Debug)]
#[allow(unused)]
struct BufferDebug {
len: usize,
cap: usize,
}
impl<'a> From<&'a dyn Buffer> for BufferDebug {
fn from(buf: &'a dyn Buffer) -> Self {
Self {
len: buf.len(),
cap: buf.cap(),
}
}
}
match self {
LogicalReadState::NotStarted(b) => {
write!(f, "NotStarted({:?})", BufferDebug::from(b as &dyn Buffer))
}
LogicalReadState::Ongoing(b) => {
write!(f, "Ongoing({:?})", BufferDebug::from(b as &dyn Buffer))
}
LogicalReadState::Ok(b) => write!(f, "Ok({:?})", BufferDebug::from(b as &dyn Buffer)),
LogicalReadState::Error(e) => write!(f, "Error({:?})", e),
LogicalReadState::Undefined => write!(f, "Undefined"),
}
}
}
#[derive(Debug)]
struct RwLockRefCell<T>(RwLock<T>);
impl<T> RwLockRefCell<T> {
fn new(value: T) -> Self {
Self(RwLock::new(value))
}
fn borrow(&self) -> impl std::ops::Deref<Target = T> + '_ {
self.0.try_read().unwrap()
}
fn borrow_mut(&self) -> impl std::ops::DerefMut<Target = T> + '_ {
self.0.try_write().unwrap()
}
fn into_inner(self) -> T {
self.0.into_inner().unwrap()
}
}
impl Buffer for Vec<u8> {
fn cap(&self) -> usize {
self.capacity()
}
fn len(&self) -> usize {
self.len()
}
fn extend_from_slice(&mut self, src: &[u8]) {
if self.len() + src.len() > self.cap() {
panic!("Buffer capacity exceeded");
}
Vec::extend_from_slice(self, src);
}
}
#[cfg(test)]
#[allow(clippy::assertions_on_constants)]
mod tests {
use rand::Rng;
use crate::{
context::DownloadBehavior, task_mgr::TaskKind,
virtual_file::owned_buffers_io::slice::SliceMutExt,
};
use super::*;
use std::{cell::RefCell, collections::VecDeque};
struct InMemoryFile {
content: Vec<u8>,
}
impl InMemoryFile {
fn new_random(len: usize) -> Self {
Self {
content: rand::thread_rng()
.sample_iter(rand::distributions::Standard)
.take(len)
.collect(),
}
}
fn test_logical_read(&self, pos: u64, len: usize) -> TestLogicalRead {
let expected_result = if pos as usize + len > self.content.len() {
Err("InMemoryFile short read".to_string())
} else {
Ok(self.content[pos as usize..pos as usize + len].to_vec())
};
TestLogicalRead::new(pos, len, expected_result)
}
}
#[test]
fn test_in_memory_file() {
let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);
let file = InMemoryFile::new_random(10);
let test_read = |pos, len| {
let buf = vec![0; len];
let fut = file.read_exact_at_eof_ok(pos, buf.slice_full(), &ctx);
use futures::FutureExt;
let (slice, nread) = fut
.now_or_never()
.expect("impl never awaits")
.expect("impl never errors");
let mut buf = slice.into_inner();
buf.truncate(nread);
buf
};
assert_eq!(test_read(0, 1), &file.content[0..1]);
assert_eq!(test_read(1, 2), &file.content[1..3]);
assert_eq!(test_read(9, 2), &file.content[9..]);
assert!(test_read(10, 2).is_empty());
assert!(test_read(11, 2).is_empty());
}
impl File for InMemoryFile {
async fn read_exact_at_eof_ok<'a, 'b, B: IoBufMut + Send>(
&'b self,
start: u64,
mut dst: Slice<B>,
_ctx: &'a RequestContext,
) -> std::io::Result<(Slice<B>, usize)> {
let dst_slice: &mut [u8] = dst.as_mut_rust_slice_full_zeroed();
let nread = {
let req_len = dst_slice.len();
let len = std::cmp::min(req_len, self.content.len().saturating_sub(start as usize));
if start as usize >= self.content.len() {
0
} else {
dst_slice[..len]
.copy_from_slice(&self.content[start as usize..start as usize + len]);
len
}
};
rand::Rng::fill(&mut rand::thread_rng(), &mut dst_slice[nread..]); // to discover bugs
Ok((dst, nread))
}
}
#[derive(Clone)]
struct TestLogicalRead {
pos: u64,
len: usize,
expected_result: Result<Vec<u8>, String>,
}
impl TestLogicalRead {
fn new(pos: u64, len: usize, expected_result: Result<Vec<u8>, String>) -> Self {
Self {
pos,
len,
expected_result,
}
}
fn make_logical_read(&self) -> LogicalRead<Vec<u8>> {
LogicalRead::new(self.pos, Vec::with_capacity(self.len))
}
}
async fn execute_and_validate_test_logical_reads<I, F>(
file: &F,
test_logical_reads: I,
ctx: &RequestContext,
) where
I: IntoIterator<Item = TestLogicalRead>,
F: File,
{
let (tmp, test_logical_reads) = test_logical_reads.into_iter().tee();
let logical_reads = tmp.map(|tr| tr.make_logical_read()).collect::<Vec<_>>();
execute(file, logical_reads.iter(), ctx).await;
for (logical_read, test_logical_read) in logical_reads.into_iter().zip(test_logical_reads) {
let actual = logical_read.into_result().expect("we call execute()");
match (actual, test_logical_read.expected_result) {
(Ok(actual), Ok(expected)) if actual == expected => {}
(Err(actual), Err(expected)) => {
assert_eq!(actual.to_string(), expected);
}
(actual, expected) => panic!("expected {expected:?}\nactual {actual:?}"),
}
}
}
#[tokio::test]
async fn test_blackbox() {
let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);
let cs = DIO_CHUNK_SIZE;
let cs_u64 = cs.into_u64();
let file = InMemoryFile::new_random(10 * cs);
let test_logical_reads = vec![
file.test_logical_read(0, 1),
// adjacent to logical_read0
file.test_logical_read(1, 2),
// gap
// spans adjacent chunks
file.test_logical_read(cs_u64 - 1, 2),
// gap
// tail of chunk 3, all of chunk 4, and 2 bytes of chunk 5
file.test_logical_read(3 * cs_u64 - 1, cs + 2),
// gap
file.test_logical_read(5 * cs_u64, 1),
];
let num_test_logical_reads = test_logical_reads.len();
let test_logical_reads_perms = test_logical_reads
.into_iter()
.permutations(num_test_logical_reads);
// test all orderings of LogicalReads, the order shouldn't matter for the results
for test_logical_reads in test_logical_reads_perms {
execute_and_validate_test_logical_reads(&file, test_logical_reads, &ctx).await;
}
}
#[tokio::test]
#[should_panic]
async fn test_reusing_logical_reads_panics() {
let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);
let file = InMemoryFile::new_random(DIO_CHUNK_SIZE);
let a = file.test_logical_read(23, 10);
let logical_reads = vec![a.make_logical_read()];
execute(&file, &logical_reads, &ctx).await;
// reuse pancis
execute(&file, &logical_reads, &ctx).await;
}
struct RecorderFile<'a> {
recorded: RefCell<Vec<RecordedRead>>,
file: &'a InMemoryFile,
}
struct RecordedRead {
pos: u64,
req_len: usize,
res: Vec<u8>,
}
impl<'a> RecorderFile<'a> {
fn new(file: &'a InMemoryFile) -> RecorderFile<'a> {
Self {
recorded: Default::default(),
file,
}
}
}
impl<'x> File for RecorderFile<'x> {
async fn read_exact_at_eof_ok<'a, 'b, B: IoBufMut + Send>(
&'b self,
start: u64,
dst: Slice<B>,
ctx: &'a RequestContext,
) -> std::io::Result<(Slice<B>, usize)> {
let (dst, nread) = self.file.read_exact_at_eof_ok(start, dst, ctx).await?;
self.recorded.borrow_mut().push(RecordedRead {
pos: start,
req_len: dst.bytes_total(),
res: Vec::from(&dst[..nread]),
});
Ok((dst, nread))
}
}
#[tokio::test]
async fn test_logical_reads_to_same_chunk_are_merged_into_one_chunk_read() {
let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);
let file = InMemoryFile::new_random(2 * DIO_CHUNK_SIZE);
let a = file.test_logical_read(DIO_CHUNK_SIZE.into_u64(), 10);
let b = file.test_logical_read(DIO_CHUNK_SIZE.into_u64() + 30, 20);
let recorder = RecorderFile::new(&file);
execute_and_validate_test_logical_reads(&recorder, vec![a, b], &ctx).await;
let recorded = recorder.recorded.borrow();
assert_eq!(recorded.len(), 1);
let RecordedRead { pos, req_len, .. } = &recorded[0];
assert_eq!(*pos, DIO_CHUNK_SIZE.into_u64());
assert_eq!(*req_len, DIO_CHUNK_SIZE);
}
#[tokio::test]
async fn test_max_chunk_batch_size_is_respected() {
let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);
let file = InMemoryFile::new_random(4 * MAX_CHUNK_BATCH_SIZE * DIO_CHUNK_SIZE);
// read the 10th byte of each chunk 3 .. 3+2*MAX_CHUNK_BATCH_SIZE
assert!(3 < MAX_CHUNK_BATCH_SIZE, "test assumption");
assert!(10 < DIO_CHUNK_SIZE, "test assumption");
let mut test_logical_reads = Vec::new();
for i in 3..3 + MAX_CHUNK_BATCH_SIZE + MAX_CHUNK_BATCH_SIZE / 2 {
test_logical_reads
.push(file.test_logical_read(i.into_u64() * DIO_CHUNK_SIZE.into_u64() + 10, 1));
}
let recorder = RecorderFile::new(&file);
execute_and_validate_test_logical_reads(&recorder, test_logical_reads, &ctx).await;
let recorded = recorder.recorded.borrow();
assert_eq!(recorded.len(), 2);
{
let RecordedRead { pos, req_len, .. } = &recorded[0];
assert_eq!(*pos as usize, 3 * DIO_CHUNK_SIZE);
assert_eq!(*req_len, MAX_CHUNK_BATCH_SIZE * DIO_CHUNK_SIZE);
}
{
let RecordedRead { pos, req_len, .. } = &recorded[1];
assert_eq!(*pos as usize, (3 + MAX_CHUNK_BATCH_SIZE) * DIO_CHUNK_SIZE);
assert_eq!(*req_len, MAX_CHUNK_BATCH_SIZE / 2 * DIO_CHUNK_SIZE);
}
}
#[tokio::test]
async fn test_batch_breaks_if_chunk_is_not_interesting() {
let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);
assert!(MAX_CHUNK_BATCH_SIZE > 10, "test assumption");
let file = InMemoryFile::new_random(3 * DIO_CHUNK_SIZE);
let a = file.test_logical_read(0, 1); // chunk 0
let b = file.test_logical_read(2 * DIO_CHUNK_SIZE.into_u64(), 1); // chunk 2
let recorder = RecorderFile::new(&file);
execute_and_validate_test_logical_reads(&recorder, vec![a, b], &ctx).await;
let recorded = recorder.recorded.borrow();
assert_eq!(recorded.len(), 2);
{
let RecordedRead { pos, req_len, .. } = &recorded[0];
assert_eq!(*pos, 0);
assert_eq!(*req_len, DIO_CHUNK_SIZE);
}
{
let RecordedRead { pos, req_len, .. } = &recorded[1];
assert_eq!(*pos, 2 * DIO_CHUNK_SIZE.into_u64());
assert_eq!(*req_len, DIO_CHUNK_SIZE);
}
}
struct ExpectedRead {
expect_pos: u64,
expect_len: usize,
respond: Result<Vec<u8>, String>,
}
struct MockFile {
expected: RefCell<VecDeque<ExpectedRead>>,
}
impl Drop for MockFile {
fn drop(&mut self) {
assert!(
self.expected.borrow().is_empty(),
"expected reads not satisfied"
);
}
}
macro_rules! mock_file {
($($pos:expr , $len:expr => $respond:expr),* $(,)?) => {{
MockFile {
expected: RefCell::new(VecDeque::from(vec![$(ExpectedRead {
expect_pos: $pos,
expect_len: $len,
respond: $respond,
}),*])),
}
}};
}
impl File for MockFile {
async fn read_exact_at_eof_ok<'a, 'b, B: IoBufMut + Send>(
&'b self,
start: u64,
mut dst: Slice<B>,
_ctx: &'a RequestContext,
) -> std::io::Result<(Slice<B>, usize)> {
let ExpectedRead {
expect_pos,
expect_len,
respond,
} = self
.expected
.borrow_mut()
.pop_front()
.expect("unexpected read");
assert_eq!(start, expect_pos);
assert_eq!(dst.bytes_total(), expect_len);
match respond {
Ok(mocked_bytes) => {
let len = std::cmp::min(dst.bytes_total(), mocked_bytes.len());
let dst_slice: &mut [u8] = dst.as_mut_rust_slice_full_zeroed();
dst_slice[..len].copy_from_slice(&mocked_bytes[..len]);
rand::Rng::fill(&mut rand::thread_rng(), &mut dst_slice[len..]); // to discover bugs
Ok((dst, len))
}
Err(e) => Err(std::io::Error::new(std::io::ErrorKind::Other, e)),
}
}
}
#[tokio::test]
async fn test_mock_file() {
// Self-test to ensure the relevant features of mock file work as expected.
let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);
let mock_file = mock_file! {
0 , 512 => Ok(vec![0; 512]),
512 , 512 => Ok(vec![1; 512]),
1024 , 512 => Ok(vec![2; 10]),
2048, 1024 => Err("foo".to_owned()),
};
let buf = Vec::with_capacity(512);
let (buf, nread) = mock_file
.read_exact_at_eof_ok(0, buf.slice_full(), &ctx)
.await
.unwrap();
assert_eq!(nread, 512);
assert_eq!(&buf.into_inner()[..nread], &[0; 512]);
let buf = Vec::with_capacity(512);
let (buf, nread) = mock_file
.read_exact_at_eof_ok(512, buf.slice_full(), &ctx)
.await
.unwrap();
assert_eq!(nread, 512);
assert_eq!(&buf.into_inner()[..nread], &[1; 512]);
let buf = Vec::with_capacity(512);
let (buf, nread) = mock_file
.read_exact_at_eof_ok(1024, buf.slice_full(), &ctx)
.await
.unwrap();
assert_eq!(nread, 10);
assert_eq!(&buf.into_inner()[..nread], &[2; 10]);
let buf = Vec::with_capacity(1024);
let err = mock_file
.read_exact_at_eof_ok(2048, buf.slice_full(), &ctx)
.await
.err()
.unwrap();
assert_eq!(err.to_string(), "foo");
}
#[tokio::test]
async fn test_error_on_one_chunk_read_fails_only_dependent_logical_reads() {
let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);
let test_logical_reads = vec![
// read spanning two batches
TestLogicalRead::new(
DIO_CHUNK_SIZE.into_u64() / 2,
MAX_CHUNK_BATCH_SIZE * DIO_CHUNK_SIZE,
Err("foo".to_owned()),
),
// second read in failing chunk
TestLogicalRead::new(
(MAX_CHUNK_BATCH_SIZE * DIO_CHUNK_SIZE).into_u64() + DIO_CHUNK_SIZE.into_u64() - 10,
5,
Err("foo".to_owned()),
),
// read unaffected
TestLogicalRead::new(
(MAX_CHUNK_BATCH_SIZE * DIO_CHUNK_SIZE).into_u64()
+ 2 * DIO_CHUNK_SIZE.into_u64()
+ 10,
5,
Ok(vec![1; 5]),
),
];
let (tmp, test_logical_reads) = test_logical_reads.into_iter().tee();
let test_logical_read_perms = tmp.permutations(test_logical_reads.len());
for test_logical_reads in test_logical_read_perms {
let file = mock_file!(
0, MAX_CHUNK_BATCH_SIZE*DIO_CHUNK_SIZE => Ok(vec![0; MAX_CHUNK_BATCH_SIZE*DIO_CHUNK_SIZE]),
(MAX_CHUNK_BATCH_SIZE*DIO_CHUNK_SIZE).into_u64(), DIO_CHUNK_SIZE => Err("foo".to_owned()),
(MAX_CHUNK_BATCH_SIZE*DIO_CHUNK_SIZE + 2*DIO_CHUNK_SIZE).into_u64(), DIO_CHUNK_SIZE => Ok(vec![1; DIO_CHUNK_SIZE]),
);
execute_and_validate_test_logical_reads(&file, test_logical_reads, &ctx).await;
}
}
struct TestShortReadsSetup {
ctx: RequestContext,
file: InMemoryFile,
written: u64,
}
fn setup_short_chunk_read_tests() -> TestShortReadsSetup {
let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);
assert!(DIO_CHUNK_SIZE > 20, "test assumption");
let written = (2 * DIO_CHUNK_SIZE - 10).into_u64();
let file = InMemoryFile::new_random(written as usize);
TestShortReadsSetup { ctx, file, written }
}
#[tokio::test]
async fn test_short_chunk_read_from_written_range() {
// Test what happens if there are logical reads
// that start within the last chunk, and
// the last chunk is not the full chunk length.
//
// The read should succeed despite the short chunk length.
let TestShortReadsSetup { ctx, file, written } = setup_short_chunk_read_tests();
let a = file.test_logical_read(written - 10, 5);
let recorder = RecorderFile::new(&file);
execute_and_validate_test_logical_reads(&recorder, vec![a], &ctx).await;
let recorded = recorder.recorded.borrow();
assert_eq!(recorded.len(), 1);
let RecordedRead { pos, req_len, res } = &recorded[0];
assert_eq!(*pos, DIO_CHUNK_SIZE.into_u64());
assert_eq!(*req_len, DIO_CHUNK_SIZE);
assert_eq!(res, &file.content[DIO_CHUNK_SIZE..(written as usize)]);
}
#[tokio::test]
async fn test_short_chunk_read_and_logical_read_from_unwritten_range() {
// Test what happens if there are logical reads
// that start within the last chunk, and
// the last chunk is not the full chunk length, and
// the logical reads end in the unwritten range.
//
// All should fail with UnexpectedEof and have the same IO pattern.
async fn the_impl(offset_delta: i64) {
let TestShortReadsSetup { ctx, file, written } = setup_short_chunk_read_tests();
let offset = u64::try_from(
i64::try_from(written)
.unwrap()
.checked_add(offset_delta)
.unwrap(),
)
.unwrap();
let a = file.test_logical_read(offset, 5);
let recorder = RecorderFile::new(&file);
let a_vr = a.make_logical_read();
execute(&recorder, vec![&a_vr], &ctx).await;
// validate the LogicalRead result
let a_res = a_vr.into_result().unwrap();
let a_err = a_res.unwrap_err();
assert_eq!(a_err.kind(), std::io::ErrorKind::UnexpectedEof);
// validate the IO pattern
let recorded = recorder.recorded.borrow();
assert_eq!(recorded.len(), 1);
let RecordedRead { pos, req_len, res } = &recorded[0];
assert_eq!(*pos, DIO_CHUNK_SIZE.into_u64());
assert_eq!(*req_len, DIO_CHUNK_SIZE);
assert_eq!(res, &file.content[DIO_CHUNK_SIZE..(written as usize)]);
}
the_impl(-1).await; // start == length - 1
the_impl(0).await; // start == length
the_impl(1).await; // start == length + 1
}
// TODO: mixed: some valid, some UnexpectedEof
// TODO: same tests but with merges
}

View File

@@ -69,7 +69,7 @@ use crate::{
config::defaults::DEFAULT_PITR_INTERVAL,
layer_map::{LayerMap, SearchResult},
metadata::TimelineMetadata,
storage_layer::PersistentLayerDesc,
storage_layer::{inmemory_layer::IndexEntry, PersistentLayerDesc},
},
walredo,
};
@@ -1907,6 +1907,8 @@ impl Timeline {
true
} else if projected_layer_size >= checkpoint_distance {
// NB: this check is relied upon by:
let _ = IndexEntry::validate_checkpoint_distance;
info!(
"Will roll layer at {} with layer size {} due to layer size ({})",
projected_lsn, layer_size, projected_layer_size
@@ -5702,7 +5704,7 @@ impl<'a> TimelineWriter<'a> {
return Ok(());
}
let serialized_batch = inmemory_layer::SerializedBatch::from_values(batch);
let serialized_batch = inmemory_layer::SerializedBatch::from_values(batch)?;
let batch_max_lsn = serialized_batch.max_lsn;
let buf_size: u64 = serialized_batch.raw.len() as u64;

View File

@@ -78,6 +78,7 @@ where
.expect("must not use after we returned an error")
}
/// Guarantees that if Ok() is returned, all bytes in `chunk` have been accepted.
#[cfg_attr(target_os = "macos", allow(dead_code))]
pub async fn write_buffered<S: IoBuf + Send>(
&mut self,

View File

@@ -247,9 +247,10 @@ def test_total_size_limit(neon_env_builder: NeonEnvBuilder):
compaction_period_s = 10
checkpoint_distance = 1024**3
tenant_conf = {
# Large space + time thresholds: effectively disable these limits
"checkpoint_distance": f"{1024 ** 4}",
"checkpoint_distance": f"{checkpoint_distance}",
"checkpoint_timeout": "3600s",
"compaction_period": f"{compaction_period_s}s",
}
@@ -269,7 +270,11 @@ def test_total_size_limit(neon_env_builder: NeonEnvBuilder):
for tenant, timeline, last_flush_lsn in last_flush_lsns:
http_client = env.pageserver.http_client()
initdb_lsn = Lsn(http_client.timeline_detail(tenant, timeline)["initdb_lsn"])
total_bytes_ingested += last_flush_lsn - initdb_lsn
this_timeline_ingested = last_flush_lsn - initdb_lsn
assert (
this_timeline_ingested < checkpoint_distance * 0.8
), "this test is supposed to fill InMemoryLayer"
total_bytes_ingested += this_timeline_ingested
log.info(f"Ingested {total_bytes_ingested} bytes since initdb (vs max dirty {max_dirty_data})")
assert total_bytes_ingested > max_dirty_data