From b0d7fc75641ffc4571a055818864cb26dd64f869 Mon Sep 17 00:00:00 2001 From: Yuchen Liang Date: Mon, 11 Nov 2024 23:49:52 +0000 Subject: [PATCH] fix IoBufferMut::extend_from_slice Signed-off-by: Yuchen Liang --- pageserver/src/tenant/ephemeral_file.rs | 5 ++- .../owned_buffers_io/aligned_buffer/buffer.rs | 2 +- .../aligned_buffer/buffer_mut.rs | 33 ++++++++++++++++++- .../virtual_file/owned_buffers_io/write.rs | 30 +++++++++-------- 4 files changed, 52 insertions(+), 18 deletions(-) diff --git a/pageserver/src/tenant/ephemeral_file.rs b/pageserver/src/tenant/ephemeral_file.rs index 7f4bf6b9aa..9979d1980f 100644 --- a/pageserver/src/tenant/ephemeral_file.rs +++ b/pageserver/src/tenant/ephemeral_file.rs @@ -10,7 +10,6 @@ use crate::virtual_file::owned_buffers_io::io_buf_aligned::IoBufAlignedMut; use crate::virtual_file::owned_buffers_io::slice::SliceMutExt; use crate::virtual_file::owned_buffers_io::write::Buffer; use crate::virtual_file::{self, owned_buffers_io, IoBufferMut, VirtualFile}; -use bytes::BytesMut; use camino::Utf8PathBuf; use num_traits::Num; use pageserver_api::shard::TenantShardId; @@ -27,7 +26,7 @@ pub struct EphemeralFile { _timeline_id: TimelineId, page_cache_file_id: page_cache::FileId, bytes_written: u64, - buffered_writer: owned_buffers_io::write::BufferedWriter, + buffered_writer: owned_buffers_io::write::BufferedWriter, /// Gate guard is held on as long as we need to do operations in the path (delete on drop) _gate_guard: utils::sync::gate::GateGuard, } @@ -73,7 +72,7 @@ impl EphemeralFile { bytes_written: 0, buffered_writer: owned_buffers_io::write::BufferedWriter::new( file, - || BytesMut::with_capacity(TAIL_SZ), + || IoBufferMut::with_capacity(TAIL_SZ), ctx, ), _gate_guard: gate_guard, diff --git a/pageserver/src/virtual_file/owned_buffers_io/aligned_buffer/buffer.rs b/pageserver/src/virtual_file/owned_buffers_io/aligned_buffer/buffer.rs index eab36e8163..4474303b4f 100644 --- a/pageserver/src/virtual_file/owned_buffers_io/aligned_buffer/buffer.rs +++ b/pageserver/src/virtual_file/owned_buffers_io/aligned_buffer/buffer.rs @@ -6,7 +6,7 @@ use std::{ use super::{alignment::Alignment, raw::RawAlignedBuffer, AlignedBufferMut}; /// An shared, immutable aligned buffer type. -#[derive(Clone)] +#[derive(Clone, Debug)] pub struct AlignedBuffer { /// Shared raw buffer. raw: Arc>, diff --git a/pageserver/src/virtual_file/owned_buffers_io/aligned_buffer/buffer_mut.rs b/pageserver/src/virtual_file/owned_buffers_io/aligned_buffer/buffer_mut.rs index 849bf2bb07..08eafcb077 100644 --- a/pageserver/src/virtual_file/owned_buffers_io/aligned_buffer/buffer_mut.rs +++ b/pageserver/src/virtual_file/owned_buffers_io/aligned_buffer/buffer_mut.rs @@ -1,4 +1,7 @@ -use std::ops::{Deref, DerefMut}; +use std::{ + mem::MaybeUninit, + ops::{Deref, DerefMut}, +}; use super::{ alignment::{Alignment, ConstAlign}, @@ -132,6 +135,34 @@ impl AlignedBufferMut { let len = self.len(); AlignedBuffer::from_raw(self.raw, 0..len) } + + #[inline] + pub fn extend_from_slice(&mut self, extend: &[u8]) { + let cnt = extend.len(); + self.reserve(cnt); + + unsafe { + let dst = self.spare_capacity_mut(); + // Reserved above + debug_assert!(dst.len() >= cnt); + + core::ptr::copy_nonoverlapping(extend.as_ptr(), dst.as_mut_ptr().cast(), cnt); + } + + unsafe { + bytes::BufMut::advance_mut(self, cnt); + } + } + + #[inline] + fn spare_capacity_mut(&mut self) -> &mut [MaybeUninit] { + unsafe { + let ptr = self.as_mut_ptr().add(self.len()); + let len = self.capacity() - self.len(); + + core::slice::from_raw_parts_mut(ptr.cast(), len) + } + } } impl Deref for AlignedBufferMut { diff --git a/pageserver/src/virtual_file/owned_buffers_io/write.rs b/pageserver/src/virtual_file/owned_buffers_io/write.rs index f0399681f1..429138bc76 100644 --- a/pageserver/src/virtual_file/owned_buffers_io/write.rs +++ b/pageserver/src/virtual_file/owned_buffers_io/write.rs @@ -3,7 +3,7 @@ use std::sync::Arc; use bytes::BytesMut; use flush::FlushHandle; -use tokio_epoll_uring::{BoundedBufMut, IoBuf}; +use tokio_epoll_uring::IoBuf; use crate::{ context::RequestContext, @@ -131,10 +131,13 @@ where .pending(), 0 ); - let chunk = self - .writer - .write_all_at(FullSlice::must_new(chunk), self.bytes_amount, ctx) - .await?; + let chunk = OwnedAsyncWriter::write_all_at( + self.writer.as_ref(), + FullSlice::must_new(chunk), + self.bytes_amount, + ctx, + ) + .await?; self.bytes_amount += u64::try_from(chunk_len).unwrap(); return Ok((chunk_len, chunk)); } @@ -257,7 +260,7 @@ impl Buffer for IoBufferMut { fn extend_from_slice(&mut self, other: &[u8]) { self.reserve(other.len()); - BoundedBufMut::put_slice(self, other); + IoBufferMut::extend_from_slice(self, other); } fn pending(&self) -> usize { @@ -281,8 +284,6 @@ impl Buffer for IoBufferMut { mod tests { use std::sync::Mutex; - use bytes::BytesMut; - use super::*; use crate::context::{DownloadBehavior, RequestContext}; use crate::task_mgr::TaskKind; @@ -336,7 +337,7 @@ mod tests { async fn test_buffered_writes_only() -> std::io::Result<()> { let recorder = Arc::new(RecorderWriter::default()); let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error); - let mut writer = BufferedWriter::new(recorder, || BytesMut::with_capacity(2), &ctx); + let mut writer = BufferedWriter::new(recorder, || IoBufferMut::with_capacity(2), &ctx); write!(writer, b"a"); write!(writer, b"b"); write!(writer, b"c"); @@ -354,7 +355,7 @@ mod tests { async fn test_passthrough_writes_only() -> std::io::Result<()> { let recorder = Arc::new(RecorderWriter::default()); let ctx = test_ctx(); - let mut writer = BufferedWriter::new(recorder, || BytesMut::with_capacity(2), &ctx); + let mut writer = BufferedWriter::new(recorder, || IoBufferMut::with_capacity(2), &ctx); write!(writer, b"abc"); write!(writer, b"de"); write!(writer, b""); @@ -371,7 +372,7 @@ mod tests { async fn test_passthrough_write_with_nonempty_buffer() -> std::io::Result<()> { let recorder = Arc::new(RecorderWriter::default()); let ctx = test_ctx(); - let mut writer = BufferedWriter::new(recorder, || BytesMut::with_capacity(2), &ctx); + let mut writer = BufferedWriter::new(recorder, || IoBufferMut::with_capacity(2), &ctx); write!(writer, b"a"); write!(writer, b"bc"); write!(writer, b"d"); @@ -389,8 +390,11 @@ mod tests { let ctx = test_ctx(); let ctx = &ctx; let recorder = Arc::new(RecorderWriter::default()); - let mut writer = - BufferedWriter::<_, RecorderWriter>::new(recorder, || BytesMut::with_capacity(2), ctx); + let mut writer = BufferedWriter::<_, RecorderWriter>::new( + recorder, + || IoBufferMut::with_capacity(2), + ctx, + ); writer.write_buffered_borrowed(b"abc", ctx).await?; writer.write_buffered_borrowed(b"d", ctx).await?;