mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-21 15:10:44 +00:00
Compare commits
4 Commits
problame/w
...
problame/w
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
8b10701a0f | ||
|
|
b937432a04 | ||
|
|
a6d414d6e6 | ||
|
|
0f7f743f37 |
@@ -3,42 +3,28 @@
|
||||
|
||||
use crate::config::PageServerConf;
|
||||
use crate::context::RequestContext;
|
||||
use crate::page_cache::{self, PAGE_SZ};
|
||||
use crate::page_cache;
|
||||
use crate::tenant::block_io::{BlockCursor, BlockLease, BlockReader};
|
||||
use crate::virtual_file::owned_buffers_io::write::OwnedAsyncWriter;
|
||||
use crate::virtual_file::{self, owned_buffers_io, VirtualFile};
|
||||
use crate::virtual_file::{self, VirtualFile};
|
||||
use camino::Utf8PathBuf;
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
|
||||
use std::io;
|
||||
use std::sync::atomic::AtomicU64;
|
||||
use tracing::*;
|
||||
use utils::id::TimelineId;
|
||||
|
||||
pub struct EphemeralFile {
|
||||
page_cache_file_id: page_cache::FileId,
|
||||
|
||||
_tenant_shard_id: TenantShardId,
|
||||
_timeline_id: TimelineId,
|
||||
/// We sandwich the buffered writer between two size-tracking writers.
|
||||
/// This allows us to "elegantly" track in-memory bytes vs flushed bytes,
|
||||
/// enabling [`Self::read_blk`] to determine whether to read from the
|
||||
/// buffered writer's buffer, versus going to the VirtualFile.
|
||||
///
|
||||
/// TODO: longer-term, we probably wand to get rid of this in favor
|
||||
/// of a double-buffering scheme. See this commit's commit message
|
||||
/// and git history for what we had before this sandwich, it might be useful.
|
||||
file: owned_buffers_io::util::size_tracking_writer::Writer<
|
||||
owned_buffers_io::write::BufferedWriter<
|
||||
{ Self::TAIL_SZ },
|
||||
owned_buffers_io::util::size_tracking_writer::Writer<VirtualFile>,
|
||||
>,
|
||||
>,
|
||||
|
||||
rw: page_caching::RW,
|
||||
}
|
||||
|
||||
impl EphemeralFile {
|
||||
const TAIL_SZ: usize = 64 * 1024;
|
||||
mod page_caching;
|
||||
mod zero_padded_buffer;
|
||||
mod zero_padded_read_write;
|
||||
|
||||
impl EphemeralFile {
|
||||
pub async fn create(
|
||||
conf: &PageServerConf,
|
||||
tenant_shard_id: TenantShardId,
|
||||
@@ -62,24 +48,20 @@ impl EphemeralFile {
|
||||
.create(true),
|
||||
)
|
||||
.await?;
|
||||
let file = owned_buffers_io::util::size_tracking_writer::Writer::new(file);
|
||||
let file = owned_buffers_io::write::BufferedWriter::new(file);
|
||||
let file = owned_buffers_io::util::size_tracking_writer::Writer::new(file);
|
||||
|
||||
Ok(EphemeralFile {
|
||||
page_cache_file_id: page_cache::next_file_id(),
|
||||
_tenant_shard_id: tenant_shard_id,
|
||||
_timeline_id: timeline_id,
|
||||
file,
|
||||
rw: page_caching::RW::new(file),
|
||||
})
|
||||
}
|
||||
|
||||
pub(crate) fn len(&self) -> u64 {
|
||||
self.file.bytes_written()
|
||||
self.rw.bytes_written()
|
||||
}
|
||||
|
||||
pub(crate) fn id(&self) -> page_cache::FileId {
|
||||
self.page_cache_file_id
|
||||
pub(crate) fn page_cache_file_id(&self) -> page_cache::FileId {
|
||||
self.rw.page_cache_file_id()
|
||||
}
|
||||
|
||||
pub(crate) async fn read_blk(
|
||||
@@ -87,81 +69,7 @@ impl EphemeralFile {
|
||||
blknum: u32,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<BlockLease, io::Error> {
|
||||
let buffered_offset = self.file.bytes_written();
|
||||
let flushed_offset = self.file.as_inner().as_inner().bytes_written();
|
||||
assert!(buffered_offset >= flushed_offset);
|
||||
let read_offset = (blknum as u64) * (PAGE_SZ as u64);
|
||||
|
||||
assert_eq!(
|
||||
flushed_offset % (PAGE_SZ as u64),
|
||||
0,
|
||||
"we need this in the logic below, because it assumes the page isn't spread across flushed part and in-memory buffer"
|
||||
);
|
||||
|
||||
if read_offset < flushed_offset {
|
||||
assert!(
|
||||
read_offset + (PAGE_SZ as u64) <= flushed_offset,
|
||||
"this impl can't deal with pages spread across flushed & buffered part"
|
||||
);
|
||||
let cache = page_cache::get();
|
||||
match cache
|
||||
.read_immutable_buf(self.page_cache_file_id, blknum, ctx)
|
||||
.await
|
||||
.map_err(|e| {
|
||||
std::io::Error::new(
|
||||
std::io::ErrorKind::Other,
|
||||
// order path before error because error is anyhow::Error => might have many contexts
|
||||
format!(
|
||||
"ephemeral file: read immutable page #{}: {}: {:#}",
|
||||
blknum,
|
||||
self.file.as_inner().as_inner().as_inner().path,
|
||||
e,
|
||||
),
|
||||
)
|
||||
})? {
|
||||
page_cache::ReadBufResult::Found(guard) => {
|
||||
return Ok(BlockLease::PageReadGuard(guard))
|
||||
}
|
||||
page_cache::ReadBufResult::NotFound(write_guard) => {
|
||||
let write_guard = self
|
||||
.file
|
||||
.as_inner()
|
||||
.as_inner()
|
||||
.as_inner()
|
||||
.read_exact_at_page(write_guard, blknum as u64 * PAGE_SZ as u64)
|
||||
.await?;
|
||||
let read_guard = write_guard.mark_valid();
|
||||
return Ok(BlockLease::PageReadGuard(read_guard));
|
||||
}
|
||||
};
|
||||
} else {
|
||||
let read_until_offset = read_offset + (PAGE_SZ as u64);
|
||||
if !(0..buffered_offset).contains(&read_until_offset) {
|
||||
// The blob_io code relies on the reader allowing reads past
|
||||
// the end of what was written, up to end of the current PAGE_SZ chunk.
|
||||
// This is a relict of the past where we would get a pre-zeroed page from the page cache.
|
||||
//
|
||||
// DeltaLayer probably has the same issue, not sure why it needs no special treatment.
|
||||
let nbytes_past_end = read_until_offset.checked_sub(buffered_offset).unwrap();
|
||||
if nbytes_past_end >= (PAGE_SZ as u64) {
|
||||
// TODO: treat this as error. Pre-existing issue before this patch.
|
||||
panic!(
|
||||
"return IO error: read past end of file: read=0x{read_offset:x} buffered=0x{buffered_offset:x} flushed=0x{flushed_offset}"
|
||||
)
|
||||
}
|
||||
}
|
||||
let buffer: &[u8; Self::TAIL_SZ] = self.file.as_inner().inspect_buffer();
|
||||
let read_offset_in_buffer = read_offset
|
||||
.checked_sub(flushed_offset)
|
||||
.expect("would have taken `if` branch instead of this one");
|
||||
|
||||
let read_offset_in_buffer = usize::try_from(read_offset_in_buffer).unwrap();
|
||||
let page = &buffer[read_offset_in_buffer..(read_offset_in_buffer + PAGE_SZ)];
|
||||
Ok(BlockLease::EphemeralFileMutableTail(
|
||||
page.try_into()
|
||||
.expect("the slice above got it as page-size slice"),
|
||||
))
|
||||
}
|
||||
self.rw.read_blk(blknum, ctx).await
|
||||
}
|
||||
|
||||
pub(crate) async fn write_blob(
|
||||
@@ -169,24 +77,22 @@ impl EphemeralFile {
|
||||
srcbuf: &[u8],
|
||||
_ctx: &RequestContext,
|
||||
) -> Result<u64, io::Error> {
|
||||
let pos = self.file.bytes_written();
|
||||
let pos = self.rw.bytes_written();
|
||||
|
||||
// Write the length field
|
||||
if srcbuf.len() < 0x80 {
|
||||
// short one-byte length header
|
||||
let len_buf = [srcbuf.len() as u8];
|
||||
|
||||
self.file.write_all_borrowed(&len_buf).await?;
|
||||
self.rw.write_all_borrowed(&len_buf).await?;
|
||||
} else {
|
||||
let mut len_buf = u32::to_be_bytes(srcbuf.len() as u32);
|
||||
len_buf[0] |= 0x80;
|
||||
self.file.write_all_borrowed(&len_buf).await?;
|
||||
self.rw.write_all_borrowed(&len_buf).await?;
|
||||
}
|
||||
|
||||
// Write the payload
|
||||
self.file.write_all_borrowed(srcbuf).await?;
|
||||
|
||||
// TODO: bring back pre-warming of page cache, using another sandwich layer
|
||||
self.rw.write_all_borrowed(srcbuf).await?;
|
||||
|
||||
Ok(pos)
|
||||
}
|
||||
@@ -201,29 +107,6 @@ pub fn is_ephemeral_file(filename: &str) -> bool {
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for EphemeralFile {
|
||||
fn drop(&mut self) {
|
||||
// There might still be pages in the [`crate::page_cache`] for this file.
|
||||
// We leave them there, [`crate::page_cache::PageCache::find_victim`] will evict them when needed.
|
||||
|
||||
// unlink the file
|
||||
let res = std::fs::remove_file(&self.file.as_inner().as_inner().as_inner().path);
|
||||
if let Err(e) = res {
|
||||
if e.kind() != std::io::ErrorKind::NotFound {
|
||||
// just never log the not found errors, we cannot do anything for them; on detach
|
||||
// the tenant directory is already gone.
|
||||
//
|
||||
// not found files might also be related to https://github.com/neondatabase/neon/issues/2442
|
||||
error!(
|
||||
"could not remove ephemeral file '{}': {}",
|
||||
self.file.as_inner().as_inner().as_inner().path,
|
||||
e
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl BlockReader for EphemeralFile {
|
||||
fn block_cursor(&self) -> super::block_io::BlockCursor<'_> {
|
||||
BlockCursor::new(super::block_io::BlockReaderRef::EphemeralFile(self))
|
||||
|
||||
105
pageserver/src/tenant/ephemeral_file/page_caching.rs
Normal file
105
pageserver/src/tenant/ephemeral_file/page_caching.rs
Normal file
@@ -0,0 +1,105 @@
|
||||
//! Wrapper around [`super::zero_padded_read_write::RW`] that uses the
|
||||
//! [`crate::page_cache`] to serve reads that need to go to the underlying [`VirtualFile`].
|
||||
|
||||
use crate::context::RequestContext;
|
||||
use crate::page_cache::{self, PAGE_SZ};
|
||||
use crate::tenant::block_io::BlockLease;
|
||||
use crate::virtual_file::VirtualFile;
|
||||
|
||||
use std::io;
|
||||
use tracing::*;
|
||||
|
||||
use super::zero_padded_read_write;
|
||||
|
||||
/// See module-level comment.
|
||||
pub struct RW {
|
||||
page_cache_file_id: page_cache::FileId,
|
||||
rw: super::zero_padded_read_write::RW,
|
||||
}
|
||||
|
||||
impl RW {
|
||||
pub fn new(file: VirtualFile) -> Self {
|
||||
Self {
|
||||
page_cache_file_id: page_cache::next_file_id(),
|
||||
rw: super::zero_padded_read_write::RW::new(file),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn page_cache_file_id(&self) -> page_cache::FileId {
|
||||
self.page_cache_file_id
|
||||
}
|
||||
|
||||
pub(crate) async fn write_all_borrowed(&mut self, srcbuf: &[u8]) -> Result<usize, io::Error> {
|
||||
// It doesn't make sense to proactively fill the page cache on the Pageserver write path
|
||||
// because Compute is unlikely to access recently written data.
|
||||
self.rw.write_all_borrowed(srcbuf).await
|
||||
}
|
||||
|
||||
pub(crate) fn bytes_written(&self) -> u64 {
|
||||
self.rw.bytes_written()
|
||||
}
|
||||
|
||||
pub(crate) async fn read_blk(
|
||||
&self,
|
||||
blknum: u32,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<BlockLease, io::Error> {
|
||||
match self.rw.read_blk(blknum).await? {
|
||||
zero_padded_read_write::ReadResult::NeedsReadFromVirtualFile { virtual_file } => {
|
||||
let cache = page_cache::get();
|
||||
match cache
|
||||
.read_immutable_buf(self.page_cache_file_id, blknum, ctx)
|
||||
.await
|
||||
.map_err(|e| {
|
||||
std::io::Error::new(
|
||||
std::io::ErrorKind::Other,
|
||||
// order path before error because error is anyhow::Error => might have many contexts
|
||||
format!(
|
||||
"ephemeral file: read immutable page #{}: {}: {:#}",
|
||||
blknum,
|
||||
self.rw.as_inner_virtual_file().path,
|
||||
e,
|
||||
),
|
||||
)
|
||||
})? {
|
||||
page_cache::ReadBufResult::Found(guard) => {
|
||||
return Ok(BlockLease::PageReadGuard(guard))
|
||||
}
|
||||
page_cache::ReadBufResult::NotFound(write_guard) => {
|
||||
let write_guard = virtual_file
|
||||
.read_exact_at_page(write_guard, blknum as u64 * PAGE_SZ as u64)
|
||||
.await?;
|
||||
let read_guard = write_guard.mark_valid();
|
||||
return Ok(BlockLease::PageReadGuard(read_guard));
|
||||
}
|
||||
}
|
||||
}
|
||||
zero_padded_read_write::ReadResult::ServedFromZeroPaddedMutableTail { buffer } => {
|
||||
Ok(BlockLease::EphemeralFileMutableTail(buffer))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for RW {
|
||||
fn drop(&mut self) {
|
||||
// There might still be pages in the [`crate::page_cache`] for this file.
|
||||
// We leave them there, [`crate::page_cache::PageCache::find_victim`] will evict them when needed.
|
||||
|
||||
// unlink the file
|
||||
let res = std::fs::remove_file(&self.rw.as_inner_virtual_file().path);
|
||||
if let Err(e) = res {
|
||||
if e.kind() != std::io::ErrorKind::NotFound {
|
||||
// just never log the not found errors, we cannot do anything for them; on detach
|
||||
// the tenant directory is already gone.
|
||||
//
|
||||
// not found files might also be related to https://github.com/neondatabase/neon/issues/2442
|
||||
error!(
|
||||
"could not remove ephemeral file '{}': {}",
|
||||
self.rw.as_inner_virtual_file().path,
|
||||
e
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
124
pageserver/src/tenant/ephemeral_file/zero_padded_read_write.rs
Normal file
124
pageserver/src/tenant/ephemeral_file/zero_padded_read_write.rs
Normal file
@@ -0,0 +1,124 @@
|
||||
//! The heart of how [`super::EphemeralFile`] does its reads and writes.
|
||||
//!
|
||||
//! # Writes
|
||||
//!
|
||||
//! [`super::EphemeralFile`] writes small, borrowed buffers using [`RW::write_all_borrowed`].
|
||||
//! The [`RW`] batches these into [`RW::TAIL_SZ`] bigger writes, using [`owned_buffers_io::write::BufferedWriter`].
|
||||
//!
|
||||
//! # Reads
|
||||
//!
|
||||
//! [`super::EphemeralFile`] always reads full [`PAGE_SZ`]ed blocks using [`RW::read_blk`].
|
||||
//!
|
||||
//! The [`RW`] serves these reads either from the buffered writer's in-memory buffer
|
||||
//! or redirects the caller to read from the underlying [`VirtualFile`]` if they have already
|
||||
//! been flushed.
|
||||
//!
|
||||
//! The current caller is [`super::page_caching::RW`]. In case it gets redirected to read from
|
||||
//! [`VirtualFile`], it consults the [`crate::page_cache`] first.
|
||||
|
||||
mod zero_padded_buffer;
|
||||
|
||||
use crate::{
|
||||
page_cache::PAGE_SZ,
|
||||
tenant::ephemeral_file::zero_padded_buffer,
|
||||
virtual_file::{
|
||||
owned_buffers_io::{self, write::Buffer},
|
||||
VirtualFile,
|
||||
},
|
||||
};
|
||||
|
||||
/// See module-level comment.
|
||||
pub struct RW {
|
||||
buffered_writer: owned_buffers_io::write::BufferedWriter<
|
||||
zero_padded_buffer::Buf<{ Self::TAIL_SZ }>,
|
||||
owned_buffers_io::util::size_tracking_writer::Writer<VirtualFile>,
|
||||
>,
|
||||
}
|
||||
|
||||
pub enum ReadResult<'a> {
|
||||
NeedsReadFromVirtualFile { virtual_file: &'a VirtualFile },
|
||||
ServedFromZeroPaddedMutableTail { buffer: &'a [u8; PAGE_SZ] },
|
||||
}
|
||||
|
||||
impl RW {
|
||||
const TAIL_SZ: usize = 64 * 1024;
|
||||
|
||||
pub fn new(file: VirtualFile) -> Self {
|
||||
let bytes_flushed_tracker = owned_buffers_io::util::size_tracking_writer::Writer::new(file);
|
||||
let buffered_writer = owned_buffers_io::write::BufferedWriter::new(
|
||||
bytes_flushed_tracker,
|
||||
zero_padded_buffer::Buf::default(),
|
||||
);
|
||||
Self { buffered_writer }
|
||||
}
|
||||
|
||||
pub(crate) fn as_inner_virtual_file(&self) -> &VirtualFile {
|
||||
self.buffered_writer.as_inner().as_inner()
|
||||
}
|
||||
|
||||
pub async fn write_all_borrowed(&mut self, buf: &[u8]) -> std::io::Result<usize> {
|
||||
self.buffered_writer.write_buffered_borrowed(buf).await
|
||||
}
|
||||
|
||||
pub fn bytes_written(&self) -> u64 {
|
||||
let flushed_offset = self.buffered_writer.as_inner().bytes_written();
|
||||
let buffer: &zero_padded_buffer::Buf<{ Self::TAIL_SZ }> =
|
||||
self.buffered_writer.inspect_buffer();
|
||||
flushed_offset + u64::try_from(buffer.pending()).unwrap()
|
||||
}
|
||||
|
||||
pub(crate) async fn read_blk(&self, blknum: u32) -> Result<ReadResult, std::io::Error> {
|
||||
let flushed_offset = self.buffered_writer.as_inner().bytes_written();
|
||||
let buffer: &zero_padded_buffer::Buf<{ Self::TAIL_SZ }> =
|
||||
self.buffered_writer.inspect_buffer();
|
||||
let buffered_offset = flushed_offset + u64::try_from(buffer.pending()).unwrap();
|
||||
let read_offset = (blknum as u64) * (PAGE_SZ as u64);
|
||||
|
||||
assert_eq!(
|
||||
flushed_offset % (Self::TAIL_SZ as u64), 0,
|
||||
"we only use write_buffered_borrowed to write to the buffered writer, so it's guaranteed that flushes happen buffer.cap()-sized chunks"
|
||||
);
|
||||
assert_eq!(
|
||||
flushed_offset % (PAGE_SZ as u64),
|
||||
0,
|
||||
"the logic below can't handle if the page is spread across the flushed part and the buffer"
|
||||
);
|
||||
|
||||
if read_offset < flushed_offset {
|
||||
assert!(
|
||||
read_offset + (PAGE_SZ as u64) <= flushed_offset,
|
||||
"this impl can't deal with pages spread across flushed & buffered part"
|
||||
);
|
||||
Ok(ReadResult::NeedsReadFromVirtualFile {
|
||||
virtual_file: self.as_inner_virtual_file(),
|
||||
})
|
||||
} else {
|
||||
let read_until_offset = read_offset + (PAGE_SZ as u64);
|
||||
if !(0..buffered_offset).contains(&read_until_offset) {
|
||||
// The blob_io code relies on the reader allowing reads past
|
||||
// the end of what was written, up to end of the current PAGE_SZ chunk.
|
||||
// This is a relict of the past where we would get a pre-zeroed page from the page cache.
|
||||
//
|
||||
// DeltaLayer probably has the same issue, not sure why it needs no special treatment.
|
||||
let nbytes_past_end = read_until_offset.checked_sub(buffered_offset).unwrap();
|
||||
if nbytes_past_end >= (PAGE_SZ as u64) {
|
||||
// TODO: treat this as error. Pre-existing issue before this patch.
|
||||
panic!(
|
||||
"return IO error: read past end of file: read=0x{read_offset:x} buffered=0x{buffered_offset:x} flushed=0x{flushed_offset}"
|
||||
)
|
||||
}
|
||||
}
|
||||
let read_offset_in_buffer = read_offset
|
||||
.checked_sub(flushed_offset)
|
||||
.expect("would have taken `if` branch instead of this one");
|
||||
let read_offset_in_buffer = usize::try_from(read_offset_in_buffer).unwrap();
|
||||
let zero_padded_slice = buffer.as_zero_padded_slice();
|
||||
let page = &zero_padded_slice[read_offset_in_buffer..(read_offset_in_buffer + PAGE_SZ)];
|
||||
Ok(ReadResult::ServedFromZeroPaddedMutableTail {
|
||||
buffer: page
|
||||
.try_into()
|
||||
.expect("the slice above got it as page-size slice"),
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,98 @@
|
||||
use std::mem::MaybeUninit;
|
||||
|
||||
pub struct Buf<const N: usize> {
|
||||
allocation: Box<[u8; N]>,
|
||||
written: usize,
|
||||
}
|
||||
|
||||
impl<const N: usize> Default for Buf<N> {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
allocation: Box::new(
|
||||
// SAFETY: zeroed memory is a valid [u8; N]
|
||||
unsafe { MaybeUninit::zeroed().assume_init() },
|
||||
),
|
||||
written: 0,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<const N: usize> Buf<N> {
|
||||
#[inline(always)]
|
||||
fn invariants(&self) {
|
||||
// don't check by default, unoptimized is too expensive even for debug mode
|
||||
if false {
|
||||
debug_assert!(self.written <= N, "{}", self.written);
|
||||
debug_assert!(self.allocation[self.written..N].iter().all(|v| *v == 0));
|
||||
}
|
||||
}
|
||||
|
||||
pub fn as_zero_padded_slice(&self) -> &[u8; N] {
|
||||
&self.allocation
|
||||
}
|
||||
}
|
||||
|
||||
impl<const N: usize> crate::virtual_file::owned_buffers_io::write::Buffer for Buf<N> {
|
||||
type IoBuf = Self;
|
||||
|
||||
fn cap(&self) -> usize {
|
||||
self.allocation.len()
|
||||
}
|
||||
|
||||
fn extend_from_slice(&mut self, other: &[u8]) {
|
||||
self.invariants();
|
||||
let remaining = self.cap() - other.len();
|
||||
if other.len() > remaining {
|
||||
panic!("calling extend_from_slice() with insufficient remaining capacity");
|
||||
}
|
||||
self.allocation[self.written..(self.written + other.len())].copy_from_slice(other);
|
||||
self.written += other.len();
|
||||
self.invariants();
|
||||
}
|
||||
|
||||
fn pending(&self) -> usize {
|
||||
self.written
|
||||
}
|
||||
|
||||
fn flush(self) -> tokio_epoll_uring::Slice<Self> {
|
||||
self.invariants();
|
||||
let written = self.written;
|
||||
tokio_epoll_uring::BoundedBuf::slice(self, 0..written)
|
||||
}
|
||||
|
||||
fn reuse_after_flush(iobuf: Self::IoBuf) -> Self {
|
||||
let Self {
|
||||
mut allocation,
|
||||
written,
|
||||
} = iobuf;
|
||||
allocation[0..written].fill(0);
|
||||
let new = Self {
|
||||
allocation,
|
||||
written: 0,
|
||||
};
|
||||
new.invariants();
|
||||
new
|
||||
}
|
||||
}
|
||||
|
||||
/// We have this implementation so that [`Buf<N>`] can be used with [`crate::virtual_file::owned_buffers_io::write::BufferedWriter`].
|
||||
///
|
||||
///
|
||||
/// SAFETY:
|
||||
///
|
||||
/// The [`Self::allocation`] is stable becauses boxes are stable.
|
||||
/// The memory is zero-initialized, so, bytes_init is always N.
|
||||
///
|
||||
unsafe impl<const N: usize> tokio_epoll_uring::IoBuf for Buf<N> {
|
||||
fn stable_ptr(&self) -> *const u8 {
|
||||
self.allocation.as_ptr()
|
||||
}
|
||||
|
||||
fn bytes_init(&self) -> usize {
|
||||
N
|
||||
}
|
||||
|
||||
fn bytes_total(&self) -> usize {
|
||||
N
|
||||
}
|
||||
}
|
||||
@@ -7,6 +7,7 @@ use std::collections::HashSet;
|
||||
use std::future::Future;
|
||||
|
||||
use anyhow::{anyhow, Context};
|
||||
use bytes::BytesMut;
|
||||
use camino::{Utf8Path, Utf8PathBuf};
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use tokio::fs::{self, File, OpenOptions};
|
||||
@@ -194,10 +195,10 @@ async fn download_object<'a>(
|
||||
// There's chunks_vectored() on the stream.
|
||||
let (bytes_amount, destination_file) = async {
|
||||
let size_tracking = size_tracking_writer::Writer::new(destination_file);
|
||||
let mut buffered = owned_buffers_io::write::BufferedWriter::<
|
||||
{ super::BUFFER_SIZE },
|
||||
_,
|
||||
>::new(size_tracking);
|
||||
let mut buffered = owned_buffers_io::write::BufferedWriter::<BytesMut, _>::new(
|
||||
size_tracking,
|
||||
BytesMut::with_capacity(super::BUFFER_SIZE),
|
||||
);
|
||||
while let Some(res) =
|
||||
futures::StreamExt::next(&mut download.download_stream).await
|
||||
{
|
||||
|
||||
@@ -454,7 +454,7 @@ impl InMemoryLayer {
|
||||
trace!("initializing new empty InMemoryLayer for writing on timeline {timeline_id} at {start_lsn}");
|
||||
|
||||
let file = EphemeralFile::create(conf, tenant_shard_id, timeline_id).await?;
|
||||
let key = InMemoryLayerFileId(file.id());
|
||||
let key = InMemoryLayerFileId(file.page_cache_file_id());
|
||||
|
||||
Ok(InMemoryLayer {
|
||||
file_id: key,
|
||||
|
||||
@@ -1093,12 +1093,6 @@ impl OwnedAsyncWriter for VirtualFile {
|
||||
let (buf, res) = VirtualFile::write_all(self, buf).await;
|
||||
res.map(move |v| (v, buf))
|
||||
}
|
||||
|
||||
#[inline(always)]
|
||||
async fn write_all_borrowed(&mut self, _buf: &[u8]) -> std::io::Result<usize> {
|
||||
// TODO: ensure this through the type system
|
||||
panic!("this should not happen");
|
||||
}
|
||||
}
|
||||
|
||||
impl OpenFiles {
|
||||
|
||||
@@ -39,11 +39,4 @@ where
|
||||
self.bytes_amount += u64::try_from(nwritten).unwrap();
|
||||
Ok((nwritten, buf))
|
||||
}
|
||||
|
||||
#[inline(always)]
|
||||
async fn write_all_borrowed(&mut self, buf: &[u8]) -> std::io::Result<usize> {
|
||||
let nwritten = self.dst.write_all_borrowed(buf).await?;
|
||||
self.bytes_amount += u64::try_from(nwritten).unwrap();
|
||||
Ok(nwritten)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
use bytes::BytesMut;
|
||||
use tokio_epoll_uring::{BoundedBuf, IoBuf, Slice};
|
||||
|
||||
/// A trait for doing owned-buffer write IO.
|
||||
@@ -7,17 +8,16 @@ pub trait OwnedAsyncWriter {
|
||||
&mut self,
|
||||
buf: B,
|
||||
) -> std::io::Result<(usize, B::Buf)>;
|
||||
async fn write_all_borrowed(&mut self, buf: &[u8]) -> std::io::Result<usize>;
|
||||
}
|
||||
|
||||
/// A wrapper aorund an [`OwnedAsyncWriter`] that batches smaller writers
|
||||
/// into `BUFFER_SIZE`-sized writes.
|
||||
/// A wrapper aorund an [`OwnedAsyncWriter`] that uses a [`Buffer`] to batch
|
||||
/// small writes into larger writes of size [`Buffer::cap`].
|
||||
///
|
||||
/// # Passthrough Of Large Writers
|
||||
///
|
||||
/// Buffered writes larger than the `BUFFER_SIZE` cause the internal
|
||||
/// buffer to be flushed, even if it is not full yet. Then, the large
|
||||
/// buffered write is passed through to the unerlying [`OwnedAsyncWriter`].
|
||||
/// Calls to [`BufferedWriter::write_buffered`] that are larger than [`Buffer::cap`]
|
||||
/// cause the internal buffer to be flushed prematurely so that the large
|
||||
/// buffered write is passed through to the underlying [`OwnedAsyncWriter`].
|
||||
///
|
||||
/// This pass-through is generally beneficial for throughput, but if
|
||||
/// the storage backend of the [`OwnedAsyncWriter`] is a shared resource,
|
||||
@@ -25,26 +25,25 @@ pub trait OwnedAsyncWriter {
|
||||
///
|
||||
/// In such cases, a different implementation that always buffers in memory
|
||||
/// may be preferable.
|
||||
pub struct BufferedWriter<const BUFFER_SIZE: usize, W> {
|
||||
pub struct BufferedWriter<B, W> {
|
||||
writer: W,
|
||||
// invariant: always remains Some(buf)
|
||||
// with buf.capacity() == BUFFER_SIZE except
|
||||
// - while IO is ongoing => goes back to Some() once the IO completed successfully
|
||||
// - after an IO error => stays `None` forever
|
||||
// In these exceptional cases, it's `None`.
|
||||
buf: Option<zero_initialized_buffer::Buf<BUFFER_SIZE>>,
|
||||
/// invariant: always remains Some(buf) except
|
||||
/// - while IO is ongoing => goes back to Some() once the IO completed successfully
|
||||
/// - after an IO error => stays `None` forever
|
||||
/// In these exceptional cases, it's `None`.
|
||||
buf: Option<B>,
|
||||
}
|
||||
|
||||
mod zero_initialized_buffer;
|
||||
|
||||
impl<const BUFFER_SIZE: usize, W> BufferedWriter<BUFFER_SIZE, W>
|
||||
impl<B, Buf, W> BufferedWriter<B, W>
|
||||
where
|
||||
B: Buffer<IoBuf = Buf> + Send,
|
||||
Buf: IoBuf + Send,
|
||||
W: OwnedAsyncWriter,
|
||||
{
|
||||
pub fn new(writer: W) -> Self {
|
||||
pub fn new(writer: W, buf: B) -> Self {
|
||||
Self {
|
||||
writer,
|
||||
buf: Some(zero_initialized_buffer::Buf::default()),
|
||||
buf: Some(buf),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -52,13 +51,9 @@ where
|
||||
&self.writer
|
||||
}
|
||||
|
||||
/// panics if used after an error
|
||||
pub fn inspect_buffer(&self) -> &[u8; BUFFER_SIZE] {
|
||||
self.buf
|
||||
.as_ref()
|
||||
// TODO: can this happen on the EphemeralFile read path?
|
||||
.expect("must not use after an error")
|
||||
.as_zero_padded_slice()
|
||||
/// Panics if used after any of the write paths returned an error
|
||||
pub fn inspect_buffer(&self) -> &B {
|
||||
self.buf()
|
||||
}
|
||||
|
||||
pub async fn flush_and_into_inner(mut self) -> std::io::Result<W> {
|
||||
@@ -68,20 +63,27 @@ where
|
||||
Ok(writer)
|
||||
}
|
||||
|
||||
pub async fn write_buffered<B: IoBuf>(&mut self, chunk: Slice<B>) -> std::io::Result<(usize, B)>
|
||||
#[inline(always)]
|
||||
fn buf(&self) -> &B {
|
||||
self.buf
|
||||
.as_ref()
|
||||
.expect("must not use after we returned an error")
|
||||
}
|
||||
|
||||
pub async fn write_buffered<S: IoBuf>(&mut self, chunk: Slice<S>) -> std::io::Result<(usize, S)>
|
||||
where
|
||||
B: IoBuf + Send,
|
||||
S: IoBuf + Send,
|
||||
{
|
||||
let chunk_len = chunk.len();
|
||||
// avoid memcpy for the middle of the chunk
|
||||
if chunk.len() >= BUFFER_SIZE {
|
||||
if chunk.len() >= self.buf().cap() {
|
||||
self.flush().await?;
|
||||
// do a big write, bypassing `buf`
|
||||
assert_eq!(
|
||||
self.buf
|
||||
.as_ref()
|
||||
.expect("must not use after an error")
|
||||
.len(),
|
||||
.pending(),
|
||||
0
|
||||
);
|
||||
let (nwritten, chunk) = self.writer.write_all(chunk).await?;
|
||||
@@ -89,17 +91,17 @@ where
|
||||
return Ok((nwritten, chunk));
|
||||
}
|
||||
// in-memory copy the < BUFFER_SIZED tail of the chunk
|
||||
assert!(chunk.len() < BUFFER_SIZE);
|
||||
assert!(chunk.len() < self.buf().cap());
|
||||
let mut slice = &chunk[..];
|
||||
while !slice.is_empty() {
|
||||
let buf = self.buf.as_mut().expect("must not use after an error");
|
||||
let need = BUFFER_SIZE - buf.len();
|
||||
let need = buf.cap() - buf.pending();
|
||||
let have = slice.len();
|
||||
let n = std::cmp::min(need, have);
|
||||
buf.extend_from_slice(&slice[..n]);
|
||||
slice = &slice[n..];
|
||||
if buf.len() >= BUFFER_SIZE {
|
||||
assert_eq!(buf.len(), BUFFER_SIZE);
|
||||
if buf.pending() >= buf.cap() {
|
||||
assert_eq!(buf.pending(), buf.cap());
|
||||
self.flush().await?;
|
||||
}
|
||||
}
|
||||
@@ -107,19 +109,22 @@ where
|
||||
Ok((chunk_len, chunk.into_inner()))
|
||||
}
|
||||
|
||||
/// Always goes through the internal buffer.
|
||||
/// Guaranteed to never invoke [`OwnedAsyncWriter::write_all_borrowed`] on the underlying.
|
||||
pub async fn write_all_borrowed(&mut self, mut chunk: &[u8]) -> std::io::Result<usize> {
|
||||
/// Strictly less performant variant of [`Self::write_buffered`] that allows writing borrowed data.
|
||||
///
|
||||
/// It is less performant because we always have to copy the borrowed data into the internal buffer
|
||||
/// before we can do the IO. The [`Self::write_buffered`] can avoid this, which is more performant
|
||||
/// for large writes.
|
||||
pub async fn write_buffered_borrowed(&mut self, mut chunk: &[u8]) -> std::io::Result<usize> {
|
||||
let chunk_len = chunk.len();
|
||||
while !chunk.is_empty() {
|
||||
let buf = self.buf.as_mut().expect("must not use after an error");
|
||||
let need = BUFFER_SIZE - buf.len();
|
||||
let need = buf.cap() - buf.pending();
|
||||
let have = chunk.len();
|
||||
let n = std::cmp::min(need, have);
|
||||
buf.extend_from_slice(&chunk[..n]);
|
||||
chunk = &chunk[n..];
|
||||
if buf.len() >= BUFFER_SIZE {
|
||||
assert_eq!(buf.len(), BUFFER_SIZE);
|
||||
if buf.pending() >= buf.cap() {
|
||||
assert_eq!(buf.pending(), buf.cap());
|
||||
self.flush().await?;
|
||||
}
|
||||
}
|
||||
@@ -128,37 +133,70 @@ where
|
||||
|
||||
async fn flush(&mut self) -> std::io::Result<()> {
|
||||
let buf = self.buf.take().expect("must not use after an error");
|
||||
if buf.is_empty() {
|
||||
let buf_len = buf.pending();
|
||||
if buf_len == 0 {
|
||||
self.buf = Some(buf);
|
||||
return std::io::Result::Ok(());
|
||||
return Ok(());
|
||||
}
|
||||
let buf_len = buf.len();
|
||||
let (nwritten, mut buf) = self.writer.write_all(buf).await?;
|
||||
let (nwritten, io_buf) = self.writer.write_all(buf.flush()).await?;
|
||||
assert_eq!(nwritten, buf_len);
|
||||
buf.clear();
|
||||
self.buf = Some(buf);
|
||||
self.buf = Some(Buffer::reuse_after_flush(io_buf));
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl<const BUFFER_SIZE: usize, W: OwnedAsyncWriter> OwnedAsyncWriter
|
||||
for BufferedWriter<BUFFER_SIZE, W>
|
||||
{
|
||||
/// A [`Buffer`] is used by [`BufferedWriter`] to batch smaller writes into larger ones.
|
||||
pub trait Buffer {
|
||||
type IoBuf: IoBuf;
|
||||
|
||||
/// Capacity of the buffer. Must not change over the lifetime `self`.`
|
||||
fn cap(&self) -> usize;
|
||||
|
||||
/// Add data to the buffer.
|
||||
/// Panics if there is not enough room to accomodate `other`'s content, i.e.,
|
||||
/// panics if `other.len() > self.cap() - self.pending()`.
|
||||
fn extend_from_slice(&mut self, other: &[u8]);
|
||||
|
||||
/// Number of bytes in the buffer.
|
||||
fn pending(&self) -> usize;
|
||||
|
||||
/// Turns `self` into a [`tokio_epoll_uring::Slice`] of the pending data
|
||||
/// so we can use [`tokio_epoll_uring`] to write it to disk.
|
||||
fn flush(self) -> Slice<Self::IoBuf>;
|
||||
|
||||
/// After the write to disk is done and we have gotten back the slice,
|
||||
/// [`BufferedWriter`] uses this method to re-use the io buffer.
|
||||
fn reuse_after_flush(iobuf: Self::IoBuf) -> Self;
|
||||
}
|
||||
|
||||
impl Buffer for BytesMut {
|
||||
type IoBuf = BytesMut;
|
||||
|
||||
#[inline(always)]
|
||||
async fn write_all<B: BoundedBuf<Buf = Buf>, Buf: IoBuf + Send>(
|
||||
&mut self,
|
||||
buf: B,
|
||||
) -> std::io::Result<(usize, B::Buf)> {
|
||||
let nbytes = buf.bytes_init();
|
||||
if nbytes == 0 {
|
||||
return Ok((0, Slice::into_inner(buf.slice_full())));
|
||||
}
|
||||
let slice = buf.slice(0..nbytes);
|
||||
BufferedWriter::write_buffered(self, slice).await
|
||||
fn cap(&self) -> usize {
|
||||
self.capacity()
|
||||
}
|
||||
|
||||
fn extend_from_slice(&mut self, other: &[u8]) {
|
||||
BytesMut::extend_from_slice(self, other)
|
||||
}
|
||||
|
||||
#[inline(always)]
|
||||
async fn write_all_borrowed(&mut self, buf: &[u8]) -> std::io::Result<usize> {
|
||||
BufferedWriter::write_all_borrowed(self, buf).await
|
||||
fn pending(&self) -> usize {
|
||||
self.len()
|
||||
}
|
||||
|
||||
fn flush(self) -> Slice<BytesMut> {
|
||||
if self.is_empty() {
|
||||
return self.slice_full();
|
||||
}
|
||||
let len = self.len();
|
||||
self.slice(0..len)
|
||||
}
|
||||
|
||||
fn reuse_after_flush(mut iobuf: BytesMut) -> Self {
|
||||
iobuf.clear();
|
||||
iobuf
|
||||
}
|
||||
}
|
||||
|
||||
@@ -175,15 +213,12 @@ impl OwnedAsyncWriter for Vec<u8> {
|
||||
self.extend_from_slice(&buf[..]);
|
||||
Ok((buf.len(), Slice::into_inner(buf)))
|
||||
}
|
||||
|
||||
async fn write_all_borrowed(&mut self, buf: &[u8]) -> std::io::Result<usize> {
|
||||
self.extend_from_slice(buf);
|
||||
Ok(buf.len())
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use bytes::BytesMut;
|
||||
|
||||
use super::*;
|
||||
|
||||
#[derive(Default)]
|
||||
@@ -204,11 +239,6 @@ mod tests {
|
||||
self.writes.push(Vec::from(&buf[..]));
|
||||
Ok((buf.len(), Slice::into_inner(buf)))
|
||||
}
|
||||
|
||||
async fn write_all_borrowed(&mut self, buf: &[u8]) -> std::io::Result<usize> {
|
||||
self.writes.push(Vec::from(buf));
|
||||
Ok(buf.len())
|
||||
}
|
||||
}
|
||||
|
||||
macro_rules! write {
|
||||
@@ -222,7 +252,7 @@ mod tests {
|
||||
#[tokio::test]
|
||||
async fn test_buffered_writes_only() -> std::io::Result<()> {
|
||||
let recorder = RecorderWriter::default();
|
||||
let mut writer = BufferedWriter::<2, _>::new(recorder);
|
||||
let mut writer = BufferedWriter::new(recorder, BytesMut::with_capacity(2));
|
||||
write!(writer, b"a");
|
||||
write!(writer, b"b");
|
||||
write!(writer, b"c");
|
||||
@@ -239,7 +269,7 @@ mod tests {
|
||||
#[tokio::test]
|
||||
async fn test_passthrough_writes_only() -> std::io::Result<()> {
|
||||
let recorder = RecorderWriter::default();
|
||||
let mut writer = BufferedWriter::<2, _>::new(recorder);
|
||||
let mut writer = BufferedWriter::new(recorder, BytesMut::with_capacity(2));
|
||||
write!(writer, b"abc");
|
||||
write!(writer, b"de");
|
||||
write!(writer, b"");
|
||||
@@ -255,7 +285,7 @@ mod tests {
|
||||
#[tokio::test]
|
||||
async fn test_passthrough_write_with_nonempty_buffer() -> std::io::Result<()> {
|
||||
let recorder = RecorderWriter::default();
|
||||
let mut writer = BufferedWriter::<2, _>::new(recorder);
|
||||
let mut writer = BufferedWriter::new(recorder, BytesMut::with_capacity(2));
|
||||
write!(writer, b"a");
|
||||
write!(writer, b"bc");
|
||||
write!(writer, b"d");
|
||||
@@ -271,15 +301,15 @@ mod tests {
|
||||
#[tokio::test]
|
||||
async fn test_write_all_borrowed_always_goes_through_buffer() -> std::io::Result<()> {
|
||||
let recorder = RecorderWriter::default();
|
||||
let mut writer = BufferedWriter::<2, _>::new(recorder);
|
||||
let mut writer = BufferedWriter::new(recorder, BytesMut::with_capacity(2));
|
||||
|
||||
writer.write_all_borrowed(b"abc").await?;
|
||||
writer.write_all_borrowed(b"d").await?;
|
||||
writer.write_all_borrowed(b"e").await?;
|
||||
writer.write_all_borrowed(b"fg").await?;
|
||||
writer.write_all_borrowed(b"hi").await?;
|
||||
writer.write_all_borrowed(b"j").await?;
|
||||
writer.write_all_borrowed(b"klmno").await?;
|
||||
writer.write_buffered_borrowed(b"abc").await?;
|
||||
writer.write_buffered_borrowed(b"d").await?;
|
||||
writer.write_buffered_borrowed(b"e").await?;
|
||||
writer.write_buffered_borrowed(b"fg").await?;
|
||||
writer.write_buffered_borrowed(b"hi").await?;
|
||||
writer.write_buffered_borrowed(b"j").await?;
|
||||
writer.write_buffered_borrowed(b"klmno").await?;
|
||||
|
||||
let recorder = writer.flush_and_into_inner().await?;
|
||||
assert_eq!(
|
||||
|
||||
@@ -1,73 +0,0 @@
|
||||
use std::mem::MaybeUninit;
|
||||
|
||||
pub struct Buf<const N: usize> {
|
||||
allocation: Box<[u8; N]>,
|
||||
written: usize,
|
||||
}
|
||||
|
||||
impl<const N: usize> Default for Buf<N> {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
allocation: Box::new(
|
||||
// SAFETY: zeroed memory is a valid [u8; N]
|
||||
unsafe { MaybeUninit::zeroed().assume_init() },
|
||||
),
|
||||
written: 0,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<const N: usize> Buf<N> {
|
||||
#[inline(always)]
|
||||
fn invariants(&self) {
|
||||
debug_assert!(self.written <= N, "{}", self.written);
|
||||
}
|
||||
|
||||
pub fn as_zero_padded_slice(&self) -> &[u8; N] {
|
||||
&self.allocation
|
||||
}
|
||||
|
||||
/// panics if there's not enough capacity left
|
||||
pub fn extend_from_slice(&mut self, buf: &[u8]) {
|
||||
self.invariants();
|
||||
let can = N - self.written;
|
||||
let want = buf.len();
|
||||
assert!(want <= can, "{:x} {:x}", want, can);
|
||||
self.allocation[self.written..(self.written + want)].copy_from_slice(buf);
|
||||
self.written += want;
|
||||
self.invariants();
|
||||
}
|
||||
|
||||
pub fn len(&self) -> usize {
|
||||
self.written
|
||||
}
|
||||
|
||||
pub fn is_empty(&self) -> bool {
|
||||
self.len() == 0
|
||||
}
|
||||
|
||||
pub fn clear(&mut self) {
|
||||
self.invariants();
|
||||
self.written = 0;
|
||||
self.allocation[..].fill(0);
|
||||
self.invariants();
|
||||
}
|
||||
}
|
||||
|
||||
/// SAFETY:
|
||||
///
|
||||
/// The [`Self::allocation`] is stable becauses boxes are stable.
|
||||
///
|
||||
unsafe impl<const N: usize> tokio_epoll_uring::IoBuf for Buf<N> {
|
||||
fn stable_ptr(&self) -> *const u8 {
|
||||
self.allocation.as_ptr()
|
||||
}
|
||||
|
||||
fn bytes_init(&self) -> usize {
|
||||
self.written
|
||||
}
|
||||
|
||||
fn bytes_total(&self) -> usize {
|
||||
self.written // ?
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user