fix IoBufferMut::extend_from_slice

Signed-off-by: Yuchen Liang <yuchen@neon.tech>
This commit is contained in:
Yuchen Liang
2024-11-11 23:49:52 +00:00
parent 7b34e73c15
commit b0d7fc7564
4 changed files with 52 additions and 18 deletions

View File

@@ -10,7 +10,6 @@ use crate::virtual_file::owned_buffers_io::io_buf_aligned::IoBufAlignedMut;
use crate::virtual_file::owned_buffers_io::slice::SliceMutExt;
use crate::virtual_file::owned_buffers_io::write::Buffer;
use crate::virtual_file::{self, owned_buffers_io, IoBufferMut, VirtualFile};
use bytes::BytesMut;
use camino::Utf8PathBuf;
use num_traits::Num;
use pageserver_api::shard::TenantShardId;
@@ -27,7 +26,7 @@ pub struct EphemeralFile {
_timeline_id: TimelineId,
page_cache_file_id: page_cache::FileId,
bytes_written: u64,
buffered_writer: owned_buffers_io::write::BufferedWriter<BytesMut, VirtualFile>,
buffered_writer: owned_buffers_io::write::BufferedWriter<IoBufferMut, VirtualFile>,
/// Gate guard is held on as long as we need to do operations in the path (delete on drop)
_gate_guard: utils::sync::gate::GateGuard,
}
@@ -73,7 +72,7 @@ impl EphemeralFile {
bytes_written: 0,
buffered_writer: owned_buffers_io::write::BufferedWriter::new(
file,
|| BytesMut::with_capacity(TAIL_SZ),
|| IoBufferMut::with_capacity(TAIL_SZ),
ctx,
),
_gate_guard: gate_guard,

View File

@@ -6,7 +6,7 @@ use std::{
use super::{alignment::Alignment, raw::RawAlignedBuffer, AlignedBufferMut};
/// An shared, immutable aligned buffer type.
#[derive(Clone)]
#[derive(Clone, Debug)]
pub struct AlignedBuffer<A: Alignment> {
/// Shared raw buffer.
raw: Arc<RawAlignedBuffer<A>>,

View File

@@ -1,4 +1,7 @@
use std::ops::{Deref, DerefMut};
use std::{
mem::MaybeUninit,
ops::{Deref, DerefMut},
};
use super::{
alignment::{Alignment, ConstAlign},
@@ -132,6 +135,34 @@ impl<A: Alignment> AlignedBufferMut<A> {
let len = self.len();
AlignedBuffer::from_raw(self.raw, 0..len)
}
#[inline]
pub fn extend_from_slice(&mut self, extend: &[u8]) {
let cnt = extend.len();
self.reserve(cnt);
unsafe {
let dst = self.spare_capacity_mut();
// Reserved above
debug_assert!(dst.len() >= cnt);
core::ptr::copy_nonoverlapping(extend.as_ptr(), dst.as_mut_ptr().cast(), cnt);
}
unsafe {
bytes::BufMut::advance_mut(self, cnt);
}
}
#[inline]
fn spare_capacity_mut(&mut self) -> &mut [MaybeUninit<u8>] {
unsafe {
let ptr = self.as_mut_ptr().add(self.len());
let len = self.capacity() - self.len();
core::slice::from_raw_parts_mut(ptr.cast(), len)
}
}
}
impl<A: Alignment> Deref for AlignedBufferMut<A> {

View File

@@ -3,7 +3,7 @@ use std::sync::Arc;
use bytes::BytesMut;
use flush::FlushHandle;
use tokio_epoll_uring::{BoundedBufMut, IoBuf};
use tokio_epoll_uring::IoBuf;
use crate::{
context::RequestContext,
@@ -131,10 +131,13 @@ where
.pending(),
0
);
let chunk = self
.writer
.write_all_at(FullSlice::must_new(chunk), self.bytes_amount, ctx)
.await?;
let chunk = OwnedAsyncWriter::write_all_at(
self.writer.as_ref(),
FullSlice::must_new(chunk),
self.bytes_amount,
ctx,
)
.await?;
self.bytes_amount += u64::try_from(chunk_len).unwrap();
return Ok((chunk_len, chunk));
}
@@ -257,7 +260,7 @@ impl Buffer for IoBufferMut {
fn extend_from_slice(&mut self, other: &[u8]) {
self.reserve(other.len());
BoundedBufMut::put_slice(self, other);
IoBufferMut::extend_from_slice(self, other);
}
fn pending(&self) -> usize {
@@ -281,8 +284,6 @@ impl Buffer for IoBufferMut {
mod tests {
use std::sync::Mutex;
use bytes::BytesMut;
use super::*;
use crate::context::{DownloadBehavior, RequestContext};
use crate::task_mgr::TaskKind;
@@ -336,7 +337,7 @@ mod tests {
async fn test_buffered_writes_only() -> std::io::Result<()> {
let recorder = Arc::new(RecorderWriter::default());
let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);
let mut writer = BufferedWriter::new(recorder, || BytesMut::with_capacity(2), &ctx);
let mut writer = BufferedWriter::new(recorder, || IoBufferMut::with_capacity(2), &ctx);
write!(writer, b"a");
write!(writer, b"b");
write!(writer, b"c");
@@ -354,7 +355,7 @@ mod tests {
async fn test_passthrough_writes_only() -> std::io::Result<()> {
let recorder = Arc::new(RecorderWriter::default());
let ctx = test_ctx();
let mut writer = BufferedWriter::new(recorder, || BytesMut::with_capacity(2), &ctx);
let mut writer = BufferedWriter::new(recorder, || IoBufferMut::with_capacity(2), &ctx);
write!(writer, b"abc");
write!(writer, b"de");
write!(writer, b"");
@@ -371,7 +372,7 @@ mod tests {
async fn test_passthrough_write_with_nonempty_buffer() -> std::io::Result<()> {
let recorder = Arc::new(RecorderWriter::default());
let ctx = test_ctx();
let mut writer = BufferedWriter::new(recorder, || BytesMut::with_capacity(2), &ctx);
let mut writer = BufferedWriter::new(recorder, || IoBufferMut::with_capacity(2), &ctx);
write!(writer, b"a");
write!(writer, b"bc");
write!(writer, b"d");
@@ -389,8 +390,11 @@ mod tests {
let ctx = test_ctx();
let ctx = &ctx;
let recorder = Arc::new(RecorderWriter::default());
let mut writer =
BufferedWriter::<_, RecorderWriter>::new(recorder, || BytesMut::with_capacity(2), ctx);
let mut writer = BufferedWriter::<_, RecorderWriter>::new(
recorder,
|| IoBufferMut::with_capacity(2),
ctx,
);
writer.write_buffered_borrowed(b"abc", ctx).await?;
writer.write_buffered_borrowed(b"d", ctx).await?;