Compare commits

...

11 Commits

Author SHA1 Message Date
Arpad Müller
c056db20c7 wip 2023-09-08 18:25:05 +02:00
Arpad Müller
f417db52e0 wip 2023-09-07 19:32:42 +02:00
Arpad Müller
2050136437 Switch the locks to tokio ones 2023-09-07 19:32:42 +02:00
Arpad Müller
eb2dd7118e Fix test_vfile_concurrency test 2023-09-07 19:25:56 +02:00
Arpad Müller
0890120517 fix 2023-09-07 17:47:18 +02:00
Arpad Müller
9e23a91c0b Fix them for real this time 2023-09-07 17:38:04 +02:00
Arpad Müller
f64a2d723a Fix tests 2023-09-06 20:04:27 +02:00
Arpad Müller
bd04abbcab Make VirtualFile::{open_with_options, create} async fn 2023-09-06 20:04:27 +02:00
Arpad Müller
ae1af9d10e Make VirtualFile::open async fn 2023-09-06 20:04:27 +02:00
Arpad Müller
41e87f92c3 Make VirtualFile::with_file async 2023-09-06 20:04:26 +02:00
Arpad Müller
3a8b630f90 Make VirtualFile::sync_all async 2023-09-06 18:39:22 +02:00
11 changed files with 114 additions and 72 deletions

View File

