add IoBufAligned marker

Signed-off-by: Yuchen Liang <yuchen@neon.tech>
This commit is contained in:
Yuchen Liang
2024-11-12 01:17:52 +00:00
parent b0d7fc7564
commit ce7cd36100
6 changed files with 49 additions and 31 deletions

View File

@@ -27,9 +27,7 @@ use crate::span::{
use crate::tenant::remote_timeline_client::{remote_layer_path, remote_timelines_path};
use crate::tenant::storage_layer::LayerName;
use crate::tenant::Generation;
#[cfg_attr(target_os = "macos", allow(unused_imports))]
use crate::virtual_file::owned_buffers_io::io_buf_ext::IoBufExt;
use crate::virtual_file::{on_fatal_io_error, MaybeFatalIo, VirtualFile};
use crate::virtual_file::{on_fatal_io_error, IoBufferMut, MaybeFatalIo, VirtualFile};
use crate::TEMP_FILE_SUFFIX;
use remote_storage::{DownloadError, DownloadOpts, GenericRemoteStorage, ListingMode, RemotePath};
use utils::crashsafe::path_with_suffix_extension;
@@ -205,7 +203,6 @@ async fn download_object<'a>(
#[cfg(target_os = "linux")]
crate::virtual_file::io_engine::IoEngine::TokioEpollUring => {
use crate::virtual_file::owned_buffers_io;
use bytes::BytesMut;
async {
let destination_file = Arc::new(
VirtualFile::create(dst_path, ctx)
@@ -225,11 +222,12 @@ async fn download_object<'a>(
// TODO: use vectored write (writev) once supported by tokio-epoll-uring.
// There's chunks_vectored() on the stream.
let (bytes_amount, destination_file) = async {
let mut buffered = owned_buffers_io::write::BufferedWriter::<BytesMut, _>::new(
destination_file,
|| BytesMut::with_capacity(super::BUFFER_SIZE),
ctx,
);
let mut buffered =
owned_buffers_io::write::BufferedWriter::<IoBufferMut, _>::new(
destination_file,
|| IoBufferMut::with_capacity(super::BUFFER_SIZE),
ctx,
);
while let Some(res) =
futures::StreamExt::next(&mut download.download_stream).await
{
@@ -237,7 +235,9 @@ async fn download_object<'a>(
Ok(chunk) => chunk,
Err(e) => return Err(e),
};
buffered.write_buffered(chunk.slice_len(), ctx).await?;
// TODO(yuchen): might have performance issue when using borrowed version?
// Problem: input is Bytes, does not satisify IO alignment requirement.
buffered.write_buffered_borrowed(&chunk, ctx).await?;
}
let inner = buffered.flush_and_into_inner(ctx).await?;
Ok(inner)

View File

@@ -20,7 +20,7 @@ use camino::{Utf8Path, Utf8PathBuf};
use once_cell::sync::OnceCell;
use owned_buffers_io::aligned_buffer::buffer::AlignedBuffer;
use owned_buffers_io::aligned_buffer::{AlignedBufferMut, AlignedSlice, ConstAlign};
use owned_buffers_io::io_buf_aligned::IoBufAlignedMut;
use owned_buffers_io::io_buf_aligned::{IoBufAligned, IoBufAlignedMut};
use owned_buffers_io::io_buf_ext::FullSlice;
use pageserver_api::config::defaults::DEFAULT_IO_BUFFER_ALIGNMENT;
use pageserver_api::shard::TenantShardId;
@@ -212,7 +212,7 @@ impl VirtualFile {
self.inner.read_exact_at_page(page, offset, ctx).await
}
pub async fn write_all_at<Buf: IoBuf + Send>(
pub async fn write_all_at<Buf: IoBufAligned + Send>(
&self,
buf: FullSlice<Buf>,
offset: u64,
@@ -1295,7 +1295,7 @@ impl Drop for VirtualFileInner {
}
impl OwnedAsyncWriter for VirtualFile {
async fn write_all_at<Buf: IoBuf + Send>(
async fn write_all_at<Buf: IoBufAligned + Send>(
&self,
buf: FullSlice<Buf>,
offset: u64,
@@ -1417,7 +1417,7 @@ mod tests {
}
}
}
async fn write_all_at<Buf: IoBuf + Send>(
async fn write_all_at<Buf: IoBufAligned + Send>(
&self,
buf: FullSlice<Buf>,
offset: u64,
@@ -1619,10 +1619,10 @@ mod tests {
)
.await?;
file_b
.write_all_at(b"BAR".to_vec().slice_len(), 3, &ctx)
.write_all_at(IoBuffer::from(b"BAR").slice_len(), 3, &ctx)
.await?;
file_b
.write_all_at(b"FOO".to_vec().slice_len(), 0, &ctx)
.write_all_at(IoBuffer::from(b"FOO").slice_len(), 0, &ctx)
.await?;
assert_eq!(file_b.read_string_at(2, 3, &ctx).await?, "OBA");

View File

@@ -3,7 +3,7 @@ use std::{
sync::Arc,
};
use super::{alignment::Alignment, raw::RawAlignedBuffer, AlignedBufferMut};
use super::{alignment::Alignment, raw::RawAlignedBuffer, AlignedBufferMut, ConstAlign};
/// An shared, immutable aligned buffer type.
#[derive(Clone, Debug)]
@@ -114,6 +114,14 @@ impl<A: Alignment> PartialEq<[u8]> for AlignedBuffer<A> {
}
}
impl<const A: usize, const N: usize> From<&[u8; N]> for AlignedBuffer<ConstAlign<A>> {
fn from(value: &[u8; N]) -> Self {
let mut buf = AlignedBufferMut::with_capacity(N);
buf.extend_from_slice(value);
buf.freeze()
}
}
/// SAFETY: the underlying buffer references a stable memory region.
unsafe impl<A: Alignment> tokio_epoll_uring::IoBuf for AlignedBuffer<A> {
fn stable_ptr(&self) -> *const u8 {

View File

@@ -1,9 +1,13 @@
use tokio_epoll_uring::IoBufMut;
use tokio_epoll_uring::{IoBuf, IoBufMut};
use crate::virtual_file::{IoBufferMut, PageWriteGuardBuf};
use crate::virtual_file::{IoBuffer, IoBufferMut, PageWriteGuardBuf};
pub trait IoBufAlignedMut: IoBufMut {}
pub trait IoBufAligned: IoBuf {}
impl IoBufAlignedMut for IoBufferMut {}
impl IoBufAligned for IoBuffer {}
impl IoBufAlignedMut for PageWriteGuardBuf {}

View File

@@ -10,12 +10,15 @@ use crate::{
virtual_file::{IoBuffer, IoBufferMut},
};
use super::io_buf_ext::{FullSlice, IoBufExt};
use super::{
io_buf_aligned::IoBufAligned,
io_buf_ext::{FullSlice, IoBufExt},
};
/// A trait for doing owned-buffer write IO.
/// Think [`tokio::io::AsyncWrite`] but with owned buffers.
pub trait OwnedAsyncWriter {
fn write_all_at<Buf: IoBuf + Send>(
fn write_all_at<Buf: IoBufAligned + Send>(
&self,
buf: FullSlice<Buf>,
offset: u64,
@@ -53,7 +56,7 @@ pub struct BufferedWriter<B: Buffer, W> {
impl<B, Buf, W> BufferedWriter<B, W>
where
B: Buffer<IoBuf = Buf> + Send + 'static,
Buf: IoBuf + Send + Sync + Clone,
Buf: IoBufAligned + Send + Sync + Clone,
W: OwnedAsyncWriter + Send + Sync + 'static + std::fmt::Debug,
{
pub fn new(writer: Arc<W>, buf_new: impl Fn() -> B, ctx: &RequestContext) -> Self {
@@ -110,8 +113,8 @@ where
}
/// Guarantees that if Ok() is returned, all bytes in `chunk` have been accepted.
#[cfg_attr(target_os = "macos", allow(dead_code))]
pub async fn write_buffered<S: IoBuf + Send>(
#[allow(dead_code)]
pub async fn write_buffered<S: IoBufAligned + Send>(
&mut self,
chunk: FullSlice<S>,
ctx: &RequestContext,
@@ -259,7 +262,6 @@ impl Buffer for IoBufferMut {
}
fn extend_from_slice(&mut self, other: &[u8]) {
self.reserve(other.len());
IoBufferMut::extend_from_slice(self, other);
}
@@ -307,7 +309,7 @@ mod tests {
}
impl OwnedAsyncWriter for RecorderWriter {
async fn write_all_at<Buf: IoBuf + Send>(
async fn write_all_at<Buf: IoBufAligned + Send>(
&self,
buf: FullSlice<Buf>,
offset: u64,
@@ -327,8 +329,10 @@ mod tests {
macro_rules! write {
($writer:ident, $data:literal) => {{
let mut buf = crate::virtual_file::IoBufferMut::with_capacity(2);
buf.extend_from_slice($data);
$writer
.write_buffered(::bytes::Bytes::from_static($data).slice_len(), &test_ctx())
.write_buffered(buf.freeze().slice_len(), &test_ctx())
.await?;
}};
}

View File

@@ -1,9 +1,11 @@
use std::sync::Arc;
use tokio::sync::mpsc;
use tokio_epoll_uring::IoBuf;
use crate::{context::RequestContext, virtual_file::owned_buffers_io::io_buf_ext::FullSlice};
use crate::{
context::RequestContext,
virtual_file::owned_buffers_io::{io_buf_aligned::IoBufAligned, io_buf_ext::FullSlice},
};
use super::{Buffer, OwnedAsyncWriter};
@@ -68,7 +70,7 @@ pub struct FlushBackgroundTask<Buf, W> {
impl<Buf, W> FlushBackgroundTask<Buf, W>
where
Buf: IoBuf + Send + Sync,
Buf: IoBufAligned + Send + Sync,
W: OwnedAsyncWriter + Sync + 'static,
{
fn new(
@@ -105,7 +107,7 @@ where
impl<Buf, W> FlushHandle<Buf, W>
where
Buf: IoBuf + Send + Sync + Clone,
Buf: IoBufAligned + Send + Sync + Clone,
W: OwnedAsyncWriter + Send + Sync + 'static + std::fmt::Debug,
{
/// Spawns a new background flush task and obtains a handle.