Add WriteBlobWriter buffering and make VirtualFile::{write,write_all} async (#5203)

## Problem

We want to convert the `VirtualFile` APIs to async fn so that we can
adopt one of the async I/O solutions.

## Summary of changes

This PR is a follow-up of #5189, #5190, and #5195, and does the
following:

* Move the used `Write` trait functions of `VirtualFile` into inherent
functions
* Add optional buffering to `WriteBlobWriter`. The buffer is discarded
on drop, similarly to how tokio's `BufWriter` does it: drop is neither
async nor does it support errors.
* Remove the generics by `Write` impl of `WriteBlobWriter`, alwaays
using `VirtualFile`
* Rename `WriteBlobWriter` to `BlobWriter`
* Make various functions in the write path async, like
`VirtualFile::{write,write_all}`.

Part of #4743.
This commit is contained in:
Arpad Müller
2023-09-06 18:17:12 +02:00
committed by GitHub
parent d5f1858f78
commit 5e00c44169
5 changed files with 303 additions and 84 deletions

View File

@@ -13,6 +13,7 @@
//!
use crate::page_cache::PAGE_SZ;
use crate::tenant::block_io::BlockCursor;
use crate::virtual_file::VirtualFile;
use std::cmp::min;
use std::io::{Error, ErrorKind};
@@ -83,20 +84,24 @@ impl<'a> BlockCursor<'a> {
}
}
/// A wrapper of `VirtualFile` that allows users to write blobs.
///
/// An implementation of BlobWriter to write blobs to anything that
/// implements std::io::Write.
///
pub struct WriteBlobWriter<W> {
inner: W,
/// If a `BlobWriter` is dropped, the internal buffer will be
/// discarded. You need to call [`flush_buffer`](Self::flush_buffer)
/// manually before dropping.
pub struct BlobWriter<const BUFFERED: bool> {
inner: VirtualFile,
offset: u64,
/// A buffer to save on write calls, only used if BUFFERED=true
buf: Vec<u8>,
}
impl<W> WriteBlobWriter<W> {
pub fn new(inner: W, start_offset: u64) -> Self {
WriteBlobWriter {
impl<const BUFFERED: bool> BlobWriter<BUFFERED> {
pub fn new(inner: VirtualFile, start_offset: u64) -> Self {
Self {
inner,
offset: start_offset,
buf: Vec::with_capacity(Self::CAPACITY),
}
}
@@ -104,20 +109,70 @@ impl<W> WriteBlobWriter<W> {
self.offset
}
/// Access the underlying Write object.
///
/// NOTE: WriteBlobWriter keeps track of the current write offset. If
/// you write something directly to the inner Write object, it makes the
/// internally tracked 'offset' to go out of sync. So don't do that.
pub fn into_inner(self) -> W {
self.inner
}
}
const CAPACITY: usize = if BUFFERED { PAGE_SZ } else { 0 };
#[inline(always)]
/// Writes the given buffer directly to the underlying `VirtualFile`.
/// You need to make sure that the internal buffer is empty, otherwise
/// data will be written in wrong order.
async fn write_all_unbuffered(&mut self, src_buf: &[u8]) -> Result<(), Error> {
self.inner.write_all(src_buf).await?;
self.offset += src_buf.len() as u64;
Ok(())
}
#[inline(always)]
/// Flushes the internal buffer to the underlying `VirtualFile`.
pub async fn flush_buffer(&mut self) -> Result<(), Error> {
self.inner.write_all(&self.buf).await?;
self.buf.clear();
Ok(())
}
#[inline(always)]
/// Writes as much of `src_buf` into the internal buffer as it fits
fn write_into_buffer(&mut self, src_buf: &[u8]) -> usize {
let remaining = Self::CAPACITY - self.buf.len();
let to_copy = src_buf.len().min(remaining);
self.buf.extend_from_slice(&src_buf[..to_copy]);
self.offset += to_copy as u64;
to_copy
}
/// Internal, possibly buffered, write function
async fn write_all(&mut self, mut src_buf: &[u8]) -> Result<(), Error> {
if !BUFFERED {
assert!(self.buf.is_empty());
self.write_all_unbuffered(src_buf).await?;
return Ok(());
}
let remaining = Self::CAPACITY - self.buf.len();
// First try to copy as much as we can into the buffer
if remaining > 0 {
let copied = self.write_into_buffer(src_buf);
src_buf = &src_buf[copied..];
}
// Then, if the buffer is full, flush it out
if self.buf.len() == Self::CAPACITY {
self.flush_buffer().await?;
}
// Finally, write the tail of src_buf:
// If it wholly fits into the buffer without
// completely filling it, then put it there.
// If not, write it out directly.
if !src_buf.is_empty() {
assert_eq!(self.buf.len(), 0);
if src_buf.len() < Self::CAPACITY {
let copied = self.write_into_buffer(src_buf);
// We just verified above that src_buf fits into our internal buffer.
assert_eq!(copied, src_buf.len());
} else {
self.write_all_unbuffered(src_buf).await?;
}
}
Ok(())
}
impl<W> WriteBlobWriter<W>
where
W: std::io::Write,
{
/// Write a blob of data. Returns the offset that it was written to,
/// which can be used to retrieve the data later.
pub async fn write_blob(&mut self, srcbuf: &[u8]) -> Result<u64, Error> {
@@ -126,8 +181,7 @@ where
if srcbuf.len() < 128 {
// Short blob. Write a 1-byte length header
let len_buf = srcbuf.len() as u8;
self.inner.write_all(&[len_buf])?;
self.offset += 1;
self.write_all(&[len_buf]).await?;
} else {
// Write a 4-byte length header
if srcbuf.len() > 0x7fff_ffff {
@@ -138,11 +192,153 @@ where
}
let mut len_buf = ((srcbuf.len()) as u32).to_be_bytes();
len_buf[0] |= 0x80;
self.inner.write_all(&len_buf)?;
self.offset += 4;
self.write_all(&len_buf).await?;
}
self.inner.write_all(srcbuf)?;
self.offset += srcbuf.len() as u64;
self.write_all(srcbuf).await?;
Ok(offset)
}
}
impl BlobWriter<true> {
/// Access the underlying `VirtualFile`.
///
/// This function flushes the internal buffer before giving access
/// to the underlying `VirtualFile`.
pub async fn into_inner(mut self) -> Result<VirtualFile, Error> {
self.flush_buffer().await?;
Ok(self.inner)
}
/// Access the underlying `VirtualFile`.
///
/// Unlike [`into_inner`](Self::into_inner), this doesn't flush
/// the internal buffer before giving access.
pub fn into_inner_no_flush(self) -> VirtualFile {
self.inner
}
}
impl BlobWriter<false> {
/// Access the underlying `VirtualFile`.
pub fn into_inner(self) -> VirtualFile {
self.inner
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::tenant::block_io::BlockReaderRef;
use rand::{Rng, SeedableRng};
async fn round_trip_test<const BUFFERED: bool>(blobs: &[Vec<u8>]) -> Result<(), Error> {
let temp_dir = tempfile::tempdir()?;
let path = temp_dir.path().join("file");
// Write part (in block to drop the file)
let mut offsets = Vec::new();
{
let file = VirtualFile::create(&path)?;
let mut wtr = BlobWriter::<BUFFERED>::new(file, 0);
for blob in blobs.iter() {
let offs = wtr.write_blob(blob).await?;
offsets.push(offs);
}
// Write out one page worth of zeros so that we can
// read again with read_blk
let offs = wtr.write_blob(&vec![0; PAGE_SZ]).await?;
println!("Writing final blob at offs={offs}");
wtr.flush_buffer().await?;
}
let file = VirtualFile::open(&path)?;
let rdr = BlockReaderRef::VirtualFile(&file);
let rdr = BlockCursor::new(rdr);
for (idx, (blob, offset)) in blobs.iter().zip(offsets.iter()).enumerate() {
let blob_read = rdr.read_blob(*offset).await?;
assert_eq!(
blob, &blob_read,
"mismatch for idx={idx} at offset={offset}"
);
}
Ok(())
}
fn random_array(len: usize) -> Vec<u8> {
let mut rng = rand::thread_rng();
(0..len).map(|_| rng.gen()).collect::<_>()
}
#[tokio::test]
async fn test_one() -> Result<(), Error> {
let blobs = &[vec![12, 21, 22]];
round_trip_test::<false>(blobs).await?;
round_trip_test::<true>(blobs).await?;
Ok(())
}
#[tokio::test]
async fn test_hello_simple() -> Result<(), Error> {
let blobs = &[
vec![0, 1, 2, 3],
b"Hello, World!".to_vec(),
Vec::new(),
b"foobar".to_vec(),
];
round_trip_test::<false>(blobs).await?;
round_trip_test::<true>(blobs).await?;
Ok(())
}
#[tokio::test]
async fn test_really_big_array() -> Result<(), Error> {
let blobs = &[
b"test".to_vec(),
random_array(10 * PAGE_SZ),
b"foobar".to_vec(),
];
round_trip_test::<false>(blobs).await?;
round_trip_test::<true>(blobs).await?;
Ok(())
}
#[tokio::test]
async fn test_arrays_inc() -> Result<(), Error> {
let blobs = (0..PAGE_SZ / 8)
.map(|v| random_array(v * 16))
.collect::<Vec<_>>();
round_trip_test::<false>(&blobs).await?;
round_trip_test::<true>(&blobs).await?;
Ok(())
}
#[tokio::test]
async fn test_arrays_random_size() -> Result<(), Error> {
let mut rng = rand::rngs::StdRng::seed_from_u64(42);
let blobs = (0..1024)
.map(|_| {
let mut sz: u16 = rng.gen();
// Make 50% of the arrays small
if rng.gen() {
sz |= 63;
}
random_array(sz.into())
})
.collect::<Vec<_>>();
round_trip_test::<false>(&blobs).await?;
round_trip_test::<true>(&blobs).await?;
Ok(())
}
#[tokio::test]
async fn test_arrays_page_boundary() -> Result<(), Error> {
let blobs = &[
random_array(PAGE_SZ - 4),
random_array(PAGE_SZ - 4),
random_array(PAGE_SZ - 4),
];
round_trip_test::<false>(blobs).await?;
round_trip_test::<true>(blobs).await?;
Ok(())
}
}

View File

@@ -76,6 +76,8 @@ pub(crate) enum BlockReaderRef<'a> {
Adapter(Adapter<&'a DeltaLayerInner>),
#[cfg(test)]
TestDisk(&'a super::disk_btree::tests::TestDisk),
#[cfg(test)]
VirtualFile(&'a VirtualFile),
}
impl<'a> BlockReaderRef<'a> {
@@ -88,6 +90,8 @@ impl<'a> BlockReaderRef<'a> {
Adapter(r) => r.read_blk(blknum).await,
#[cfg(test)]
TestDisk(r) => r.read_blk(blknum),
#[cfg(test)]
VirtualFile(r) => r.read_blk(blknum).await,
}
}
}

View File

@@ -31,7 +31,7 @@ use crate::config::PageServerConf;
use crate::context::RequestContext;
use crate::page_cache::PAGE_SZ;
use crate::repository::{Key, Value, KEY_SIZE};
use crate::tenant::blob_io::WriteBlobWriter;
use crate::tenant::blob_io::BlobWriter;
use crate::tenant::block_io::{BlockBuf, BlockCursor, BlockLease, BlockReader, FileBlockReader};
use crate::tenant::disk_btree::{DiskBtreeBuilder, DiskBtreeReader, VisitDirection};
use crate::tenant::storage_layer::{
@@ -46,7 +46,6 @@ use rand::{distributions::Alphanumeric, Rng};
use serde::{Deserialize, Serialize};
use std::fs::{self, File};
use std::io::SeekFrom;
use std::io::{BufWriter, Write};
use std::ops::Range;
use std::os::unix::fs::FileExt;
use std::path::{Path, PathBuf};
@@ -583,7 +582,7 @@ struct DeltaLayerWriterInner {
tree: DiskBtreeBuilder<BlockBuf, DELTA_KEY_SIZE>,
blob_writer: WriteBlobWriter<BufWriter<VirtualFile>>,
blob_writer: BlobWriter<true>,
}
impl DeltaLayerWriterInner {
@@ -608,8 +607,7 @@ impl DeltaLayerWriterInner {
let mut file = VirtualFile::create(&path)?;
// make room for the header block
file.seek(SeekFrom::Start(PAGE_SZ as u64)).await?;
let buf_writer = BufWriter::new(file);
let blob_writer = WriteBlobWriter::new(buf_writer, PAGE_SZ as u64);
let blob_writer = BlobWriter::new(file, PAGE_SZ as u64);
// Initialize the b-tree index builder
let block_buf = BlockBuf::new();
@@ -667,15 +665,14 @@ impl DeltaLayerWriterInner {
let index_start_blk =
((self.blob_writer.size() + PAGE_SZ as u64 - 1) / PAGE_SZ as u64) as u32;
let buf_writer = self.blob_writer.into_inner();
let mut file = buf_writer.into_inner()?;
let mut file = self.blob_writer.into_inner().await?;
// Write out the index
let (index_root_blk, block_buf) = self.tree.finish()?;
file.seek(SeekFrom::Start(index_start_blk as u64 * PAGE_SZ as u64))
.await?;
for buf in block_buf.blocks {
file.write_all(buf.as_ref())?;
file.write_all(buf.as_ref()).await?;
}
assert!(self.lsn_range.start < self.lsn_range.end);
// Fill in the summary on blk 0
@@ -700,7 +697,7 @@ impl DeltaLayerWriterInner {
);
}
file.seek(SeekFrom::Start(0)).await?;
file.write_all(&buf)?;
file.write_all(&buf).await?;
let metadata = file
.metadata()
@@ -840,13 +837,10 @@ impl DeltaLayerWriter {
impl Drop for DeltaLayerWriter {
fn drop(&mut self) {
if let Some(inner) = self.inner.take() {
match inner.blob_writer.into_inner().into_inner() {
Ok(vfile) => vfile.remove(),
Err(err) => warn!(
"error while flushing buffer of image layer temporary file: {}",
err
),
}
// We want to remove the virtual file here, so it's fine to not
// having completely flushed unwritten data.
let vfile = inner.blob_writer.into_inner_no_flush();
vfile.remove();
}
}
}

View File

@@ -27,7 +27,7 @@ use crate::config::PageServerConf;
use crate::context::RequestContext;
use crate::page_cache::PAGE_SZ;
use crate::repository::{Key, KEY_SIZE};
use crate::tenant::blob_io::WriteBlobWriter;
use crate::tenant::blob_io::BlobWriter;
use crate::tenant::block_io::{BlockBuf, BlockReader, FileBlockReader};
use crate::tenant::disk_btree::{DiskBtreeBuilder, DiskBtreeReader, VisitDirection};
use crate::tenant::storage_layer::{
@@ -43,7 +43,6 @@ use rand::{distributions::Alphanumeric, Rng};
use serde::{Deserialize, Serialize};
use std::fs::{self, File};
use std::io::SeekFrom;
use std::io::Write;
use std::ops::Range;
use std::os::unix::prelude::FileExt;
use std::path::{Path, PathBuf};
@@ -511,7 +510,7 @@ struct ImageLayerWriterInner {
key_range: Range<Key>,
lsn: Lsn,
blob_writer: WriteBlobWriter<VirtualFile>,
blob_writer: BlobWriter<false>,
tree: DiskBtreeBuilder<BlockBuf, KEY_SIZE>,
}
@@ -544,7 +543,7 @@ impl ImageLayerWriterInner {
)?;
// make room for the header block
file.seek(SeekFrom::Start(PAGE_SZ as u64)).await?;
let blob_writer = WriteBlobWriter::new(file, PAGE_SZ as u64);
let blob_writer = BlobWriter::new(file, PAGE_SZ as u64);
// Initialize the b-tree index builder
let block_buf = BlockBuf::new();
@@ -594,7 +593,7 @@ impl ImageLayerWriterInner {
.await?;
let (index_root_blk, block_buf) = self.tree.finish()?;
for buf in block_buf.blocks {
file.write_all(buf.as_ref())?;
file.write_all(buf.as_ref()).await?;
}
// Fill in the summary on blk 0
@@ -619,7 +618,7 @@ impl ImageLayerWriterInner {
);
}
file.seek(SeekFrom::Start(0)).await?;
file.write_all(&buf)?;
file.write_all(&buf).await?;
let metadata = file
.metadata()

View File

@@ -14,7 +14,7 @@ use crate::metrics::{STORAGE_IO_SIZE, STORAGE_IO_TIME};
use crate::tenant::TENANTS_SEGMENT_NAME;
use once_cell::sync::OnceCell;
use std::fs::{self, File, OpenOptions};
use std::io::{Error, ErrorKind, Seek, SeekFrom, Write};
use std::io::{Error, ErrorKind, Seek, SeekFrom};
use std::os::unix::fs::FileExt;
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
@@ -301,6 +301,7 @@ impl VirtualFile {
)
.map_err(CrashsafeOverwriteError::CreateTempfile)?;
file.write_all(content)
.await
.map_err(CrashsafeOverwriteError::WriteContents)?;
file.sync_all()
.map_err(CrashsafeOverwriteError::SyncTempfile)?;
@@ -433,22 +434,6 @@ impl VirtualFile {
Ok(self.pos)
}
#[cfg(test)]
async fn read_to_end(&mut self, buf: &mut Vec<u8>) -> Result<(), Error> {
loop {
let mut tmp = [0; 128];
match self.read_at(&mut tmp, self.pos).await {
Ok(0) => return Ok(()),
Ok(n) => {
self.pos += n as u64;
buf.extend_from_slice(&tmp[..n]);
}
Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => {}
Err(e) => return Err(e),
}
}
}
// Copied from https://doc.rust-lang.org/1.72.0/src/std/os/unix/fs.rs.html#117-135
pub async fn read_exact_at(&self, mut buf: &mut [u8], mut offset: u64) -> Result<(), Error> {
while !buf.is_empty() {
@@ -473,7 +458,7 @@ impl VirtualFile {
// Copied from https://doc.rust-lang.org/1.72.0/src/std/os/unix/fs.rs.html#219-235
pub async fn write_all_at(&self, mut buf: &[u8], mut offset: u64) -> Result<(), Error> {
while !buf.is_empty() {
match self.write_at(buf, offset) {
match self.write_at(buf, offset).await {
Ok(0) => {
return Err(Error::new(
std::io::ErrorKind::WriteZero,
@@ -491,6 +476,32 @@ impl VirtualFile {
Ok(())
}
pub async fn write_all(&mut self, mut buf: &[u8]) -> Result<(), Error> {
while !buf.is_empty() {
match self.write(buf).await {
Ok(0) => {
return Err(Error::new(
std::io::ErrorKind::WriteZero,
"failed to write whole buffer",
));
}
Ok(n) => {
buf = &buf[n..];
}
Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => {}
Err(e) => return Err(e),
}
}
Ok(())
}
async fn write(&mut self, buf: &[u8]) -> Result<usize, std::io::Error> {
let pos = self.pos;
let n = self.write_at(buf, pos).await?;
self.pos += n as u64;
Ok(n)
}
pub async fn read_at(&self, buf: &mut [u8], offset: u64) -> Result<usize, Error> {
let result = self.with_file("read", |file| file.read_at(buf, offset))?;
if let Ok(size) = result {
@@ -501,7 +512,7 @@ impl VirtualFile {
result
}
pub fn write_at(&self, buf: &[u8], offset: u64) -> Result<usize, Error> {
async fn write_at(&self, buf: &[u8], offset: u64) -> Result<usize, Error> {
let result = self.with_file("write", |file| file.write_at(buf, offset))?;
if let Ok(size) = result {
STORAGE_IO_SIZE
@@ -512,6 +523,35 @@ impl VirtualFile {
}
}
#[cfg(test)]
impl VirtualFile {
pub(crate) async fn read_blk(
&self,
blknum: u32,
) -> Result<crate::tenant::block_io::BlockLease<'_>, std::io::Error> {
use crate::page_cache::PAGE_SZ;
let mut buf = [0; PAGE_SZ];
self.read_exact_at(&mut buf, blknum as u64 * (PAGE_SZ as u64))
.await?;
Ok(std::sync::Arc::new(buf).into())
}
async fn read_to_end(&mut self, buf: &mut Vec<u8>) -> Result<(), Error> {
loop {
let mut tmp = [0; 128];
match self.read_at(&mut tmp, self.pos).await {
Ok(0) => return Ok(()),
Ok(n) => {
self.pos += n as u64;
buf.extend_from_slice(&tmp[..n]);
}
Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => {}
Err(e) => return Err(e),
}
}
}
}
impl Drop for VirtualFile {
/// If a VirtualFile is dropped, close the underlying file if it was open.
fn drop(&mut self) {
@@ -532,21 +572,6 @@ impl Drop for VirtualFile {
}
}
impl Write for VirtualFile {
fn write(&mut self, buf: &[u8]) -> Result<usize, std::io::Error> {
let pos = self.pos;
let n = self.write_at(buf, pos)?;
self.pos += n as u64;
Ok(n)
}
fn flush(&mut self) -> Result<(), std::io::Error> {
// flush is no-op for File (at least on unix), so we don't need to do
// anything here either.
Ok(())
}
}
impl OpenFiles {
fn new(num_slots: usize) -> OpenFiles {
let mut slots = Box::new(Vec::with_capacity(num_slots));
@@ -600,6 +625,7 @@ mod tests {
use rand::seq::SliceRandom;
use rand::thread_rng;
use rand::Rng;
use std::io::Write;
use std::sync::Arc;
enum MaybeVirtualFile {
@@ -628,7 +654,7 @@ mod tests {
}
async fn write_all(&mut self, buf: &[u8]) -> Result<(), Error> {
match self {
MaybeVirtualFile::VirtualFile(file) => file.write_all(buf),
MaybeVirtualFile::VirtualFile(file) => file.write_all(buf).await,
MaybeVirtualFile::File(file) => file.write_all(buf),
}
}