This commit is contained in:
Arpad Müller
2023-09-07 15:48:30 +02:00
parent 2050136437
commit f417db52e0

View File

@@ -12,6 +12,7 @@
//!
use crate::metrics::{STORAGE_IO_SIZE, STORAGE_IO_TIME};
use crate::tenant::TENANTS_SEGMENT_NAME;
use futures::Future;
use once_cell::sync::OnceCell;
use std::fs::{self, File, OpenOptions};
use std::io::{Error, ErrorKind, Seek, SeekFrom};
@@ -330,19 +331,22 @@ impl VirtualFile {
/// Call File::sync_all() on the underlying File.
pub async fn sync_all(&self) -> Result<(), Error> {
self.with_file("fsync", |file| file.sync_all()).await?
self.with_file("fsync", |file| async move { file.sync_all() })
.await?
}
pub async fn metadata(&self) -> Result<fs::Metadata, Error> {
self.with_file("metadata", |file| file.metadata()).await?
self.with_file("metadata", |file| async move { file.metadata() })
.await?
}
/// Helper function that looks up the underlying File for this VirtualFile,
/// opening it and evicting some other File if necessary. It calls 'func'
/// with the physical File.
async fn with_file<F, R>(&self, op: &str, mut func: F) -> Result<R, Error>
async fn with_file<'a, F, R, FR>(&self, _op: &str, func: F) -> Result<R, Error>
where
F: FnMut(&File) -> R,
F: FnOnce(&'a File) -> FR,
FR: Future<Output = R> + 'a,
{
let open_files = get_open_files();
@@ -363,9 +367,7 @@ impl VirtualFile {
if let Some(file) = &slot_guard.file {
// Found a cached file descriptor.
slot.recently_used.store(true, Ordering::Relaxed);
return Ok(STORAGE_IO_TIME
.with_label_values(&[op])
.observe_closure_duration(|| func(file)));
return Ok(func(file).await);
}
}
}
@@ -395,9 +397,7 @@ impl VirtualFile {
.observe_closure_duration(|| self.open_options.open(&self.path))?;
// Perform the requested operation on it
let result = STORAGE_IO_TIME
.with_label_values(&[op])
.observe_closure_duration(|| func(&file));
let result = func(&file).await;
// Store the File in the slot and update the handle in the VirtualFile
// to point to it.
@@ -421,7 +421,9 @@ impl VirtualFile {
}
SeekFrom::End(offset) => {
self.pos = self
.with_file("seek", |mut file| file.seek(SeekFrom::End(offset)))
.with_file("seek", |mut file| async move {
file.seek(SeekFrom::End(offset))
})
.await??
}
SeekFrom::Current(offset) => {
@@ -511,7 +513,7 @@ impl VirtualFile {
pub async fn read_at(&self, buf: &mut [u8], offset: u64) -> Result<usize, Error> {
let result = self
.with_file("read", |file| file.read_at(buf, offset))
.with_file("read", |file| async move { file.read_at(buf, offset) })
.await?;
if let Ok(size) = result {
STORAGE_IO_SIZE
@@ -523,7 +525,7 @@ impl VirtualFile {
async fn write_at(&self, buf: &[u8], offset: u64) -> Result<usize, Error> {
let result = self
.with_file("write", |file| file.write_at(buf, offset))
.with_file("write", |file| async move { file.write_at(buf, offset) })
.await?;
if let Ok(size) = result {
STORAGE_IO_SIZE