From fe99bb9548b25d6dbe1b5f4b44f1f72c0499711e Mon Sep 17 00:00:00 2001 From: Yuchen Liang Date: Fri, 4 Oct 2024 11:16:53 -0400 Subject: [PATCH] use aligned buffer trait for read path Signed-off-by: Yuchen Liang --- libs/pageserver_api/src/models.rs | 12 ++++--- pageserver/src/tenant/block_io.rs | 6 ++-- pageserver/src/tenant/ephemeral_file.rs | 32 ++++++++++++------- .../tenant/storage_layer/inmemory_layer.rs | 4 +-- .../inmemory_layer/vectored_dio_read.rs | 31 +++++++++++------- pageserver/src/virtual_file.rs | 32 ++++++++++++------- pageserver/src/virtual_file/dio.rs | 28 ++++++++++++++++ .../owned_buffers_io/io_buf_aligned.rs | 6 ++-- 8 files changed, 106 insertions(+), 45 deletions(-) diff --git a/libs/pageserver_api/src/models.rs b/libs/pageserver_api/src/models.rs index 3f045a35c2..44546c5fc8 100644 --- a/libs/pageserver_api/src/models.rs +++ b/libs/pageserver_api/src/models.rs @@ -1015,12 +1015,14 @@ pub mod virtual_file { } impl IoMode { + #[cfg(target_os = "linux")] pub const fn preferred() -> Self { - if cfg!(target_os = "linux") { - Self::Direct - } else { - Self::Buffered - } + Self::Direct + } + + #[cfg(target_os = "macos")] + pub const fn preferred() -> Self { + Self::Buffered } } diff --git a/pageserver/src/tenant/block_io.rs b/pageserver/src/tenant/block_io.rs index 3afa3a86b9..f82ea48fdd 100644 --- a/pageserver/src/tenant/block_io.rs +++ b/pageserver/src/tenant/block_io.rs @@ -5,6 +5,8 @@ use super::storage_layer::delta_layer::{Adapter, DeltaLayerInner}; use crate::context::RequestContext; use crate::page_cache::{self, FileId, PageReadGuard, PageWriteGuard, ReadBufResult, PAGE_SZ}; +#[cfg(test)] +use crate::virtual_file::dio::IoBufferMut; use crate::virtual_file::VirtualFile; use bytes::Bytes; use std::ops::Deref; @@ -40,7 +42,7 @@ pub enum BlockLease<'a> { #[cfg(test)] Arc(std::sync::Arc<[u8; PAGE_SZ]>), #[cfg(test)] - Vec(Vec), + IoBufferMut(IoBufferMut), } impl From> for BlockLease<'static> { @@ -67,7 +69,7 @@ impl<'a> Deref for BlockLease<'a> { #[cfg(test)] BlockLease::Arc(v) => v.deref(), #[cfg(test)] - BlockLease::Vec(v) => { + BlockLease::IoBufferMut(v) => { TryFrom::try_from(&v[..]).expect("caller must ensure that v has PAGE_SZ") } } diff --git a/pageserver/src/tenant/ephemeral_file.rs b/pageserver/src/tenant/ephemeral_file.rs index a62a47f9a7..180865df8f 100644 --- a/pageserver/src/tenant/ephemeral_file.rs +++ b/pageserver/src/tenant/ephemeral_file.rs @@ -6,6 +6,8 @@ use crate::config::PageServerConf; use crate::context::RequestContext; use crate::page_cache; use crate::tenant::storage_layer::inmemory_layer::vectored_dio_read::File; +use crate::virtual_file::dio::IoBufferMut; +use crate::virtual_file::owned_buffers_io::io_buf_aligned::IoBufAlignedMut; use crate::virtual_file::owned_buffers_io::slice::SliceMutExt; use crate::virtual_file::owned_buffers_io::util::size_tracking_writer; use crate::virtual_file::owned_buffers_io::write::Buffer; @@ -107,15 +109,16 @@ impl EphemeralFile { self.page_cache_file_id } - pub(crate) async fn load_to_vec(&self, ctx: &RequestContext) -> Result, io::Error> { + pub(crate) async fn load_to_buf(&self, ctx: &RequestContext) -> Result { let size = self.len().into_usize(); - let vec = Vec::with_capacity(size); - let (slice, nread) = self.read_exact_at_eof_ok(0, vec.slice_full(), ctx).await?; + let align = virtual_file::get_io_buffer_alignment(); + let buf = IoBufferMut::with_capacity_aligned(size, align); + let (slice, nread) = self.read_exact_at_eof_ok(0, buf.slice_full(), ctx).await?; assert_eq!(nread, size); - let vec = slice.into_inner(); - assert_eq!(vec.len(), nread); - assert_eq!(vec.capacity(), size, "we shouldn't be reallocating"); - Ok(vec) + let buf = slice.into_inner(); + assert_eq!(buf.len(), nread); + assert_eq!(buf.capacity(), size, "we shouldn't be reallocating"); + Ok(buf) } /// Returns the offset at which the first byte of the input was written, for use @@ -158,7 +161,7 @@ impl EphemeralFile { } impl super::storage_layer::inmemory_layer::vectored_dio_read::File for EphemeralFile { - async fn read_exact_at_eof_ok<'a, 'b, B: tokio_epoll_uring::IoBufMut + Send>( + async fn read_exact_at_eof_ok<'a, 'b, B: IoBufAlignedMut + Send>( &'b self, start: u64, dst: tokio_epoll_uring::Slice, @@ -343,9 +346,10 @@ mod tests { } assert!(file.len() as usize == write_nbytes); + let align = virtual_file::get_io_buffer_alignment(); for i in 0..write_nbytes { assert_eq!(value_offsets[i], i.into_u64()); - let buf = Vec::with_capacity(1); + let buf = IoBufferMut::with_capacity_aligned(1, align); let (buf_slice, nread) = file .read_exact_at_eof_ok(i.into_u64(), buf.slice_full(), &ctx) .await @@ -385,7 +389,7 @@ mod tests { // assert the state is as this test expects it to be assert_eq!( - &file.load_to_vec(&ctx).await.unwrap(), + &file.load_to_buf(&ctx).await.unwrap()[..], &content[0..cap + cap / 2] ); let md = file @@ -440,13 +444,17 @@ mod tests { let (buf, nread) = file .read_exact_at_eof_ok( start.into_u64(), - Vec::with_capacity(len).slice_full(), + IoBufferMut::with_capacity_aligned( + len, + virtual_file::get_io_buffer_alignment(), + ) + .slice_full(), ctx, ) .await .unwrap(); assert_eq!(nread, len); - assert_eq!(&buf.into_inner(), &content[start..(start + len)]); + assert_eq!(&buf.into_inner()[..], &content[start..(start + len)]); } }; diff --git a/pageserver/src/tenant/storage_layer/inmemory_layer.rs b/pageserver/src/tenant/storage_layer/inmemory_layer.rs index e487bee1f2..b27387812a 100644 --- a/pageserver/src/tenant/storage_layer/inmemory_layer.rs +++ b/pageserver/src/tenant/storage_layer/inmemory_layer.rs @@ -809,9 +809,9 @@ impl InMemoryLayer { match l0_flush_global_state { l0_flush::Inner::Direct { .. } => { - let file_contents: Vec = inner.file.load_to_vec(ctx).await?; + let file_contents = inner.file.load_to_buf(ctx).await?; - let file_contents = Bytes::from(file_contents); + let file_contents = Bytes::copy_from_slice(&file_contents[..]); for (key, vec_map) in inner.index.iter() { // Write all page versions diff --git a/pageserver/src/tenant/storage_layer/inmemory_layer/vectored_dio_read.rs b/pageserver/src/tenant/storage_layer/inmemory_layer/vectored_dio_read.rs index 0683e15659..97d4962b73 100644 --- a/pageserver/src/tenant/storage_layer/inmemory_layer/vectored_dio_read.rs +++ b/pageserver/src/tenant/storage_layer/inmemory_layer/vectored_dio_read.rs @@ -9,6 +9,7 @@ use tokio_epoll_uring::{BoundedBuf, IoBufMut, Slice}; use crate::{ assert_u64_eq_usize::{U64IsUsize, UsizeIsU64}, context::RequestContext, + virtual_file::{self, dio::IoBufferMut, owned_buffers_io::io_buf_aligned::IoBufAlignedMut}, }; /// The file interface we require. At runtime, this is a [`crate::tenant::ephemeral_file::EphemeralFile`]. @@ -24,7 +25,7 @@ pub trait File: Send { /// [`std::io::ErrorKind::UnexpectedEof`] error if the file is shorter than `start+dst.len()`. /// /// No guarantees are made about the remaining bytes in `dst` in case of a short read. - async fn read_exact_at_eof_ok<'a, 'b, B: IoBufMut + Send>( + async fn read_exact_at_eof_ok<'a, 'b, B: IoBufAlignedMut + Send>( &'b self, start: u64, dst: Slice, @@ -227,7 +228,12 @@ where // Execute physical reads and fill the logical read buffers // TODO: pipelined reads; prefetch; - let get_io_buffer = |nchunks| Vec::with_capacity(nchunks * DIO_CHUNK_SIZE); + let get_io_buffer = |nchunks| { + IoBufferMut::with_capacity_aligned( + nchunks * DIO_CHUNK_SIZE, + virtual_file::get_io_buffer_alignment(), + ) + }; for PhysicalRead { start_chunk_no, nchunks, @@ -459,7 +465,10 @@ mod tests { let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error); let file = InMemoryFile::new_random(10); let test_read = |pos, len| { - let buf = vec![0; len]; + let buf = IoBufferMut::with_capacity_aligned_zeroed( + len, + virtual_file::get_io_buffer_alignment(), + ); let fut = file.read_exact_at_eof_ok(pos, buf.slice_full(), &ctx); use futures::FutureExt; let (slice, nread) = fut @@ -470,9 +479,9 @@ mod tests { buf.truncate(nread); buf }; - assert_eq!(test_read(0, 1), &file.content[0..1]); - assert_eq!(test_read(1, 2), &file.content[1..3]); - assert_eq!(test_read(9, 2), &file.content[9..]); + assert_eq!(&test_read(0, 1), &file.content[0..1]); + assert_eq!(&test_read(1, 2), &file.content[1..3]); + assert_eq!(&test_read(9, 2), &file.content[9..]); assert!(test_read(10, 2).is_empty()); assert!(test_read(11, 2).is_empty()); } @@ -609,7 +618,7 @@ mod tests { } impl<'x> File for RecorderFile<'x> { - async fn read_exact_at_eof_ok<'a, 'b, B: IoBufMut + Send>( + async fn read_exact_at_eof_ok<'a, 'b, B: IoBufAlignedMut + Send>( &'b self, start: u64, dst: Slice, @@ -782,7 +791,7 @@ mod tests { 2048, 1024 => Err("foo".to_owned()), }; - let buf = Vec::with_capacity(512); + let buf = IoBufferMut::with_capacity_aligned(512, 512); let (buf, nread) = mock_file .read_exact_at_eof_ok(0, buf.slice_full(), &ctx) .await @@ -790,7 +799,7 @@ mod tests { assert_eq!(nread, 512); assert_eq!(&buf.into_inner()[..nread], &[0; 512]); - let buf = Vec::with_capacity(512); + let buf = IoBufferMut::with_capacity_aligned(512, 512); let (buf, nread) = mock_file .read_exact_at_eof_ok(512, buf.slice_full(), &ctx) .await @@ -798,7 +807,7 @@ mod tests { assert_eq!(nread, 512); assert_eq!(&buf.into_inner()[..nread], &[1; 512]); - let buf = Vec::with_capacity(512); + let buf = IoBufferMut::with_capacity_aligned(512, 512); let (buf, nread) = mock_file .read_exact_at_eof_ok(1024, buf.slice_full(), &ctx) .await @@ -806,7 +815,7 @@ mod tests { assert_eq!(nread, 10); assert_eq!(&buf.into_inner()[..nread], &[2; 10]); - let buf = Vec::with_capacity(1024); + let buf = IoBufferMut::with_capacity_aligned(1024, 512); let err = mock_file .read_exact_at_eof_ok(2048, buf.slice_full(), &ctx) .await diff --git a/pageserver/src/virtual_file.rs b/pageserver/src/virtual_file.rs index fdc3bd5bbb..57e12b894a 100644 --- a/pageserver/src/virtual_file.rs +++ b/pageserver/src/virtual_file.rs @@ -17,7 +17,10 @@ use crate::metrics::{StorageIoOperation, STORAGE_IO_SIZE, STORAGE_IO_TIME_METRIC use crate::page_cache::{PageWriteGuard, PAGE_SZ}; use crate::tenant::TENANTS_SEGMENT_NAME; use camino::{Utf8Path, Utf8PathBuf}; +#[cfg(test)] +use dio::IoBufferMut; use once_cell::sync::OnceCell; +use owned_buffers_io::io_buf_aligned::IoBufAlignedMut; use owned_buffers_io::io_buf_ext::FullSlice; use pageserver_api::config::defaults::DEFAULT_IO_BUFFER_ALIGNMENT; use pageserver_api::shard::TenantShardId; @@ -203,7 +206,7 @@ impl VirtualFile { ctx: &RequestContext, ) -> Result, Error> where - Buf: IoBufMut + Send, + Buf: IoBufAlignedMut + Send, { self.inner().read_exact_at(slice, offset, ctx).await } @@ -778,7 +781,7 @@ impl VirtualFileInner { ctx: &RequestContext, ) -> Result, Error> where - Buf: IoBufMut + Send, + Buf: IoBufAlignedMut + Send, { let assert_we_return_original_bounds = if cfg!(debug_assertions) { Some((slice.stable_ptr() as usize, slice.bytes_total())) @@ -1229,12 +1232,15 @@ impl VirtualFileInner { ctx: &RequestContext, ) -> Result, std::io::Error> { use crate::page_cache::PAGE_SZ; - let slice = Vec::with_capacity(PAGE_SZ).slice_full(); + let align = get_io_buffer_alignment(); + let slice = IoBufferMut::with_capacity_aligned(PAGE_SZ, align).slice_full(); assert_eq!(slice.bytes_total(), PAGE_SZ); let slice = self .read_exact_at(slice, blknum as u64 * (PAGE_SZ as u64), ctx) .await?; - Ok(crate::tenant::block_io::BlockLease::Vec(slice.into_inner())) + Ok(crate::tenant::block_io::BlockLease::IoBufferMut( + slice.into_inner(), + )) } async fn read_to_end(&mut self, buf: &mut Vec, ctx: &RequestContext) -> Result<(), Error> { @@ -1420,6 +1426,7 @@ mod tests { use crate::task_mgr::TaskKind; use super::*; + use dio::IoBufferMut; use owned_buffers_io::io_buf_ext::IoBufExt; use owned_buffers_io::slice::SliceMutExt; use rand::seq::SliceRandom; @@ -1443,10 +1450,10 @@ mod tests { impl MaybeVirtualFile { async fn read_exact_at( &self, - mut slice: tokio_epoll_uring::Slice>, + mut slice: tokio_epoll_uring::Slice, offset: u64, ctx: &RequestContext, - ) -> Result>, Error> { + ) -> Result, Error> { match self { MaybeVirtualFile::VirtualFile(file) => file.read_exact_at(slice, offset, ctx).await, MaybeVirtualFile::File(file) => { @@ -1514,11 +1521,13 @@ mod tests { len: usize, ctx: &RequestContext, ) -> Result { - let slice = Vec::with_capacity(len).slice_full(); + let slice = IoBufferMut::with_capacity_aligned(len, 512).slice_full(); assert_eq!(slice.bytes_total(), len); let slice = self.read_exact_at(slice, pos, ctx).await?; - let vec = slice.into_inner(); - assert_eq!(vec.len(), len); + let buf = slice.into_inner(); + assert_eq!(buf.len(), len); + let mut vec = Vec::with_capacity(buf.len()); + vec.extend_from_slice(&buf); Ok(String::from_utf8(vec).unwrap()) } } @@ -1707,6 +1716,7 @@ mod tests { const VIRTUAL_FILES: usize = 100; const THREADS: usize = 100; const SAMPLE: [u8; SIZE] = [0xADu8; SIZE]; + let align = super::get_io_buffer_alignment(); let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error); let testdir = crate::config::PageServerConf::test_repo_dir("vfile_concurrency"); @@ -1743,7 +1753,7 @@ mod tests { let files = files.clone(); let ctx = ctx.detached_child(TaskKind::UnitTest, DownloadBehavior::Error); let hdl = rt.spawn(async move { - let mut buf = vec![0u8; SIZE]; + let mut buf = IoBufferMut::with_capacity_aligned_zeroed(SIZE, align); let mut rng = rand::rngs::OsRng; for _ in 1..1000 { let f = &files[rng.gen_range(0..files.len())]; @@ -1752,7 +1762,7 @@ mod tests { .await .unwrap() .into_inner(); - assert!(buf == SAMPLE); + assert!(&buf == SAMPLE.as_slice()); } }); hdls.push(hdl); diff --git a/pageserver/src/virtual_file/dio.rs b/pageserver/src/virtual_file/dio.rs index 7baf968bbb..3bac7be057 100644 --- a/pageserver/src/virtual_file/dio.rs +++ b/pageserver/src/virtual_file/dio.rs @@ -11,12 +11,14 @@ use std::{ use bytes::buf::UninitSlice; +#[derive(Debug)] struct IoBufferPtr(*mut u8); // SAFETY: We gurantees no one besides `IoBufferPtr` itself has the raw pointer. unsafe impl Send for IoBufferPtr {} /// An aligned buffer type used for I/O. +#[derive(Debug)] pub struct IoBufferMut { ptr: IoBufferPtr, capacity: usize, @@ -141,6 +143,14 @@ impl IoBufferMut { } } + /// Shortens the buffer, keeping the first len bytes. + pub fn truncate(&mut self, len: usize) { + if len > self.len { + return; + } + self.len = len; + } + fn reserve_inner(&mut self, additional: usize) { let Some(required_cap) = self.len().checked_add(additional) else { capacity_overflow() @@ -210,6 +220,24 @@ impl Drop for IoBufferMut { } } +impl AsRef<[u8]> for IoBufferMut { + fn as_ref(&self) -> &[u8] { + self.as_slice() + } +} + +impl AsMut<[u8]> for IoBufferMut { + fn as_mut(&mut self) -> &mut [u8] { + self.as_mut_slice() + } +} + +impl PartialEq<[u8]> for IoBufferMut { + fn eq(&self, other: &[u8]) -> bool { + self.as_slice().eq(other) + } +} + impl Deref for IoBufferMut { type Target = [u8]; diff --git a/pageserver/src/virtual_file/owned_buffers_io/io_buf_aligned.rs b/pageserver/src/virtual_file/owned_buffers_io/io_buf_aligned.rs index 19f60123c4..417103c0e5 100644 --- a/pageserver/src/virtual_file/owned_buffers_io/io_buf_aligned.rs +++ b/pageserver/src/virtual_file/owned_buffers_io/io_buf_aligned.rs @@ -2,8 +2,10 @@ use tokio_epoll_uring::IoBufMut; -use crate::virtual_file::dio::IoBufferMut; +use crate::virtual_file::{dio::IoBufferMut, PageWriteGuardBuf}; -pub(crate) trait IoBufAlignedMut: IoBufMut {} +pub trait IoBufAlignedMut: IoBufMut {} impl IoBufAlignedMut for IoBufferMut {} + +impl IoBufAlignedMut for PageWriteGuardBuf {}