Compare commits

...

41 Commits

Author SHA1 Message Date
Christian Schwarz
9281f54527 fix doc comments 2024-04-17 13:35:52 +00:00
Christian Schwarz
717b7cb73e safety comment 2024-04-17 13:35:45 +00:00
Christian Schwarz
99056dde13 clippy nitpickery 2024-04-17 13:26:59 +00:00
Christian Schwarz
78233dc969 Merge remote-tracking branch 'origin/main' into problame/write-path-larger-buffers 2024-04-17 13:19:46 +00:00
Christian Schwarz
5e14362cf7 Revert the page_cache_priming_writer changes
It would require plumbing through the RequestContext, and its utility is
somewhat dubious anyway.
2024-03-28 14:18:40 +00:00
Christian Schwarz
fcd1ccfea7 drop the parametrization changes, not part of the feature PR 2024-03-28 12:44:56 +00:00
Christian Schwarz
27564dde78 Merge remote-tracking branch 'origin/main' into problame/write-path-larger-buffers
Conflicts:
	control_plane/src/local_env.rs
	control_plane/src/pageserver.rs
the above have been preliminaried to `main` since
	pageserver/src/virtual_file.rs
	pageserver/src/virtual_file/owned_buffers_io/util/size_tracking_writer.rs
	pageserver/src/virtual_file/owned_buffers_io/write.rs
strategy: pick ours (likely breaks the build, will fix subsequently)
2024-03-28 12:43:02 +00:00
Christian Schwarz
261f116a2d local benchmark run
test_bulk_insert[neon-release-pg14-std-fs].wal_written: 345 MB
test_bulk_insert[neon-release-pg14-std-fs].wal_recovery: 8.381 s

