mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-16 18:02:56 +00:00
refactor(VirtualFile): switch to OwnedFd
This commit is contained in:
@@ -18,6 +18,7 @@ use once_cell::sync::OnceCell;
|
||||
use std::fs::{self, File};
|
||||
use std::io::{Error, ErrorKind, Seek, SeekFrom};
|
||||
|
||||
use std::os::fd::{AsRawFd, FromRawFd, IntoRawFd, OwnedFd};
|
||||
use std::os::unix::fs::FileExt;
|
||||
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
|
||||
use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};
|
||||
@@ -109,7 +110,7 @@ struct SlotInner {
|
||||
tag: u64,
|
||||
|
||||
/// the underlying file
|
||||
file: Option<File>,
|
||||
file: Option<OwnedFd>,
|
||||
}
|
||||
|
||||
impl OpenFiles {
|
||||
@@ -277,6 +278,10 @@ macro_rules! with_file {
|
||||
let $ident = $this.lock_file().await?;
|
||||
observe_duration!($op, $($body)*)
|
||||
}};
|
||||
($this:expr, $op:expr, | mut $ident:ident | $($body:tt)*) => {{
|
||||
let mut $ident = $this.lock_file().await?;
|
||||
observe_duration!($op, $($body)*)
|
||||
}};
|
||||
}
|
||||
|
||||
impl VirtualFile {
|
||||
@@ -320,10 +325,9 @@ impl VirtualFile {
|
||||
// NB: there is also StorageIoOperation::OpenAfterReplace which is for the case
|
||||
// where our caller doesn't get to use the returned VirtualFile before its
|
||||
// slot gets re-used by someone else.
|
||||
let file = observe_duration!(
|
||||
StorageIoOperation::Open,
|
||||
open_options.open(path.as_std_path()).await
|
||||
)?;
|
||||
let file = observe_duration!(StorageIoOperation::Open, {
|
||||
open_options.open(path.as_std_path()).await?
|
||||
});
|
||||
|
||||
// Strip all options other than read and write.
|
||||
//
|
||||
@@ -396,15 +400,13 @@ impl VirtualFile {
|
||||
|
||||
/// Call File::sync_all() on the underlying File.
|
||||
pub async fn sync_all(&self) -> Result<(), Error> {
|
||||
with_file!(self, StorageIoOperation::Fsync, |file| file
|
||||
.as_ref()
|
||||
.sync_all())
|
||||
with_file!(self, StorageIoOperation::Fsync, |file_guard| file_guard
|
||||
.with_std_file(|std_file| std_file.sync_all()))
|
||||
}
|
||||
|
||||
pub async fn metadata(&self) -> Result<fs::Metadata, Error> {
|
||||
with_file!(self, StorageIoOperation::Metadata, |file| file
|
||||
.as_ref()
|
||||
.metadata())
|
||||
with_file!(self, StorageIoOperation::Metadata, |file_guard| file_guard
|
||||
.with_std_file(|std_file| std_file.metadata()))
|
||||
}
|
||||
|
||||
/// Helper function internal to `VirtualFile` that looks up the underlying File,
|
||||
@@ -413,7 +415,7 @@ impl VirtualFile {
|
||||
///
|
||||
/// We are doing it via a macro as Rust doesn't support async closures that
|
||||
/// take on parameters with lifetimes.
|
||||
async fn lock_file(&self) -> Result<FileGuard<'_>, Error> {
|
||||
async fn lock_file(&self) -> Result<FileGuard, Error> {
|
||||
let open_files = get_open_files();
|
||||
|
||||
let mut handle_guard = {
|
||||
@@ -459,10 +461,9 @@ impl VirtualFile {
|
||||
// NB: we use StorageIoOperation::OpenAferReplace for this to distinguish this
|
||||
// case from StorageIoOperation::Open. This helps with identifying thrashing
|
||||
// of the virtual file descriptor cache.
|
||||
let file = observe_duration!(
|
||||
StorageIoOperation::OpenAfterReplace,
|
||||
self.open_options.open(self.path.as_std_path()).await
|
||||
)?;
|
||||
let file = observe_duration!(StorageIoOperation::OpenAfterReplace, {
|
||||
self.open_options.open(self.path.as_std_path()).await?
|
||||
});
|
||||
|
||||
// Store the File in the slot and update the handle in the VirtualFile
|
||||
// to point to it.
|
||||
@@ -487,9 +488,8 @@ impl VirtualFile {
|
||||
self.pos = offset;
|
||||
}
|
||||
SeekFrom::End(offset) => {
|
||||
self.pos = with_file!(self, StorageIoOperation::Seek, |file| file
|
||||
.as_ref()
|
||||
.seek(SeekFrom::End(offset)))?
|
||||
self.pos = with_file!(self, StorageIoOperation::Seek, |mut file_guard| file_guard
|
||||
.with_std_file_mut(|std_file| std_file.seek(SeekFrom::End(offset))))?
|
||||
}
|
||||
SeekFrom::Current(offset) => {
|
||||
let pos = self.pos as i128 + offset as i128;
|
||||
@@ -577,9 +577,8 @@ impl VirtualFile {
|
||||
}
|
||||
|
||||
pub async fn read_at(&self, buf: &mut [u8], offset: u64) -> Result<usize, Error> {
|
||||
let result = with_file!(self, StorageIoOperation::Read, |file| file
|
||||
.as_ref()
|
||||
.read_at(buf, offset));
|
||||
let result = with_file!(self, StorageIoOperation::Read, |file_guard| file_guard
|
||||
.with_std_file(|std_file| std_file.read_at(buf, offset)));
|
||||
if let Ok(size) = result {
|
||||
STORAGE_IO_SIZE
|
||||
.with_label_values(&["read", &self.tenant_id, &self.timeline_id])
|
||||
@@ -589,9 +588,9 @@ impl VirtualFile {
|
||||
}
|
||||
|
||||
async fn write_at(&self, buf: &[u8], offset: u64) -> Result<usize, Error> {
|
||||
let result = with_file!(self, StorageIoOperation::Write, |file| file
|
||||
.as_ref()
|
||||
.write_at(buf, offset));
|
||||
let result = with_file!(self, StorageIoOperation::Write, |file_guard| {
|
||||
file_guard.with_std_file(|std_file| std_file.write_at(buf, offset))
|
||||
});
|
||||
if let Ok(size) = result {
|
||||
STORAGE_IO_SIZE
|
||||
.with_label_values(&["write", &self.tenant_id, &self.timeline_id])
|
||||
@@ -601,18 +600,47 @@ impl VirtualFile {
|
||||
}
|
||||
}
|
||||
|
||||
struct FileGuard<'a> {
|
||||
slot_guard: RwLockReadGuard<'a, SlotInner>,
|
||||
struct FileGuard {
|
||||
slot_guard: RwLockReadGuard<'static, SlotInner>,
|
||||
}
|
||||
|
||||
impl<'a> AsRef<File> for FileGuard<'a> {
|
||||
fn as_ref(&self) -> &File {
|
||||
impl AsRef<OwnedFd> for FileGuard {
|
||||
fn as_ref(&self) -> &OwnedFd {
|
||||
// This unwrap is safe because we only create `FileGuard`s
|
||||
// if we know that the file is Some.
|
||||
self.slot_guard.file.as_ref().unwrap()
|
||||
}
|
||||
}
|
||||
|
||||
impl FileGuard {
|
||||
/// Soft deprecation: we'll move VirtualFile to async APIs and remove this function eventually.
|
||||
fn with_std_file<F, R>(&self, with: F) -> R
|
||||
where
|
||||
F: FnOnce(&File) -> R,
|
||||
{
|
||||
// SAFETY:
|
||||
// - lifetime of the fd: `file` doesn't outlive the OwnedFd stored in `self`.
|
||||
// - `&` usage below: `self` is `&`, hence Rust typesystem guarantees there are is no `&mut`
|
||||
let file = unsafe { File::from_raw_fd(self.as_ref().as_raw_fd()) };
|
||||
let res = with(&file);
|
||||
let _ = file.into_raw_fd();
|
||||
res
|
||||
}
|
||||
/// Soft deprecation: we'll move VirtualFile to async APIs and remove this function eventually.
|
||||
fn with_std_file_mut<F, R>(&mut self, with: F) -> R
|
||||
where
|
||||
F: FnOnce(&mut File) -> R,
|
||||
{
|
||||
// SAFETY:
|
||||
// - lifetime of the fd: `file` doesn't outlive the OwnedFd stored in `self`.
|
||||
// - &mut usage below: `self` is `&mut`, hence this call is the only task/thread that has control over the underlying fd
|
||||
let mut file = unsafe { File::from_raw_fd(self.as_ref().as_raw_fd()) };
|
||||
let res = with(&mut file);
|
||||
let _ = file.into_raw_fd();
|
||||
res
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
impl VirtualFile {
|
||||
pub(crate) async fn read_blk(
|
||||
@@ -805,7 +833,7 @@ mod tests {
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_virtual_files() -> Result<(), Error> {
|
||||
async fn test_virtual_files() -> anyhow::Result<()> {
|
||||
// The real work is done in the test_files() helper function. This
|
||||
// allows us to run the same set of tests against a native File, and
|
||||
// VirtualFile. We trust the native Files and wouldn't need to test them,
|
||||
@@ -821,16 +849,17 @@ mod tests {
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_physical_files() -> Result<(), Error> {
|
||||
async fn test_physical_files() -> anyhow::Result<()> {
|
||||
test_files("physical_files", |path, open_options| async move {
|
||||
Ok(MaybeVirtualFile::File(
|
||||
open_options.open(path.as_std_path()).await?,
|
||||
))
|
||||
Ok(MaybeVirtualFile::File({
|
||||
let owned_fd = open_options.open(path.as_std_path()).await?;
|
||||
File::from(owned_fd)
|
||||
}))
|
||||
})
|
||||
.await
|
||||
}
|
||||
|
||||
async fn test_files<OF, FT>(testname: &str, openfunc: OF) -> Result<(), Error>
|
||||
async fn test_files<OF, FT>(testname: &str, openfunc: OF) -> anyhow::Result<()>
|
||||
where
|
||||
OF: Fn(Utf8PathBuf, OpenOptions) -> FT,
|
||||
FT: Future<Output = Result<MaybeVirtualFile, std::io::Error>>,
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
use std::path::Path;
|
||||
use std::{os::fd::OwnedFd, path::Path};
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct OpenOptions(std::fs::OpenOptions);
|
||||
@@ -39,11 +39,8 @@ impl OpenOptions {
|
||||
self
|
||||
}
|
||||
|
||||
pub(in crate::virtual_file) async fn open(
|
||||
&self,
|
||||
path: &Path,
|
||||
) -> std::io::Result<std::fs::File> {
|
||||
self.0.open(path)
|
||||
pub(in crate::virtual_file) async fn open(&self, path: &Path) -> std::io::Result<OwnedFd> {
|
||||
self.0.open(path).map(|file| file.into())
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user