From 5e00c441692ff4746a6604583c04771736f0ee6b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arpad=20M=C3=BCller?= Date: Wed, 6 Sep 2023 18:17:12 +0200 Subject: [PATCH] Add WriteBlobWriter buffering and make VirtualFile::{write,write_all} async (#5203) ## Problem We want to convert the `VirtualFile` APIs to async fn so that we can adopt one of the async I/O solutions. ## Summary of changes This PR is a follow-up of #5189, #5190, and #5195, and does the following: * Move the used `Write` trait functions of `VirtualFile` into inherent functions * Add optional buffering to `WriteBlobWriter`. The buffer is discarded on drop, similarly to how tokio's `BufWriter` does it: drop is neither async nor does it support errors. * Remove the generics by `Write` impl of `WriteBlobWriter`, alwaays using `VirtualFile` * Rename `WriteBlobWriter` to `BlobWriter` * Make various functions in the write path async, like `VirtualFile::{write,write_all}`. Part of #4743. --- pageserver/src/tenant/blob_io.rs | 250 ++++++++++++++++-- pageserver/src/tenant/block_io.rs | 4 + .../src/tenant/storage_layer/delta_layer.rs | 26 +- .../src/tenant/storage_layer/image_layer.rs | 11 +- pageserver/src/virtual_file.rs | 96 ++++--- 5 files changed, 303 insertions(+), 84 deletions(-) diff --git a/pageserver/src/tenant/blob_io.rs b/pageserver/src/tenant/blob_io.rs index ba3542d869..71db8d2978 100644 --- a/pageserver/src/tenant/blob_io.rs +++ b/pageserver/src/tenant/blob_io.rs @@ -13,6 +13,7 @@ //! use crate::page_cache::PAGE_SZ; use crate::tenant::block_io::BlockCursor; +use crate::virtual_file::VirtualFile; use std::cmp::min; use std::io::{Error, ErrorKind}; @@ -83,20 +84,24 @@ impl<'a> BlockCursor<'a> { } } +/// A wrapper of `VirtualFile` that allows users to write blobs. /// -/// An implementation of BlobWriter to write blobs to anything that -/// implements std::io::Write. -/// -pub struct WriteBlobWriter { - inner: W, +/// If a `BlobWriter` is dropped, the internal buffer will be +/// discarded. You need to call [`flush_buffer`](Self::flush_buffer) +/// manually before dropping. +pub struct BlobWriter { + inner: VirtualFile, offset: u64, + /// A buffer to save on write calls, only used if BUFFERED=true + buf: Vec, } -impl WriteBlobWriter { - pub fn new(inner: W, start_offset: u64) -> Self { - WriteBlobWriter { +impl BlobWriter { + pub fn new(inner: VirtualFile, start_offset: u64) -> Self { + Self { inner, offset: start_offset, + buf: Vec::with_capacity(Self::CAPACITY), } } @@ -104,20 +109,70 @@ impl WriteBlobWriter { self.offset } - /// Access the underlying Write object. - /// - /// NOTE: WriteBlobWriter keeps track of the current write offset. If - /// you write something directly to the inner Write object, it makes the - /// internally tracked 'offset' to go out of sync. So don't do that. - pub fn into_inner(self) -> W { - self.inner - } -} + const CAPACITY: usize = if BUFFERED { PAGE_SZ } else { 0 }; + + #[inline(always)] + /// Writes the given buffer directly to the underlying `VirtualFile`. + /// You need to make sure that the internal buffer is empty, otherwise + /// data will be written in wrong order. + async fn write_all_unbuffered(&mut self, src_buf: &[u8]) -> Result<(), Error> { + self.inner.write_all(src_buf).await?; + self.offset += src_buf.len() as u64; + Ok(()) + } + + #[inline(always)] + /// Flushes the internal buffer to the underlying `VirtualFile`. + pub async fn flush_buffer(&mut self) -> Result<(), Error> { + self.inner.write_all(&self.buf).await?; + self.buf.clear(); + Ok(()) + } + + #[inline(always)] + /// Writes as much of `src_buf` into the internal buffer as it fits + fn write_into_buffer(&mut self, src_buf: &[u8]) -> usize { + let remaining = Self::CAPACITY - self.buf.len(); + let to_copy = src_buf.len().min(remaining); + self.buf.extend_from_slice(&src_buf[..to_copy]); + self.offset += to_copy as u64; + to_copy + } + + /// Internal, possibly buffered, write function + async fn write_all(&mut self, mut src_buf: &[u8]) -> Result<(), Error> { + if !BUFFERED { + assert!(self.buf.is_empty()); + self.write_all_unbuffered(src_buf).await?; + return Ok(()); + } + let remaining = Self::CAPACITY - self.buf.len(); + // First try to copy as much as we can into the buffer + if remaining > 0 { + let copied = self.write_into_buffer(src_buf); + src_buf = &src_buf[copied..]; + } + // Then, if the buffer is full, flush it out + if self.buf.len() == Self::CAPACITY { + self.flush_buffer().await?; + } + // Finally, write the tail of src_buf: + // If it wholly fits into the buffer without + // completely filling it, then put it there. + // If not, write it out directly. + if !src_buf.is_empty() { + assert_eq!(self.buf.len(), 0); + if src_buf.len() < Self::CAPACITY { + let copied = self.write_into_buffer(src_buf); + // We just verified above that src_buf fits into our internal buffer. + assert_eq!(copied, src_buf.len()); + } else { + self.write_all_unbuffered(src_buf).await?; + } + } + Ok(()) + } -impl WriteBlobWriter -where - W: std::io::Write, -{ /// Write a blob of data. Returns the offset that it was written to, /// which can be used to retrieve the data later. pub async fn write_blob(&mut self, srcbuf: &[u8]) -> Result { @@ -126,8 +181,7 @@ where if srcbuf.len() < 128 { // Short blob. Write a 1-byte length header let len_buf = srcbuf.len() as u8; - self.inner.write_all(&[len_buf])?; - self.offset += 1; + self.write_all(&[len_buf]).await?; } else { // Write a 4-byte length header if srcbuf.len() > 0x7fff_ffff { @@ -138,11 +192,153 @@ where } let mut len_buf = ((srcbuf.len()) as u32).to_be_bytes(); len_buf[0] |= 0x80; - self.inner.write_all(&len_buf)?; - self.offset += 4; + self.write_all(&len_buf).await?; } - self.inner.write_all(srcbuf)?; - self.offset += srcbuf.len() as u64; + self.write_all(srcbuf).await?; Ok(offset) } } + +impl BlobWriter { + /// Access the underlying `VirtualFile`. + /// + /// This function flushes the internal buffer before giving access + /// to the underlying `VirtualFile`. + pub async fn into_inner(mut self) -> Result { + self.flush_buffer().await?; + Ok(self.inner) + } + + /// Access the underlying `VirtualFile`. + /// + /// Unlike [`into_inner`](Self::into_inner), this doesn't flush + /// the internal buffer before giving access. + pub fn into_inner_no_flush(self) -> VirtualFile { + self.inner + } +} + +impl BlobWriter { + /// Access the underlying `VirtualFile`. + pub fn into_inner(self) -> VirtualFile { + self.inner + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::tenant::block_io::BlockReaderRef; + use rand::{Rng, SeedableRng}; + + async fn round_trip_test(blobs: &[Vec]) -> Result<(), Error> { + let temp_dir = tempfile::tempdir()?; + let path = temp_dir.path().join("file"); + + // Write part (in block to drop the file) + let mut offsets = Vec::new(); + { + let file = VirtualFile::create(&path)?; + let mut wtr = BlobWriter::::new(file, 0); + for blob in blobs.iter() { + let offs = wtr.write_blob(blob).await?; + offsets.push(offs); + } + // Write out one page worth of zeros so that we can + // read again with read_blk + let offs = wtr.write_blob(&vec![0; PAGE_SZ]).await?; + println!("Writing final blob at offs={offs}"); + wtr.flush_buffer().await?; + } + + let file = VirtualFile::open(&path)?; + let rdr = BlockReaderRef::VirtualFile(&file); + let rdr = BlockCursor::new(rdr); + for (idx, (blob, offset)) in blobs.iter().zip(offsets.iter()).enumerate() { + let blob_read = rdr.read_blob(*offset).await?; + assert_eq!( + blob, &blob_read, + "mismatch for idx={idx} at offset={offset}" + ); + } + Ok(()) + } + + fn random_array(len: usize) -> Vec { + let mut rng = rand::thread_rng(); + (0..len).map(|_| rng.gen()).collect::<_>() + } + + #[tokio::test] + async fn test_one() -> Result<(), Error> { + let blobs = &[vec![12, 21, 22]]; + round_trip_test::(blobs).await?; + round_trip_test::(blobs).await?; + Ok(()) + } + + #[tokio::test] + async fn test_hello_simple() -> Result<(), Error> { + let blobs = &[ + vec![0, 1, 2, 3], + b"Hello, World!".to_vec(), + Vec::new(), + b"foobar".to_vec(), + ]; + round_trip_test::(blobs).await?; + round_trip_test::(blobs).await?; + Ok(()) + } + + #[tokio::test] + async fn test_really_big_array() -> Result<(), Error> { + let blobs = &[ + b"test".to_vec(), + random_array(10 * PAGE_SZ), + b"foobar".to_vec(), + ]; + round_trip_test::(blobs).await?; + round_trip_test::(blobs).await?; + Ok(()) + } + + #[tokio::test] + async fn test_arrays_inc() -> Result<(), Error> { + let blobs = (0..PAGE_SZ / 8) + .map(|v| random_array(v * 16)) + .collect::>(); + round_trip_test::(&blobs).await?; + round_trip_test::(&blobs).await?; + Ok(()) + } + + #[tokio::test] + async fn test_arrays_random_size() -> Result<(), Error> { + let mut rng = rand::rngs::StdRng::seed_from_u64(42); + let blobs = (0..1024) + .map(|_| { + let mut sz: u16 = rng.gen(); + // Make 50% of the arrays small + if rng.gen() { + sz |= 63; + } + random_array(sz.into()) + }) + .collect::>(); + round_trip_test::(&blobs).await?; + round_trip_test::(&blobs).await?; + Ok(()) + } + + #[tokio::test] + async fn test_arrays_page_boundary() -> Result<(), Error> { + let blobs = &[ + random_array(PAGE_SZ - 4), + random_array(PAGE_SZ - 4), + random_array(PAGE_SZ - 4), + ]; + round_trip_test::(blobs).await?; + round_trip_test::(blobs).await?; + Ok(()) + } +} diff --git a/pageserver/src/tenant/block_io.rs b/pageserver/src/tenant/block_io.rs index 645ec81036..abb636579d 100644 --- a/pageserver/src/tenant/block_io.rs +++ b/pageserver/src/tenant/block_io.rs @@ -76,6 +76,8 @@ pub(crate) enum BlockReaderRef<'a> { Adapter(Adapter<&'a DeltaLayerInner>), #[cfg(test)] TestDisk(&'a super::disk_btree::tests::TestDisk), + #[cfg(test)] + VirtualFile(&'a VirtualFile), } impl<'a> BlockReaderRef<'a> { @@ -88,6 +90,8 @@ impl<'a> BlockReaderRef<'a> { Adapter(r) => r.read_blk(blknum).await, #[cfg(test)] TestDisk(r) => r.read_blk(blknum), + #[cfg(test)] + VirtualFile(r) => r.read_blk(blknum).await, } } } diff --git a/pageserver/src/tenant/storage_layer/delta_layer.rs b/pageserver/src/tenant/storage_layer/delta_layer.rs index e95c422d29..b6fbf98962 100644 --- a/pageserver/src/tenant/storage_layer/delta_layer.rs +++ b/pageserver/src/tenant/storage_layer/delta_layer.rs @@ -31,7 +31,7 @@ use crate::config::PageServerConf; use crate::context::RequestContext; use crate::page_cache::PAGE_SZ; use crate::repository::{Key, Value, KEY_SIZE}; -use crate::tenant::blob_io::WriteBlobWriter; +use crate::tenant::blob_io::BlobWriter; use crate::tenant::block_io::{BlockBuf, BlockCursor, BlockLease, BlockReader, FileBlockReader}; use crate::tenant::disk_btree::{DiskBtreeBuilder, DiskBtreeReader, VisitDirection}; use crate::tenant::storage_layer::{ @@ -46,7 +46,6 @@ use rand::{distributions::Alphanumeric, Rng}; use serde::{Deserialize, Serialize}; use std::fs::{self, File}; use std::io::SeekFrom; -use std::io::{BufWriter, Write}; use std::ops::Range; use std::os::unix::fs::FileExt; use std::path::{Path, PathBuf}; @@ -583,7 +582,7 @@ struct DeltaLayerWriterInner { tree: DiskBtreeBuilder, - blob_writer: WriteBlobWriter>, + blob_writer: BlobWriter, } impl DeltaLayerWriterInner { @@ -608,8 +607,7 @@ impl DeltaLayerWriterInner { let mut file = VirtualFile::create(&path)?; // make room for the header block file.seek(SeekFrom::Start(PAGE_SZ as u64)).await?; - let buf_writer = BufWriter::new(file); - let blob_writer = WriteBlobWriter::new(buf_writer, PAGE_SZ as u64); + let blob_writer = BlobWriter::new(file, PAGE_SZ as u64); // Initialize the b-tree index builder let block_buf = BlockBuf::new(); @@ -667,15 +665,14 @@ impl DeltaLayerWriterInner { let index_start_blk = ((self.blob_writer.size() + PAGE_SZ as u64 - 1) / PAGE_SZ as u64) as u32; - let buf_writer = self.blob_writer.into_inner(); - let mut file = buf_writer.into_inner()?; + let mut file = self.blob_writer.into_inner().await?; // Write out the index let (index_root_blk, block_buf) = self.tree.finish()?; file.seek(SeekFrom::Start(index_start_blk as u64 * PAGE_SZ as u64)) .await?; for buf in block_buf.blocks { - file.write_all(buf.as_ref())?; + file.write_all(buf.as_ref()).await?; } assert!(self.lsn_range.start < self.lsn_range.end); // Fill in the summary on blk 0 @@ -700,7 +697,7 @@ impl DeltaLayerWriterInner { ); } file.seek(SeekFrom::Start(0)).await?; - file.write_all(&buf)?; + file.write_all(&buf).await?; let metadata = file .metadata() @@ -840,13 +837,10 @@ impl DeltaLayerWriter { impl Drop for DeltaLayerWriter { fn drop(&mut self) { if let Some(inner) = self.inner.take() { - match inner.blob_writer.into_inner().into_inner() { - Ok(vfile) => vfile.remove(), - Err(err) => warn!( - "error while flushing buffer of image layer temporary file: {}", - err - ), - } + // We want to remove the virtual file here, so it's fine to not + // having completely flushed unwritten data. + let vfile = inner.blob_writer.into_inner_no_flush(); + vfile.remove(); } } } diff --git a/pageserver/src/tenant/storage_layer/image_layer.rs b/pageserver/src/tenant/storage_layer/image_layer.rs index 62bfeb8eec..8f7fb8175c 100644 --- a/pageserver/src/tenant/storage_layer/image_layer.rs +++ b/pageserver/src/tenant/storage_layer/image_layer.rs @@ -27,7 +27,7 @@ use crate::config::PageServerConf; use crate::context::RequestContext; use crate::page_cache::PAGE_SZ; use crate::repository::{Key, KEY_SIZE}; -use crate::tenant::blob_io::WriteBlobWriter; +use crate::tenant::blob_io::BlobWriter; use crate::tenant::block_io::{BlockBuf, BlockReader, FileBlockReader}; use crate::tenant::disk_btree::{DiskBtreeBuilder, DiskBtreeReader, VisitDirection}; use crate::tenant::storage_layer::{ @@ -43,7 +43,6 @@ use rand::{distributions::Alphanumeric, Rng}; use serde::{Deserialize, Serialize}; use std::fs::{self, File}; use std::io::SeekFrom; -use std::io::Write; use std::ops::Range; use std::os::unix::prelude::FileExt; use std::path::{Path, PathBuf}; @@ -511,7 +510,7 @@ struct ImageLayerWriterInner { key_range: Range, lsn: Lsn, - blob_writer: WriteBlobWriter, + blob_writer: BlobWriter, tree: DiskBtreeBuilder, } @@ -544,7 +543,7 @@ impl ImageLayerWriterInner { )?; // make room for the header block file.seek(SeekFrom::Start(PAGE_SZ as u64)).await?; - let blob_writer = WriteBlobWriter::new(file, PAGE_SZ as u64); + let blob_writer = BlobWriter::new(file, PAGE_SZ as u64); // Initialize the b-tree index builder let block_buf = BlockBuf::new(); @@ -594,7 +593,7 @@ impl ImageLayerWriterInner { .await?; let (index_root_blk, block_buf) = self.tree.finish()?; for buf in block_buf.blocks { - file.write_all(buf.as_ref())?; + file.write_all(buf.as_ref()).await?; } // Fill in the summary on blk 0 @@ -619,7 +618,7 @@ impl ImageLayerWriterInner { ); } file.seek(SeekFrom::Start(0)).await?; - file.write_all(&buf)?; + file.write_all(&buf).await?; let metadata = file .metadata() diff --git a/pageserver/src/virtual_file.rs b/pageserver/src/virtual_file.rs index 91430ef6eb..c4241c4270 100644 --- a/pageserver/src/virtual_file.rs +++ b/pageserver/src/virtual_file.rs @@ -14,7 +14,7 @@ use crate::metrics::{STORAGE_IO_SIZE, STORAGE_IO_TIME}; use crate::tenant::TENANTS_SEGMENT_NAME; use once_cell::sync::OnceCell; use std::fs::{self, File, OpenOptions}; -use std::io::{Error, ErrorKind, Seek, SeekFrom, Write}; +use std::io::{Error, ErrorKind, Seek, SeekFrom}; use std::os::unix::fs::FileExt; use std::path::{Path, PathBuf}; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; @@ -301,6 +301,7 @@ impl VirtualFile { ) .map_err(CrashsafeOverwriteError::CreateTempfile)?; file.write_all(content) + .await .map_err(CrashsafeOverwriteError::WriteContents)?; file.sync_all() .map_err(CrashsafeOverwriteError::SyncTempfile)?; @@ -433,22 +434,6 @@ impl VirtualFile { Ok(self.pos) } - #[cfg(test)] - async fn read_to_end(&mut self, buf: &mut Vec) -> Result<(), Error> { - loop { - let mut tmp = [0; 128]; - match self.read_at(&mut tmp, self.pos).await { - Ok(0) => return Ok(()), - Ok(n) => { - self.pos += n as u64; - buf.extend_from_slice(&tmp[..n]); - } - Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => {} - Err(e) => return Err(e), - } - } - } - // Copied from https://doc.rust-lang.org/1.72.0/src/std/os/unix/fs.rs.html#117-135 pub async fn read_exact_at(&self, mut buf: &mut [u8], mut offset: u64) -> Result<(), Error> { while !buf.is_empty() { @@ -473,7 +458,7 @@ impl VirtualFile { // Copied from https://doc.rust-lang.org/1.72.0/src/std/os/unix/fs.rs.html#219-235 pub async fn write_all_at(&self, mut buf: &[u8], mut offset: u64) -> Result<(), Error> { while !buf.is_empty() { - match self.write_at(buf, offset) { + match self.write_at(buf, offset).await { Ok(0) => { return Err(Error::new( std::io::ErrorKind::WriteZero, @@ -491,6 +476,32 @@ impl VirtualFile { Ok(()) } + pub async fn write_all(&mut self, mut buf: &[u8]) -> Result<(), Error> { + while !buf.is_empty() { + match self.write(buf).await { + Ok(0) => { + return Err(Error::new( + std::io::ErrorKind::WriteZero, + "failed to write whole buffer", + )); + } + Ok(n) => { + buf = &buf[n..]; + } + Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => {} + Err(e) => return Err(e), + } + } + Ok(()) + } + + async fn write(&mut self, buf: &[u8]) -> Result { + let pos = self.pos; + let n = self.write_at(buf, pos).await?; + self.pos += n as u64; + Ok(n) + } + pub async fn read_at(&self, buf: &mut [u8], offset: u64) -> Result { let result = self.with_file("read", |file| file.read_at(buf, offset))?; if let Ok(size) = result { @@ -501,7 +512,7 @@ impl VirtualFile { result } - pub fn write_at(&self, buf: &[u8], offset: u64) -> Result { + async fn write_at(&self, buf: &[u8], offset: u64) -> Result { let result = self.with_file("write", |file| file.write_at(buf, offset))?; if let Ok(size) = result { STORAGE_IO_SIZE @@ -512,6 +523,35 @@ impl VirtualFile { } } +#[cfg(test)] +impl VirtualFile { + pub(crate) async fn read_blk( + &self, + blknum: u32, + ) -> Result, std::io::Error> { + use crate::page_cache::PAGE_SZ; + let mut buf = [0; PAGE_SZ]; + self.read_exact_at(&mut buf, blknum as u64 * (PAGE_SZ as u64)) + .await?; + Ok(std::sync::Arc::new(buf).into()) + } + + async fn read_to_end(&mut self, buf: &mut Vec) -> Result<(), Error> { + loop { + let mut tmp = [0; 128]; + match self.read_at(&mut tmp, self.pos).await { + Ok(0) => return Ok(()), + Ok(n) => { + self.pos += n as u64; + buf.extend_from_slice(&tmp[..n]); + } + Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => {} + Err(e) => return Err(e), + } + } + } +} + impl Drop for VirtualFile { /// If a VirtualFile is dropped, close the underlying file if it was open. fn drop(&mut self) { @@ -532,21 +572,6 @@ impl Drop for VirtualFile { } } -impl Write for VirtualFile { - fn write(&mut self, buf: &[u8]) -> Result { - let pos = self.pos; - let n = self.write_at(buf, pos)?; - self.pos += n as u64; - Ok(n) - } - - fn flush(&mut self) -> Result<(), std::io::Error> { - // flush is no-op for File (at least on unix), so we don't need to do - // anything here either. - Ok(()) - } -} - impl OpenFiles { fn new(num_slots: usize) -> OpenFiles { let mut slots = Box::new(Vec::with_capacity(num_slots)); @@ -600,6 +625,7 @@ mod tests { use rand::seq::SliceRandom; use rand::thread_rng; use rand::Rng; + use std::io::Write; use std::sync::Arc; enum MaybeVirtualFile { @@ -628,7 +654,7 @@ mod tests { } async fn write_all(&mut self, buf: &[u8]) -> Result<(), Error> { match self { - MaybeVirtualFile::VirtualFile(file) => file.write_all(buf), + MaybeVirtualFile::VirtualFile(file) => file.write_all(buf).await, MaybeVirtualFile::File(file) => file.write_all(buf), } }