Convert VirtualFile::{seek,metadata} to async (#5195)

## Problem

We want to convert the `VirtualFile` APIs to async fn so that we can
adopt one of the async I/O solutions.

## Summary of changes

Convert the following APIs of `VirtualFile` to async fn (as well as all
of the APIs calling it):

* `VirtualFile::seek`
* `VirtualFile::metadata`
* Also, prepare for deletion of the write impl by writing the summary to
a buffer before writing it to disk, as suggested in
https://github.com/neondatabase/neon/issues/4743#issuecomment-1700663864
. This change adds an additional warning for the case when the summary
exceeds a block. Previously, we'd have silently corrupted data in this
(unlikely) case.
* `WriteBlobWriter::write_blob`, in preparation for making
`VirtualFile::write_all` async.
This commit is contained in:
Arpad Müller
2023-09-05 12:55:45 +02:00
committed by GitHub
parent 77658a155b
commit 4904613aaa
6 changed files with 114 additions and 91 deletions

View File

@@ -83,15 +83,6 @@ impl<'a> BlockCursor<'a> {
}
}
///
/// Abstract trait for a data sink that you can write blobs to.
///
pub trait BlobWriter {
/// Write a blob of data. Returns the offset that it was written to,
/// which can be used to retrieve the data later.
fn write_blob(&mut self, srcbuf: &[u8]) -> Result<u64, Error>;
}
///
/// An implementation of BlobWriter to write blobs to anything that
/// implements std::io::Write.
@@ -123,11 +114,13 @@ impl<W> WriteBlobWriter<W> {
}
}
impl<W> BlobWriter for WriteBlobWriter<W>
impl<W> WriteBlobWriter<W>
where
W: std::io::Write,
{
fn write_blob(&mut self, srcbuf: &[u8]) -> Result<u64, Error> {
/// Write a blob of data. Returns the offset that it was written to,
/// which can be used to retrieve the data later.
pub async fn write_blob(&mut self, srcbuf: &[u8]) -> Result<u64, Error> {
let offset = self.offset;
if srcbuf.len() < 128 {

View File

@@ -31,7 +31,7 @@ use crate::config::PageServerConf;
use crate::context::RequestContext;
use crate::page_cache::PAGE_SZ;
use crate::repository::{Key, Value, KEY_SIZE};
use crate::tenant::blob_io::{BlobWriter, WriteBlobWriter};
use crate::tenant::blob_io::WriteBlobWriter;
use crate::tenant::block_io::{BlockBuf, BlockCursor, BlockLease, BlockReader, FileBlockReader};
use crate::tenant::disk_btree::{DiskBtreeBuilder, DiskBtreeReader, VisitDirection};
use crate::tenant::storage_layer::{
@@ -590,7 +590,7 @@ impl DeltaLayerWriterInner {
///
/// Start building a new delta layer.
///
fn new(
async fn new(
conf: &'static PageServerConf,
timeline_id: TimelineId,
tenant_id: TenantId,
@@ -607,7 +607,7 @@ impl DeltaLayerWriterInner {
let mut file = VirtualFile::create(&path)?;
// make room for the header block
file.seek(SeekFrom::Start(PAGE_SZ as u64))?;
file.seek(SeekFrom::Start(PAGE_SZ as u64)).await?;
let buf_writer = BufWriter::new(file);
let blob_writer = WriteBlobWriter::new(buf_writer, PAGE_SZ as u64);
@@ -632,11 +632,12 @@ impl DeltaLayerWriterInner {
///
/// The values must be appended in key, lsn order.
///
fn put_value(&mut self, key: Key, lsn: Lsn, val: Value) -> anyhow::Result<()> {
async fn put_value(&mut self, key: Key, lsn: Lsn, val: Value) -> anyhow::Result<()> {
self.put_value_bytes(key, lsn, &Value::ser(&val)?, val.will_init())
.await
}
fn put_value_bytes(
async fn put_value_bytes(
&mut self,
key: Key,
lsn: Lsn,
@@ -645,7 +646,7 @@ impl DeltaLayerWriterInner {
) -> anyhow::Result<()> {
assert!(self.lsn_range.start <= lsn);
let off = self.blob_writer.write_blob(val)?;
let off = self.blob_writer.write_blob(val).await?;
let blob_ref = BlobRef::new(off, will_init);
@@ -662,7 +663,7 @@ impl DeltaLayerWriterInner {
///
/// Finish writing the delta layer.
///
fn finish(self, key_end: Key) -> anyhow::Result<DeltaLayer> {
async fn finish(self, key_end: Key) -> anyhow::Result<DeltaLayer> {
let index_start_blk =
((self.blob_writer.size() + PAGE_SZ as u64 - 1) / PAGE_SZ as u64) as u32;
@@ -671,7 +672,8 @@ impl DeltaLayerWriterInner {
// Write out the index
let (index_root_blk, block_buf) = self.tree.finish()?;
file.seek(SeekFrom::Start(index_start_blk as u64 * PAGE_SZ as u64))?;
file.seek(SeekFrom::Start(index_start_blk as u64 * PAGE_SZ as u64))
.await?;
for buf in block_buf.blocks {
file.write_all(buf.as_ref())?;
}
@@ -687,11 +689,22 @@ impl DeltaLayerWriterInner {
index_start_blk,
index_root_blk,
};
file.seek(SeekFrom::Start(0))?;
Summary::ser_into(&summary, &mut file)?;
let mut buf = smallvec::SmallVec::<[u8; PAGE_SZ]>::new();
Summary::ser_into(&summary, &mut buf)?;
if buf.spilled() {
// This is bad as we only have one free block for the summary
warn!(
"Used more than one page size for summary buffer: {}",
buf.len()
);
}
file.seek(SeekFrom::Start(0)).await?;
file.write_all(&buf)?;
let metadata = file
.metadata()
.await
.context("get file metadata to determine size")?;
// 5GB limit for objects without multipart upload (which we don't want to use)
@@ -774,7 +787,7 @@ impl DeltaLayerWriter {
///
/// Start building a new delta layer.
///
pub fn new(
pub async fn new(
conf: &'static PageServerConf,
timeline_id: TimelineId,
tenant_id: TenantId,
@@ -782,13 +795,10 @@ impl DeltaLayerWriter {
lsn_range: Range<Lsn>,
) -> anyhow::Result<Self> {
Ok(Self {
inner: Some(DeltaLayerWriterInner::new(
conf,
timeline_id,
tenant_id,
key_start,
lsn_range,
)?),
inner: Some(
DeltaLayerWriterInner::new(conf, timeline_id, tenant_id, key_start, lsn_range)
.await?,
),
})
}
@@ -797,11 +807,11 @@ impl DeltaLayerWriter {
///
/// The values must be appended in key, lsn order.
///
pub fn put_value(&mut self, key: Key, lsn: Lsn, val: Value) -> anyhow::Result<()> {
self.inner.as_mut().unwrap().put_value(key, lsn, val)
pub async fn put_value(&mut self, key: Key, lsn: Lsn, val: Value) -> anyhow::Result<()> {
self.inner.as_mut().unwrap().put_value(key, lsn, val).await
}
pub fn put_value_bytes(
pub async fn put_value_bytes(
&mut self,
key: Key,
lsn: Lsn,
@@ -812,6 +822,7 @@ impl DeltaLayerWriter {
.as_mut()
.unwrap()
.put_value_bytes(key, lsn, val, will_init)
.await
}
pub fn size(&self) -> u64 {
@@ -821,8 +832,8 @@ impl DeltaLayerWriter {
///
/// Finish writing the delta layer.
///
pub fn finish(mut self, key_end: Key) -> anyhow::Result<DeltaLayer> {
self.inner.take().unwrap().finish(key_end)
pub async fn finish(mut self, key_end: Key) -> anyhow::Result<DeltaLayer> {
self.inner.take().unwrap().finish(key_end).await
}
}

View File

@@ -27,7 +27,7 @@ use crate::config::PageServerConf;
use crate::context::RequestContext;
use crate::page_cache::PAGE_SZ;
use crate::repository::{Key, KEY_SIZE};
use crate::tenant::blob_io::{BlobWriter, WriteBlobWriter};
use crate::tenant::blob_io::WriteBlobWriter;
use crate::tenant::block_io::{BlockBuf, BlockReader, FileBlockReader};
use crate::tenant::disk_btree::{DiskBtreeBuilder, DiskBtreeReader, VisitDirection};
use crate::tenant::storage_layer::{
@@ -519,7 +519,7 @@ impl ImageLayerWriterInner {
///
/// Start building a new image layer.
///
fn new(
async fn new(
conf: &'static PageServerConf,
timeline_id: TimelineId,
tenant_id: TenantId,
@@ -543,7 +543,7 @@ impl ImageLayerWriterInner {
std::fs::OpenOptions::new().write(true).create_new(true),
)?;
// make room for the header block
file.seek(SeekFrom::Start(PAGE_SZ as u64))?;
file.seek(SeekFrom::Start(PAGE_SZ as u64)).await?;
let blob_writer = WriteBlobWriter::new(file, PAGE_SZ as u64);
// Initialize the b-tree index builder
@@ -569,9 +569,9 @@ impl ImageLayerWriterInner {
///
/// The page versions must be appended in blknum order.
///
fn put_image(&mut self, key: Key, img: &[u8]) -> anyhow::Result<()> {
async fn put_image(&mut self, key: Key, img: &[u8]) -> anyhow::Result<()> {
ensure!(self.key_range.contains(&key));
let off = self.blob_writer.write_blob(img)?;
let off = self.blob_writer.write_blob(img).await?;
let mut keybuf: [u8; KEY_SIZE] = [0u8; KEY_SIZE];
key.write_to_byte_slice(&mut keybuf);
@@ -583,14 +583,15 @@ impl ImageLayerWriterInner {
///
/// Finish writing the image layer.
///
fn finish(self) -> anyhow::Result<ImageLayer> {
async fn finish(self) -> anyhow::Result<ImageLayer> {
let index_start_blk =
((self.blob_writer.size() + PAGE_SZ as u64 - 1) / PAGE_SZ as u64) as u32;
let mut file = self.blob_writer.into_inner();
// Write out the index
file.seek(SeekFrom::Start(index_start_blk as u64 * PAGE_SZ as u64))?;
file.seek(SeekFrom::Start(index_start_blk as u64 * PAGE_SZ as u64))
.await?;
let (index_root_blk, block_buf) = self.tree.finish()?;
for buf in block_buf.blocks {
file.write_all(buf.as_ref())?;
@@ -607,11 +608,22 @@ impl ImageLayerWriterInner {
index_start_blk,
index_root_blk,
};
file.seek(SeekFrom::Start(0))?;
Summary::ser_into(&summary, &mut file)?;
let mut buf = smallvec::SmallVec::<[u8; PAGE_SZ]>::new();
Summary::ser_into(&summary, &mut buf)?;
if buf.spilled() {
// This is bad as we only have one free block for the summary
warn!(
"Used more than one page size for summary buffer: {}",
buf.len()
);
}
file.seek(SeekFrom::Start(0)).await?;
file.write_all(&buf)?;
let metadata = file
.metadata()
.await
.context("get metadata to determine file size")?;
let desc = PersistentLayerDesc::new_img(
@@ -687,7 +699,7 @@ impl ImageLayerWriter {
///
/// Start building a new image layer.
///
pub fn new(
pub async fn new(
conf: &'static PageServerConf,
timeline_id: TimelineId,
tenant_id: TenantId,
@@ -695,13 +707,9 @@ impl ImageLayerWriter {
lsn: Lsn,
) -> anyhow::Result<ImageLayerWriter> {
Ok(Self {
inner: Some(ImageLayerWriterInner::new(
conf,
timeline_id,
tenant_id,
key_range,
lsn,
)?),
inner: Some(
ImageLayerWriterInner::new(conf, timeline_id, tenant_id, key_range, lsn).await?,
),
})
}
@@ -710,15 +718,15 @@ impl ImageLayerWriter {
///
/// The page versions must be appended in blknum order.
///
pub fn put_image(&mut self, key: Key, img: &[u8]) -> anyhow::Result<()> {
self.inner.as_mut().unwrap().put_image(key, img)
pub async fn put_image(&mut self, key: Key, img: &[u8]) -> anyhow::Result<()> {
self.inner.as_mut().unwrap().put_image(key, img).await
}
///
/// Finish writing the image layer.
///
pub fn finish(mut self) -> anyhow::Result<ImageLayer> {
self.inner.take().unwrap().finish()
pub async fn finish(mut self) -> anyhow::Result<ImageLayer> {
self.inner.take().unwrap().finish().await
}
}

View File

@@ -333,7 +333,8 @@ impl InMemoryLayer {
self.tenant_id,
Key::MIN,
self.start_lsn..end_lsn,
)?;
)
.await?;
let mut buf = Vec::new();
@@ -348,11 +349,13 @@ impl InMemoryLayer {
for (lsn, pos) in vec_map.as_slice() {
cursor.read_blob_into_buf(*pos, &mut buf).await?;
let will_init = Value::des(&buf)?.will_init();
delta_layer_writer.put_value_bytes(key, *lsn, &buf, will_init)?;
delta_layer_writer
.put_value_bytes(key, *lsn, &buf, will_init)
.await?;
}
}
let delta_layer = delta_layer_writer.finish(Key::MAX)?;
let delta_layer = delta_layer_writer.finish(Key::MAX).await?;
Ok(delta_layer)
}
}

View File

@@ -3033,7 +3033,8 @@ impl Timeline {
self.tenant_id,
&img_range,
lsn,
)?;
)
.await?;
fail_point!("image-layer-writer-fail-before-finish", |_| {
Err(PageReconstructError::Other(anyhow::anyhow!(
@@ -3069,11 +3070,11 @@ impl Timeline {
}
}
};
image_layer_writer.put_image(key, &img)?;
image_layer_writer.put_image(key, &img).await?;
key = key.next();
}
}
let image_layer = image_layer_writer.finish()?;
let image_layer = image_layer_writer.finish().await?;
image_layers.push(image_layer);
}
}
@@ -3618,7 +3619,11 @@ impl Timeline {
{
// ... if so, flush previous layer and prepare to write new one
new_layers.push(Arc::new(
writer.take().unwrap().finish(prev_key.unwrap().next())?,
writer
.take()
.unwrap()
.finish(prev_key.unwrap().next())
.await?,
));
writer = None;
@@ -3633,20 +3638,23 @@ impl Timeline {
}
if writer.is_none() {
// Create writer if not initiaized yet
writer = Some(DeltaLayerWriter::new(
self.conf,
self.timeline_id,
self.tenant_id,
key,
if dup_end_lsn.is_valid() {
// this is a layer containing slice of values of the same key
debug!("Create new dup layer {}..{}", dup_start_lsn, dup_end_lsn);
dup_start_lsn..dup_end_lsn
} else {
debug!("Create new layer {}..{}", lsn_range.start, lsn_range.end);
lsn_range.clone()
},
)?);
writer = Some(
DeltaLayerWriter::new(
self.conf,
self.timeline_id,
self.tenant_id,
key,
if dup_end_lsn.is_valid() {
// this is a layer containing slice of values of the same key
debug!("Create new dup layer {}..{}", dup_start_lsn, dup_end_lsn);
dup_start_lsn..dup_end_lsn
} else {
debug!("Create new layer {}..{}", lsn_range.start, lsn_range.end);
lsn_range.clone()
},
)
.await?,
);
}
fail_point!("delta-layer-writer-fail-before-finish", |_| {
@@ -3655,11 +3663,11 @@ impl Timeline {
)))
});
writer.as_mut().unwrap().put_value(key, lsn, value)?;
writer.as_mut().unwrap().put_value(key, lsn, value).await?;
prev_key = Some(key);
}
if let Some(writer) = writer {
new_layers.push(Arc::new(writer.finish(prev_key.unwrap().next())?));
new_layers.push(Arc::new(writer.finish(prev_key.unwrap().next()).await?));
}
// Sync layers

View File

@@ -326,7 +326,7 @@ impl VirtualFile {
self.with_file("fsync", |file| file.sync_all())?
}
pub fn metadata(&self) -> Result<fs::Metadata, Error> {
pub async fn metadata(&self) -> Result<fs::Metadata, Error> {
self.with_file("metadata", |file| file.metadata())?
}
@@ -407,7 +407,7 @@ impl VirtualFile {
std::fs::remove_file(path).expect("failed to remove the virtual file");
}
pub fn seek(&mut self, pos: SeekFrom) -> Result<u64, Error> {
pub async fn seek(&mut self, pos: SeekFrom) -> Result<u64, Error> {
match pos {
SeekFrom::Start(offset) => {
self.pos = offset;
@@ -619,9 +619,9 @@ mod tests {
MaybeVirtualFile::File(file) => file.write_all_at(buf, offset),
}
}
fn seek(&mut self, pos: SeekFrom) -> Result<u64, Error> {
async fn seek(&mut self, pos: SeekFrom) -> Result<u64, Error> {
match self {
MaybeVirtualFile::VirtualFile(file) => file.seek(pos),
MaybeVirtualFile::VirtualFile(file) => file.seek(pos).await,
MaybeVirtualFile::File(file) => file.seek(pos),
}
}
@@ -712,23 +712,23 @@ mod tests {
assert_eq!("", file_a.read_string().await?);
// Test seeks.
assert_eq!(file_a.seek(SeekFrom::Start(1))?, 1);
assert_eq!(file_a.seek(SeekFrom::Start(1)).await?, 1);
assert_eq!("oobar", file_a.read_string().await?);
assert_eq!(file_a.seek(SeekFrom::End(-2))?, 4);
assert_eq!(file_a.seek(SeekFrom::End(-2)).await?, 4);
assert_eq!("ar", file_a.read_string().await?);
assert_eq!(file_a.seek(SeekFrom::Start(1))?, 1);
assert_eq!(file_a.seek(SeekFrom::Current(2))?, 3);
assert_eq!(file_a.seek(SeekFrom::Start(1)).await?, 1);
assert_eq!(file_a.seek(SeekFrom::Current(2)).await?, 3);
assert_eq!("bar", file_a.read_string().await?);
assert_eq!(file_a.seek(SeekFrom::Current(-5))?, 1);
assert_eq!(file_a.seek(SeekFrom::Current(-5)).await?, 1);
assert_eq!("oobar", file_a.read_string().await?);
// Test erroneous seeks to before byte 0
assert!(file_a.seek(SeekFrom::End(-7)).is_err());
assert_eq!(file_a.seek(SeekFrom::Start(1))?, 1);
assert!(file_a.seek(SeekFrom::Current(-2)).is_err());
file_a.seek(SeekFrom::End(-7)).await.unwrap_err();
assert_eq!(file_a.seek(SeekFrom::Start(1)).await?, 1);
file_a.seek(SeekFrom::Current(-2)).await.unwrap_err();
// the erroneous seek should have left the position unchanged
assert_eq!("oobar", file_a.read_string().await?);
@@ -752,7 +752,7 @@ mod tests {
// open the same file many times. The effect is the same.)
//
// leave file_a positioned at offset 1 before we start
assert_eq!(file_a.seek(SeekFrom::Start(1))?, 1);
assert_eq!(file_a.seek(SeekFrom::Start(1)).await?, 1);
let mut vfiles = Vec::new();
for _ in 0..100 {