don't think in pages, but DIO chunks; remove read_page & page_caching remnants

This commit is contained in:
Christian Schwarz
2024-08-14 19:26:40 +00:00
parent 37bfa04996
commit 6f65b4d2d3
5 changed files with 207 additions and 587 deletions

View File

@@ -4,9 +4,16 @@
use crate::config::PageServerConf;
use crate::context::RequestContext;
use crate::page_cache;
use crate::virtual_file::{self, VirtualFile};
use crate::virtual_file::owned_buffers_io::slice::SliceMutExt;
use crate::virtual_file::owned_buffers_io::util::size_tracking_writer;
use crate::virtual_file::owned_buffers_io::write::Buffer;
use crate::virtual_file::{self, owned_buffers_io, VirtualFile};
use anyhow::Context;
use bytes::BytesMut;
use camino::Utf8PathBuf;
use pageserver_api::shard::TenantShardId;
use tokio_epoll_uring::{BoundedBuf, IoBufMut, Slice};
use tracing::error;
use std::io;
use std::sync::atomic::AtomicU64;
@@ -15,14 +22,19 @@ use utils::id::TimelineId;
pub struct EphemeralFile {
_tenant_shard_id: TenantShardId,
_timeline_id: TimelineId,
rw: page_caching::RW,
page_cache_file_id: page_cache::FileId,
bytes_written: u32,
buffered_writer: owned_buffers_io::write::BufferedWriter<
BytesMut,
size_tracking_writer::Writer<VirtualFile>,
>,
/// Gate guard is held on as long as we need to do operations in the path (delete on drop)
_gate_guard: utils::sync::gate::GateGuard,
}
pub(super) mod page_caching;
use super::storage_layer::inmemory_layer::InMemoryLayerIndexValue;
mod zero_padded_read_write;
const TAIL_SZ: usize = 64 * 1024;
impl EphemeralFile {
pub async fn create(
@@ -52,33 +64,164 @@ impl EphemeralFile {
)
.await?;
let page_cache_file_id = page_cache::next_file_id(); // XXX get rid, we're not page-caching anymore
Ok(EphemeralFile {
_tenant_shard_id: tenant_shard_id,
_timeline_id: timeline_id,
rw: page_caching::RW::new(file, gate_guard),
page_cache_file_id,
bytes_written: 0,
buffered_writer: owned_buffers_io::write::BufferedWriter::new(
size_tracking_writer::Writer::new(file),
BytesMut::with_capacity(TAIL_SZ),
),
_gate_guard: gate_guard,
})
}
}
impl Drop for EphemeralFile {
fn drop(&mut self) {
// unlink the file
// we are clear to do this, because we have entered a gate
let path = &self.buffered_writer.as_inner().as_inner().path;
let res = std::fs::remove_file(path);
if let Err(e) = res {
if e.kind() != std::io::ErrorKind::NotFound {
// just never log the not found errors, we cannot do anything for them; on detach
// the tenant directory is already gone.
//
// not found files might also be related to https://github.com/neondatabase/neon/issues/2442
error!("could not remove ephemeral file '{path}': {e}");
}
}
}
}
impl EphemeralFile {
pub(crate) fn len(&self) -> u32 {
self.rw.bytes_written()
self.bytes_written
}
pub(crate) fn page_cache_file_id(&self) -> page_cache::FileId {
self.rw.page_cache_file_id()
self.page_cache_file_id
}
/// See [`self::page_caching::RW::load_to_vec`].
pub(crate) async fn load_to_vec(&self, ctx: &RequestContext) -> Result<Vec<u8>, io::Error> {
self.rw.load_to_vec(ctx).await
let size = usize::try_from(self.len()).unwrap();
let vec = Vec::with_capacity(size);
// read from disk what we've already flushed
let file_size_tracker = self.buffered_writer.as_inner();
let flushed_offset = usize::try_from(file_size_tracker.bytes_written()).unwrap();
let flushed_range = 0..flushed_offset;
let file: &VirtualFile = file_size_tracker.as_inner();
let mut vec = file
.read_exact_at(
vec.slice(0..(flushed_range.end - flushed_range.start)),
u64::try_from(flushed_range.start).unwrap(),
ctx,
)
.await?
.into_inner();
// copy from in-memory buffer what we haven't flushed yet but would return when accessed via read_blk
let buffer = self.buffered_writer.inspect_buffer();
let buffered = &buffer[0..buffer.pending()];
vec.extend_from_slice(buffered);
assert_eq!(vec.len(), size);
Ok(vec)
}
pub(crate) async fn read_page(
/// Fill dst will dst.bytes_total() bytes from the bytes written to the buffered writer from offset `start` and later.
/// If `dst` is larger than the available bytes, the read will be short.
/// The read will never be short for other reasons.
/// The number of bytes read into `dst` is returned as part of the result tuple.
/// No guarantees are made about the remaining bytes in `dst`, i.e., assume their contents are random.
pub(crate) async fn read_at_to_end<B: IoBufMut + Send>(
&self,
blknum: u32,
dst: page_caching::PageBuf,
start: u32,
dst: Slice<B>,
ctx: &RequestContext,
) -> Result<page_caching::ReadResult, io::Error> {
self.rw.read_page(blknum, dst, ctx).await
) -> std::io::Result<(Slice<B>, usize)> {
let file_size_tracking_writer = self.buffered_writer.as_inner();
let flushed_offset = u32::try_from(file_size_tracking_writer.bytes_written())
.expect("we don't allow writing more than u32::MAX bytes");
let buffer = self.buffered_writer.inspect_buffer();
let buffered = &buffer[0..buffer.pending()];
let dst_cap = u32::try_from(dst.bytes_total())
.with_context(|| {
format!(
"read_aligned: dst.bytes_total() is too large: {}",
dst.len()
)
})
.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?;
let end = {
let mut end = start
.checked_add(dst_cap)
.with_context(|| {
format!("read_aligned: offset + dst.bytes_total() is too large: {start} + {dst_cap}",)
})
.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?;
if end > self.bytes_written {
end = self.bytes_written;
}
end
};
// inclusive, exclusive
#[derive(Debug)]
struct Range(u32, u32);
impl Range {
fn len(&self) -> u32 {
if self.0 > self.1 {
0
} else {
self.1 - self.0
}
}
}
let written_range = Range(start, std::cmp::min(end, flushed_offset));
let buffered_range = Range(std::cmp::max(start, flushed_offset), end);
let dst = if written_range.len() > 0 {
let file: &VirtualFile = file_size_tracking_writer.as_inner();
let bounds = dst.bounds();
let slice = file
.read_exact_at(
dst.slice(0..written_range.len() as usize),
start as u64,
ctx,
)
.await?;
Slice::from_buf_bounds(Slice::into_inner(slice), bounds)
} else {
dst
};
let dst = if buffered_range.len() > 0 {
let offset_in_buffer =
usize::try_from(buffered_range.0.checked_sub(flushed_offset).unwrap()).unwrap();
let to_copy =
&buffered[offset_in_buffer..(offset_in_buffer + buffered_range.len() as usize)];
let bounds = dst.bounds();
let mut view = dst.slice(
written_range.len() as usize
..written_range.len() as usize + buffered_range.len() as usize,
);
view.as_mut_rust_slice_full_zeroed()
.copy_from_slice(to_copy);
Slice::from_buf_bounds(Slice::into_inner(view), bounds)
} else {
dst
};
// TODO: in debug mode, randomize the remaining bytes in `dst` to catch bugs
Ok((dst, (end - start) as usize))
}
pub(crate) async fn write_blob(
@@ -87,7 +230,7 @@ impl EphemeralFile {
will_init: bool,
ctx: &RequestContext,
) -> Result<InMemoryLayerIndexValue, io::Error> {
let pos = self.rw.bytes_written();
let pos = self.bytes_written;
let len = u32::try_from(buf.len()).map_err(|e| {
std::io::Error::new(
std::io::ErrorKind::Other,
@@ -104,7 +247,10 @@ impl EphemeralFile {
)
})?;
self.rw.write_all_borrowed(buf, ctx).await?;
self.buffered_writer
.write_buffered_borrowed(buf, ctx)
.await?;
self.bytes_written += len;
Ok(InMemoryLayerIndexValue {
pos,

View File

@@ -1,265 +0,0 @@
//! Wrapper around [`super::zero_padded_read_write::RW`] that uses the
//! [`crate::page_cache`] to serve reads that need to go to the underlying [`VirtualFile`].
use crate::context::RequestContext;
use crate::page_cache::{self, PAGE_SZ};
use crate::virtual_file::owned_buffers_io::io_buf_ext::FullSlice;
use crate::virtual_file::VirtualFile;
use std::io::{self, ErrorKind};
use std::ops::{Deref, Range};
use tokio_epoll_uring::BoundedBuf;
use tracing::*;
use super::zero_padded_read_write;
/// See module-level comment.
pub struct RW {
page_cache_file_id: page_cache::FileId,
rw: super::zero_padded_read_write::RW<PreWarmingWriter>,
/// Gate guard is held on as long as we need to do operations in the path (delete on drop).
_gate_guard: utils::sync::gate::GateGuard,
}
/// Result of [`RW::read_page`].
pub(crate) enum ReadResult<'a> {
EphemeralFileMutableTail(PageBuf, &'a [u8; PAGE_SZ]),
Owned(PageBuf),
}
impl ReadResult<'_> {
pub(crate) fn contents(&self) -> &[u8; PAGE_SZ] {
match self {
ReadResult::EphemeralFileMutableTail(_, buf) => buf,
ReadResult::Owned(buf) => buf.deref(),
}
}
pub(crate) fn into_page_buf(self) -> PageBuf {
match self {
ReadResult::EphemeralFileMutableTail(buf, _) => buf,
ReadResult::Owned(buf) => buf,
}
}
}
pub(crate) struct PageBuf(Box<[u8; PAGE_SZ]>);
impl From<Box<[u8; PAGE_SZ]>> for PageBuf {
fn from(buf: Box<[u8; PAGE_SZ]>) -> Self {
Self(buf)
}
}
impl Deref for PageBuf {
type Target = [u8; PAGE_SZ];
fn deref(&self) -> &Self::Target {
&self.0
}
}
// Safety: `PageBuf` is a fixed-size buffer that is zero-initialized.
unsafe impl tokio_epoll_uring::IoBuf for PageBuf {
fn stable_ptr(&self) -> *const u8 {
self.0.as_ptr()
}
fn bytes_init(&self) -> usize {
self.0.len()
}
fn bytes_total(&self) -> usize {
self.0.len()
}
}
// Safety: the `&mut self` guarantees no aliasing. `set_init` is safe
// because the buffer is always fully initialized.
unsafe impl tokio_epoll_uring::IoBufMut for PageBuf {
fn stable_mut_ptr(&mut self) -> *mut u8 {
self.0.as_mut_ptr()
}
unsafe fn set_init(&mut self, pos: usize) {
// this is a no-op because the buffer is always fully initialized
assert!(pos <= self.0.len());
}
}
impl RW {
pub fn new(file: VirtualFile, _gate_guard: utils::sync::gate::GateGuard) -> Self {
let page_cache_file_id = page_cache::next_file_id();
Self {
page_cache_file_id,
rw: super::zero_padded_read_write::RW::new(PreWarmingWriter::new(file)),
_gate_guard,
}
}
pub fn page_cache_file_id(&self) -> page_cache::FileId {
self.page_cache_file_id
}
pub(crate) async fn write_all_borrowed(
&mut self,
srcbuf: &[u8],
ctx: &RequestContext,
) -> Result<(), io::Error> {
// It doesn't make sense to proactively fill the page cache on the Pageserver write path
// because Compute is unlikely to access recently written data.
self.rw.write_all_borrowed(srcbuf, ctx).await.map(|_| ())
}
pub(crate) fn bytes_written(&self) -> u32 {
self.rw.bytes_written()
}
/// Load all blocks that can be read via [`Self::read_page`] into a contiguous memory buffer.
///
/// This includes the blocks that aren't yet flushed to disk by the internal buffered writer.
/// The last block is zero-padded to [`PAGE_SZ`], so, the returned buffer is always a multiple of [`PAGE_SZ`].
pub(super) async fn load_to_vec(&self, ctx: &RequestContext) -> Result<Vec<u8>, io::Error> {
// round up to the next PAGE_SZ multiple, required by blob_io
let size = {
let s = usize::try_from(self.bytes_written()).unwrap();
if s % PAGE_SZ == 0 {
s
} else {
s.checked_add(PAGE_SZ - (s % PAGE_SZ)).unwrap()
}
};
let vec = Vec::with_capacity(size);
// read from disk what we've already flushed
let writer = self.rw.as_writer();
let flushed_range = writer.written_range();
let mut vec = writer
.file
.read_exact_at(
vec.slice(0..(flushed_range.end - flushed_range.start)),
u64::try_from(flushed_range.start).unwrap(),
ctx,
)
.await?
.into_inner();
// copy from in-memory buffer what we haven't flushed yet but would return when accessed via read_blk
let buffered = self.rw.get_tail_zero_padded();
vec.extend_from_slice(buffered);
assert_eq!(vec.len(), size);
assert_eq!(vec.len() % PAGE_SZ, 0);
Ok(vec)
}
pub(crate) async fn read_page(
&self,
blknum: u32,
buf: PageBuf,
ctx: &RequestContext,
) -> Result<ReadResult, io::Error> {
match self.rw.read_blk(blknum).await? {
zero_padded_read_write::ReadResult::NeedsReadFromWriter { writer } => {
let buf = writer
.file
.read_exact_at(buf.slice_full(), blknum as u64 * PAGE_SZ as u64, ctx)
.await
.map(|slice| slice.into_inner())?;
Ok(ReadResult::Owned(buf))
}
zero_padded_read_write::ReadResult::ServedFromZeroPaddedMutableTail {
buffer: tail_ref,
} => Ok(ReadResult::EphemeralFileMutableTail(buf, tail_ref)),
}
}
}
impl Drop for RW {
fn drop(&mut self) {
// There might still be pages in the [`crate::page_cache`] for this file.
// We leave them there, [`crate::page_cache::PageCache::find_victim`] will evict them when needed.
// unlink the file
// we are clear to do this, because we have entered a gate
let res = std::fs::remove_file(&self.rw.as_writer().file.path);
if let Err(e) = res {
if e.kind() != std::io::ErrorKind::NotFound {
// just never log the not found errors, we cannot do anything for them; on detach
// the tenant directory is already gone.
//
// not found files might also be related to https://github.com/neondatabase/neon/issues/2442
error!(
"could not remove ephemeral file '{}': {}",
self.rw.as_writer().file.path,
e
);
}
}
}
}
struct PreWarmingWriter {
nwritten_blocks: u32,
file: VirtualFile,
}
impl PreWarmingWriter {
fn new(file: VirtualFile) -> Self {
Self {
nwritten_blocks: 0,
file,
}
}
/// Return the byte range within `file` that has been written though `write_all`.
///
/// The returned range would be invalidated by another `write_all`. To prevent that, we capture `&_`.
fn written_range(&self) -> (impl Deref<Target = Range<usize>> + '_) {
let nwritten_blocks = usize::try_from(self.nwritten_blocks).unwrap();
struct Wrapper(Range<usize>);
impl Deref for Wrapper {
type Target = Range<usize>;
fn deref(&self) -> &Range<usize> {
&self.0
}
}
Wrapper(0..nwritten_blocks * PAGE_SZ)
}
}
impl crate::virtual_file::owned_buffers_io::write::OwnedAsyncWriter for PreWarmingWriter {
async fn write_all<Buf: tokio_epoll_uring::IoBuf + Send>(
&mut self,
buf: FullSlice<Buf>,
ctx: &RequestContext,
) -> std::io::Result<(usize, FullSlice<Buf>)> {
let buflen = buf.len();
assert_eq!(
buflen % PAGE_SZ,
0,
"{buflen} ; we know TAIL_SZ is a PAGE_SZ multiple, and write_buffered_borrowed is used"
);
// Do the IO.
let buf = match self.file.write_all(buf, ctx).await {
(buf, Ok(nwritten)) => {
assert_eq!(nwritten, buflen);
buf
}
(_, Err(e)) => {
return Err(std::io::Error::new(
ErrorKind::Other,
// order error before path because path is long and error is short
format!(
"ephemeral_file: write_blob: write-back tail self.nwritten_blocks={}, buflen={}, {:#}: {}",
self.nwritten_blocks, buflen, e, self.file.path,
),
));
}
};
let nblocks = buflen / PAGE_SZ;
let nblocks32 = u32::try_from(nblocks).unwrap();
self.nwritten_blocks = self.nwritten_blocks.checked_add(nblocks32).unwrap();
Ok((buflen, buf))
}
}

View File

@@ -1,151 +0,0 @@
//! The heart of how [`super::EphemeralFile`] does its reads and writes.
//!
//! # Writes
//!
//! [`super::EphemeralFile`] writes small, borrowed buffers using [`RW::write_all_borrowed`].
//! The [`RW`] batches these into [`TAIL_SZ`] bigger writes, using [`owned_buffers_io::write::BufferedWriter`].
//!
//! # Reads
//!
//! [`super::EphemeralFile`] always reads full [`PAGE_SZ`]ed blocks using [`RW::read_blk`].
//!
//! The [`RW`] serves these reads either from the buffered writer's in-memory buffer
//! or redirects the caller to read from the underlying [`OwnedAsyncWriter`]
//! if the read is for the prefix that has already been flushed.
//!
//! # Current Usage
//!
//! The current user of this module is [`super::page_caching::RW`].
mod zero_padded;
use anyhow::Context;
use crate::{
context::RequestContext,
page_cache::PAGE_SZ,
virtual_file::owned_buffers_io::{
self,
write::{Buffer, OwnedAsyncWriter},
},
};
const TAIL_SZ: usize = 64 * 1024;
/// See module-level comment.
pub struct RW<W: OwnedAsyncWriter> {
buffered_writer: owned_buffers_io::write::BufferedWriter<
zero_padded::Buffer<TAIL_SZ>,
owned_buffers_io::util::size_tracking_writer::Writer<W>,
>,
}
pub enum ReadResult<'a, W> {
NeedsReadFromWriter { writer: &'a W },
ServedFromZeroPaddedMutableTail { buffer: &'a [u8; PAGE_SZ] },
}
impl<W> RW<W>
where
W: OwnedAsyncWriter,
{
pub fn new(writer: W) -> Self {
let bytes_flushed_tracker =
owned_buffers_io::util::size_tracking_writer::Writer::new(writer);
let buffered_writer = owned_buffers_io::write::BufferedWriter::new(
bytes_flushed_tracker,
zero_padded::Buffer::default(),
);
Self { buffered_writer }
}
pub(crate) fn as_writer(&self) -> &W {
self.buffered_writer.as_inner().as_inner()
}
pub async fn write_all_borrowed(
&mut self,
buf: &[u8],
ctx: &RequestContext,
) -> std::io::Result<usize> {
self.buffered_writer.write_buffered_borrowed(buf, ctx).await
}
pub fn bytes_written(&self) -> u32 {
let flushed_offset = self.buffered_writer.as_inner().bytes_written();
let flushed_offset = u32::try_from(flushed_offset).with_context(|| format!("buffered_writer.write_buffered_borrowed() disallows sizes larger than u32::MAX: {flushed_offset}")).unwrap();
let buffer: &zero_padded::Buffer<TAIL_SZ> = self.buffered_writer.inspect_buffer();
let buffer_pending = u32::try_from(buffer.pending()).expect("TAIL_SZ is < u32::MAX");
flushed_offset.checked_add(buffer_pending).with_context(|| format!("buffered_writer.write_buffered_borrowed() disallows sizes larger than u32::MAX: {flushed_offset} + {buffer_pending}")).unwrap()
}
/// Get a slice of all blocks that [`Self::read_blk`] would return as [`ReadResult::ServedFromZeroPaddedMutableTail`].
pub fn get_tail_zero_padded(&self) -> &[u8] {
let buffer: &zero_padded::Buffer<TAIL_SZ> = self.buffered_writer.inspect_buffer();
let buffer_written_up_to = buffer.pending();
// pad to next page boundary
let read_up_to = if buffer_written_up_to % PAGE_SZ == 0 {
buffer_written_up_to
} else {
buffer_written_up_to
.checked_add(PAGE_SZ - (buffer_written_up_to % PAGE_SZ))
.unwrap()
};
&buffer.as_zero_padded_slice()[0..read_up_to]
}
pub(crate) async fn read_blk(&self, blknum: u32) -> Result<ReadResult<'_, W>, std::io::Error> {
let flushed_offset =
u32::try_from(self.buffered_writer.as_inner().bytes_written()).expect("");
let buffer: &zero_padded::Buffer<TAIL_SZ> = self.buffered_writer.inspect_buffer();
let buffered_offset = flushed_offset + u32::try_from(buffer.pending()).unwrap();
let page_sz = u32::try_from(PAGE_SZ).unwrap();
let read_offset = blknum.checked_mul(page_sz).unwrap();
// The trailing page ("block") might only be partially filled,
// yet the blob_io code relies on us to return a full PAGE_SZed slice anyway.
// Moreover, it has to be zero-padded, because when we still had
// a write-back page cache, it provided pre-zeroed pages, and blob_io came to rely on it.
// DeltaLayer probably has the same issue, not sure why it needs no special treatment.
// => check here that the read doesn't go beyond this potentially trailing
// => the zero-padding is done in the `else` branch below
let blocks_written = if buffered_offset % page_sz == 0 {
buffered_offset / page_sz
} else {
(buffered_offset / page_sz) + 1
};
if blknum >= blocks_written {
return Err(std::io::Error::new(std::io::ErrorKind::Other, anyhow::anyhow!("read past end of ephemeral_file: read=0x{read_offset:x} buffered=0x{buffered_offset:x} flushed=0x{flushed_offset}")));
}
// assertions for the `if-else` below
assert_eq!(
flushed_offset % (u32::try_from(TAIL_SZ).unwrap()), 0,
"we only use write_buffered_borrowed to write to the buffered writer, so it's guaranteed that flushes happen buffer.cap()-sized chunks"
);
assert_eq!(
flushed_offset % page_sz,
0,
"the logic below can't handle if the page is spread across the flushed part and the buffer"
);
if read_offset < flushed_offset {
assert!(read_offset + page_sz <= flushed_offset);
Ok(ReadResult::NeedsReadFromWriter {
writer: self.as_writer(),
})
} else {
let read_offset_in_buffer = read_offset
.checked_sub(flushed_offset)
.expect("would have taken `if` branch instead of this one");
let read_offset_in_buffer = usize::try_from(read_offset_in_buffer).unwrap();
let zero_padded_slice = buffer.as_zero_padded_slice();
let page = &zero_padded_slice[read_offset_in_buffer..(read_offset_in_buffer + PAGE_SZ)];
Ok(ReadResult::ServedFromZeroPaddedMutableTail {
buffer: page
.try_into()
.expect("the slice above got it as page-size slice"),
})
}
}
}

View File

@@ -1,110 +0,0 @@
//! A [`crate::virtual_file::owned_buffers_io::write::Buffer`] whose
//! unwritten range is guaranteed to be zero-initialized.
//! This is used by [`crate::tenant::ephemeral_file::zero_padded_read_write::RW::read_blk`]
//! to serve page-sized reads of the trailing page when the trailing page has only been partially filled.
use std::mem::MaybeUninit;
use crate::virtual_file::owned_buffers_io::io_buf_ext::FullSlice;
/// See module-level comment.
pub struct Buffer<const N: usize> {
allocation: Box<[u8; N]>,
written: usize,
}
impl<const N: usize> Default for Buffer<N> {
fn default() -> Self {
Self {
allocation: Box::new(
// SAFETY: zeroed memory is a valid [u8; N]
unsafe { MaybeUninit::zeroed().assume_init() },
),
written: 0,
}
}
}
impl<const N: usize> Buffer<N> {
#[inline(always)]
fn invariants(&self) {
// don't check by default, unoptimized is too expensive even for debug mode
if false {
debug_assert!(self.written <= N, "{}", self.written);
debug_assert!(self.allocation[self.written..N].iter().all(|v| *v == 0));
}
}
pub fn as_zero_padded_slice(&self) -> &[u8; N] {
&self.allocation
}
}
impl<const N: usize> crate::virtual_file::owned_buffers_io::write::Buffer for Buffer<N> {
type IoBuf = Self;
fn cap(&self) -> usize {
self.allocation.len()
}
fn extend_from_slice(&mut self, other: &[u8]) {
self.invariants();
let remaining = self.allocation.len() - self.written;
if other.len() > remaining {
panic!("calling extend_from_slice() with insufficient remaining capacity");
}
self.allocation[self.written..(self.written + other.len())].copy_from_slice(other);
self.written += other.len();
self.invariants();
}
fn pending(&self) -> usize {
self.written
}
fn flush(self) -> FullSlice<Self> {
self.invariants();
let written = self.written;
FullSlice::must_new(tokio_epoll_uring::BoundedBuf::slice(self, 0..written))
}
fn reuse_after_flush(iobuf: Self::IoBuf) -> Self {
let Self {
mut allocation,
written,
} = iobuf;
allocation[0..written].fill(0);
let new = Self {
allocation,
written: 0,
};
new.invariants();
new
}
}
/// We have this trait impl so that the `flush` method in the `Buffer` impl above can produce a
/// [`tokio_epoll_uring::BoundedBuf::slice`] of the [`Self::written`] range of the data.
///
/// Remember that bytes_init is generally _not_ a tracker of the amount
/// of valid data in the io buffer; we use `Slice` for that.
/// The `IoBuf` is _only_ for keeping track of uninitialized memory, a bit like MaybeUninit.
///
/// SAFETY:
///
/// The [`Self::allocation`] is stable becauses boxes are stable.
/// The memory is zero-initialized, so, bytes_init is always N.
unsafe impl<const N: usize> tokio_epoll_uring::IoBuf for Buffer<N> {
fn stable_ptr(&self) -> *const u8 {
self.allocation.as_ptr()
}
fn bytes_init(&self) -> usize {
// Yes, N, not self.written; Read the full comment of this impl block!
N
}
fn bytes_total(&self) -> usize {
N
}
}

View File

@@ -6,9 +6,7 @@
//!
use crate::config::PageServerConf;
use crate::context::{PageContentKind, RequestContext, RequestContextBuilder};
use crate::page_cache::PAGE_SZ;
use crate::repository::{Key, Value};
use crate::tenant::ephemeral_file::page_caching::PageBuf;
use crate::tenant::ephemeral_file::EphemeralFile;
use crate::tenant::timeline::GetVectoredError;
use crate::tenant::PageReconstructError;
@@ -24,6 +22,7 @@ use pageserver_api::shard::TenantShardId;
use std::collections::{BTreeMap, HashMap};
use std::sync::{Arc, Mutex, OnceLock};
use std::time::Instant;
use tokio_epoll_uring::BoundedBuf;
use tracing::*;
use utils::{bin_ser::BeSer, id::TimelineId, lsn::Lsn, vec_map::VecMap};
// avoid binding to Write (conflicts with std::io::Write)
@@ -318,40 +317,45 @@ impl InMemoryLayer {
}
}
const DIO_CHUNK_SIZE: usize = 512;
// Plan which parts of which pages need to be appended to which value_buf
struct PageReadDestination<'a> {
struct ChunkReadDestination<'a> {
value_read: &'a ValueRead,
offset_in_page: u32,
offset_in_chunk: u32,
len: u32,
}
// use of BTreeMap's sorted iterator is critical to esnure value_buf is filled in order
let mut page_reads: BTreeMap<u32, Vec<PageReadDestination>> = BTreeMap::new();
let mut chunk_reads: BTreeMap<u32, Vec<ChunkReadDestination>> = BTreeMap::new();
for value_read in reads.iter().flat_map(|(_, v)| v.iter()) {
let ValueRead { pos, len, .. } = value_read;
let mut remaining = usize::try_from(*len).unwrap();
let mut page_no = *pos / (PAGE_SZ as u32);
let mut offset_in_page = usize::try_from(*pos % (PAGE_SZ as u32)).unwrap();
let mut chunk_no = *pos / (DIO_CHUNK_SIZE as u32);
let mut offset_in_chunk = usize::try_from(*pos % (DIO_CHUNK_SIZE as u32)).unwrap();
while remaining > 0 {
let remaining_in_page = std::cmp::min(remaining, PAGE_SZ - offset_in_page);
page_reads
.entry(page_no)
let remaining_in_chunk = std::cmp::min(remaining, DIO_CHUNK_SIZE - offset_in_chunk);
chunk_reads
.entry(chunk_no)
.or_default()
.push(PageReadDestination {
.push(ChunkReadDestination {
value_read,
offset_in_page: offset_in_page as u32,
len: remaining_in_page as u32,
offset_in_chunk: offset_in_chunk as u32,
len: remaining_in_chunk as u32,
});
offset_in_page = 0;
page_no += 1;
remaining -= remaining_in_page;
offset_in_chunk = 0;
chunk_no += 1;
remaining -= remaining_in_chunk;
}
}
// TODO: merge adjacent chunk reads (merging pass on the BTreeMap iterator)
// Execute reads and fill the destination
// TODO: prefetch
let mut page_buf = PageBuf::from(Box::new([0u8; PAGE_SZ]));
for (page_no, dsts) in page_reads.into_iter() {
let all_done = dsts.iter().all(|PageReadDestination { value_read, .. }| {
let get_chunk_buf = || Vec::with_capacity(DIO_CHUNK_SIZE);
let mut chunk_buf = get_chunk_buf();
for (chunk_no, dsts) in chunk_reads.into_iter() {
let all_done = dsts.iter().all(|ChunkReadDestination { value_read, .. }| {
let value_buf = value_read.value_buf.lock().unwrap();
let Ok(buf) = &*value_buf else {
return true; // on Err() there's no need to read more
@@ -361,34 +365,42 @@ impl InMemoryLayer {
if all_done {
continue;
}
let read_result = match inner.file.read_page(page_no, page_buf, &ctx).await {
Ok(read_result) => read_result,
let (tmp, nread) = match inner
.file
.read_at_to_end(
chunk_no * DIO_CHUNK_SIZE as u32,
chunk_buf.slice_full(),
&ctx,
)
.await
{
Ok(t) => t,
Err(e) => {
let e = Arc::new(e);
for PageReadDestination { value_read, .. } in dsts {
for ChunkReadDestination { value_read, .. } in dsts {
*value_read.value_buf.lock().unwrap() = Err(Arc::clone(&e));
// this will make later reads short-circuit, see top of loop body
}
page_buf = PageBuf::from(Box::new([0u8; PAGE_SZ])); // TODO: change read_page API to return the buffer
chunk_buf = get_chunk_buf(); // TODO: change API to return the buffer back on error
continue;
}
};
let page_contents = read_result.contents();
for PageReadDestination {
chunk_buf = tmp.into_inner();
let contents = &chunk_buf[..nread];
for ChunkReadDestination {
value_read,
offset_in_page,
offset_in_chunk,
len,
} in dsts
{
if let Ok(buf) = &mut *value_read.value_buf.lock().unwrap() {
buf.extend_from_slice(
&page_contents[offset_in_page as usize..(offset_in_page + len) as usize],
&contents[offset_in_chunk as usize..(offset_in_chunk + len) as usize],
);
}
}
page_buf = read_result.into_page_buf();
}
drop(page_buf);
drop(chunk_buf);
// Process results into the reconstruct state
'next_key: for (key, value_reads) in reads {
@@ -651,19 +663,6 @@ impl InMemoryLayer {
match l0_flush_global_state {
l0_flush::Inner::Direct { .. } => {
let file_contents: Vec<u8> = inner.file.load_to_vec(ctx).await?;
assert_eq!(
file_contents.len() % PAGE_SZ,
0,
"needed by BlockReaderRef::Slice"
);
assert_eq!(file_contents.len(), {
let written = usize::try_from(inner.file.len()).unwrap();
if written % PAGE_SZ == 0 {
written
} else {
written.checked_add(PAGE_SZ - (written % PAGE_SZ)).unwrap()
}
});
let file_contents = Bytes::from(file_contents);
@@ -675,7 +674,8 @@ impl InMemoryLayer {
len,
will_init,
} = entry;
let buf = file_contents.slice(*pos as usize..(*pos + *len) as usize);
let buf =
Bytes::slice(&file_contents, *pos as usize..(*pos + *len) as usize);
let (_buf, res) = delta_layer_writer
.put_value_bytes(
Key::from_compact(*key),