Compare commits

...

13 Commits

Author SHA1 Message Date
Arpad Müller
a9e5da9613 wip 2023-09-01 00:19:47 +02:00
Arpad Müller
a5acfdaa5a Use write_and_fsync in save_metadata 2023-09-01 00:19:47 +02:00
Arpad Müller
70052ae1ca Make save_metadata async fn 2023-09-01 00:19:47 +02:00
Arpad Müller
fe69dd9a40 Add write_and_fsync function 2023-09-01 00:19:47 +02:00
Arpad Müller
930eccfcaa Make write_blob and the things it calls async fn 2023-09-01 00:19:47 +02:00
Arpad Müller
29c2381fa5 Move generics on trait into macro
Monomorphization is basically like macro expansion, it just happens at a
later compiler phase.
2023-09-01 00:19:47 +02:00
Arpad Müller
7839cda66a Remove bounds 2023-09-01 00:19:47 +02:00
Arpad Müller
d565df25d6 Make write_all_at async 2023-09-01 00:19:47 +02:00
Christian Schwarz
92b7d7f466 FileBlockReader::fill_buffer make it obvious that we need to switch to async API 2023-09-01 00:19:47 +02:00
Arpad Müller
cfabd8b598 Remove Read impl that was only used in one place 2023-09-01 00:19:47 +02:00
Arpad Müller
4432094443 Move used FileExt functions to inherent impls 2023-09-01 00:19:47 +02:00
Christian Schwarz
83babcce30 FileBlockReader<File> is never used 2023-09-01 00:19:47 +02:00
Arpad Müller
735e20112a Move VirtualFile::seek to inherent function 2023-09-01 00:19:47 +02:00
12 changed files with 302 additions and 215 deletions

View File

