mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-14 11:40:38 +00:00
use aligned buffer for inmemory layer
Signed-off-by: Yuchen Liang <yuchen@neon.tech>
This commit is contained in:
@@ -9,7 +9,7 @@ use crate::tenant::storage_layer::inmemory_layer::vectored_dio_read::File;
|
||||
use crate::virtual_file::owned_buffers_io::slice::SliceMutExt;
|
||||
use crate::virtual_file::owned_buffers_io::util::size_tracking_writer;
|
||||
use crate::virtual_file::owned_buffers_io::write::Buffer;
|
||||
use crate::virtual_file::{self, owned_buffers_io, VirtualFile};
|
||||
use crate::virtual_file::{self, owned_buffers_io, IoBufferMut, VirtualFile};
|
||||
use bytes::BytesMut;
|
||||
use camino::Utf8PathBuf;
|
||||
use num_traits::Num;
|
||||
@@ -107,15 +107,18 @@ impl EphemeralFile {
|
||||
self.page_cache_file_id
|
||||
}
|
||||
|
||||
pub(crate) async fn load_to_vec(&self, ctx: &RequestContext) -> Result<Vec<u8>, io::Error> {
|
||||
pub(crate) async fn load_to_io_buf(
|
||||
&self,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<IoBufferMut, io::Error> {
|
||||
let size = self.len().into_usize();
|
||||
let vec = Vec::with_capacity(size);
|
||||
let (slice, nread) = self.read_exact_at_eof_ok(0, vec.slice_full(), ctx).await?;
|
||||
let buf = IoBufferMut::with_capacity(size);
|
||||
let (slice, nread) = self.read_exact_at_eof_ok(0, buf.slice_full(), ctx).await?;
|
||||
assert_eq!(nread, size);
|
||||
let vec = slice.into_inner();
|
||||
assert_eq!(vec.len(), nread);
|
||||
assert_eq!(vec.capacity(), size, "we shouldn't be reallocating");
|
||||
Ok(vec)
|
||||
let buf = slice.into_inner();
|
||||
assert_eq!(buf.len(), nread);
|
||||
assert_eq!(buf.capacity(), size, "we shouldn't be reallocating");
|
||||
Ok(buf)
|
||||
}
|
||||
|
||||
/// Returns the offset at which the first byte of the input was written, for use
|
||||
@@ -385,7 +388,7 @@ mod tests {
|
||||
|
||||
// assert the state is as this test expects it to be
|
||||
assert_eq!(
|
||||
&file.load_to_vec(&ctx).await.unwrap(),
|
||||
&file.load_to_io_buf(&ctx).await.unwrap(),
|
||||
&content[0..cap + cap / 2]
|
||||
);
|
||||
let md = file
|
||||
|
||||
@@ -809,9 +809,9 @@ impl InMemoryLayer {
|
||||
|
||||
match l0_flush_global_state {
|
||||
l0_flush::Inner::Direct { .. } => {
|
||||
let file_contents: Vec<u8> = inner.file.load_to_vec(ctx).await?;
|
||||
|
||||
let file_contents = Bytes::from(file_contents);
|
||||
let file_contents = inner.file.load_to_io_buf(ctx).await?;
|
||||
// TODO(yuchen): see ways to avoid this copy.
|
||||
let file_contents = Bytes::copy_from_slice(&file_contents);
|
||||
|
||||
for (key, vec_map) in inner.index.iter() {
|
||||
// Write all page versions
|
||||
|
||||
@@ -11,12 +11,14 @@ use std::{
|
||||
|
||||
use bytes::buf::UninitSlice;
|
||||
|
||||
#[derive(Debug)]
|
||||
struct IoBufferPtr(*mut u8);
|
||||
|
||||
// SAFETY: We gurantees no one besides `IoBufferPtr` itself has the raw pointer.
|
||||
unsafe impl Send for IoBufferPtr {}
|
||||
|
||||
/// An aligned buffer type used for I/O.
|
||||
#[derive(Debug)]
|
||||
pub struct AlignedBufferMut<const ALIGN: usize> {
|
||||
ptr: IoBufferPtr,
|
||||
capacity: usize,
|
||||
@@ -252,6 +254,24 @@ impl<const ALIGN: usize> DerefMut for AlignedBufferMut<ALIGN> {
|
||||
}
|
||||
}
|
||||
|
||||
impl<const ALIGN: usize> AsRef<[u8]> for AlignedBufferMut<ALIGN> {
|
||||
fn as_ref(&self) -> &[u8] {
|
||||
self.as_slice()
|
||||
}
|
||||
}
|
||||
|
||||
impl<const ALIGN: usize> AsMut<[u8]> for AlignedBufferMut<ALIGN> {
|
||||
fn as_mut(&mut self) -> &mut [u8] {
|
||||
self.as_mut_slice()
|
||||
}
|
||||
}
|
||||
|
||||
impl<const ALIGN: usize> PartialEq<[u8]> for AlignedBufferMut<ALIGN> {
|
||||
fn eq(&self, other: &[u8]) -> bool {
|
||||
self.as_slice().eq(other)
|
||||
}
|
||||
}
|
||||
|
||||
/// SAFETY: When advancing the internal cursor, the caller needs to make sure the bytes advcanced past have been initialized.
|
||||
unsafe impl<const ALIGN: usize> bytes::BufMut for AlignedBufferMut<ALIGN> {
|
||||
#[inline]
|
||||
|
||||
Reference in New Issue
Block a user