From ce7cd361003b3769131e67191c482f7bb74edf49 Mon Sep 17 00:00:00 2001 From: Yuchen Liang Date: Tue, 12 Nov 2024 01:17:52 +0000 Subject: [PATCH] add IoBufAligned marker Signed-off-by: Yuchen Liang --- .../tenant/remote_timeline_client/download.rs | 20 +++++++++---------- pageserver/src/virtual_file.rs | 12 +++++------ .../owned_buffers_io/aligned_buffer/buffer.rs | 10 +++++++++- .../owned_buffers_io/io_buf_aligned.rs | 8 ++++++-- .../virtual_file/owned_buffers_io/write.rs | 20 +++++++++++-------- .../owned_buffers_io/write/flush.rs | 10 ++++++---- 6 files changed, 49 insertions(+), 31 deletions(-) diff --git a/pageserver/src/tenant/remote_timeline_client/download.rs b/pageserver/src/tenant/remote_timeline_client/download.rs index c39a95d452..e9230c58b6 100644 --- a/pageserver/src/tenant/remote_timeline_client/download.rs +++ b/pageserver/src/tenant/remote_timeline_client/download.rs @@ -27,9 +27,7 @@ use crate::span::{ use crate::tenant::remote_timeline_client::{remote_layer_path, remote_timelines_path}; use crate::tenant::storage_layer::LayerName; use crate::tenant::Generation; -#[cfg_attr(target_os = "macos", allow(unused_imports))] -use crate::virtual_file::owned_buffers_io::io_buf_ext::IoBufExt; -use crate::virtual_file::{on_fatal_io_error, MaybeFatalIo, VirtualFile}; +use crate::virtual_file::{on_fatal_io_error, IoBufferMut, MaybeFatalIo, VirtualFile}; use crate::TEMP_FILE_SUFFIX; use remote_storage::{DownloadError, DownloadOpts, GenericRemoteStorage, ListingMode, RemotePath}; use utils::crashsafe::path_with_suffix_extension; @@ -205,7 +203,6 @@ async fn download_object<'a>( #[cfg(target_os = "linux")] crate::virtual_file::io_engine::IoEngine::TokioEpollUring => { use crate::virtual_file::owned_buffers_io; - use bytes::BytesMut; async { let destination_file = Arc::new( VirtualFile::create(dst_path, ctx) @@ -225,11 +222,12 @@ async fn download_object<'a>( // TODO: use vectored write (writev) once supported by tokio-epoll-uring. // There's chunks_vectored() on the stream. let (bytes_amount, destination_file) = async { - let mut buffered = owned_buffers_io::write::BufferedWriter::::new( - destination_file, - || BytesMut::with_capacity(super::BUFFER_SIZE), - ctx, - ); + let mut buffered = + owned_buffers_io::write::BufferedWriter::::new( + destination_file, + || IoBufferMut::with_capacity(super::BUFFER_SIZE), + ctx, + ); while let Some(res) = futures::StreamExt::next(&mut download.download_stream).await { @@ -237,7 +235,9 @@ async fn download_object<'a>( Ok(chunk) => chunk, Err(e) => return Err(e), }; - buffered.write_buffered(chunk.slice_len(), ctx).await?; + // TODO(yuchen): might have performance issue when using borrowed version? + // Problem: input is Bytes, does not satisify IO alignment requirement. + buffered.write_buffered_borrowed(&chunk, ctx).await?; } let inner = buffered.flush_and_into_inner(ctx).await?; Ok(inner) diff --git a/pageserver/src/virtual_file.rs b/pageserver/src/virtual_file.rs index 40e0e97b0c..5a4f387a33 100644 --- a/pageserver/src/virtual_file.rs +++ b/pageserver/src/virtual_file.rs @@ -20,7 +20,7 @@ use camino::{Utf8Path, Utf8PathBuf}; use once_cell::sync::OnceCell; use owned_buffers_io::aligned_buffer::buffer::AlignedBuffer; use owned_buffers_io::aligned_buffer::{AlignedBufferMut, AlignedSlice, ConstAlign}; -use owned_buffers_io::io_buf_aligned::IoBufAlignedMut; +use owned_buffers_io::io_buf_aligned::{IoBufAligned, IoBufAlignedMut}; use owned_buffers_io::io_buf_ext::FullSlice; use pageserver_api::config::defaults::DEFAULT_IO_BUFFER_ALIGNMENT; use pageserver_api::shard::TenantShardId; @@ -212,7 +212,7 @@ impl VirtualFile { self.inner.read_exact_at_page(page, offset, ctx).await } - pub async fn write_all_at( + pub async fn write_all_at( &self, buf: FullSlice, offset: u64, @@ -1295,7 +1295,7 @@ impl Drop for VirtualFileInner { } impl OwnedAsyncWriter for VirtualFile { - async fn write_all_at( + async fn write_all_at( &self, buf: FullSlice, offset: u64, @@ -1417,7 +1417,7 @@ mod tests { } } } - async fn write_all_at( + async fn write_all_at( &self, buf: FullSlice, offset: u64, @@ -1619,10 +1619,10 @@ mod tests { ) .await?; file_b - .write_all_at(b"BAR".to_vec().slice_len(), 3, &ctx) + .write_all_at(IoBuffer::from(b"BAR").slice_len(), 3, &ctx) .await?; file_b - .write_all_at(b"FOO".to_vec().slice_len(), 0, &ctx) + .write_all_at(IoBuffer::from(b"FOO").slice_len(), 0, &ctx) .await?; assert_eq!(file_b.read_string_at(2, 3, &ctx).await?, "OBA"); diff --git a/pageserver/src/virtual_file/owned_buffers_io/aligned_buffer/buffer.rs b/pageserver/src/virtual_file/owned_buffers_io/aligned_buffer/buffer.rs index 4474303b4f..ff5f2cd163 100644 --- a/pageserver/src/virtual_file/owned_buffers_io/aligned_buffer/buffer.rs +++ b/pageserver/src/virtual_file/owned_buffers_io/aligned_buffer/buffer.rs @@ -3,7 +3,7 @@ use std::{ sync::Arc, }; -use super::{alignment::Alignment, raw::RawAlignedBuffer, AlignedBufferMut}; +use super::{alignment::Alignment, raw::RawAlignedBuffer, AlignedBufferMut, ConstAlign}; /// An shared, immutable aligned buffer type. #[derive(Clone, Debug)] @@ -114,6 +114,14 @@ impl PartialEq<[u8]> for AlignedBuffer { } } +impl From<&[u8; N]> for AlignedBuffer> { + fn from(value: &[u8; N]) -> Self { + let mut buf = AlignedBufferMut::with_capacity(N); + buf.extend_from_slice(value); + buf.freeze() + } +} + /// SAFETY: the underlying buffer references a stable memory region. unsafe impl tokio_epoll_uring::IoBuf for AlignedBuffer { fn stable_ptr(&self) -> *const u8 { diff --git a/pageserver/src/virtual_file/owned_buffers_io/io_buf_aligned.rs b/pageserver/src/virtual_file/owned_buffers_io/io_buf_aligned.rs index dba695196e..6ef0168548 100644 --- a/pageserver/src/virtual_file/owned_buffers_io/io_buf_aligned.rs +++ b/pageserver/src/virtual_file/owned_buffers_io/io_buf_aligned.rs @@ -1,9 +1,13 @@ -use tokio_epoll_uring::IoBufMut; +use tokio_epoll_uring::{IoBuf, IoBufMut}; -use crate::virtual_file::{IoBufferMut, PageWriteGuardBuf}; +use crate::virtual_file::{IoBuffer, IoBufferMut, PageWriteGuardBuf}; pub trait IoBufAlignedMut: IoBufMut {} +pub trait IoBufAligned: IoBuf {} + impl IoBufAlignedMut for IoBufferMut {} +impl IoBufAligned for IoBuffer {} + impl IoBufAlignedMut for PageWriteGuardBuf {} diff --git a/pageserver/src/virtual_file/owned_buffers_io/write.rs b/pageserver/src/virtual_file/owned_buffers_io/write.rs index 429138bc76..d29c7af316 100644 --- a/pageserver/src/virtual_file/owned_buffers_io/write.rs +++ b/pageserver/src/virtual_file/owned_buffers_io/write.rs @@ -10,12 +10,15 @@ use crate::{ virtual_file::{IoBuffer, IoBufferMut}, }; -use super::io_buf_ext::{FullSlice, IoBufExt}; +use super::{ + io_buf_aligned::IoBufAligned, + io_buf_ext::{FullSlice, IoBufExt}, +}; /// A trait for doing owned-buffer write IO. /// Think [`tokio::io::AsyncWrite`] but with owned buffers. pub trait OwnedAsyncWriter { - fn write_all_at( + fn write_all_at( &self, buf: FullSlice, offset: u64, @@ -53,7 +56,7 @@ pub struct BufferedWriter { impl BufferedWriter where B: Buffer + Send + 'static, - Buf: IoBuf + Send + Sync + Clone, + Buf: IoBufAligned + Send + Sync + Clone, W: OwnedAsyncWriter + Send + Sync + 'static + std::fmt::Debug, { pub fn new(writer: Arc, buf_new: impl Fn() -> B, ctx: &RequestContext) -> Self { @@ -110,8 +113,8 @@ where } /// Guarantees that if Ok() is returned, all bytes in `chunk` have been accepted. - #[cfg_attr(target_os = "macos", allow(dead_code))] - pub async fn write_buffered( + #[allow(dead_code)] + pub async fn write_buffered( &mut self, chunk: FullSlice, ctx: &RequestContext, @@ -259,7 +262,6 @@ impl Buffer for IoBufferMut { } fn extend_from_slice(&mut self, other: &[u8]) { - self.reserve(other.len()); IoBufferMut::extend_from_slice(self, other); } @@ -307,7 +309,7 @@ mod tests { } impl OwnedAsyncWriter for RecorderWriter { - async fn write_all_at( + async fn write_all_at( &self, buf: FullSlice, offset: u64, @@ -327,8 +329,10 @@ mod tests { macro_rules! write { ($writer:ident, $data:literal) => {{ + let mut buf = crate::virtual_file::IoBufferMut::with_capacity(2); + buf.extend_from_slice($data); $writer - .write_buffered(::bytes::Bytes::from_static($data).slice_len(), &test_ctx()) + .write_buffered(buf.freeze().slice_len(), &test_ctx()) .await?; }}; } diff --git a/pageserver/src/virtual_file/owned_buffers_io/write/flush.rs b/pageserver/src/virtual_file/owned_buffers_io/write/flush.rs index a9c1404712..49ad954725 100644 --- a/pageserver/src/virtual_file/owned_buffers_io/write/flush.rs +++ b/pageserver/src/virtual_file/owned_buffers_io/write/flush.rs @@ -1,9 +1,11 @@ use std::sync::Arc; use tokio::sync::mpsc; -use tokio_epoll_uring::IoBuf; -use crate::{context::RequestContext, virtual_file::owned_buffers_io::io_buf_ext::FullSlice}; +use crate::{ + context::RequestContext, + virtual_file::owned_buffers_io::{io_buf_aligned::IoBufAligned, io_buf_ext::FullSlice}, +}; use super::{Buffer, OwnedAsyncWriter}; @@ -68,7 +70,7 @@ pub struct FlushBackgroundTask { impl FlushBackgroundTask where - Buf: IoBuf + Send + Sync, + Buf: IoBufAligned + Send + Sync, W: OwnedAsyncWriter + Sync + 'static, { fn new( @@ -105,7 +107,7 @@ where impl FlushHandle where - Buf: IoBuf + Send + Sync + Clone, + Buf: IoBufAligned + Send + Sync + Clone, W: OwnedAsyncWriter + Send + Sync + 'static + std::fmt::Debug, { /// Spawns a new background flush task and obtains a handle.