mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-15 17:32:56 +00:00
Switch the locks to tokio ones
This commit is contained in:
@@ -18,7 +18,7 @@ use std::io::{Error, ErrorKind, Seek, SeekFrom};
|
||||
use std::os::unix::fs::FileExt;
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
|
||||
use std::sync::{RwLock, RwLockWriteGuard};
|
||||
use tokio::sync::{RwLock, RwLockWriteGuard};
|
||||
|
||||
///
|
||||
/// A virtual file descriptor. You can use this just like std::fs::File, but internally
|
||||
@@ -110,7 +110,7 @@ impl OpenFiles {
|
||||
///
|
||||
/// On return, we hold a lock on the slot, and its 'tag' has been updated
|
||||
/// recently_used has been set. It's all ready for reuse.
|
||||
fn find_victim_slot(&self) -> (SlotHandle, RwLockWriteGuard<SlotInner>) {
|
||||
async fn find_victim_slot(&self) -> (SlotHandle, RwLockWriteGuard<SlotInner>) {
|
||||
//
|
||||
// Run the clock algorithm to find a slot to replace.
|
||||
//
|
||||
@@ -142,7 +142,7 @@ impl OpenFiles {
|
||||
}
|
||||
retries += 1;
|
||||
} else {
|
||||
slot_guard = slot.inner.write().unwrap();
|
||||
slot_guard = slot.inner.write().await;
|
||||
index = next;
|
||||
break;
|
||||
}
|
||||
@@ -244,7 +244,7 @@ impl VirtualFile {
|
||||
tenant_id = "*".to_string();
|
||||
timeline_id = "*".to_string();
|
||||
}
|
||||
let (handle, mut slot_guard) = get_open_files().find_victim_slot();
|
||||
let (handle, mut slot_guard) = get_open_files().find_victim_slot().await;
|
||||
let file = STORAGE_IO_TIME
|
||||
.with_label_values(&["open"])
|
||||
.observe_closure_duration(|| open_options.open(path))?;
|
||||
@@ -353,12 +353,12 @@ impl VirtualFile {
|
||||
// We only need to hold the handle lock while we read the current handle. If
|
||||
// another thread closes the file and recycles the slot for a different file,
|
||||
// we will notice that the handle we read is no longer valid and retry.
|
||||
let mut handle = *self.handle.read().unwrap();
|
||||
let mut handle = *self.handle.read().await;
|
||||
loop {
|
||||
// Check if the slot contains our File
|
||||
{
|
||||
let slot = &open_files.slots[handle.index];
|
||||
let slot_guard = slot.inner.read().unwrap();
|
||||
let slot_guard = slot.inner.read().await;
|
||||
if slot_guard.tag == handle.tag {
|
||||
if let Some(file) = &slot_guard.file {
|
||||
// Found a cached file descriptor.
|
||||
@@ -373,7 +373,7 @@ impl VirtualFile {
|
||||
// The slot didn't contain our File. We will have to open it ourselves,
|
||||
// but before that, grab a write lock on handle in the VirtualFile, so
|
||||
// that no other thread will try to concurrently open the same file.
|
||||
let handle_guard = self.handle.write().unwrap();
|
||||
let handle_guard = self.handle.write().await;
|
||||
|
||||
// If another thread changed the handle while we were not holding the lock,
|
||||
// then the handle might now be valid again. Loop back to retry.
|
||||
@@ -387,7 +387,7 @@ impl VirtualFile {
|
||||
|
||||
// We need to open the file ourselves. The handle in the VirtualFile is
|
||||
// now locked in write-mode. Find a free slot to put it in.
|
||||
let (handle, mut slot_guard) = open_files.find_victim_slot();
|
||||
let (handle, mut slot_guard) = open_files.find_victim_slot().await;
|
||||
|
||||
// Open the physical file
|
||||
let file = STORAGE_IO_TIME
|
||||
@@ -566,12 +566,19 @@ impl VirtualFile {
|
||||
impl Drop for VirtualFile {
|
||||
/// If a VirtualFile is dropped, close the underlying file if it was open.
|
||||
fn drop(&mut self) {
|
||||
let handle = self.handle.get_mut().unwrap();
|
||||
let handle = self.handle.get_mut();
|
||||
|
||||
// We could check with a read-lock first, to avoid waiting on an
|
||||
// unrelated I/O.
|
||||
// We don't have async drop so we cannot wait for the lock here.
|
||||
// Instead, do a best-effort attempt at closing the underlying
|
||||
// file descriptor by using `try_write`.
|
||||
// This best-effort attempt should be quite good though
|
||||
// as we have `&mut self` access. In other words, if the slot
|
||||
// is still occupied by our file, we should be the only ones
|
||||
// accessing it (and if it has been reassigned since, we don't
|
||||
// need to bother with dropping anyways).
|
||||
let slot = &get_open_files().slots[handle.index];
|
||||
let mut slot_guard = slot.inner.write().unwrap();
|
||||
let Ok(mut slot_guard) = slot.inner.try_write() else { return };
|
||||
|
||||
if slot_guard.tag == handle.tag {
|
||||
slot.recently_used.store(false, Ordering::Relaxed);
|
||||
// there is also operation "close-by-replace" for closes done on eviction for
|
||||
|
||||
Reference in New Issue
Block a user