test_bulk_insert[neon-release-pg14-tokio-epoll-uring].wal_written: 345 MB
test_bulk_insert[neon-release-pg14-tokio-epoll-uring].wal_recovery: 11.760 s
2024-03-14 17:29:04 +00:00
Christian Schwarz
79a6cda45d page_cache_priming_writer: done 2024-03-14 17:19:36 +00:00
Christian Schwarz
f0a67c4071 WIP: page_cache_priming_writer: clippy 2024-03-14 17:19:36 +00:00
Christian Schwarz
a1f4eb2815 WIP: page_cache_priming_writer: bugfixes 2024-03-14 17:15:30 +00:00
Christian Schwarz
eb1ccd7988 WIP: page_cache_priming_writer: plumb through RequestContext for previous commit, yet more churn -,- 2024-03-14 17:15:21 +00:00
Christian Schwarz
f1f0452722 WIP: page_cache_priming_writer (is it really worth it?) 2024-03-14 15:46:41 +00:00
Christian Schwarz
40200b7521 figure out why & when exactly zeroes past write offset are required & assert it 2024-03-14 12:04:17 +00:00
Christian Schwarz
91bd729be2 junk up owned_buffers_io from previous commit to deal with EphemeralFile speciality of reading zeroes past end-of-file
This makes it diverge semantically from what's in the tokio-epoll-uring
download PR :(
2024-03-13 18:23:12 +00:00
Christian Schwarz
4b84f23cea larger buffers for the write path
The OwnedAsyncWrite stuff is based on the code in
tokio-epoll-uring on-demand download PR (#6992), which hasn't merged
yet.
2024-03-13 18:23:08 +00:00
Christian Schwarz
644c5e243d Revert "experiment(repeat, without preceding reverts) demonstrate that std-fs performs better because it hits the page cache"
This reverts commit d66ccbae5e.
2024-03-13 15:28:24 +00:00
Christian Schwarz
d66ccbae5e experiment(repeat, without preceding reverts) demonstrate that std-fs performs better because it hits the page cache
... by forcing each write system call to go to disk

test_bulk_insert[neon-release-pg14-tokio-epoll-uring].wal_written: 346 MB
test_bulk_insert[neon-release-pg14-tokio-epoll-uring].wal_recovery: 93.417 s

test_bulk_insert[neon-release-pg14-std-fs].wal_written: 346 MB
test_bulk_insert[neon-release-pg14-std-fs].wal_recovery: 86.009 s

=> ~8% instead of 2x difference
2024-03-13 15:27:41 +00:00
Christian Schwarz
578a2d5d5f Revert "experiment: for create_delta_layer, use global io_engine, but inside a spawn_blocking single-threaded runtime"
This reverts commit 72a8e090dd.
2024-03-13 15:09:11 +00:00
Christian Schwarz
c9d1f51a93 Revert "experiment: for create_delta_layer _write path_, use StdFs io engine in a spawn_blocking thread single-threaded runtime"
This reverts commit 4a8e7f8716.
2024-03-13 15:09:06 +00:00
Christian Schwarz
1339834297 Revert "experiment: for EphemeralFile write path, use StdFs io engine"
This reverts commit c8c04c0db8.
2024-03-13 15:09:00 +00:00
Christian Schwarz
746fc530c5 "experiment: demonstrate that std-fs performs better because it hits the page cache"
This reverts commit 2edbc07733.
2024-03-13 15:08:43 +00:00
Christian Schwarz
94311052cd previous commit's numbers were with all the preceding experiments 2024-03-13 15:08:11 +00:00
Christian Schwarz
2edbc07733 experiment: demonstrate that std-fs performs better because it hits the page cache
... by forcing each write system call to go to disk

test_bulk_insert[neon-release-pg14-tokio-epoll-uring].wal_written: 346 MB
test_bulk_insert[neon-release-pg14-tokio-epoll-uring].wal_recovery: 92.559 s

test_bulk_insert[neon-release-pg14-std-fs].wal_written: 346 MB
test_bulk_insert[neon-release-pg14-std-fs].wal_recovery: 81.998 s

=> 10%ish worse instead of 2x
2024-03-13 15:07:46 +00:00
Christian Schwarz
c8c04c0db8 experiment: for EphemeralFile write path, use StdFs io engine
together with previous commits, this brings us back down to
pre-regression

test_bulk_insert[neon-release-pg14-tokio-epoll-uring].wal_written: 345 MB
test_bulk_insert[neon-release-pg14-tokio-epoll-uring].wal_recovery: 9.991 s
2024-03-13 14:09:21 +00:00
Christian Schwarz
4a8e7f8716 experiment: for create_delta_layer _write path_, use StdFs io engine in a spawn_blocking thread single-threaded runtime
builds on top of the previous commit

test_bulk_insert[neon-release-pg14-tokio-epoll-uring].wal_written: 345 MB
test_bulk_insert[neon-release-pg14-tokio-epoll-uring].wal_recovery: 13.153 s
2024-03-13 14:09:16 +00:00
Christian Schwarz
72a8e090dd experiment: for create_delta_layer, use global io_engine, but inside a spawn_blocking single-threaded runtime
This makes things worse with tokio-epoll-uring

test_bulk_insert[neon-release-pg14-tokio-epoll-uring].wal_written: 345 MB
test_bulk_insert[neon-release-pg14-tokio-epoll-uring].wal_recovery: 19.574 s

This is a partial revert of 3da410c8fe
2024-03-13 14:09:12 +00:00
Christian Schwarz
e0ea465aed Revert "experiment: Revert "tokio-epoll-uring: use it on the layer-creating code paths (#6378)""
This reverts commit d3c157eeee.
2024-03-13 12:45:53 +00:00
Christian Schwarz
d3c157eeee experiment: Revert "tokio-epoll-uring: use it on the layer-creating code paths (#6378)"
Unchanged

test_bulk_insert[neon-release-pg14-tokio-epoll-uring].wal_written: 345 MB
test_bulk_insert[neon-release-pg14-tokio-epoll-uring].wal_recovery: 9.194 s

This reverts commit 3da410c8fe.
2024-03-13 12:45:40 +00:00
Christian Schwarz
c600355802 Revert "experiment: StdFs for EphemeralFile writes isn't the bottleneck"
This reverts commit 57241c1c5a.
2024-03-13 12:36:21 +00:00
Christian Schwarz
57241c1c5a experiment: StdFs for EphemeralFile writes isn't the bottleneck
With this

test_bulk_insert[neon-release-pg14-tokio-epoll-uring].wal_written: 345 MB
test_bulk_insert[neon-release-pg14-tokio-epoll-uring].wal_recovery: 16.053 s

down from

test_bulk_insert[neon-release-pg14-tokio-epoll-uring].wal_written: 345 MB
test_bulk_insert[neon-release-pg14-tokio-epoll-uring].wal_recovery: 17.669 s

but the regression is from baseline

test_bulk_insert[neon-release-pg14-std-fs].wal_written: 345 MB
test_bulk_insert[neon-release-pg14-std-fs].wal_recovery: 9.335 s
2024-03-13 11:41:17 +00:00
Christian Schwarz
b9c30dbd6b fix the parametrization 2024-03-12 20:24:45 +00:00
Christian Schwarz
35f8735a27 wip 2024-03-12 20:13:44 +00:00
Christian Schwarz
095130c1b3 DO NOT MERGE: always parametrize 2024-03-12 20:09:14 +00:00
Christian Schwarz
5a0277476d Revert "make changes preparing next commit"
This reverts commit e85a631ddb.
2024-03-12 20:09:14 +00:00
Christian Schwarz
6348833bdc expose that virtual_file_io_engine and get_vectored_impl were never set 2024-03-12 20:09:14 +00:00
Christian Schwarz
dbabd4e4ea Revert "expose that pageserver_virtual_file_io_engine test param was never used (same for get_vectored_impl)"
This reverts commit 5b8888ce6b.
2024-03-12 20:09:14 +00:00
Christian Schwarz
5b8888ce6b expose that pageserver_virtual_file_io_engine test param was never used (same for get_vectored_impl) 2024-03-12 19:56:02 +00:00
Christian Schwarz
e85a631ddb make changes preparing next commit 2024-03-12 19:56:02 +00:00
Christian Schwarz
95deea4f39 Revert "Revert "tokio-epoll-uring: use it on the layer-creating code paths (#6378)""
This reverts commit 9876045444.
2024-03-12 18:53:16 +00:00
Christian Schwarz
9876045444 Revert "tokio-epoll-uring: use it on the layer-creating code paths (#6378)"
This reverts commit 3da410c8fe.
2024-03-12 18:53:11 +00:00
6 changed files with 296 additions and 168 deletions

View File

@@ -121,7 +121,7 @@ impl<const BUFFERED: bool> BlobWriter<BUFFERED> {
self.offset
}
const CAPACITY: usize = if BUFFERED { PAGE_SZ } else { 0 };
const CAPACITY: usize = if BUFFERED { 64 * 1024 } else { 0 };
/// Writes the given buffer directly to the underlying `VirtualFile`.
/// You need to make sure that the internal buffer is empty, otherwise

View File

@@ -5,14 +5,12 @@ use crate::config::PageServerConf;
use crate::context::RequestContext;
use crate::page_cache::{self, PAGE_SZ};
use crate::tenant::block_io::{BlockCursor, BlockLease, BlockReader};
use crate::virtual_file::{self, VirtualFile};
use bytes::BytesMut;
use crate::virtual_file::owned_buffers_io::write::OwnedAsyncWriter;
use crate::virtual_file::{self, owned_buffers_io, VirtualFile};
use camino::Utf8PathBuf;
use pageserver_api::shard::TenantShardId;
use std::cmp::min;
use std::io::{self, ErrorKind};
use std::ops::DerefMut;
use std::io;
use std::sync::atomic::AtomicU64;
use tracing::*;
use utils::id::TimelineId;
@@ -22,18 +20,25 @@ pub struct EphemeralFile {
_tenant_shard_id: TenantShardId,
_timeline_id: TimelineId,
file: VirtualFile,
len: u64,
/// An ephemeral file is append-only.
/// We keep the last page, which can still be modified, in [`Self::mutable_tail`].
/// The other pages, which can no longer be modified, are accessed through the page cache.
/// We sandwich the buffered writer between two size-tracking writers.
/// This allows us to "elegantly" track in-memory bytes vs flushed bytes,
/// enabling [`Self::read_blk`] to determine whether to read from the
/// buffered writer's buffer, versus going to the VirtualFile.
///
/// None <=> IO is ongoing.
/// Size is fixed to PAGE_SZ at creation time and must not be changed.
mutable_tail: Option<BytesMut>,
/// TODO: longer-term, we probably wand to get rid of this in favor
/// of a double-buffering scheme. See this commit's commit message
/// and git history for what we had before this sandwich, it might be useful.
file: owned_buffers_io::util::size_tracking_writer::Writer<
owned_buffers_io::write::BufferedWriter<
{ Self::TAIL_SZ },
owned_buffers_io::util::size_tracking_writer::Writer<VirtualFile>,
>,
>,
}
impl EphemeralFile {
const TAIL_SZ: usize = 64 * 1024;
pub async fn create(
conf: &PageServerConf,
tenant_shard_id: TenantShardId,
@@ -57,19 +62,20 @@ impl EphemeralFile {
.create(true),
)
.await?;
let file = owned_buffers_io::util::size_tracking_writer::Writer::new(file);
let file = owned_buffers_io::write::BufferedWriter::new(file);
let file = owned_buffers_io::util::size_tracking_writer::Writer::new(file);
Ok(EphemeralFile {
page_cache_file_id: page_cache::next_file_id(),
_tenant_shard_id: tenant_shard_id,
_timeline_id: timeline_id,
file,
len: 0,
mutable_tail: Some(BytesMut::zeroed(PAGE_SZ)),
})
}
pub(crate) fn len(&self) -> u64 {
self.len
self.file.bytes_written()
}
pub(crate) fn id(&self) -> page_cache::FileId {
@@ -81,8 +87,22 @@ impl EphemeralFile {
blknum: u32,
ctx: &RequestContext,
) -> Result<BlockLease, io::Error> {
let flushed_blknums = 0..self.len / PAGE_SZ as u64;
if flushed_blknums.contains(&(blknum as u64)) {
let buffered_offset = self.file.bytes_written();
let flushed_offset = self.file.as_inner().as_inner().bytes_written();
assert!(buffered_offset >= flushed_offset);
let read_offset = (blknum as u64) * (PAGE_SZ as u64);
assert_eq!(
flushed_offset % (PAGE_SZ as u64),
0,
"we need this in the logic below, because it assumes the page isn't spread across flushed part and in-memory buffer"
);
if read_offset < flushed_offset {
assert!(
read_offset + (PAGE_SZ as u64) <= flushed_offset,
"this impl can't deal with pages spread across flushed & buffered part"
);
let cache = page_cache::get();
match cache
.read_immutable_buf(self.page_cache_file_id, blknum, ctx)
@@ -93,7 +113,9 @@ impl EphemeralFile {
// order path before error because error is anyhow::Error => might have many contexts
format!(
"ephemeral file: read immutable page #{}: {}: {:#}",
blknum, self.file.path, e,
blknum,
self.file.as_inner().as_inner().as_inner().path,
e,
),
)
})? {
@@ -103,6 +125,9 @@ impl EphemeralFile {
page_cache::ReadBufResult::NotFound(write_guard) => {
let write_guard = self
.file
.as_inner()
.as_inner()
.as_inner()
.read_exact_at_page(write_guard, blknum as u64 * PAGE_SZ as u64)
.await?;
let read_guard = write_guard.mark_valid();
@@ -110,13 +135,31 @@ impl EphemeralFile {
}
};
} else {
debug_assert_eq!(blknum as u64, self.len / PAGE_SZ as u64);
let read_until_offset = read_offset + (PAGE_SZ as u64);
if !(0..buffered_offset).contains(&read_until_offset) {
// The blob_io code relies on the reader allowing reads past
// the end of what was written, up to end of the current PAGE_SZ chunk.
// This is a relict of the past where we would get a pre-zeroed page from the page cache.
//
// DeltaLayer probably has the same issue, not sure why it needs no special treatment.
let nbytes_past_end = read_until_offset.checked_sub(buffered_offset).unwrap();
if nbytes_past_end >= (PAGE_SZ as u64) {
// TODO: treat this as error. Pre-existing issue before this patch.
panic!(
"return IO error: read past end of file: read=0x{read_offset:x} buffered=0x{buffered_offset:x} flushed=0x{flushed_offset}"
)
}
}
let buffer: &[u8; Self::TAIL_SZ] = self.file.as_inner().inspect_buffer();
let read_offset_in_buffer = read_offset
.checked_sub(flushed_offset)
.expect("would have taken `if` branch instead of this one");
let read_offset_in_buffer = usize::try_from(read_offset_in_buffer).unwrap();
let page = &buffer[read_offset_in_buffer..(read_offset_in_buffer + PAGE_SZ)];
Ok(BlockLease::EphemeralFileMutableTail(
self.mutable_tail
.as_deref()
.expect("we're not doing IO, it must be Some()")
.try_into()
.expect("we ensure that it's always PAGE_SZ"),
page.try_into()
.expect("the slice above got it as page-size slice"),
))
}
}
@@ -124,139 +167,26 @@ impl EphemeralFile {
pub(crate) async fn write_blob(
&mut self,
srcbuf: &[u8],
ctx: &RequestContext,
_ctx: &RequestContext,
) -> Result<u64, io::Error> {
struct Writer<'a> {
ephemeral_file: &'a mut EphemeralFile,
/// The block to which the next [`push_bytes`] will write.
blknum: u32,
/// The offset inside the block identified by [`blknum`] to which [`push_bytes`] will write.
off: usize,
}
impl<'a> Writer<'a> {
fn new(ephemeral_file: &'a mut EphemeralFile) -> io::Result<Writer<'a>> {
Ok(Writer {
blknum: (ephemeral_file.len / PAGE_SZ as u64) as u32,
off: (ephemeral_file.len % PAGE_SZ as u64) as usize,
ephemeral_file,
})
}
#[inline(always)]
async fn push_bytes(
&mut self,
src: &[u8],
ctx: &RequestContext,
) -> Result<(), io::Error> {
let mut src_remaining = src;
while !src_remaining.is_empty() {
let dst_remaining = &mut self
.ephemeral_file
.mutable_tail
.as_deref_mut()
.expect("IO is not yet ongoing")[self.off..];
let n = min(dst_remaining.len(), src_remaining.len());
dst_remaining[..n].copy_from_slice(&src_remaining[..n]);
self.off += n;
src_remaining = &src_remaining[n..];
if self.off == PAGE_SZ {
let mutable_tail = std::mem::take(&mut self.ephemeral_file.mutable_tail)
.expect("IO is not yet ongoing");
let (mutable_tail, res) = self
.ephemeral_file
.file
.write_all_at(mutable_tail, self.blknum as u64 * PAGE_SZ as u64)
.await;
// TODO: If we panic before we can put the mutable_tail back, subsequent calls will fail.
// I.e., the IO isn't retryable if we panic.
self.ephemeral_file.mutable_tail = Some(mutable_tail);
match res {
Ok(_) => {
// Pre-warm the page cache with what we just wrote.
// This isn't necessary for coherency/correctness, but it's how we've always done it.
let cache = page_cache::get();
match cache
.read_immutable_buf(
self.ephemeral_file.page_cache_file_id,
self.blknum,
ctx,
)
.await
{
Ok(page_cache::ReadBufResult::Found(_guard)) => {
// This function takes &mut self, so, it shouldn't be possible to reach this point.
unreachable!("we just wrote blknum {} and this function takes &mut self, so, no concurrent read_blk is possible", self.blknum);
}
Ok(page_cache::ReadBufResult::NotFound(mut write_guard)) => {
let buf: &mut [u8] = write_guard.deref_mut();
debug_assert_eq!(buf.len(), PAGE_SZ);
buf.copy_from_slice(
self.ephemeral_file
.mutable_tail
.as_deref()
.expect("IO is not ongoing"),
);
let _ = write_guard.mark_valid();
// pre-warm successful
}
Err(e) => {
error!("ephemeral_file write_blob failed to get immutable buf to pre-warm page cache: {e:?}");
// fail gracefully, it's not the end of the world if we can't pre-warm the cache here
}
}
// Zero the buffer for re-use.
// Zeroing is critical for correcntess because the write_blob code below
// and similarly read_blk expect zeroed pages.
self.ephemeral_file
.mutable_tail
.as_deref_mut()
.expect("IO is not ongoing")
.fill(0);
// This block is done, move to next one.
self.blknum += 1;
self.off = 0;
}
Err(e) => {
return Err(std::io::Error::new(
ErrorKind::Other,
// order error before path because path is long and error is short
format!(
"ephemeral_file: write_blob: write-back full tail blk #{}: {:#}: {}",
self.blknum,
e,
self.ephemeral_file.file.path,
),
));
}
}
}
}
Ok(())
}
}
let pos = self.len;
let mut writer = Writer::new(self)?;
let pos = self.file.bytes_written();
// Write the length field
if srcbuf.len() < 0x80 {
// short one-byte length header
let len_buf = [srcbuf.len() as u8];
writer.push_bytes(&len_buf, ctx).await?;
self.file.write_all_borrowed(&len_buf).await?;
} else {
let mut len_buf = u32::to_be_bytes(srcbuf.len() as u32);
len_buf[0] |= 0x80;
writer.push_bytes(&len_buf, ctx).await?;
self.file.write_all_borrowed(&len_buf).await?;
}
// Write the payload
writer.push_bytes(srcbuf, ctx).await?;
self.file.write_all_borrowed(srcbuf).await?;
if srcbuf.len() < 0x80 {
self.len += 1;
} else {
self.len += 4;
}
self.len += srcbuf.len() as u64;
// TODO: bring back pre-warming of page cache, using another sandwich layer
Ok(pos)
}
@@ -277,7 +207,7 @@ impl Drop for EphemeralFile {
// We leave them there, [`crate::page_cache::PageCache::find_victim`] will evict them when needed.
// unlink the file
let res = std::fs::remove_file(&self.file.path);
let res = std::fs::remove_file(&self.file.as_inner().as_inner().as_inner().path);
if let Err(e) = res {
if e.kind() != std::io::ErrorKind::NotFound {
// just never log the not found errors, we cannot do anything for them; on detach
@@ -286,7 +216,8 @@ impl Drop for EphemeralFile {
// not found files might also be related to https://github.com/neondatabase/neon/issues/2442
error!(
"could not remove ephemeral file '{}': {}",
self.file.path, e
self.file.as_inner().as_inner().as_inner().path,
e
);
}
}

View File

@@ -36,7 +36,8 @@ pub(crate) use io_engine::IoEngineKind;
pub(crate) use metadata::Metadata;
pub(crate) use open_options::*;
#[cfg_attr(not(target_os = "linux"), allow(dead_code))]
use self::owned_buffers_io::write::OwnedAsyncWriter;
pub(crate) mod owned_buffers_io {
//! Abstractions for IO with owned buffers.
//!
@@ -1083,6 +1084,23 @@ impl Drop for VirtualFile {
}
}
impl OwnedAsyncWriter for VirtualFile {
#[inline(always)]
async fn write_all<B: BoundedBuf<Buf = Buf>, Buf: IoBuf + Send>(
&mut self,
buf: B,
) -> std::io::Result<(usize, B::Buf)> {
let (buf, res) = VirtualFile::write_all(self, buf).await;
res.map(move |v| (v, buf))
}
#[inline(always)]
async fn write_all_borrowed(&mut self, _buf: &[u8]) -> std::io::Result<usize> {
// TODO: ensure this through the type system
panic!("this should not happen");
}
}
impl OpenFiles {
fn new(num_slots: usize) -> OpenFiles {
let mut slots = Box::new(Vec::with_capacity(num_slots));

View File

@@ -1,34 +1,49 @@
use crate::virtual_file::{owned_buffers_io::write::OwnedAsyncWriter, VirtualFile};
use crate::virtual_file::owned_buffers_io::write::OwnedAsyncWriter;
use tokio_epoll_uring::{BoundedBuf, IoBuf};
pub struct Writer {
dst: VirtualFile,
pub struct Writer<W> {
dst: W,
bytes_amount: u64,
}
impl Writer {
pub fn new(dst: VirtualFile) -> Self {
impl<W> Writer<W> {
pub fn new(dst: W) -> Self {
Self {
dst,
bytes_amount: 0,
}
}
pub fn bytes_written(&self) -> u64 {
self.bytes_amount
}
pub fn as_inner(&self) -> &W {
&self.dst
}
/// Returns the wrapped `VirtualFile` object as well as the number
/// of bytes that were written to it through this object.
pub fn into_inner(self) -> (u64, VirtualFile) {
pub fn into_inner(self) -> (u64, W) {
(self.bytes_amount, self.dst)
}
}
impl OwnedAsyncWriter for Writer {
impl<W> OwnedAsyncWriter for Writer<W>
where
W: OwnedAsyncWriter,
{
#[inline(always)]
async fn write_all<B: BoundedBuf<Buf = Buf>, Buf: IoBuf + Send>(
&mut self,
buf: B,
) -> std::io::Result<(usize, B::Buf)> {
let (buf, res) = self.dst.write_all(buf).await;
let nwritten = res?;
let (nwritten, buf) = self.dst.write_all(buf).await?;
self.bytes_amount += u64::try_from(nwritten).unwrap();
Ok((nwritten, buf))
}
#[inline(always)]
async fn write_all_borrowed(&mut self, buf: &[u8]) -> std::io::Result<usize> {
let nwritten = self.dst.write_all_borrowed(buf).await?;
self.bytes_amount += u64::try_from(nwritten).unwrap();
Ok(nwritten)
}
}

View File

@@ -1,4 +1,3 @@
use bytes::BytesMut;
use tokio_epoll_uring::{BoundedBuf, IoBuf, Slice};
/// A trait for doing owned-buffer write IO.
@@ -8,6 +7,7 @@ pub trait OwnedAsyncWriter {
&mut self,
buf: B,
) -> std::io::Result<(usize, B::Buf)>;
async fn write_all_borrowed(&mut self, buf: &[u8]) -> std::io::Result<usize>;
}
/// A wrapper aorund an [`OwnedAsyncWriter`] that batches smaller writers
@@ -32,9 +32,11 @@ pub struct BufferedWriter<const BUFFER_SIZE: usize, W> {
// - while IO is ongoing => goes back to Some() once the IO completed successfully
// - after an IO error => stays `None` forever
// In these exceptional cases, it's `None`.
buf: Option<BytesMut>,
buf: Option<zero_initialized_buffer::Buf<BUFFER_SIZE>>,
}
mod zero_initialized_buffer;
impl<const BUFFER_SIZE: usize, W> BufferedWriter<BUFFER_SIZE, W>
where
W: OwnedAsyncWriter,
@@ -42,10 +44,23 @@ where
pub fn new(writer: W) -> Self {
Self {
writer,
buf: Some(BytesMut::with_capacity(BUFFER_SIZE)),
buf: Some(zero_initialized_buffer::Buf::default()),
}
}
pub fn as_inner(&self) -> &W {
&self.writer
}
/// panics if used after an error
pub fn inspect_buffer(&self) -> &[u8; BUFFER_SIZE] {
self.buf
.as_ref()
// TODO: can this happen on the EphemeralFile read path?
.expect("must not use after an error")
.as_zero_padded_slice()
}
pub async fn flush_and_into_inner(mut self) -> std::io::Result<W> {
self.flush().await?;
let Self { buf, writer } = self;
@@ -53,10 +68,11 @@ where
Ok(writer)
}
pub async fn write_buffered<B: IoBuf>(&mut self, chunk: Slice<B>) -> std::io::Result<()>
pub async fn write_buffered<B: IoBuf>(&mut self, chunk: Slice<B>) -> std::io::Result<(usize, B)>
where
B: IoBuf + Send,
{
let chunk_len = chunk.len();
// avoid memcpy for the middle of the chunk
if chunk.len() >= BUFFER_SIZE {
self.flush().await?;
@@ -68,15 +84,33 @@ where
.len(),
0
);
let chunk_len = chunk.len();
let (nwritten, chunk) = self.writer.write_all(chunk).await?;
assert_eq!(nwritten, chunk_len);
drop(chunk);
return Ok(());
return Ok((nwritten, chunk));
}
// in-memory copy the < BUFFER_SIZED tail of the chunk
assert!(chunk.len() < BUFFER_SIZE);
let mut chunk = &chunk[..];
let mut slice = &chunk[..];
while !slice.is_empty() {
let buf = self.buf.as_mut().expect("must not use after an error");
let need = BUFFER_SIZE - buf.len();
let have = slice.len();
let n = std::cmp::min(need, have);
buf.extend_from_slice(&slice[..n]);
slice = &slice[n..];
if buf.len() >= BUFFER_SIZE {
assert_eq!(buf.len(), BUFFER_SIZE);
self.flush().await?;
}
}
assert!(slice.is_empty(), "by now we should have drained the chunk");
Ok((chunk_len, chunk.into_inner()))
}
/// Always goes through the internal buffer.
/// Guaranteed to never invoke [`OwnedAsyncWriter::write_all_borrowed`] on the underlying.
pub async fn write_all_borrowed(&mut self, mut chunk: &[u8]) -> std::io::Result<usize> {
let chunk_len = chunk.len();
while !chunk.is_empty() {
let buf = self.buf.as_mut().expect("must not use after an error");
let need = BUFFER_SIZE - buf.len();
@@ -89,8 +123,7 @@ where
self.flush().await?;
}
}
assert!(chunk.is_empty(), "by now we should have drained the chunk");
Ok(())
Ok(chunk_len)
}
async fn flush(&mut self) -> std::io::Result<()> {
@@ -108,6 +141,27 @@ where
}
}
impl<const BUFFER_SIZE: usize, W: OwnedAsyncWriter> OwnedAsyncWriter
for BufferedWriter<BUFFER_SIZE, W>
{
#[inline(always)]
async fn write_all<B: BoundedBuf<Buf = Buf>, Buf: IoBuf + Send>(
&mut self,
buf: B,
) -> std::io::Result<(usize, B::Buf)> {
let nbytes = buf.bytes_init();
if nbytes == 0 {
return Ok((0, Slice::into_inner(buf.slice_full())));
}
let slice = buf.slice(0..nbytes);
BufferedWriter::write_buffered(self, slice).await
}
#[inline(always)]
async fn write_all_borrowed(&mut self, buf: &[u8]) -> std::io::Result<usize> {
BufferedWriter::write_all_borrowed(self, buf).await
}
}
impl OwnedAsyncWriter for Vec<u8> {
async fn write_all<B: BoundedBuf<Buf = Buf>, Buf: IoBuf + Send>(
&mut self,
@@ -121,6 +175,11 @@ impl OwnedAsyncWriter for Vec<u8> {
self.extend_from_slice(&buf[..]);
Ok((buf.len(), Slice::into_inner(buf)))
}
async fn write_all_borrowed(&mut self, buf: &[u8]) -> std::io::Result<usize> {
self.extend_from_slice(buf);
Ok(buf.len())
}
}
#[cfg(test)]
@@ -145,6 +204,11 @@ mod tests {
self.writes.push(Vec::from(&buf[..]));
Ok((buf.len(), Slice::into_inner(buf)))
}
async fn write_all_borrowed(&mut self, buf: &[u8]) -> std::io::Result<usize> {
self.writes.push(Vec::from(buf));
Ok(buf.len())
}
}
macro_rules! write {
@@ -203,4 +267,31 @@ mod tests {
);
Ok(())
}
#[tokio::test]
async fn test_write_all_borrowed_always_goes_through_buffer() -> std::io::Result<()> {
let recorder = RecorderWriter::default();
let mut writer = BufferedWriter::<2, _>::new(recorder);
writer.write_all_borrowed(b"abc").await?;
writer.write_all_borrowed(b"d").await?;
writer.write_all_borrowed(b"e").await?;
writer.write_all_borrowed(b"fg").await?;
writer.write_all_borrowed(b"hi").await?;
writer.write_all_borrowed(b"j").await?;
writer.write_all_borrowed(b"klmno").await?;
let recorder = writer.flush_and_into_inner().await?;
assert_eq!(
recorder.writes,
{
let expect: &[&[u8]] = &[b"ab", b"cd", b"ef", b"gh", b"ij", b"kl", b"mn", b"o"];
expect
}
.iter()
.map(|v| v[..].to_vec())
.collect::<Vec<_>>()
);
Ok(())
}
}

View File

@@ -0,0 +1,73 @@
use std::mem::MaybeUninit;
pub struct Buf<const N: usize> {
allocation: Box<[u8; N]>,
written: usize,
}
impl<const N: usize> Default for Buf<N> {
fn default() -> Self {
Self {
allocation: Box::new(
// SAFETY: zeroed memory is a valid [u8; N]
unsafe { MaybeUninit::zeroed().assume_init() },
),
written: 0,
}
}
}
impl<const N: usize> Buf<N> {
#[inline(always)]
fn invariants(&self) {
debug_assert!(self.written <= N, "{}", self.written);
}
pub fn as_zero_padded_slice(&self) -> &[u8; N] {
&self.allocation
}
/// panics if there's not enough capacity left
pub fn extend_from_slice(&mut self, buf: &[u8]) {
self.invariants();
let can = N - self.written;
let want = buf.len();
assert!(want <= can, "{:x} {:x}", want, can);
self.allocation[self.written..(self.written + want)].copy_from_slice(buf);
self.written += want;
self.invariants();
}
pub fn len(&self) -> usize {
self.written
}
pub fn is_empty(&self) -> bool {
self.len() == 0
}
pub fn clear(&mut self) {
self.invariants();
self.written = 0;
self.allocation[..].fill(0);
self.invariants();
}
}
/// SAFETY:
///
/// The [`Self::allocation`] is stable becauses boxes are stable.
///
unsafe impl<const N: usize> tokio_epoll_uring::IoBuf for Buf<N> {
fn stable_ptr(&self) -> *const u8 {
self.allocation.as_ptr()
}
fn bytes_init(&self) -> usize {
self.written
}
fn bytes_total(&self) -> usize {
self.written // ?
}
}