From f28dd95022f08fa8dbe15af58e29ca63e372bd8c Mon Sep 17 00:00:00 2001 From: Yuchen Liang Date: Mon, 7 Oct 2024 16:05:43 -0400 Subject: [PATCH] use aligned buffer for image and delta layers Signed-off-by: Yuchen Liang --- .../src/tenant/storage_layer/delta_layer.rs | 15 +++++++-------- .../src/tenant/storage_layer/image_layer.rs | 19 +++++++++---------- pageserver/src/tenant/vectored_blob_io.rs | 9 +++++---- 3 files changed, 21 insertions(+), 22 deletions(-) diff --git a/pageserver/src/tenant/storage_layer/delta_layer.rs b/pageserver/src/tenant/storage_layer/delta_layer.rs index 8be7d7876f..42436699a7 100644 --- a/pageserver/src/tenant/storage_layer/delta_layer.rs +++ b/pageserver/src/tenant/storage_layer/delta_layer.rs @@ -44,11 +44,11 @@ use crate::tenant::vectored_blob_io::{ }; use crate::tenant::PageReconstructError; use crate::virtual_file::owned_buffers_io::io_buf_ext::{FullSlice, IoBufExt}; +use crate::virtual_file::IoBufferMut; use crate::virtual_file::{self, MaybeFatalIo, VirtualFile}; use crate::{walrecord, TEMP_FILE_SUFFIX}; use crate::{DELTA_FILE_MAGIC, STORAGE_FORMAT_VERSION}; use anyhow::{anyhow, bail, ensure, Context, Result}; -use bytes::BytesMut; use camino::{Utf8Path, Utf8PathBuf}; use futures::StreamExt; use itertools::Itertools; @@ -1003,7 +1003,7 @@ impl DeltaLayerInner { .0 .into(); let buf_size = Self::get_min_read_buffer_size(&reads, max_vectored_read_bytes); - let mut buf = Some(BytesMut::with_capacity(buf_size)); + let mut buf = Some(IoBufferMut::with_capacity(buf_size)); // Note that reads are processed in reverse order (from highest key+lsn). // This is the order that `ReconstructState` requires such that it can @@ -1030,7 +1030,7 @@ impl DeltaLayerInner { // We have "lost" the buffer since the lower level IO api // doesn't return the buffer on error. Allocate a new one. - buf = Some(BytesMut::with_capacity(buf_size)); + buf = Some(IoBufferMut::with_capacity(buf_size)); continue; } @@ -1204,7 +1204,7 @@ impl DeltaLayerInner { .map(|x| x.0.get()) .unwrap_or(8192); - let mut buffer = Some(BytesMut::with_capacity(max_read_size)); + let mut buffer = Some(IoBufferMut::with_capacity(max_read_size)); // FIXME: buffering of DeltaLayerWriter let mut per_blob_copy = Vec::new(); @@ -1562,12 +1562,11 @@ impl<'a> DeltaLayerIterator<'a> { let vectored_blob_reader = VectoredBlobReader::new(&self.delta_layer.file); let mut next_batch = std::collections::VecDeque::new(); let buf_size = plan.size(); - let buf = BytesMut::with_capacity(buf_size); + let buf = IoBufferMut::with_capacity(buf_size); let blobs_buf = vectored_blob_reader .read_blobs(&plan, buf, self.ctx) .await?; - let frozen_buf = blobs_buf.buf.freeze(); - let view = BufView::new_bytes(frozen_buf); + let view = BufView::new_slice(&blobs_buf.buf); for meta in blobs_buf.blobs.iter() { let blob_read = meta.read(&view).await?; let value = Value::des(&blob_read)?; @@ -1942,7 +1941,7 @@ pub(crate) mod test { &vectored_reads, constants::MAX_VECTORED_READ_BYTES, ); - let mut buf = Some(BytesMut::with_capacity(buf_size)); + let mut buf = Some(IoBufferMut::with_capacity(buf_size)); for read in vectored_reads { let blobs_buf = vectored_blob_reader diff --git a/pageserver/src/tenant/storage_layer/image_layer.rs b/pageserver/src/tenant/storage_layer/image_layer.rs index de8155f455..5773ae467f 100644 --- a/pageserver/src/tenant/storage_layer/image_layer.rs +++ b/pageserver/src/tenant/storage_layer/image_layer.rs @@ -41,10 +41,11 @@ use crate::tenant::vectored_blob_io::{ }; use crate::tenant::PageReconstructError; use crate::virtual_file::owned_buffers_io::io_buf_ext::IoBufExt; +use crate::virtual_file::IoBufferMut; use crate::virtual_file::{self, MaybeFatalIo, VirtualFile}; use crate::{IMAGE_FILE_MAGIC, STORAGE_FORMAT_VERSION, TEMP_FILE_SUFFIX}; use anyhow::{anyhow, bail, ensure, Context, Result}; -use bytes::{Bytes, BytesMut}; +use bytes::Bytes; use camino::{Utf8Path, Utf8PathBuf}; use hex; use itertools::Itertools; @@ -547,10 +548,10 @@ impl ImageLayerInner { for read in plan.into_iter() { let buf_size = read.size(); - let buf = BytesMut::with_capacity(buf_size); + let buf = IoBufferMut::with_capacity(buf_size); let blobs_buf = vectored_blob_reader.read_blobs(&read, buf, ctx).await?; - let frozen_buf = blobs_buf.buf.freeze(); - let view = BufView::new_bytes(frozen_buf); + + let view = BufView::new_slice(&blobs_buf.buf); for meta in blobs_buf.blobs.iter() { let img_buf = meta.read(&view).await?; @@ -609,13 +610,12 @@ impl ImageLayerInner { } } - let buf = BytesMut::with_capacity(buf_size); + let buf = IoBufferMut::with_capacity(buf_size); let res = vectored_blob_reader.read_blobs(&read, buf, ctx).await; match res { Ok(blobs_buf) => { - let frozen_buf = blobs_buf.buf.freeze(); - let view = BufView::new_bytes(frozen_buf); + let view = BufView::new_slice(&blobs_buf.buf); for meta in blobs_buf.blobs.iter() { let img_buf = meta.read(&view).await; @@ -1051,12 +1051,11 @@ impl<'a> ImageLayerIterator<'a> { let vectored_blob_reader = VectoredBlobReader::new(&self.image_layer.file); let mut next_batch = std::collections::VecDeque::new(); let buf_size = plan.size(); - let buf = BytesMut::with_capacity(buf_size); + let buf = IoBufferMut::with_capacity(buf_size); let blobs_buf = vectored_blob_reader .read_blobs(&plan, buf, self.ctx) .await?; - let frozen_buf = blobs_buf.buf.freeze(); - let view = BufView::new_bytes(frozen_buf); + let view = BufView::new_slice(&blobs_buf.buf); for meta in blobs_buf.blobs.iter() { let img_buf = meta.read(&view).await?; next_batch.push_back(( diff --git a/pageserver/src/tenant/vectored_blob_io.rs b/pageserver/src/tenant/vectored_blob_io.rs index 792c769b4f..6257035986 100644 --- a/pageserver/src/tenant/vectored_blob_io.rs +++ b/pageserver/src/tenant/vectored_blob_io.rs @@ -18,7 +18,7 @@ use std::collections::BTreeMap; use std::ops::Deref; -use bytes::{Bytes, BytesMut}; +use bytes::Bytes; use pageserver_api::key::Key; use tokio::io::AsyncWriteExt; use tokio_epoll_uring::BoundedBuf; @@ -27,6 +27,7 @@ use utils::vec_map::VecMap; use crate::context::RequestContext; use crate::tenant::blob_io::{BYTE_UNCOMPRESSED, BYTE_ZSTD, LEN_COMPRESSION_BIT_MASK}; +use crate::virtual_file::IoBufferMut; use crate::virtual_file::{self, VirtualFile}; /// Metadata bundled with the start and end offset of a blob. @@ -158,7 +159,7 @@ impl std::fmt::Display for VectoredBlob { /// Return type of [`VectoredBlobReader::read_blobs`] pub struct VectoredBlobsBuf { /// Buffer for all blobs in this read - pub buf: BytesMut, + pub buf: IoBufferMut, /// Offsets into the buffer and metadata for all blobs in this read pub blobs: Vec, } @@ -446,7 +447,7 @@ impl<'a> VectoredBlobReader<'a> { pub async fn read_blobs( &self, read: &VectoredRead, - buf: BytesMut, + buf: IoBufferMut, ctx: &RequestContext, ) -> Result { assert!(read.size() > 0); @@ -921,7 +922,7 @@ mod tests { // Multiply by two (compressed data might need more space), and add a few bytes for the header let reserved_bytes = blobs.iter().map(|bl| bl.len()).max().unwrap() * 2 + 16; - let mut buf = BytesMut::with_capacity(reserved_bytes); + let mut buf = IoBufferMut::with_capacity(reserved_bytes); let vectored_blob_reader = VectoredBlobReader::new(&file); let meta = BlobMeta {