From cf5b0c9e439cd35a8a32be61b50c3f5b542b6427 Mon Sep 17 00:00:00 2001 From: Yuchen Liang Date: Tue, 10 Dec 2024 18:18:22 +0000 Subject: [PATCH] use v2 file API and pad summary blk Signed-off-by: Yuchen Liang --- libs/pageserver_api/src/models.rs | 2 +- pageserver/src/tenant/blob_io.rs | 64 ------------------- pageserver/src/tenant/block_io.rs | 9 ++- pageserver/src/tenant/disk_btree.rs | 13 ++-- .../src/tenant/storage_layer/delta_layer.rs | 26 ++++---- .../src/tenant/storage_layer/image_layer.rs | 27 ++++---- .../aligned_buffer/buffer_mut.rs | 11 ++++ 7 files changed, 50 insertions(+), 102 deletions(-) diff --git a/libs/pageserver_api/src/models.rs b/libs/pageserver_api/src/models.rs index 5488f7b2c2..30c1dd2466 100644 --- a/libs/pageserver_api/src/models.rs +++ b/libs/pageserver_api/src/models.rs @@ -1084,7 +1084,7 @@ pub mod virtual_file { impl IoMode { pub const fn preferred() -> Self { - Self::Buffered + Self::Direct } } diff --git a/pageserver/src/tenant/blob_io.rs b/pageserver/src/tenant/blob_io.rs index e8c25b5f8e..1904c3fee0 100644 --- a/pageserver/src/tenant/blob_io.rs +++ b/pageserver/src/tenant/blob_io.rs @@ -210,70 +210,6 @@ impl BlobWriter { (src_buf, res) } - // /// Internal, possibly buffered, write function - // async fn write_all_old( - // &mut self, - // src_buf: FullSlice, - // ctx: &RequestContext, - // ) -> (FullSlice, Result<(), Error>) { - // let src_buf = src_buf.into_raw_slice(); - // let src_buf_bounds = src_buf.bounds(); - // let restore = move |src_buf_slice: Slice<_>| { - // FullSlice::must_new(Slice::from_buf_bounds( - // src_buf_slice.into_inner(), - // src_buf_bounds, - // )) - // }; - - // if !BUFFERED { - // assert!(self.buf.is_empty()); - // return self - // .write_all_unbuffered(FullSlice::must_new(src_buf), ctx) - // .await; - // } - // let remaining = Self::CAPACITY - self.buf.len(); - // let src_buf_len = src_buf.bytes_init(); - // if src_buf_len == 0 { - // return (restore(src_buf), Ok(())); - // } - // let mut src_buf = src_buf.slice(0..src_buf_len); - // // First try to copy as much as we can into the buffer - // if remaining > 0 { - // let copied = self.write_into_buffer(&src_buf); - // src_buf = src_buf.slice(copied..); - // } - // // Then, if the buffer is full, flush it out - // if self.buf.len() == Self::CAPACITY { - // if let Err(e) = self.flush_buffer(ctx).await { - // return (restore(src_buf), Err(e)); - // } - // } - // // Finally, write the tail of src_buf: - // // If it wholly fits into the buffer without - // // completely filling it, then put it there. - // // If not, write it out directly. - // let src_buf = if !src_buf.is_empty() { - // assert_eq!(self.buf.len(), 0); - // if src_buf.len() < Self::CAPACITY { - // let copied = self.write_into_buffer(&src_buf); - // // We just verified above that src_buf fits into our internal buffer. - // assert_eq!(copied, src_buf.len()); - // restore(src_buf) - // } else { - // let (src_buf, res) = self - // .write_all_unbuffered(FullSlice::must_new(src_buf), ctx) - // .await; - // if let Err(e) = res { - // return (src_buf, Err(e)); - // } - // src_buf - // } - // } else { - // restore(src_buf) - // }; - // (src_buf, Ok(())) - // } - /// Write a blob of data. Returns the offset that it was written to, /// which can be used to retrieve the data later. pub async fn write_blob( diff --git a/pageserver/src/tenant/block_io.rs b/pageserver/src/tenant/block_io.rs index 2bd7f2d619..5fcb91a57d 100644 --- a/pageserver/src/tenant/block_io.rs +++ b/pageserver/src/tenant/block_io.rs @@ -7,8 +7,7 @@ use crate::context::RequestContext; use crate::page_cache::{self, FileId, PageReadGuard, PageWriteGuard, ReadBufResult, PAGE_SZ}; #[cfg(test)] use crate::virtual_file::IoBufferMut; -use crate::virtual_file::VirtualFile; -use bytes::Bytes; +use crate::virtual_file::{IoBuffer, VirtualFile}; use std::ops::Deref; /// This is implemented by anything that can read 8 kB (PAGE_SZ) @@ -249,17 +248,17 @@ pub trait BlockWriter { /// 'buf' must be of size PAGE_SZ. Returns the block number the page was /// written to. /// - fn write_blk(&mut self, buf: Bytes) -> Result; + fn write_blk(&mut self, buf: IoBuffer) -> Result; } /// /// A simple in-memory buffer of blocks. /// pub struct BlockBuf { - pub blocks: Vec, + pub blocks: Vec, } impl BlockWriter for BlockBuf { - fn write_blk(&mut self, buf: Bytes) -> Result { + fn write_blk(&mut self, buf: IoBuffer) -> Result { assert!(buf.len() == PAGE_SZ); let blknum = self.blocks.len(); self.blocks.push(buf); diff --git a/pageserver/src/tenant/disk_btree.rs b/pageserver/src/tenant/disk_btree.rs index b302cbc975..49f7bd9a65 100644 --- a/pageserver/src/tenant/disk_btree.rs +++ b/pageserver/src/tenant/disk_btree.rs @@ -20,7 +20,7 @@ //! use async_stream::try_stream; use byteorder::{ReadBytesExt, BE}; -use bytes::{BufMut, Bytes, BytesMut}; +use bytes::BufMut; use either::Either; use futures::{Stream, StreamExt}; use hex; @@ -38,6 +38,7 @@ use crate::{ context::{DownloadBehavior, RequestContext}, task_mgr::TaskKind, tenant::block_io::{BlockReader, BlockWriter}, + virtual_file::{owned_buffers_io::write::Buffer, IoBuffer, IoBufferMut}, }; // The maximum size of a value stored in the B-tree. 5 bytes is enough currently. @@ -793,12 +794,12 @@ impl BuildNode { /// /// Serialize the node to on-disk format. /// - fn pack(&self) -> Bytes { + fn pack(&self) -> IoBuffer { assert!(self.keys.len() == self.num_children as usize * self.suffix_len); assert!(self.values.len() == self.num_children as usize * VALUE_SZ); assert!(self.num_children > 0); - let mut buf = BytesMut::new(); + let mut buf = IoBufferMut::with_capacity(PAGE_SZ); buf.put_u16(self.num_children); buf.put_u8(self.level); @@ -811,7 +812,7 @@ impl BuildNode { assert!(buf.len() == self.size); assert!(buf.len() <= PAGE_SZ); - buf.resize(PAGE_SZ, 0); + buf.extend_with(0, PAGE_SZ - buf.len()); buf.freeze() } @@ -841,7 +842,7 @@ pub(crate) mod tests { #[derive(Clone, Default)] pub(crate) struct TestDisk { - blocks: Vec, + blocks: Vec, } impl TestDisk { fn new() -> Self { @@ -859,7 +860,7 @@ pub(crate) mod tests { } } impl BlockWriter for &mut TestDisk { - fn write_blk(&mut self, buf: Bytes) -> io::Result { + fn write_blk(&mut self, buf: IoBuffer) -> io::Result { let blknum = self.blocks.len(); self.blocks.push(buf); Ok(blknum as u32) diff --git a/pageserver/src/tenant/storage_layer/delta_layer.rs b/pageserver/src/tenant/storage_layer/delta_layer.rs index 44a0ded44b..b1976f6fa3 100644 --- a/pageserver/src/tenant/storage_layer/delta_layer.rs +++ b/pageserver/src/tenant/storage_layer/delta_layer.rs @@ -43,6 +43,7 @@ use crate::tenant::vectored_blob_io::{ }; use crate::tenant::PageReconstructError; use crate::virtual_file::owned_buffers_io::io_buf_ext::{FullSlice, IoBufExt}; +use crate::virtual_file::owned_buffers_io::write::Buffer; use crate::virtual_file::IoBufferMut; use crate::virtual_file::{self, MaybeFatalIo, VirtualFile}; use crate::TEMP_FILE_SUFFIX; @@ -62,7 +63,6 @@ use rand::{distributions::Alphanumeric, Rng}; use serde::{Deserialize, Serialize}; use std::collections::VecDeque; use std::fs::File; -use std::io::SeekFrom; use std::ops::Range; use std::os::unix::fs::FileExt; use std::str::FromStr; @@ -420,7 +420,7 @@ impl DeltaLayerWriterInner { let path = DeltaLayer::temp_path_for(conf, &tenant_shard_id, &timeline_id, key_start, &lsn_range); - let file = Arc::new(VirtualFile::create(&path, ctx).await?); + let file = Arc::new(VirtualFile::create_v2(&path, ctx).await?); // Start at PAGE_SZ, make room for the header block let blob_writer = BlobWriter::new(file, PAGE_SZ as u64, gate, ctx)?; @@ -533,15 +533,15 @@ impl DeltaLayerWriterInner { ) -> anyhow::Result<(PersistentLayerDesc, Utf8PathBuf)> { let index_start_blk = self.blob_writer.size().div_ceil(PAGE_SZ as u64) as u32; - let mut file = self.blob_writer.into_inner(ctx).await?; + let file = self.blob_writer.into_inner(ctx).await?; // Write out the index let (index_root_blk, block_buf) = self.tree.finish()?; - file.seek(SeekFrom::Start(index_start_blk as u64 * PAGE_SZ as u64)) - .await?; + let mut offset = index_start_blk as u64 * PAGE_SZ as u64; for buf in block_buf.blocks { - let (_buf, res) = file.write_all(buf.slice_len(), ctx).await; + let (_buf, res) = file.write_all_at(buf.slice_len(), offset, ctx).await; res?; + offset += PAGE_SZ as u64; } assert!(self.lsn_range.start < self.lsn_range.end); // Fill in the summary on blk 0 @@ -556,11 +556,11 @@ impl DeltaLayerWriterInner { index_root_blk, }; - let mut buf = Vec::with_capacity(PAGE_SZ); + let mut buf = IoBufferMut::with_capacity(PAGE_SZ); // TODO: could use smallvec here but it's a pain with Slice Summary::ser_into(&summary, &mut buf)?; - file.seek(SeekFrom::Start(0)).await?; - let (_buf, res) = file.write_all(buf.slice_len(), ctx).await; + buf.extend_with(0, buf.capacity() - buf.len()); + let (_buf, res) = file.write_all_at(buf.freeze().slice_len(), 0, ctx).await; res?; let metadata = file @@ -756,7 +756,7 @@ impl DeltaLayer { where F: Fn(Summary) -> Summary, { - let mut file = VirtualFile::open_with_options( + let file = VirtualFile::open_with_options_v2( path, virtual_file::OpenOptions::new().read(true).write(true), ctx, @@ -773,11 +773,11 @@ impl DeltaLayer { let new_summary = rewrite(actual_summary); - let mut buf = Vec::with_capacity(PAGE_SZ); + let mut buf = IoBufferMut::with_capacity(PAGE_SZ); // TODO: could use smallvec here, but it's a pain with Slice Summary::ser_into(&new_summary, &mut buf).context("serialize")?; - file.seek(SeekFrom::Start(0)).await?; - let (_buf, res) = file.write_all(buf.slice_len(), ctx).await; + buf.extend_with(0, buf.capacity() - buf.len()); + let (_buf, res) = file.write_all_at(buf.freeze().slice_len(), 0, ctx).await; res?; Ok(()) } diff --git a/pageserver/src/tenant/storage_layer/image_layer.rs b/pageserver/src/tenant/storage_layer/image_layer.rs index a42637324a..da25624568 100644 --- a/pageserver/src/tenant/storage_layer/image_layer.rs +++ b/pageserver/src/tenant/storage_layer/image_layer.rs @@ -40,6 +40,7 @@ use crate::tenant::vectored_blob_io::{ }; use crate::tenant::PageReconstructError; use crate::virtual_file::owned_buffers_io::io_buf_ext::IoBufExt; +use crate::virtual_file::owned_buffers_io::write::Buffer; use crate::virtual_file::IoBufferMut; use crate::virtual_file::{self, MaybeFatalIo, VirtualFile}; use crate::{IMAGE_FILE_MAGIC, STORAGE_FORMAT_VERSION, TEMP_FILE_SUFFIX}; @@ -58,7 +59,6 @@ use rand::{distributions::Alphanumeric, Rng}; use serde::{Deserialize, Serialize}; use std::collections::VecDeque; use std::fs::File; -use std::io::SeekFrom; use std::ops::Range; use std::os::unix::prelude::FileExt; use std::str::FromStr; @@ -349,7 +349,7 @@ impl ImageLayer { where F: Fn(Summary) -> Summary, { - let mut file = VirtualFile::open_with_options( + let file = VirtualFile::open_with_options_v2( path, virtual_file::OpenOptions::new().read(true).write(true), ctx, @@ -366,11 +366,11 @@ impl ImageLayer { let new_summary = rewrite(actual_summary); - let mut buf = Vec::with_capacity(PAGE_SZ); + let mut buf = IoBufferMut::with_capacity(PAGE_SZ); // TODO: could use smallvec here but it's a pain with Slice Summary::ser_into(&new_summary, &mut buf).context("serialize")?; - file.seek(SeekFrom::Start(0)).await?; - let (_buf, res) = file.write_all(buf.slice_len(), ctx).await; + buf.extend_with(0, buf.capacity() - buf.len()); + let (_buf, res) = file.write_all_at(buf.freeze().slice_len(), 0, ctx).await; res?; Ok(()) } @@ -759,7 +759,7 @@ impl ImageLayerWriterInner { trace!("creating image layer {}", path); let file = { Arc::new( - VirtualFile::open_with_options( + VirtualFile::open_with_options_v2( &path, virtual_file::OpenOptions::new() .write(true) @@ -877,15 +877,16 @@ impl ImageLayerWriterInner { crate::metrics::COMPRESSION_IMAGE_INPUT_BYTES_CHOSEN.inc_by(self.uncompressed_bytes_chosen); crate::metrics::COMPRESSION_IMAGE_OUTPUT_BYTES.inc_by(compressed_size); - let mut file = self.blob_writer.into_inner(ctx).await?; + let file = self.blob_writer.into_inner(ctx).await?; // Write out the index - file.seek(SeekFrom::Start(index_start_blk as u64 * PAGE_SZ as u64)) - .await?; + // TODO(yuchen): should we just replace BlockBuf::blocks with one big buffer? + let mut offset = index_start_blk as u64 * PAGE_SZ as u64; let (index_root_blk, block_buf) = self.tree.finish()?; for buf in block_buf.blocks { - let (_buf, res) = file.write_all(buf.slice_len(), ctx).await; + let (_buf, res) = file.write_all_at(buf.slice_len(), offset, ctx).await; res?; + offset += PAGE_SZ as u64; } let final_key_range = if let Some(end_key) = end_key { @@ -906,11 +907,11 @@ impl ImageLayerWriterInner { index_root_blk, }; - let mut buf = Vec::with_capacity(PAGE_SZ); + let mut buf = IoBufferMut::with_capacity(PAGE_SZ); // TODO: could use smallvec here but it's a pain with Slice Summary::ser_into(&summary, &mut buf)?; - file.seek(SeekFrom::Start(0)).await?; - let (_buf, res) = file.write_all(buf.slice_len(), ctx).await; + buf.extend_with(0, buf.capacity() - buf.len()); + let (_buf, res) = file.write_all_at(buf.freeze().slice_len(), 0, ctx).await; res?; let metadata = file diff --git a/pageserver/src/virtual_file/owned_buffers_io/aligned_buffer/buffer_mut.rs b/pageserver/src/virtual_file/owned_buffers_io/aligned_buffer/buffer_mut.rs index d2f5e206bb..be07d5539f 100644 --- a/pageserver/src/virtual_file/owned_buffers_io/aligned_buffer/buffer_mut.rs +++ b/pageserver/src/virtual_file/owned_buffers_io/aligned_buffer/buffer_mut.rs @@ -280,6 +280,17 @@ unsafe impl tokio_epoll_uring::IoBufMut for AlignedBufferMut { } } +impl std::io::Write for AlignedBufferMut { + fn write(&mut self, buf: &[u8]) -> std::io::Result { + self.extend_from_slice(buf); + Ok(buf.len()) + } + + fn flush(&mut self) -> std::io::Result<()> { + Ok(()) + } +} + #[cfg(test)] mod tests {