mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-16 18:02:56 +00:00
refactor: get_file_guard & base with_file on it
This commit is contained in:
@@ -14,10 +14,11 @@ use crate::metrics::{STORAGE_IO_SIZE, STORAGE_IO_TIME};
|
||||
use once_cell::sync::OnceCell;
|
||||
use std::fs::{self, File, OpenOptions};
|
||||
use std::io::{Error, ErrorKind, Seek, SeekFrom, Write};
|
||||
use std::ops::Deref;
|
||||
use std::os::unix::fs::FileExt;
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
|
||||
use std::sync::{RwLock, RwLockWriteGuard};
|
||||
use std::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};
|
||||
|
||||
///
|
||||
/// A virtual file descriptor. You can use this just like std::fs::File, but internally
|
||||
@@ -244,7 +245,26 @@ impl VirtualFile {
|
||||
pub fn metadata(&self) -> Result<fs::Metadata, Error> {
|
||||
self.with_file("metadata", |file| file.metadata())?
|
||||
}
|
||||
}
|
||||
|
||||
/// The function that constructs the guard guarantees that the respective [`SlotInner::file`] is `Some`.
|
||||
enum FileSlotGuard {
|
||||
Write(RwLockWriteGuard<'static, SlotInner>),
|
||||
Read(RwLockReadGuard<'static, SlotInner>),
|
||||
}
|
||||
|
||||
impl Deref for FileSlotGuard {
|
||||
type Target = File;
|
||||
|
||||
fn deref(&self) -> &Self::Target {
|
||||
match self {
|
||||
FileSlotGuard::Write(guard) => guard.file.as_ref().unwrap(),
|
||||
FileSlotGuard::Read(guard) => guard.file.as_ref().unwrap(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl VirtualFile {
|
||||
/// Helper function that looks up the underlying File for this VirtualFile,
|
||||
/// opening it and evicting some other File if necessary. It calls 'func'
|
||||
/// with the physical File.
|
||||
@@ -252,6 +272,14 @@ impl VirtualFile {
|
||||
where
|
||||
F: FnMut(&File) -> R,
|
||||
{
|
||||
let guard = self.get_file_guard()?;
|
||||
|
||||
return Ok(STORAGE_IO_TIME
|
||||
.with_label_values(&[op])
|
||||
.observe_closure_duration(|| func(&guard)));
|
||||
}
|
||||
|
||||
fn get_file_guard(&self) -> Result<FileSlotGuard, Error> {
|
||||
let open_files = get_open_files();
|
||||
|
||||
let mut handle_guard = {
|
||||
@@ -268,12 +296,10 @@ impl VirtualFile {
|
||||
let slot = &open_files.slots[handle.index];
|
||||
let slot_guard = slot.inner.read().unwrap();
|
||||
if slot_guard.tag == handle.tag {
|
||||
if let Some(file) = &slot_guard.file {
|
||||
if let Some(_) = &slot_guard.file {
|
||||
// Found a cached file descriptor.
|
||||
slot.recently_used.store(true, Ordering::Relaxed);
|
||||
return Ok(STORAGE_IO_TIME
|
||||
.with_label_values(&[op])
|
||||
.observe_closure_duration(|| func(file)));
|
||||
return Ok(FileSlotGuard::Read(slot_guard));
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -302,18 +328,13 @@ impl VirtualFile {
|
||||
.with_label_values(&["open"])
|
||||
.observe_closure_duration(|| self.open_options.open(&self.path))?;
|
||||
|
||||
// Perform the requested operation on it
|
||||
let result = STORAGE_IO_TIME
|
||||
.with_label_values(&[op])
|
||||
.observe_closure_duration(|| func(&file));
|
||||
|
||||
// Store the File in the slot and update the handle in the VirtualFile
|
||||
// to point to it.
|
||||
slot_guard.file.replace(file);
|
||||
|
||||
*handle_guard = handle;
|
||||
|
||||
Ok(result)
|
||||
Ok(FileSlotGuard::Write(slot_guard))
|
||||
}
|
||||
|
||||
pub fn remove(self) {
|
||||
@@ -359,7 +380,6 @@ impl Write for VirtualFile {
|
||||
}
|
||||
|
||||
impl VirtualFile {
|
||||
|
||||
pub fn seek(&mut self, pos: SeekFrom) -> Result<u64, Error> {
|
||||
match pos {
|
||||
SeekFrom::Start(offset) => {
|
||||
@@ -407,7 +427,11 @@ impl VirtualFile {
|
||||
}
|
||||
|
||||
// Copied from https://doc.rust-lang.org/1.72.0/src/std/os/unix/fs.rs.html#117-135
|
||||
pub async fn read_exact_at_async(&self, mut buf: &mut [u8], mut offset: u64) -> Result<(), Error> {
|
||||
pub async fn read_exact_at_async(
|
||||
&self,
|
||||
mut buf: &mut [u8],
|
||||
mut offset: u64,
|
||||
) -> Result<(), Error> {
|
||||
while !buf.is_empty() {
|
||||
match self.read_at_async(buf, offset).await {
|
||||
Ok(0) => {
|
||||
|
||||
Reference in New Issue
Block a user