use v2 file API and pad summary blk

Signed-off-by: Yuchen Liang <yuchen@neon.tech>
This commit is contained in:
Yuchen Liang
2024-12-10 18:18:22 +00:00
parent 7aa8111af9
commit cf5b0c9e43
7 changed files with 50 additions and 102 deletions

View File

@@ -1084,7 +1084,7 @@ pub mod virtual_file {
impl IoMode {
pub const fn preferred() -> Self {
Self::Buffered
Self::Direct
}
}

View File

@@ -210,70 +210,6 @@ impl BlobWriter {
(src_buf, res)
}
// /// Internal, possibly buffered, write function
// async fn write_all_old<Buf: IoBuf + Send>(
// &mut self,
// src_buf: FullSlice<Buf>,
// ctx: &RequestContext,
// ) -> (FullSlice<Buf>, Result<(), Error>) {
// let src_buf = src_buf.into_raw_slice();
// let src_buf_bounds = src_buf.bounds();
// let restore = move |src_buf_slice: Slice<_>| {
// FullSlice::must_new(Slice::from_buf_bounds(
// src_buf_slice.into_inner(),
// src_buf_bounds,
// ))
// };
// if !BUFFERED {
// assert!(self.buf.is_empty());
// return self
// .write_all_unbuffered(FullSlice::must_new(src_buf), ctx)
// .await;
// }
// let remaining = Self::CAPACITY - self.buf.len();
// let src_buf_len = src_buf.bytes_init();
// if src_buf_len == 0 {
// return (restore(src_buf), Ok(()));
// }
// let mut src_buf = src_buf.slice(0..src_buf_len);
// // First try to copy as much as we can into the buffer
// if remaining > 0 {
// let copied = self.write_into_buffer(&src_buf);
// src_buf = src_buf.slice(copied..);
// }
// // Then, if the buffer is full, flush it out
// if self.buf.len() == Self::CAPACITY {
// if let Err(e) = self.flush_buffer(ctx).await {
// return (restore(src_buf), Err(e));
// }
// }
// // Finally, write the tail of src_buf:
// // If it wholly fits into the buffer without
// // completely filling it, then put it there.
// // If not, write it out directly.
// let src_buf = if !src_buf.is_empty() {
// assert_eq!(self.buf.len(), 0);
// if src_buf.len() < Self::CAPACITY {
// let copied = self.write_into_buffer(&src_buf);
// // We just verified above that src_buf fits into our internal buffer.
// assert_eq!(copied, src_buf.len());
// restore(src_buf)
// } else {
// let (src_buf, res) = self
// .write_all_unbuffered(FullSlice::must_new(src_buf), ctx)
// .await;
// if let Err(e) = res {
// return (src_buf, Err(e));
// }
// src_buf
// }
// } else {
// restore(src_buf)
// };
// (src_buf, Ok(()))
// }
/// Write a blob of data. Returns the offset that it was written to,
/// which can be used to retrieve the data later.
pub async fn write_blob<Buf: IoBuf + Send>(

View File

@@ -7,8 +7,7 @@ use crate::context::RequestContext;
use crate::page_cache::{self, FileId, PageReadGuard, PageWriteGuard, ReadBufResult, PAGE_SZ};
#[cfg(test)]
use crate::virtual_file::IoBufferMut;
use crate::virtual_file::VirtualFile;
use bytes::Bytes;
use crate::virtual_file::{IoBuffer, VirtualFile};
use std::ops::Deref;
/// This is implemented by anything that can read 8 kB (PAGE_SZ)
@@ -249,17 +248,17 @@ pub trait BlockWriter {
/// 'buf' must be of size PAGE_SZ. Returns the block number the page was
/// written to.
///
fn write_blk(&mut self, buf: Bytes) -> Result<u32, std::io::Error>;
fn write_blk(&mut self, buf: IoBuffer) -> Result<u32, std::io::Error>;
}
///
/// A simple in-memory buffer of blocks.
///
pub struct BlockBuf {
pub blocks: Vec<Bytes>,
pub blocks: Vec<IoBuffer>,
}
impl BlockWriter for BlockBuf {
fn write_blk(&mut self, buf: Bytes) -> Result<u32, std::io::Error> {
fn write_blk(&mut self, buf: IoBuffer) -> Result<u32, std::io::Error> {
assert!(buf.len() == PAGE_SZ);
let blknum = self.blocks.len();
self.blocks.push(buf);

View File

@@ -20,7 +20,7 @@
//!
use async_stream::try_stream;
use byteorder::{ReadBytesExt, BE};
use bytes::{BufMut, Bytes, BytesMut};
use bytes::BufMut;
use either::Either;
use futures::{Stream, StreamExt};
use hex;
@@ -38,6 +38,7 @@ use crate::{
context::{DownloadBehavior, RequestContext},
task_mgr::TaskKind,
tenant::block_io::{BlockReader, BlockWriter},
virtual_file::{owned_buffers_io::write::Buffer, IoBuffer, IoBufferMut},
};
// The maximum size of a value stored in the B-tree. 5 bytes is enough currently.
@@ -793,12 +794,12 @@ impl<const L: usize> BuildNode<L> {
///
/// Serialize the node to on-disk format.
///
fn pack(&self) -> Bytes {
fn pack(&self) -> IoBuffer {
assert!(self.keys.len() == self.num_children as usize * self.suffix_len);
assert!(self.values.len() == self.num_children as usize * VALUE_SZ);
assert!(self.num_children > 0);
let mut buf = BytesMut::new();
let mut buf = IoBufferMut::with_capacity(PAGE_SZ);
buf.put_u16(self.num_children);
buf.put_u8(self.level);
@@ -811,7 +812,7 @@ impl<const L: usize> BuildNode<L> {
assert!(buf.len() == self.size);
assert!(buf.len() <= PAGE_SZ);
buf.resize(PAGE_SZ, 0);
buf.extend_with(0, PAGE_SZ - buf.len());
buf.freeze()
}
@@ -841,7 +842,7 @@ pub(crate) mod tests {
#[derive(Clone, Default)]
pub(crate) struct TestDisk {
blocks: Vec<Bytes>,
blocks: Vec<IoBuffer>,
}
impl TestDisk {
fn new() -> Self {
@@ -859,7 +860,7 @@ pub(crate) mod tests {
}
}
impl BlockWriter for &mut TestDisk {
fn write_blk(&mut self, buf: Bytes) -> io::Result<u32> {
fn write_blk(&mut self, buf: IoBuffer) -> io::Result<u32> {
let blknum = self.blocks.len();
self.blocks.push(buf);
Ok(blknum as u32)

View File

@@ -43,6 +43,7 @@ use crate::tenant::vectored_blob_io::{
};
use crate::tenant::PageReconstructError;
use crate::virtual_file::owned_buffers_io::io_buf_ext::{FullSlice, IoBufExt};
use crate::virtual_file::owned_buffers_io::write::Buffer;
use crate::virtual_file::IoBufferMut;
use crate::virtual_file::{self, MaybeFatalIo, VirtualFile};
use crate::TEMP_FILE_SUFFIX;
@@ -62,7 +63,6 @@ use rand::{distributions::Alphanumeric, Rng};
use serde::{Deserialize, Serialize};
use std::collections::VecDeque;
use std::fs::File;
use std::io::SeekFrom;
use std::ops::Range;
use std::os::unix::fs::FileExt;
use std::str::FromStr;
@@ -420,7 +420,7 @@ impl DeltaLayerWriterInner {
let path =
DeltaLayer::temp_path_for(conf, &tenant_shard_id, &timeline_id, key_start, &lsn_range);
let file = Arc::new(VirtualFile::create(&path, ctx).await?);
let file = Arc::new(VirtualFile::create_v2(&path, ctx).await?);
// Start at PAGE_SZ, make room for the header block
let blob_writer = BlobWriter::new(file, PAGE_SZ as u64, gate, ctx)?;
@@ -533,15 +533,15 @@ impl DeltaLayerWriterInner {
) -> anyhow::Result<(PersistentLayerDesc, Utf8PathBuf)> {
let index_start_blk = self.blob_writer.size().div_ceil(PAGE_SZ as u64) as u32;
let mut file = self.blob_writer.into_inner(ctx).await?;
let file = self.blob_writer.into_inner(ctx).await?;
// Write out the index
let (index_root_blk, block_buf) = self.tree.finish()?;
file.seek(SeekFrom::Start(index_start_blk as u64 * PAGE_SZ as u64))
.await?;
let mut offset = index_start_blk as u64 * PAGE_SZ as u64;
for buf in block_buf.blocks {
let (_buf, res) = file.write_all(buf.slice_len(), ctx).await;
let (_buf, res) = file.write_all_at(buf.slice_len(), offset, ctx).await;
res?;
offset += PAGE_SZ as u64;
}
assert!(self.lsn_range.start < self.lsn_range.end);
// Fill in the summary on blk 0
@@ -556,11 +556,11 @@ impl DeltaLayerWriterInner {
index_root_blk,
};
let mut buf = Vec::with_capacity(PAGE_SZ);
let mut buf = IoBufferMut::with_capacity(PAGE_SZ);
// TODO: could use smallvec here but it's a pain with Slice<T>
Summary::ser_into(&summary, &mut buf)?;
file.seek(SeekFrom::Start(0)).await?;
let (_buf, res) = file.write_all(buf.slice_len(), ctx).await;
buf.extend_with(0, buf.capacity() - buf.len());
let (_buf, res) = file.write_all_at(buf.freeze().slice_len(), 0, ctx).await;
res?;
let metadata = file
@@ -756,7 +756,7 @@ impl DeltaLayer {
where
F: Fn(Summary) -> Summary,
{
let mut file = VirtualFile::open_with_options(
let file = VirtualFile::open_with_options_v2(
path,
virtual_file::OpenOptions::new().read(true).write(true),
ctx,
@@ -773,11 +773,11 @@ impl DeltaLayer {
let new_summary = rewrite(actual_summary);
let mut buf = Vec::with_capacity(PAGE_SZ);
let mut buf = IoBufferMut::with_capacity(PAGE_SZ);
// TODO: could use smallvec here, but it's a pain with Slice<T>
Summary::ser_into(&new_summary, &mut buf).context("serialize")?;
file.seek(SeekFrom::Start(0)).await?;
let (_buf, res) = file.write_all(buf.slice_len(), ctx).await;
buf.extend_with(0, buf.capacity() - buf.len());
let (_buf, res) = file.write_all_at(buf.freeze().slice_len(), 0, ctx).await;
res?;
Ok(())
}

View File

@@ -40,6 +40,7 @@ use crate::tenant::vectored_blob_io::{
};
use crate::tenant::PageReconstructError;
use crate::virtual_file::owned_buffers_io::io_buf_ext::IoBufExt;
use crate::virtual_file::owned_buffers_io::write::Buffer;
use crate::virtual_file::IoBufferMut;
use crate::virtual_file::{self, MaybeFatalIo, VirtualFile};
use crate::{IMAGE_FILE_MAGIC, STORAGE_FORMAT_VERSION, TEMP_FILE_SUFFIX};
@@ -58,7 +59,6 @@ use rand::{distributions::Alphanumeric, Rng};
use serde::{Deserialize, Serialize};
use std::collections::VecDeque;
use std::fs::File;
use std::io::SeekFrom;
use std::ops::Range;
use std::os::unix::prelude::FileExt;
use std::str::FromStr;
@@ -349,7 +349,7 @@ impl ImageLayer {
where
F: Fn(Summary) -> Summary,
{
let mut file = VirtualFile::open_with_options(
let file = VirtualFile::open_with_options_v2(
path,
virtual_file::OpenOptions::new().read(true).write(true),
ctx,
@@ -366,11 +366,11 @@ impl ImageLayer {
let new_summary = rewrite(actual_summary);
let mut buf = Vec::with_capacity(PAGE_SZ);
let mut buf = IoBufferMut::with_capacity(PAGE_SZ);
// TODO: could use smallvec here but it's a pain with Slice<T>
Summary::ser_into(&new_summary, &mut buf).context("serialize")?;
file.seek(SeekFrom::Start(0)).await?;
let (_buf, res) = file.write_all(buf.slice_len(), ctx).await;
buf.extend_with(0, buf.capacity() - buf.len());
let (_buf, res) = file.write_all_at(buf.freeze().slice_len(), 0, ctx).await;
res?;
Ok(())
}
@@ -759,7 +759,7 @@ impl ImageLayerWriterInner {
trace!("creating image layer {}", path);
let file = {
Arc::new(
VirtualFile::open_with_options(
VirtualFile::open_with_options_v2(
&path,
virtual_file::OpenOptions::new()
.write(true)
@@ -877,15 +877,16 @@ impl ImageLayerWriterInner {
crate::metrics::COMPRESSION_IMAGE_INPUT_BYTES_CHOSEN.inc_by(self.uncompressed_bytes_chosen);
crate::metrics::COMPRESSION_IMAGE_OUTPUT_BYTES.inc_by(compressed_size);
let mut file = self.blob_writer.into_inner(ctx).await?;
let file = self.blob_writer.into_inner(ctx).await?;
// Write out the index
file.seek(SeekFrom::Start(index_start_blk as u64 * PAGE_SZ as u64))
.await?;
// TODO(yuchen): should we just replace BlockBuf::blocks with one big buffer?
let mut offset = index_start_blk as u64 * PAGE_SZ as u64;
let (index_root_blk, block_buf) = self.tree.finish()?;
for buf in block_buf.blocks {
let (_buf, res) = file.write_all(buf.slice_len(), ctx).await;
let (_buf, res) = file.write_all_at(buf.slice_len(), offset, ctx).await;
res?;
offset += PAGE_SZ as u64;
}
let final_key_range = if let Some(end_key) = end_key {
@@ -906,11 +907,11 @@ impl ImageLayerWriterInner {
index_root_blk,
};
let mut buf = Vec::with_capacity(PAGE_SZ);
let mut buf = IoBufferMut::with_capacity(PAGE_SZ);
// TODO: could use smallvec here but it's a pain with Slice<T>
Summary::ser_into(&summary, &mut buf)?;
file.seek(SeekFrom::Start(0)).await?;
let (_buf, res) = file.write_all(buf.slice_len(), ctx).await;
buf.extend_with(0, buf.capacity() - buf.len());
let (_buf, res) = file.write_all_at(buf.freeze().slice_len(), 0, ctx).await;
res?;
let metadata = file

View File

@@ -280,6 +280,17 @@ unsafe impl<A: Alignment> tokio_epoll_uring::IoBufMut for AlignedBufferMut<A> {
}
}
impl<A: Alignment> std::io::Write for AlignedBufferMut<A> {
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
self.extend_from_slice(buf);
Ok(buf.len())
}
fn flush(&mut self) -> std::io::Result<()> {
Ok(())
}
}
#[cfg(test)]
mod tests {