@@ -97,7 +97,7 @@ pub(crate) fn parse_filename(name: &str) -> Option<LayerFile> {
// Finds the max_holes largest holes, ignoring any that are smaller than MIN_HOLE_LENGTH"
async fn get_holes(path: &Path, max_holes: usize) -> Result<Vec<Hole>> {
let file = FileBlockReader::new(VirtualFile::open(path)?);
let file = FileBlockReader::new(VirtualFile::open(path).await?);
let summary_blk = file.read_blk(0).await?;
let actual_summary = Summary::des_prefix(summary_blk.as_ref())?;
let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(

View File

@@ -48,7 +48,7 @@ async fn read_delta_file(path: impl AsRef<Path>) -> Result<()> {
let path = path.as_ref();
virtual_file::init(10);
page_cache::init(100);
let file = FileBlockReader::new(VirtualFile::open(path)?);
let file = FileBlockReader::new(VirtualFile::open(path).await?);
let summary_blk = file.read_blk(0).await?;
let actual_summary = Summary::des_prefix(summary_blk.as_ref())?;
let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(

View File

@@ -238,7 +238,7 @@ mod tests {
// Write part (in block to drop the file)
let mut offsets = Vec::new();
{
let file = VirtualFile::create(&path)?;
let file = VirtualFile::create(&path).await?;
let mut wtr = BlobWriter::<BUFFERED>::new(file, 0);
for blob in blobs.iter() {
let offs = wtr.write_blob(blob).await?;
@@ -251,7 +251,7 @@ mod tests {
wtr.flush_buffer().await?;
}
let file = VirtualFile::open(&path)?;
let file = VirtualFile::open(&path).await?;
let rdr = BlockReaderRef::VirtualFile(&file);
let rdr = BlockCursor::new(rdr);
for (idx, (blob, offset)) in blobs.iter().zip(offsets.iter()).enumerate() {

View File

@@ -28,7 +28,7 @@ pub struct EphemeralFile {
}
impl EphemeralFile {
pub fn create(
pub async fn create(
conf: &PageServerConf,
tenant_id: TenantId,
timeline_id: TimelineId,
@@ -44,7 +44,8 @@ impl EphemeralFile {
let file = VirtualFile::open_with_options(
&filename,
OpenOptions::new().read(true).write(true).create(true),
)?;
)
.await?;
Ok(EphemeralFile {
page_cache_file_id: page_cache::next_file_id(),
@@ -286,7 +287,7 @@ mod tests {
async fn test_ephemeral_blobs() -> Result<(), io::Error> {
let (conf, tenant_id, timeline_id) = harness("ephemeral_blobs")?;
let mut file = EphemeralFile::create(conf, tenant_id, timeline_id)?;
let mut file = EphemeralFile::create(conf, tenant_id, timeline_id).await?;
let pos_foo = file.write_blob(b"foo").await?;
assert_eq!(

View File

@@ -4,10 +4,9 @@ use std::{
sync::atomic::{AtomicUsize, Ordering},
};
use crate::virtual_file::VirtualFile;
fn fsync_path(path: &Path) -> io::Result<()> {
let file = VirtualFile::open(path)?;
// TODO use VirtualFile::fsync_all once we fully go async.
let file = std::fs::File::open(path)?;
file.sync_all()
}

View File

@@ -604,7 +604,7 @@ impl DeltaLayerWriterInner {
// FIXME: throw an error instead?
let path = DeltaLayer::temp_path_for(conf, &tenant_id, &timeline_id, key_start, &lsn_range);
let mut file = VirtualFile::create(&path)?;
let mut file = VirtualFile::create(&path).await?;
// make room for the header block
file.seek(SeekFrom::Start(PAGE_SZ as u64)).await?;
let blob_writer = BlobWriter::new(file, PAGE_SZ as u64);
@@ -732,7 +732,7 @@ impl DeltaLayerWriterInner {
};
// fsync the file
file.sync_all()?;
file.sync_all().await?;
// Rename the file to its final name
//
// Note: This overwrites any existing file. There shouldn't be any.
@@ -851,6 +851,7 @@ impl DeltaLayerInner {
summary: Option<Summary>,
) -> anyhow::Result<Self> {
let file = VirtualFile::open(path)
.await
.with_context(|| format!("Failed to open file '{}'", path.display()))?;
let file = FileBlockReader::new(file);

View File

@@ -438,6 +438,7 @@ impl ImageLayerInner {
summary: Option<Summary>,
) -> anyhow::Result<Self> {
let file = VirtualFile::open(path)
.await
.with_context(|| format!("Failed to open file '{}'", path.display()))?;
let file = FileBlockReader::new(file);
let summary_blk = file.read_blk(0).await?;
@@ -540,7 +541,8 @@ impl ImageLayerWriterInner {
let mut file = VirtualFile::open_with_options(
&path,
std::fs::OpenOptions::new().write(true).create_new(true),
)?;
)
.await?;
// make room for the header block
file.seek(SeekFrom::Start(PAGE_SZ as u64)).await?;
let blob_writer = BlobWriter::new(file, PAGE_SZ as u64);
@@ -645,7 +647,7 @@ impl ImageLayerWriterInner {
};
// fsync the file
file.sync_all()?;
file.sync_all().await?;
// Rename the file to its final name
//

View File

@@ -236,7 +236,7 @@ impl InMemoryLayer {
///
/// Create a new, empty, in-memory layer
///
pub fn create(
pub async fn create(
conf: &'static PageServerConf,
timeline_id: TimelineId,
tenant_id: TenantId,
@@ -244,7 +244,7 @@ impl InMemoryLayer {
) -> Result<InMemoryLayer> {
trace!("initializing new empty InMemoryLayer for writing on timeline {timeline_id} at {start_lsn}");
let file = EphemeralFile::create(conf, tenant_id, timeline_id)?;
let file = EphemeralFile::create(conf, tenant_id, timeline_id).await?;
Ok(InMemoryLayer {
conf,

View File

@@ -2544,13 +2544,15 @@ impl Timeline {
///
async fn get_layer_for_write(&self, lsn: Lsn) -> anyhow::Result<Arc<InMemoryLayer>> {
let mut guard = self.layers.write().await;
let layer = guard.get_layer_for_write(
lsn,
self.get_last_record_lsn(),
self.conf,
self.timeline_id,
self.tenant_id,
)?;
let layer = guard
.get_layer_for_write(
lsn,
self.get_last_record_lsn(),
self.conf,
self.timeline_id,
self.tenant_id,
)
.await?;
Ok(layer)
}

View File

@@ -87,7 +87,7 @@ impl LayerManager {
/// Open a new writable layer to append data if there is no open layer, otherwise return the current open layer,
/// called within `get_layer_for_write`.
pub(crate) fn get_layer_for_write(
pub(crate) async fn get_layer_for_write(
&mut self,
lsn: Lsn,
last_record_lsn: Lsn,
@@ -129,7 +129,7 @@ impl LayerManager {
lsn
);
let new_layer = InMemoryLayer::create(conf, timeline_id, tenant_id, start_lsn)?;
let new_layer = InMemoryLayer::create(conf, timeline_id, tenant_id, start_lsn).await?;
let layer = Arc::new(new_layer);
self.layer_map.open_layer = Some(layer.clone());

View File

@@ -12,13 +12,14 @@
//!
use crate::metrics::{STORAGE_IO_SIZE, STORAGE_IO_TIME};
use crate::tenant::TENANTS_SEGMENT_NAME;
use futures::Future;
use once_cell::sync::OnceCell;
use std::fs::{self, File, OpenOptions};
use std::io::{Error, ErrorKind, Seek, SeekFrom};
use std::os::unix::fs::FileExt;
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::{RwLock, RwLockWriteGuard};
use tokio::sync::{RwLock, RwLockWriteGuard};
///
/// A virtual file descriptor. You can use this just like std::fs::File, but internally
@@ -110,7 +111,7 @@ impl OpenFiles {
///
/// On return, we hold a lock on the slot, and its 'tag' has been updated
/// recently_used has been set. It's all ready for reuse.
fn find_victim_slot(&self) -> (SlotHandle, RwLockWriteGuard<SlotInner>) {
async fn find_victim_slot(&self) -> (SlotHandle, RwLockWriteGuard<SlotInner>) {
//
// Run the clock algorithm to find a slot to replace.
//
@@ -142,7 +143,7 @@ impl OpenFiles {
}
retries += 1;
} else {
slot_guard = slot.inner.write().unwrap();
slot_guard = slot.inner.write().await;
index = next;
break;
}
@@ -210,17 +211,18 @@ impl CrashsafeOverwriteError {
impl VirtualFile {
/// Open a file in read-only mode. Like File::open.
pub fn open(path: &Path) -> Result<VirtualFile, std::io::Error> {
Self::open_with_options(path, OpenOptions::new().read(true))
pub async fn open(path: &Path) -> Result<VirtualFile, std::io::Error> {
Self::open_with_options(path, OpenOptions::new().read(true)).await
}
/// Create a new file for writing. If the file exists, it will be truncated.
/// Like File::create.
pub fn create(path: &Path) -> Result<VirtualFile, std::io::Error> {
pub async fn create(path: &Path) -> Result<VirtualFile, std::io::Error> {
Self::open_with_options(
path,
OpenOptions::new().write(true).create(true).truncate(true),
)
.await
}
/// Open a file with given options.
@@ -228,7 +230,7 @@ impl VirtualFile {
/// Note: If any custom flags were set in 'open_options' through OpenOptionsExt,
/// they will be applied also when the file is subsequently re-opened, not only
/// on the first time. Make sure that's sane!
pub fn open_with_options(
pub async fn open_with_options(
path: &Path,
open_options: &OpenOptions,
) -> Result<VirtualFile, std::io::Error> {
@@ -243,7 +245,7 @@ impl VirtualFile {
tenant_id = "*".to_string();
timeline_id = "*".to_string();
}
let (handle, mut slot_guard) = get_open_files().find_victim_slot();
let (handle, mut slot_guard) = get_open_files().find_victim_slot().await;
let file = STORAGE_IO_TIME
.with_label_values(&["open"])
.observe_closure_duration(|| open_options.open(path))?;
@@ -299,11 +301,13 @@ impl VirtualFile {
// we bail out instead of causing damage.
.create_new(true),
)
.await
.map_err(CrashsafeOverwriteError::CreateTempfile)?;
file.write_all(content)
.await
.map_err(CrashsafeOverwriteError::WriteContents)?;
file.sync_all()
.await
.map_err(CrashsafeOverwriteError::SyncTempfile)?;
drop(file); // before the rename, that's important!
// renames are atomic
@@ -316,28 +320,33 @@ impl VirtualFile {
// try_lock.
let final_parent_dirfd =
Self::open_with_options(final_path_parent, OpenOptions::new().read(true))
.await
.map_err(CrashsafeOverwriteError::OpenFinalPathParentDir)?;
final_parent_dirfd
.sync_all()
.await
.map_err(CrashsafeOverwriteError::SyncFinalPathParentDir)?;
Ok(())
}
/// Call File::sync_all() on the underlying File.
pub fn sync_all(&self) -> Result<(), Error> {
self.with_file("fsync", |file| file.sync_all())?
pub async fn sync_all(&self) -> Result<(), Error> {
self.with_file("fsync", |file| async move { file.sync_all() })
.await?
}
pub async fn metadata(&self) -> Result<fs::Metadata, Error> {
self.with_file("metadata", |file| file.metadata())?
self.with_file("metadata", |file| async move { file.metadata() })
.await?
}
/// Helper function that looks up the underlying File for this VirtualFile,
/// opening it and evicting some other File if necessary. It calls 'func'
/// with the physical File.
fn with_file<F, R>(&self, op: &str, mut func: F) -> Result<R, Error>
async fn with_file<F, R, FR>(&self, _op: &str, func: F) -> Result<R, Error>
where
F: FnMut(&File) -> R,
F: FnOnce(&File) -> FR,
FR: Future<Output = R>,
{
let open_files = get_open_files();
@@ -348,19 +357,17 @@ impl VirtualFile {
// We only need to hold the handle lock while we read the current handle. If
// another thread closes the file and recycles the slot for a different file,
// we will notice that the handle we read is no longer valid and retry.
let mut handle = *self.handle.read().unwrap();
let mut handle = *self.handle.read().await;
loop {
// Check if the slot contains our File
{
let slot = &open_files.slots[handle.index];
let slot_guard = slot.inner.read().unwrap();
let slot_guard = slot.inner.read().await;
if slot_guard.tag == handle.tag {
if let Some(file) = &slot_guard.file {
// Found a cached file descriptor.
slot.recently_used.store(true, Ordering::Relaxed);
return Ok(STORAGE_IO_TIME
.with_label_values(&[op])
.observe_closure_duration(|| func(file)));
return Ok(func(file).await);
}
}
}
@@ -368,7 +375,7 @@ impl VirtualFile {
// The slot didn't contain our File. We will have to open it ourselves,
// but before that, grab a write lock on handle in the VirtualFile, so
// that no other thread will try to concurrently open the same file.
let handle_guard = self.handle.write().unwrap();
let handle_guard = self.handle.write().await;
// If another thread changed the handle while we were not holding the lock,
// then the handle might now be valid again. Loop back to retry.
@@ -382,7 +389,7 @@ impl VirtualFile {
// We need to open the file ourselves. The handle in the VirtualFile is
// now locked in write-mode. Find a free slot to put it in.
let (handle, mut slot_guard) = open_files.find_victim_slot();
let (handle, mut slot_guard) = open_files.find_victim_slot().await;
// Open the physical file
let file = STORAGE_IO_TIME
@@ -390,9 +397,7 @@ impl VirtualFile {
.observe_closure_duration(|| self.open_options.open(&self.path))?;
// Perform the requested operation on it
let result = STORAGE_IO_TIME
.with_label_values(&[op])
.observe_closure_duration(|| func(&file));
let result = func(&file).await;
// Store the File in the slot and update the handle in the VirtualFile
// to point to it.
@@ -415,7 +420,11 @@ impl VirtualFile {
self.pos = offset;
}
SeekFrom::End(offset) => {
self.pos = self.with_file("seek", |mut file| file.seek(SeekFrom::End(offset)))??
self.pos = self
.with_file("seek", |mut file| async move {
file.seek(SeekFrom::End(offset))
})
.await??
}
SeekFrom::Current(offset) => {
let pos = self.pos as i128 + offset as i128;
@@ -503,7 +512,9 @@ impl VirtualFile {
}
pub async fn read_at(&self, buf: &mut [u8], offset: u64) -> Result<usize, Error> {
let result = self.with_file("read", |file| file.read_at(buf, offset))?;
let result = self
.with_file("read", |file| async move { file.read_at(buf, offset) })
.await?;
if let Ok(size) = result {
STORAGE_IO_SIZE
.with_label_values(&["read", &self.tenant_id, &self.timeline_id])
@@ -513,7 +524,9 @@ impl VirtualFile {
}
async fn write_at(&self, buf: &[u8], offset: u64) -> Result<usize, Error> {
let result = self.with_file("write", |file| file.write_at(buf, offset))?;
let result = self
.with_file("write", |file| async move { file.write_at(buf, offset) })
.await?;
if let Ok(size) = result {
STORAGE_IO_SIZE
.with_label_values(&["write", &self.tenant_id, &self.timeline_id])
@@ -555,12 +568,19 @@ impl VirtualFile {
impl Drop for VirtualFile {
/// If a VirtualFile is dropped, close the underlying file if it was open.
fn drop(&mut self) {
let handle = self.handle.get_mut().unwrap();
let handle = self.handle.get_mut();
// We could check with a read-lock first, to avoid waiting on an
// unrelated I/O.
// We don't have async drop so we cannot wait for the lock here.
// Instead, do a best-effort attempt at closing the underlying
// file descriptor by using `try_write`.
// This best-effort attempt should be quite good though
// as we have `&mut self` access. In other words, if the slot
// is still occupied by our file, we should be the only ones
// accessing it (and if it has been reassigned since, we don't
// need to bother with dropping anyways).
let slot = &get_open_files().slots[handle.index];
let mut slot_guard = slot.inner.write().unwrap();
let Ok(mut slot_guard) = slot.inner.try_write() else { return };
if slot_guard.tag == handle.tag {
slot.recently_used.store(false, Ordering::Relaxed);
// there is also operation "close-by-replace" for closes done on eviction for
@@ -625,6 +645,7 @@ mod tests {
use rand::seq::SliceRandom;
use rand::thread_rng;
use rand::Rng;
use std::future::Future;
use std::io::Write;
use std::sync::Arc;
@@ -694,8 +715,8 @@ mod tests {
// results with VirtualFiles as with native Files. (Except that with
// native files, you will run out of file descriptors if the ulimit
// is low enough.)
test_files("virtual_files", |path, open_options| {
let vf = VirtualFile::open_with_options(path, open_options)?;
test_files("virtual_files", |path, open_options| async move {
let vf = VirtualFile::open_with_options(&path, &open_options).await?;
Ok(MaybeVirtualFile::VirtualFile(vf))
})
.await
@@ -703,31 +724,37 @@ mod tests {
#[tokio::test]
async fn test_physical_files() -> Result<(), Error> {
test_files("physical_files", |path, open_options| {
test_files("physical_files", |path, open_options| async move {
Ok(MaybeVirtualFile::File(open_options.open(path)?))
})
.await
}
async fn test_files<OF>(testname: &str, openfunc: OF) -> Result<(), Error>
async fn test_files<OF, FT>(testname: &str, openfunc: OF) -> Result<(), Error>
where
OF: Fn(&Path, &OpenOptions) -> Result<MaybeVirtualFile, std::io::Error>,
OF: Fn(PathBuf, OpenOptions) -> FT,
FT: Future<Output = Result<MaybeVirtualFile, std::io::Error>>,
{
let testdir = crate::config::PageServerConf::test_repo_dir(testname);
std::fs::create_dir_all(&testdir)?;
let path_a = testdir.join("file_a");
let mut file_a = openfunc(
&path_a,
OpenOptions::new().write(true).create(true).truncate(true),
)?;
path_a.clone(),
OpenOptions::new()
.write(true)
.create(true)
.truncate(true)
.to_owned(),
)
.await?;
file_a.write_all(b"foobar").await?;
// cannot read from a file opened in write-only mode
let _ = file_a.read_string().await.unwrap_err();
// Close the file and re-open for reading
let mut file_a = openfunc(&path_a, OpenOptions::new().read(true))?;
let mut file_a = openfunc(path_a, OpenOptions::new().read(true).to_owned()).await?;
// cannot write to a file opened in read-only mode
let _ = file_a.write_all(b"bar").await.unwrap_err();
@@ -763,13 +790,15 @@ mod tests {
// Create another test file, and try FileExt functions on it.
let path_b = testdir.join("file_b");
let mut file_b = openfunc(
&path_b,
path_b.clone(),
OpenOptions::new()
.read(true)
.write(true)
.create(true)
.truncate(true),
)?;
.truncate(true)
.to_owned(),
)
.await?;
file_b.write_all_at(b"BAR", 3).await?;
file_b.write_all_at(b"FOO", 0).await?;
@@ -783,7 +812,8 @@ mod tests {
let mut vfiles = Vec::new();
for _ in 0..100 {
let mut vfile = openfunc(&path_b, OpenOptions::new().read(true))?;
let mut vfile =
openfunc(path_b.clone(), OpenOptions::new().read(true).to_owned()).await?;
assert_eq!("FOOBAR", vfile.read_string().await?);
vfiles.push(vfile);
}
@@ -808,8 +838,8 @@ mod tests {
/// Test using VirtualFiles from many threads concurrently. This tests both using
/// a lot of VirtualFiles concurrently, causing evictions, and also using the same
/// VirtualFile from multiple threads concurrently.
#[test]
fn test_vfile_concurrency() -> Result<(), Error> {
#[tokio::test]
async fn test_vfile_concurrency() -> Result<(), Error> {
const SIZE: usize = 8 * 1024;
const VIRTUAL_FILES: usize = 100;
const THREADS: usize = 100;
@@ -828,7 +858,8 @@ mod tests {
// Open the file many times.
let mut files = Vec::new();
for _ in 0..VIRTUAL_FILES {
let f = VirtualFile::open_with_options(&test_file_path, OpenOptions::new().read(true))?;
let f = VirtualFile::open_with_options(&test_file_path, OpenOptions::new().read(true))
.await?;
files.push(f);
}
let files = Arc::new(files);
@@ -839,9 +870,10 @@ mod tests {
.thread_name("test_vfile_concurrency thread")
.build()
.unwrap();
let mut hdls = Vec::new();
for _threadno in 0..THREADS {
let files = files.clone();
rt.spawn(async move {
let hdl = rt.spawn(async move {
let mut buf = [0u8; SIZE];
let mut rng = rand::rngs::OsRng;
for _ in 1..1000 {
@@ -850,7 +882,12 @@ mod tests {
assert!(buf == SAMPLE);
}
});
hdls.push(hdl);
}
for hdl in hdls {
hdl.await?;
}
std::mem::forget(rt);
Ok(())
}