@@ -469,7 +469,9 @@ impl PageServerHandler {
// Create empty timeline
info!("creating new timeline");
let tenant = get_active_tenant_with_timeout(tenant_id, &ctx).await?;
let timeline = tenant.create_empty_timeline(timeline_id, base_lsn, pg_version, &ctx)?;
let timeline = tenant
.create_empty_timeline(timeline_id, base_lsn, pg_version, &ctx)
.await?;
// TODO mark timeline as not ready until it reaches end_lsn.
// We might have some wal to import as well, and we should prevent compute

View File

@@ -448,6 +448,7 @@ impl Tenant {
up_to_date_metadata,
first_save,
)
.await
.context("save_metadata")?;
}
@@ -1450,7 +1451,7 @@ impl Tenant {
/// For tests, use `DatadirModification::init_empty_test_timeline` + `commit` to setup the
/// minimum amount of keys required to get a writable timeline.
/// (Without it, `put` might fail due to `repartition` failing.)
pub fn create_empty_timeline(
pub async fn create_empty_timeline(
&self,
new_timeline_id: TimelineId,
initdb_lsn: Lsn,
@@ -1462,10 +1463,10 @@ impl Tenant {
"Cannot create empty timelines on inactive tenant"
);
let timelines = self.timelines.lock().unwrap();
let timeline_uninit_mark = self.create_timeline_uninit_mark(new_timeline_id, &timelines)?;
drop(timelines);
let timeline_uninit_mark = {
let timelines = self.timelines.lock().unwrap();
self.create_timeline_uninit_mark(new_timeline_id, &timelines)?
};
let new_metadata = TimelineMetadata::new(
// Initialize disk_consistent LSN to 0, The caller must import some data to
// make it valid, before calling finish_creation()
@@ -1484,6 +1485,7 @@ impl Tenant {
initdb_lsn,
None,
)
.await
}
/// Helper for unit tests to create an empty timeline.
@@ -1499,7 +1501,9 @@ impl Tenant {
pg_version: u32,
ctx: &RequestContext,
) -> anyhow::Result<Arc<Timeline>> {
let uninit_tl = self.create_empty_timeline(new_timeline_id, initdb_lsn, pg_version, ctx)?;
let uninit_tl = self
.create_empty_timeline(new_timeline_id, initdb_lsn, pg_version, ctx)
.await?;
let tline = uninit_tl.raw_timeline().expect("we just created it");
assert_eq!(tline.get_last_record_lsn(), Lsn(0));
@@ -2797,13 +2801,15 @@ impl Tenant {
src_timeline.pg_version,
);
let uninitialized_timeline = self.prepare_new_timeline(
dst_id,
&metadata,
timeline_uninit_mark,
start_lsn + 1,
Some(Arc::clone(src_timeline)),
)?;
let uninitialized_timeline = self
.prepare_new_timeline(
dst_id,
&metadata,
timeline_uninit_mark,
start_lsn + 1,
Some(Arc::clone(src_timeline)),
)
.await?;
let new_timeline = uninitialized_timeline.finish_creation()?;
@@ -2881,13 +2887,15 @@ impl Tenant {
pgdata_lsn,
pg_version,
);
let raw_timeline = self.prepare_new_timeline(
timeline_id,
&new_metadata,
timeline_uninit_mark,
pgdata_lsn,
None,
)?;
let raw_timeline = self
.prepare_new_timeline(
timeline_id,
&new_metadata,
timeline_uninit_mark,
pgdata_lsn,
None,
)
.await?;
let tenant_id = raw_timeline.owning_tenant.tenant_id;
let unfinished_timeline = raw_timeline.raw_timeline()?;
@@ -2958,7 +2966,7 @@ impl Tenant {
/// at 'disk_consistent_lsn'. After any initial data has been imported, call
/// `finish_creation` to insert the Timeline into the timelines map and to remove the
/// uninit mark file.
fn prepare_new_timeline(
async fn prepare_new_timeline(
&self,
new_timeline_id: TimelineId,
new_metadata: &TimelineMetadata,
@@ -2986,8 +2994,9 @@ impl Tenant {
timeline_struct.init_empty_layer_map(start_lsn);
if let Err(e) =
self.create_timeline_files(&uninit_mark.timeline_path, &new_timeline_id, new_metadata)
if let Err(e) = self
.create_timeline_files(&uninit_mark.timeline_path, &new_timeline_id, new_metadata)
.await
{
error!("Failed to create initial files for timeline {tenant_id}/{new_timeline_id}, cleaning up: {e:?}");
cleanup_timeline_directory(uninit_mark);
@@ -3003,7 +3012,7 @@ impl Tenant {
))
}
fn create_timeline_files(
async fn create_timeline_files(
&self,
timeline_path: &Path,
new_timeline_id: &TimelineId,
@@ -3022,6 +3031,7 @@ impl Tenant {
new_metadata,
true,
)
.await
.context("Failed to create timeline metadata")?;
Ok(())
}
@@ -3649,7 +3659,10 @@ mod tests {
.create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
.await?;
match tenant.create_empty_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx) {
match tenant
.create_empty_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
.await
{
Ok(_) => panic!("duplicate timeline creation should fail"),
Err(e) => assert_eq!(
e.to_string(),
@@ -4489,8 +4502,9 @@ mod tests {
.await;
let initdb_lsn = Lsn(0x20);
let utline =
tenant.create_empty_timeline(TIMELINE_ID, initdb_lsn, DEFAULT_PG_VERSION, &ctx)?;
let utline = tenant
.create_empty_timeline(TIMELINE_ID, initdb_lsn, DEFAULT_PG_VERSION, &ctx)
.await?;
let tline = utline.raw_timeline().unwrap();
// Spawn flush loop now so that we can set the `expect_initdb_optimization`
@@ -4555,8 +4569,9 @@ mod tests {
let harness = TenantHarness::create(name)?;
{
let (tenant, ctx) = harness.load().await;
let tline =
tenant.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?;
let tline = tenant
.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)
.await?;
// Keeps uninit mark in place
std::mem::forget(tline);
}

View File

@@ -96,18 +96,12 @@ pub trait BlobWriter {
/// An implementation of BlobWriter to write blobs to anything that
/// implements std::io::Write.
///
pub struct WriteBlobWriter<W>
where
W: std::io::Write,
{
pub struct WriteBlobWriter<W> {
inner: W,
offset: u64,
}
impl<W> WriteBlobWriter<W>
where
W: std::io::Write,
{
impl<W> WriteBlobWriter<W> {
pub fn new(inner: W, start_offset: u64) -> Self {
WriteBlobWriter {
inner,
@@ -129,33 +123,38 @@ where
}
}
impl<W> BlobWriter for WriteBlobWriter<W>
where
W: std::io::Write,
{
fn write_blob(&mut self, srcbuf: &[u8]) -> Result<u64, Error> {
let offset = self.offset;
macro_rules! write_blob_impl {
(WriteBlobWriter<$ty:ty>) => {
impl WriteBlobWriter<$ty> {
pub async fn write_blob(&mut self, srcbuf: &[u8]) -> Result<u64, Error> {
use std::io::Write;
let offset = self.offset;
if srcbuf.len() < 128 {
// Short blob. Write a 1-byte length header
let len_buf = srcbuf.len() as u8;
self.inner.write_all(&[len_buf])?;
self.offset += 1;
} else {
// Write a 4-byte length header
if srcbuf.len() > 0x7fff_ffff {
return Err(Error::new(
ErrorKind::Other,
format!("blob too large ({} bytes)", srcbuf.len()),
));
if srcbuf.len() < 128 {
// Short blob. Write a 1-byte length header
let len_buf = srcbuf.len() as u8;
self.inner.write_all(&[len_buf])?;
self.offset += 1;
} else {
// Write a 4-byte length header
if srcbuf.len() > 0x7fff_ffff {
return Err(Error::new(
ErrorKind::Other,
format!("blob too large ({} bytes)", srcbuf.len()),
));
}
let mut len_buf = ((srcbuf.len()) as u32).to_be_bytes();
len_buf[0] |= 0x80;
self.inner.write_all(&len_buf)?;
self.offset += 4;
}
self.inner.write_all(srcbuf)?;
self.offset += srcbuf.len() as u64;
Ok(offset)
}
let mut len_buf = ((srcbuf.len()) as u32).to_be_bytes();
len_buf[0] |= 0x80;
self.inner.write_all(&len_buf)?;
self.offset += 4;
}
self.inner.write_all(srcbuf)?;
self.offset += srcbuf.len() as u64;
Ok(offset)
}
};
}
write_blob_impl!(WriteBlobWriter<crate::tenant::io::BufWriter<crate::virtual_file::VirtualFile>>);
write_blob_impl!(WriteBlobWriter<crate::virtual_file::VirtualFile>);

View File

@@ -7,9 +7,7 @@ use super::storage_layer::delta_layer::{Adapter, DeltaLayerInner};
use crate::page_cache::{self, PageReadGuard, ReadBufResult, PAGE_SZ};
use crate::virtual_file::VirtualFile;
use bytes::Bytes;
use std::fs::File;
use std::ops::{Deref, DerefMut};
use std::os::unix::fs::FileExt;
/// This is implemented by anything that can read 8 kB (PAGE_SZ)
/// blocks, using the page cache
@@ -74,7 +72,6 @@ impl<'a> Deref for BlockLease<'a> {
/// Unlike traits, we also support the read function to be async though.
pub(crate) enum BlockReaderRef<'a> {
FileBlockReaderVirtual(&'a FileBlockReader<VirtualFile>),
FileBlockReaderFile(&'a FileBlockReader<std::fs::File>),
EphemeralFile(&'a EphemeralFile),
Adapter(Adapter<&'a DeltaLayerInner>),
#[cfg(test)]
@@ -87,7 +84,6 @@ impl<'a> BlockReaderRef<'a> {
use BlockReaderRef::*;
match self {
FileBlockReaderVirtual(r) => r.read_blk(blknum).await,
FileBlockReaderFile(r) => r.read_blk(blknum).await,
EphemeralFile(r) => r.read_blk(blknum).await,
Adapter(r) => r.read_blk(blknum).await,
#[cfg(test)]
@@ -150,18 +146,15 @@ pub struct FileBlockReader<F> {
file_id: page_cache::FileId,
}
impl<F> FileBlockReader<F>
where
F: FileExt,
{
pub fn new(file: F) -> Self {
impl FileBlockReader<VirtualFile> {
pub fn new(file: VirtualFile) -> Self {
let file_id = page_cache::next_file_id();
FileBlockReader { file_id, file }
}
/// Read a page from the underlying file into given buffer.
fn fill_buffer(&self, buf: &mut [u8], blkno: u32) -> Result<(), std::io::Error> {
async fn fill_buffer(&self, buf: &mut [u8], blkno: u32) -> Result<(), std::io::Error> {
assert!(buf.len() == PAGE_SZ);
self.file.read_exact_at(buf, blkno as u64 * PAGE_SZ as u64)
}
@@ -185,7 +178,7 @@ where
ReadBufResult::Found(guard) => break Ok(guard.into()),
ReadBufResult::NotFound(mut write_guard) => {
// Read the page from disk into the buffer
self.fill_buffer(write_guard.deref_mut(), blknum)?;
self.fill_buffer(write_guard.deref_mut(), blknum).await?;
write_guard.mark_valid();
// Swap for read lock
@@ -196,12 +189,6 @@ where
}
}
impl BlockReader for FileBlockReader<File> {
fn block_cursor(&self) -> BlockCursor<'_> {
BlockCursor::new(BlockReaderRef::FileBlockReaderFile(self))
}
}
impl BlockReader for FileBlockReader<VirtualFile> {
fn block_cursor(&self) -> BlockCursor<'_> {
BlockCursor::new(BlockReaderRef::FileBlockReaderVirtual(self))

View File

@@ -9,7 +9,6 @@ use std::cmp::min;
use std::fs::OpenOptions;
use std::io::{self, ErrorKind};
use std::ops::DerefMut;
use std::os::unix::prelude::FileExt;
use std::path::PathBuf;
use std::sync::atomic::AtomicU64;
use tracing::*;
@@ -128,10 +127,15 @@ impl EphemeralFile {
self.off += n;
src_remaining = &src_remaining[n..];
if self.off == PAGE_SZ {
match self.ephemeral_file.file.write_all_at(
&self.ephemeral_file.mutable_tail,
self.blknum as u64 * PAGE_SZ as u64,
) {
match self
.ephemeral_file
.file
.write_all_at(
&self.ephemeral_file.mutable_tail,
self.blknum as u64 * PAGE_SZ as u64,
)
.await
{
Ok(_) => {
// Pre-warm the page cache with what we just wrote.
// This isn't necessary for coherency/correctness, but it's how we've always done it.

View File

@@ -26,7 +26,7 @@
//! recovered from this file. This is tracked in
//! <https://github.com/neondatabase/neon/issues/4418>
use std::io::{self, Read, Write};
use std::io::{self, Write};
use crate::virtual_file::VirtualFile;
use anyhow::Result;
@@ -151,11 +151,12 @@ impl Manifest {
/// Load a manifest. Returns the manifest and a list of operations. If the manifest is corrupted,
/// the bool flag will be set to true and the user is responsible to reconstruct a new manifest and
/// backup the current one.
pub fn load(
mut file: VirtualFile,
pub async fn load(
file: VirtualFile,
) -> Result<(Self, Vec<Operation>, ManifestPartiallyCorrupted), ManifestLoadError> {
let mut buf = vec![];
file.read_to_end(&mut buf).map_err(ManifestLoadError::Io)?;
file.read_exact_at(&mut buf, 0)
.map_err(ManifestLoadError::Io)?;
// Read manifest header
let mut buf = Bytes::from(buf);
@@ -241,8 +242,8 @@ mod tests {
use super::*;
#[test]
fn test_read_manifest() {
#[tokio::test]
async fn test_read_manifest() {
let testdir = crate::config::PageServerConf::test_repo_dir("test_read_manifest");
std::fs::create_dir_all(&testdir).unwrap();
let file = VirtualFile::create(&testdir.join("MANIFEST")).unwrap();
@@ -274,7 +275,7 @@ mod tests {
.truncate(false),
)
.unwrap();
let (mut manifest, operations, corrupted) = Manifest::load(file).unwrap();
let (mut manifest, operations, corrupted) = Manifest::load(file).await.unwrap();
assert!(!corrupted.0);
assert_eq!(operations.len(), 2);
assert_eq!(
@@ -306,7 +307,7 @@ mod tests {
.truncate(false),
)
.unwrap();
let (_manifest, operations, corrupted) = Manifest::load(file).unwrap();
let (_manifest, operations, corrupted) = Manifest::load(file).await.unwrap();
assert!(!corrupted.0);
assert_eq!(operations.len(), 3);
assert_eq!(&operations[0], &Operation::Snapshot(snapshot, Lsn::from(0)));

View File

@@ -9,9 +9,9 @@
//! [`remote_timeline_client`]: super::remote_timeline_client
use std::fs::{File, OpenOptions};
use std::io::{self, Write};
use std::io;
use anyhow::{bail, ensure, Context};
use anyhow::{ensure, Context};
use serde::{de::Error, Deserialize, Serialize, Serializer};
use thiserror::Error;
use tracing::info_span;
@@ -255,7 +255,7 @@ impl Serialize for TimelineMetadata {
}
/// Save timeline metadata to file
pub fn save_metadata(
pub async fn save_metadata(
conf: &'static PageServerConf,
tenant_id: &TenantId,
timeline_id: &TimelineId,
@@ -273,10 +273,7 @@ pub fn save_metadata(
let metadata_bytes = data.to_bytes().context("Failed to get metadata bytes")?;
if file.write(&metadata_bytes)? != metadata_bytes.len() {
bail!("Could not write all the metadata bytes in a single call");
}
file.sync_all()?;
file.write_and_fsync(&metadata_bytes)?;
// fsync the parent directory to ensure the directory entry is durable
if first_save {

View File

@@ -31,7 +31,7 @@ use crate::config::PageServerConf;
use crate::context::RequestContext;
use crate::page_cache::PAGE_SZ;
use crate::repository::{Key, Value, KEY_SIZE};
use crate::tenant::blob_io::{BlobWriter, WriteBlobWriter};
use crate::tenant::blob_io::WriteBlobWriter;
use crate::tenant::block_io::{BlockBuf, BlockCursor, BlockLease, BlockReader, FileBlockReader};
use crate::tenant::disk_btree::{DiskBtreeBuilder, DiskBtreeReader, VisitDirection};
use crate::tenant::storage_layer::{
@@ -45,8 +45,8 @@ use pageserver_api::models::{HistoricLayerInfo, LayerAccessKind};
use rand::{distributions::Alphanumeric, Rng};
use serde::{Deserialize, Serialize};
use std::fs::{self, File};
use std::io::SeekFrom;
use std::io::{BufWriter, Write};
use std::io::{Seek, SeekFrom};
use std::ops::Range;
use std::os::unix::fs::FileExt;
use std::path::{Path, PathBuf};
@@ -632,11 +632,12 @@ impl DeltaLayerWriterInner {
///
/// The values must be appended in key, lsn order.
///
fn put_value(&mut self, key: Key, lsn: Lsn, val: Value) -> anyhow::Result<()> {
async fn put_value(&mut self, key: Key, lsn: Lsn, val: Value) -> anyhow::Result<()> {
self.put_value_bytes(key, lsn, &Value::ser(&val)?, val.will_init())
.await
}
fn put_value_bytes(
async fn put_value_bytes(
&mut self,
key: Key,
lsn: Lsn,
@@ -645,7 +646,7 @@ impl DeltaLayerWriterInner {
) -> anyhow::Result<()> {
assert!(self.lsn_range.start <= lsn);
let off = self.blob_writer.write_blob(val)?;
let off = self.blob_writer.write_blob(val).await?;
let blob_ref = BlobRef::new(off, will_init);
@@ -797,11 +798,11 @@ impl DeltaLayerWriter {
///
/// The values must be appended in key, lsn order.
///
pub fn put_value(&mut self, key: Key, lsn: Lsn, val: Value) -> anyhow::Result<()> {
self.inner.as_mut().unwrap().put_value(key, lsn, val)
pub async fn put_value(&mut self, key: Key, lsn: Lsn, val: Value) -> anyhow::Result<()> {
self.inner.as_mut().unwrap().put_value(key, lsn, val).await
}
pub fn put_value_bytes(
pub async fn put_value_bytes(
&mut self,
key: Key,
lsn: Lsn,
@@ -812,6 +813,7 @@ impl DeltaLayerWriter {
.as_mut()
.unwrap()
.put_value_bytes(key, lsn, val, will_init)
.await
}
pub fn size(&self) -> u64 {

View File

@@ -27,7 +27,7 @@ use crate::config::PageServerConf;
use crate::context::RequestContext;
use crate::page_cache::PAGE_SZ;
use crate::repository::{Key, KEY_SIZE};
use crate::tenant::blob_io::{BlobWriter, WriteBlobWriter};
use crate::tenant::blob_io::WriteBlobWriter;
use crate::tenant::block_io::{BlockBuf, BlockReader, FileBlockReader};
use crate::tenant::disk_btree::{DiskBtreeBuilder, DiskBtreeReader, VisitDirection};
use crate::tenant::storage_layer::{
@@ -42,8 +42,8 @@ use pageserver_api::models::{HistoricLayerInfo, LayerAccessKind};
use rand::{distributions::Alphanumeric, Rng};
use serde::{Deserialize, Serialize};
use std::fs::{self, File};
use std::io::SeekFrom;
use std::io::Write;
use std::io::{Seek, SeekFrom};
use std::ops::Range;
use std::os::unix::prelude::FileExt;
use std::path::{Path, PathBuf};
@@ -569,9 +569,9 @@ impl ImageLayerWriterInner {
///
/// The page versions must be appended in blknum order.
///
fn put_image(&mut self, key: Key, img: &[u8]) -> anyhow::Result<()> {
async fn put_image(&mut self, key: Key, img: &[u8]) -> anyhow::Result<()> {
ensure!(self.key_range.contains(&key));
let off = self.blob_writer.write_blob(img)?;
let off = self.blob_writer.write_blob(img).await?;
let mut keybuf: [u8; KEY_SIZE] = [0u8; KEY_SIZE];
key.write_to_byte_slice(&mut keybuf);
@@ -710,8 +710,8 @@ impl ImageLayerWriter {
///
/// The page versions must be appended in blknum order.
///
pub fn put_image(&mut self, key: Key, img: &[u8]) -> anyhow::Result<()> {
self.inner.as_mut().unwrap().put_image(key, img)
pub async fn put_image(&mut self, key: Key, img: &[u8]) -> anyhow::Result<()> {
self.inner.as_mut().unwrap().put_image(key, img).await
}
///

View File

@@ -348,7 +348,9 @@ impl InMemoryLayer {
for (lsn, pos) in vec_map.as_slice() {
cursor.read_blob_into_buf(*pos, &mut buf).await?;
let will_init = Value::des(&buf)?.will_init();
delta_layer_writer.put_value_bytes(key, *lsn, &buf, will_init)?;
delta_layer_writer
.put_value_bytes(key, *lsn, &buf, will_init)
.await?;
}
}

View File

@@ -2735,6 +2735,7 @@ impl Timeline {
if disk_consistent_lsn != old_disk_consistent_lsn {
assert!(disk_consistent_lsn > old_disk_consistent_lsn);
self.update_metadata_file(disk_consistent_lsn, layer_paths_to_upload)
.await
.context("update_metadata_file")?;
// Also update the in-memory copy
self.disk_consistent_lsn.store(disk_consistent_lsn);
@@ -2743,7 +2744,7 @@ impl Timeline {
}
/// Update metadata file
fn update_metadata_file(
async fn update_metadata_file(
&self,
disk_consistent_lsn: Lsn,
layer_paths_to_upload: HashMap<LayerFileName, LayerFileMetadata>,
@@ -2791,6 +2792,7 @@ impl Timeline {
&metadata,
false,
)
.await
.context("save_metadata")?;
if let Some(remote_client) = &self.remote_client {
@@ -3030,7 +3032,7 @@ impl Timeline {
}
}
};
image_layer_writer.put_image(key, &img)?;
image_layer_writer.put_image(key, &img).await?;
key = key.next();
}
}
@@ -3616,7 +3618,7 @@ impl Timeline {
)))
});
writer.as_mut().unwrap().put_value(key, lsn, value)?;
writer.as_mut().unwrap().put_value(key, lsn, value).await?;
prev_key = Some(key);
}
if let Some(writer) = writer {
@@ -4122,7 +4124,8 @@ impl Timeline {
if !layers_to_remove.is_empty() {
// Persist the new GC cutoff value in the metadata file, before
// we actually remove anything.
self.update_metadata_file(self.disk_consistent_lsn.load(), HashMap::new())?;
self.update_metadata_file(self.disk_consistent_lsn.load(), HashMap::new())
.await?;
// Actually delete the layers from disk and remove them from the map.
// (couldn't do this in the loop above, because you cannot modify a collection

View File

@@ -13,7 +13,7 @@
use crate::metrics::{STORAGE_IO_SIZE, STORAGE_IO_TIME};
use once_cell::sync::OnceCell;
use std::fs::{self, File, OpenOptions};
use std::io::{Error, ErrorKind, Read, Seek, SeekFrom, Write};
use std::io::{Error, ErrorKind, Seek, SeekFrom, Write};
use std::os::unix::fs::FileExt;
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
@@ -321,54 +321,8 @@ impl VirtualFile {
drop(self);
std::fs::remove_file(path).expect("failed to remove the virtual file");
}
}
impl Drop for VirtualFile {
/// If a VirtualFile is dropped, close the underlying file if it was open.
fn drop(&mut self) {
let handle = self.handle.get_mut().unwrap();
// We could check with a read-lock first, to avoid waiting on an
// unrelated I/O.
let slot = &get_open_files().slots[handle.index];
let mut slot_guard = slot.inner.write().unwrap();
if slot_guard.tag == handle.tag {
slot.recently_used.store(false, Ordering::Relaxed);
// there is also operation "close-by-replace" for closes done on eviction for
// comparison.
STORAGE_IO_TIME
.with_label_values(&["close"])
.observe_closure_duration(|| drop(slot_guard.file.take()));
}
}
}
impl Read for VirtualFile {
fn read(&mut self, buf: &mut [u8]) -> Result<usize, Error> {
let pos = self.pos;
let n = self.read_at(buf, pos)?;
self.pos += n as u64;
Ok(n)
}
}
impl Write for VirtualFile {
fn write(&mut self, buf: &[u8]) -> Result<usize, std::io::Error> {
let pos = self.pos;
let n = self.write_at(buf, pos)?;
self.pos += n as u64;
Ok(n)
}
fn flush(&mut self) -> Result<(), std::io::Error> {
// flush is no-op for File (at least on unix), so we don't need to do
// anything here either.
Ok(())
}
}
impl Seek for VirtualFile {
fn seek(&mut self, pos: SeekFrom) -> Result<u64, Error> {
pub fn seek(&mut self, pos: SeekFrom) -> Result<u64, Error> {
match pos {
SeekFrom::Start(offset) => {
self.pos = offset;
@@ -392,10 +346,64 @@ impl Seek for VirtualFile {
}
Ok(self.pos)
}
}
impl FileExt for VirtualFile {
fn read_at(&self, buf: &mut [u8], offset: u64) -> Result<usize, Error> {
// Copied from https://doc.rust-lang.org/1.72.0/src/std/os/unix/fs.rs.html#117-135
pub fn read_exact_at(&self, mut buf: &mut [u8], mut offset: u64) -> Result<(), Error> {
while !buf.is_empty() {
match self.read_at(buf, offset) {
Ok(0) => {
return Err(Error::new(
std::io::ErrorKind::UnexpectedEof,
"failed to fill whole buffer",
))
}
Ok(n) => {
buf = &mut buf[n..];
offset += n as u64;
}
Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => {}
Err(e) => return Err(e),
}
}
Ok(())
}
// Copied from https://doc.rust-lang.org/1.72.0/src/std/os/unix/fs.rs.html#219-235
pub async fn write_all_at(&self, mut buf: &[u8], mut offset: u64) -> Result<(), Error> {
while !buf.is_empty() {
match self.write_at(buf, offset) {
Ok(0) => {
return Err(Error::new(
std::io::ErrorKind::WriteZero,
"failed to write whole buffer",
));
}
Ok(n) => {
buf = &buf[n..];
offset += n as u64;
}
Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => {}
Err(e) => return Err(e),
}
}
Ok(())
}
/// Write the given buffer (which has to be below the kernel's internal page size) and fsync
///
/// This ensures some level of atomicity (not a good one, but it's the best we have).
pub fn write_and_fsync(&mut self, buf: &[u8]) -> Result<(), Error> {
if self.write(buf)? != buf.len() {
return Err(Error::new(
std::io::ErrorKind::Other,
"Could not write all the bytes in a single call",
));
}
self.sync_all()?;
Ok(())
}
pub fn read_at(&self, buf: &mut [u8], offset: u64) -> Result<usize, Error> {
let result = self.with_file("read", |file| file.read_at(buf, offset))?;
if let Ok(size) = result {
STORAGE_IO_SIZE
@@ -405,7 +413,7 @@ impl FileExt for VirtualFile {
result
}
fn write_at(&self, buf: &[u8], offset: u64) -> Result<usize, Error> {
pub fn write_at(&self, buf: &[u8], offset: u64) -> Result<usize, Error> {
let result = self.with_file("write", |file| file.write_at(buf, offset))?;
if let Ok(size) = result {
STORAGE_IO_SIZE
@@ -416,6 +424,41 @@ impl FileExt for VirtualFile {
}
}
impl Drop for VirtualFile {
/// If a VirtualFile is dropped, close the underlying file if it was open.
fn drop(&mut self) {
let handle = self.handle.get_mut().unwrap();
// We could check with a read-lock first, to avoid waiting on an
// unrelated I/O.
let slot = &get_open_files().slots[handle.index];
let mut slot_guard = slot.inner.write().unwrap();
if slot_guard.tag == handle.tag {
slot.recently_used.store(false, Ordering::Relaxed);
// there is also operation "close-by-replace" for closes done on eviction for
// comparison.
STORAGE_IO_TIME
.with_label_values(&["close"])
.observe_closure_duration(|| drop(slot_guard.file.take()));
}
}
}
impl Write for VirtualFile {
fn write(&mut self, buf: &[u8]) -> Result<usize, std::io::Error> {
let pos = self.pos;
let n = self.write_at(buf, pos)?;
self.pos += n as u64;
Ok(n)
}
fn flush(&mut self) -> Result<(), std::io::Error> {
// flush is no-op for File (at least on unix), so we don't need to do
// anything here either.
Ok(())
}
}
impl OpenFiles {
fn new(num_slots: usize) -> OpenFiles {
let mut slots = Box::new(Vec::with_capacity(num_slots));
@@ -472,30 +515,60 @@ mod tests {
use std::sync::Arc;
use std::thread;
// Helper function to slurp contents of a file, starting at the current position,
// into a string
fn read_string<FD>(vfile: &mut FD) -> Result<String, Error>
where
FD: Read,
{
let mut buf = String::new();
vfile.read_to_string(&mut buf)?;
Ok(buf)
enum MaybeVirtualFile {
VirtualFile(VirtualFile),
File(File),
}
// Helper function to slurp a portion of a file into a string
fn read_string_at<FD>(vfile: &mut FD, pos: u64, len: usize) -> Result<String, Error>
where
FD: FileExt,
{
let mut buf = Vec::new();
buf.resize(len, 0);
vfile.read_exact_at(&mut buf, pos)?;
Ok(String::from_utf8(buf).unwrap())
impl MaybeVirtualFile {
fn read_exact_at(&self, buf: &mut [u8], offset: u64) -> Result<(), Error> {
match self {
MaybeVirtualFile::VirtualFile(file) => file.read_exact_at(buf, offset),
MaybeVirtualFile::File(file) => file.read_exact_at(buf, offset),
}
}
async fn write_all_at(&self, buf: &[u8], offset: u64) -> Result<(), Error> {
match self {
MaybeVirtualFile::VirtualFile(file) => file.write_all_at(buf, offset).await,
MaybeVirtualFile::File(file) => file.write_all_at(buf, offset),
}
}
fn seek(&mut self, pos: SeekFrom) -> Result<u64, Error> {
match self {
MaybeVirtualFile::VirtualFile(file) => file.seek(pos),
MaybeVirtualFile::File(file) => file.seek(pos),
}
}
async fn write_all(&mut self, buf: &[u8]) -> Result<(), Error> {
match self {
MaybeVirtualFile::VirtualFile(file) => file.write_all(buf),
MaybeVirtualFile::File(file) => file.write_all(buf),
}
}
// Helper function to slurp contents of a file, starting at the current position,
// into a string
async fn read_string(&mut self) -> Result<String, Error> {
use std::io::Read;
let mut buf = String::new();
match self {
MaybeVirtualFile::VirtualFile(file) => file.read_to_string(&mut buf)?,
MaybeVirtualFile::File(file) => file.read_to_string(&mut buf)?,
}
Ok(buf)
}
// Helper function to slurp a portion of a file into a string
async fn read_string_at(&mut self, pos: u64, len: usize) -> Result<String, Error> {
let mut buf = Vec::new();
buf.resize(len, 0);
self.read_exact_at(&mut buf, pos)?;
Ok(String::from_utf8(buf).unwrap())
}
}
#[test]
fn test_virtual_files() -> Result<(), Error> {
#[tokio::test]
async fn test_virtual_files() -> Result<(), Error> {
// The real work is done in the test_files() helper function. This
// allows us to run the same set of tests against a native File, and
// VirtualFile. We trust the native Files and wouldn't need to test them,
@@ -504,21 +577,23 @@ mod tests {
// native files, you will run out of file descriptors if the ulimit
// is low enough.)
test_files("virtual_files", |path, open_options| {
VirtualFile::open_with_options(path, open_options)
let vf = VirtualFile::open_with_options(path, open_options)?;
Ok(MaybeVirtualFile::VirtualFile(vf))
})
.await
}
#[test]
fn test_physical_files() -> Result<(), Error> {
#[tokio::test]
async fn test_physical_files() -> Result<(), Error> {
test_files("physical_files", |path, open_options| {
open_options.open(path)
Ok(MaybeVirtualFile::File(open_options.open(path)?))
})
.await
}
fn test_files<OF, FD>(testname: &str, openfunc: OF) -> Result<(), Error>
async fn test_files<OF>(testname: &str, openfunc: OF) -> Result<(), Error>
where
FD: Read + Write + Seek + FileExt,
OF: Fn(&Path, &OpenOptions) -> Result<FD, std::io::Error>,
OF: Fn(&Path, &OpenOptions) -> Result<MaybeVirtualFile, std::io::Error>,
{
let testdir = crate::config::PageServerConf::test_repo_dir(testname);
std::fs::create_dir_all(&testdir)?;
@@ -528,36 +603,36 @@ mod tests {
&path_a,
OpenOptions::new().write(true).create(true).truncate(true),
)?;
file_a.write_all(b"foobar")?;
file_a.write_all(b"foobar").await?;
// cannot read from a file opened in write-only mode
assert!(read_string(&mut file_a).is_err());
assert!(file_a.read_string().await.is_err());
// Close the file and re-open for reading
let mut file_a = openfunc(&path_a, OpenOptions::new().read(true))?;
// cannot write to a file opened in read-only mode
assert!(file_a.write(b"bar").is_err());
assert!(file_a.write_all(b"bar").await.is_err());
// Try simple read
assert_eq!("foobar", read_string(&mut file_a)?);
assert_eq!("foobar", file_a.read_string().await?);
// It's positioned at the EOF now.
assert_eq!("", read_string(&mut file_a)?);
assert_eq!("", file_a.read_string().await?);
// Test seeks.
assert_eq!(file_a.seek(SeekFrom::Start(1))?, 1);
assert_eq!("oobar", read_string(&mut file_a)?);
assert_eq!("oobar", file_a.read_string().await?);
assert_eq!(file_a.seek(SeekFrom::End(-2))?, 4);
assert_eq!("ar", read_string(&mut file_a)?);
assert_eq!("ar", file_a.read_string().await?);
assert_eq!(file_a.seek(SeekFrom::Start(1))?, 1);
assert_eq!(file_a.seek(SeekFrom::Current(2))?, 3);
assert_eq!("bar", read_string(&mut file_a)?);
assert_eq!("bar", file_a.read_string().await?);
assert_eq!(file_a.seek(SeekFrom::Current(-5))?, 1);
assert_eq!("oobar", read_string(&mut file_a)?);
assert_eq!("oobar", file_a.read_string().await?);
// Test erroneous seeks to before byte 0
assert!(file_a.seek(SeekFrom::End(-7)).is_err());
@@ -565,7 +640,7 @@ mod tests {
assert!(file_a.seek(SeekFrom::Current(-2)).is_err());
// the erroneous seek should have left the position unchanged
assert_eq!("oobar", read_string(&mut file_a)?);
assert_eq!("oobar", file_a.read_string().await?);
// Create another test file, and try FileExt functions on it.
let path_b = testdir.join("file_b");
@@ -577,10 +652,10 @@ mod tests {
.create(true)
.truncate(true),
)?;
file_b.write_all_at(b"BAR", 3)?;
file_b.write_all_at(b"FOO", 0)?;
file_b.write_all_at(b"BAR", 3).await?;
file_b.write_all_at(b"FOO", 0).await?;
assert_eq!(read_string_at(&mut file_b, 2, 3)?, "OBA");
assert_eq!(file_b.read_string_at(2, 3).await?, "OBA");
// Open a lot of files, enough to cause some evictions. (Or to be precise,
// open the same file many times. The effect is the same.)
@@ -591,7 +666,7 @@ mod tests {
let mut vfiles = Vec::new();
for _ in 0..100 {
let mut vfile = openfunc(&path_b, OpenOptions::new().read(true))?;
assert_eq!("FOOBAR", read_string(&mut vfile)?);
assert_eq!("FOOBAR", vfile.read_string().await?);
vfiles.push(vfile);
}
@@ -600,13 +675,13 @@ mod tests {
// The underlying file descriptor for 'file_a' should be closed now. Try to read
// from it again. We left the file positioned at offset 1 above.
assert_eq!("oobar", read_string(&mut file_a)?);
assert_eq!("oobar", file_a.read_string().await?);
// Check that all the other FDs still work too. Use them in random order for
// good measure.
vfiles.as_mut_slice().shuffle(&mut thread_rng());
for vfile in vfiles.iter_mut() {
assert_eq!("OOBAR", read_string_at(vfile, 1, 5)?);
assert_eq!("OOBAR", vfile.read_string_at(1, 5).await?);
}
Ok(())