From 1e5533cdc980728176718cfd6d1b60c0cdc8275b Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Thu, 30 Nov 2023 11:58:03 +0000 Subject: [PATCH] Revert "pull in WIP: integrate tokio-epoll-uring #5824" This reverts commit cfeeda4d51fb6c0bccd2691f81cfc7daa2950a02. --- Cargo.lock | 56 +-- pageserver/Cargo.toml | 3 - pageserver/src/tenant/block_io.rs | 14 +- pageserver/src/tenant/ephemeral_file.rs | 20 +- .../src/tenant/storage_layer/image_layer.rs | 10 +- pageserver/src/virtual_file.rs | 354 ++++++------------ 6 files changed, 136 insertions(+), 321 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index c03fa44ee5..d605169986 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2286,16 +2286,6 @@ dependencies = [ "windows-sys 0.48.0", ] -[[package]] -name = "io-uring" -version = "0.6.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "460648e47a07a43110fbfa2e0b14afb2be920093c31e5dccc50e49568e099762" -dependencies = [ - "bitflags", - "libc", -] - [[package]] name = "ipnet" version = "2.7.2" @@ -2395,9 +2385,9 @@ checksum = "830d08ce1d1d941e6b30645f1a0eb5643013d835ce3779a5fc208261dbe10f55" [[package]] name = "libc" -version = "0.2.147" +version = "0.2.144" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b4668fb0ea861c1df094127ac5f1da3409a82116a4ba74fca2e58ef927159bb3" +checksum = "2b00cc1c228a6782d0f076e7b232802e0c5689d41bb5df366f2a6b6621cfdfe1" [[package]] name = "libloading" @@ -2992,7 +2982,6 @@ dependencies = [ "tenant_size_model", "thiserror", "tokio", - "tokio-epoll-uring", "tokio-io-timeout", "tokio-postgres", "tokio-tar", @@ -4800,18 +4789,18 @@ dependencies = [ [[package]] name = "thiserror" -version = "1.0.47" +version = "1.0.40" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "97a802ec30afc17eee47b2855fc72e0c4cd62be9b4efe6591edde0ec5bd68d8f" +checksum = "978c9a314bd8dc99be594bc3c175faaa9794be04a5a5e153caba6915336cebac" dependencies = [ "thiserror-impl", ] [[package]] name = "thiserror-impl" -version = "1.0.47" +version = "1.0.40" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6bb623b56e39ab7dcd4b1b98bb6c8f8d907ed255b18de254088016b27a8ee19b" +checksum = "f9456a42c5b0d803c8cd86e73dd7cc9edd429499f37a3550d286d5e86720569f" dependencies = [ "proc-macro2", "quote", @@ -4899,38 +4888,22 @@ dependencies = [ [[package]] name = "tokio" -version = "1.32.0" +version = "1.28.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "17ed6077ed6cd6c74735e21f37eb16dc3935f96878b1fe961074089cc80893f9" +checksum = "0aa32867d44e6f2ce3385e89dceb990188b8bb0fb25b0cf576647a6f98ac5105" dependencies = [ - "backtrace", + "autocfg", "bytes", "libc", "mio", "num_cpus", - "parking_lot 0.12.1", "pin-project-lite", "signal-hook-registry", - "socket2 0.5.3", + "socket2 0.4.9", "tokio-macros", "windows-sys 0.48.0", ] -[[package]] -name = "tokio-epoll-uring" -version = "0.1.0" -source = "git+https://github.com/neondatabase/tokio-epoll-uring.git?branch=main#53c832d322244f704c7612125496df5ac117ac39" -dependencies = [ - "futures", - "once_cell", - "scopeguard", - "thiserror", - "tokio", - "tokio-util", - "tracing", - "uring-common", -] - [[package]] name = "tokio-io-timeout" version = "1.2.0" @@ -5463,15 +5436,6 @@ dependencies = [ "webpki-roots 0.23.1", ] -[[package]] -name = "uring-common" -version = "0.1.0" -source = "git+https://github.com/neondatabase/tokio-epoll-uring.git?branch=main#53c832d322244f704c7612125496df5ac117ac39" -dependencies = [ - "io-uring", - "libc", -] - [[package]] name = "url" version = "2.3.1" diff --git a/pageserver/Cargo.toml b/pageserver/Cargo.toml index c5b55c4b9d..3eb01003df 100644 --- a/pageserver/Cargo.toml +++ b/pageserver/Cargo.toml @@ -82,9 +82,6 @@ enum-map.workspace = true enumset.workspace = true strum.workspace = true strum_macros.workspace = true -# WIP PR: https://github.com/neondatabase/tokio-epoll-uring/pull/25 -#tokio-epoll-uring = { path = "../../tokio-epoll-uring/tokio-epoll-uring" } -tokio-epoll-uring = { git = "https://github.com/neondatabase/tokio-epoll-uring.git" , branch = "main" } [dev-dependencies] criterion.workspace = true diff --git a/pageserver/src/tenant/block_io.rs b/pageserver/src/tenant/block_io.rs index 38305e9f4e..0617017528 100644 --- a/pageserver/src/tenant/block_io.rs +++ b/pageserver/src/tenant/block_io.rs @@ -5,10 +5,10 @@ use super::ephemeral_file::EphemeralFile; use super::storage_layer::delta_layer::{Adapter, DeltaLayerInner}; use crate::context::RequestContext; -use crate::page_cache::{self, PageReadGuard, PageWriteGuard, ReadBufResult, PAGE_SZ}; +use crate::page_cache::{self, PageReadGuard, ReadBufResult, PAGE_SZ}; use crate::virtual_file::VirtualFile; use bytes::Bytes; -use std::ops::Deref; +use std::ops::{Deref, DerefMut}; /// This is implemented by anything that can read 8 kB (PAGE_SZ) /// blocks, using the page cache @@ -169,11 +169,7 @@ impl FileBlockReader { } /// Read a page from the underlying file into given buffer. - async fn fill_buffer( - &self, - buf: PageWriteGuard<'static>, - blkno: u32, - ) -> Result, std::io::Error> { + async fn fill_buffer(&self, buf: &mut [u8], blkno: u32) -> Result<(), std::io::Error> { assert!(buf.len() == PAGE_SZ); self.file .read_exact_at(buf, blkno as u64 * PAGE_SZ as u64) @@ -200,9 +196,9 @@ impl FileBlockReader { ) })? { ReadBufResult::Found(guard) => Ok(guard.into()), - ReadBufResult::NotFound(write_guard) => { + ReadBufResult::NotFound(mut write_guard) => { // Read the page from disk into the buffer - let write_guard = self.fill_buffer(write_guard, blknum).await?; + self.fill_buffer(write_guard.deref_mut(), blknum).await?; Ok(write_guard.mark_valid().into()) } } diff --git a/pageserver/src/tenant/ephemeral_file.rs b/pageserver/src/tenant/ephemeral_file.rs index 35b3a0c701..9a06d9df61 100644 --- a/pageserver/src/tenant/ephemeral_file.rs +++ b/pageserver/src/tenant/ephemeral_file.rs @@ -8,6 +8,7 @@ use crate::tenant::block_io::{BlockCursor, BlockLease, BlockReader}; use crate::virtual_file::VirtualFile; use camino::Utf8PathBuf; use std::cmp::min; +use std::fs::OpenOptions; use std::io::{self, ErrorKind}; use std::ops::DerefMut; use std::sync::atomic::AtomicU64; @@ -43,11 +44,11 @@ impl EphemeralFile { "ephemeral-{filename_disambiguator}" ))); - let file = { - let mut options = tokio_epoll_uring::ops::open_at::OpenOptions::new(); - options.read(true).write(true).create(true); - VirtualFile::open_with_options_async(&filename, options).await? - }; + let file = VirtualFile::open_with_options( + &filename, + OpenOptions::new().read(true).write(true).create(true), + ) + .await?; Ok(EphemeralFile { page_cache_file_id: page_cache::next_file_id(), @@ -87,10 +88,11 @@ impl EphemeralFile { page_cache::ReadBufResult::Found(guard) => { return Ok(BlockLease::PageReadGuard(guard)) } - page_cache::ReadBufResult::NotFound(write_guard) => { - let write_guard = self - .file - .read_exact_at(write_guard, blknum as u64 * PAGE_SZ as u64) + page_cache::ReadBufResult::NotFound(mut write_guard) => { + let buf: &mut [u8] = write_guard.deref_mut(); + debug_assert_eq!(buf.len(), PAGE_SZ); + self.file + .read_exact_at(&mut buf[..], blknum as u64 * PAGE_SZ as u64) .await?; let read_guard = write_guard.mark_valid(); return Ok(BlockLease::PageReadGuard(read_guard)); diff --git a/pageserver/src/tenant/storage_layer/image_layer.rs b/pageserver/src/tenant/storage_layer/image_layer.rs index 2394c0a25d..2f0291f58e 100644 --- a/pageserver/src/tenant/storage_layer/image_layer.rs +++ b/pageserver/src/tenant/storage_layer/image_layer.rs @@ -487,11 +487,11 @@ impl ImageLayerWriterInner { }, ); info!("new image layer {path}"); - let mut file = { - let mut options = tokio_epoll_uring::ops::open_at::OpenOptions::new(); - options.write(true).create_new(true); - VirtualFile::open_with_options_async(&path, options).await? - }; + let mut file = VirtualFile::open_with_options( + &path, + std::fs::OpenOptions::new().write(true).create_new(true), + ) + .await?; // make room for the header block file.seek(SeekFrom::Start(PAGE_SZ as u64)).await?; let blob_writer = BlobWriter::new(file, PAGE_SZ as u64); diff --git a/pageserver/src/virtual_file.rs b/pageserver/src/virtual_file.rs index fad0430650..b58b883ab6 100644 --- a/pageserver/src/virtual_file.rs +++ b/pageserver/src/virtual_file.rs @@ -11,16 +11,14 @@ //! src/backend/storage/file/fd.c //! use crate::metrics::{StorageIoOperation, STORAGE_IO_SIZE, STORAGE_IO_TIME_METRIC}; -use crate::page_cache::PageWriteGuard; +use crate::tenant::TENANTS_SEGMENT_NAME; use camino::{Utf8Path, Utf8PathBuf}; use once_cell::sync::OnceCell; -use std::fs::{self, File}; +use std::fs::{self, File, OpenOptions}; use std::io::{Error, ErrorKind, Seek, SeekFrom}; -use std::os::fd::{AsRawFd, FromRawFd, IntoRawFd, OwnedFd}; use std::os::unix::fs::FileExt; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; -use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard}; -use tokio::time::Instant; +use std::sync::{RwLock, RwLockWriteGuard}; use utils::fs_ext; /// @@ -55,7 +53,7 @@ pub struct VirtualFile { /// opened, in the VirtualFile::create() function, and strip the flag before /// storing it here. pub path: Utf8PathBuf, - open_options: tokio_epoll_uring::ops::open_at::OpenOptions, + open_options: OpenOptions, // These are strings becase we only use them for metrics, and those expect strings. // It makes no sense for us to constantly turn the `TimelineId` and `TenantId` into @@ -113,7 +111,7 @@ impl OpenFiles { /// /// On return, we hold a lock on the slot, and its 'tag' has been updated /// recently_used has been set. It's all ready for reuse. - async fn find_victim_slot(&self) -> (SlotHandle, RwLockWriteGuard) { + fn find_victim_slot(&self) -> (SlotHandle, RwLockWriteGuard) { // // Run the clock algorithm to find a slot to replace. // @@ -145,7 +143,7 @@ impl OpenFiles { } retries += 1; } else { - slot_guard = slot.inner.write().await; + slot_guard = slot.inner.write().unwrap(); index = next; break; } @@ -156,7 +154,7 @@ impl OpenFiles { // old file. // if let Some(old_file) = slot_guard.file.take() { - // the normal path of dropping VirtualFile uses `Close`, use `CloseByReplace` here to + // the normal path of dropping VirtualFile uses "close", use "close-by-replace" here to // distinguish the two. STORAGE_IO_TIME_METRIC .get(StorageIoOperation::CloseByReplace) @@ -252,43 +250,20 @@ impl MaybeFatalIo for std::io::Result { } } -/// Observe duration for the given storage I/O operation -/// -/// Unlike `observe_closure_duration`, this supports async, -/// where "support" means that we measure wall clock time. -macro_rules! observe_duration { - ($op:expr, $($body:tt)*) => {{ - let instant = Instant::now(); - let result = $($body)*; - let elapsed = instant.elapsed().as_secs_f64(); - STORAGE_IO_TIME_METRIC - .get($op) - .observe(elapsed); - result - }} -} - -macro_rules! with_file { - ($this:expr, $op:expr, | $ident:ident | $($body:tt)*) => {{ - let $ident = $this.lock_file().await?; - observe_duration!($op, $($body)*) - }}; -} - impl VirtualFile { /// Open a file in read-only mode. Like File::open. pub async fn open(path: &Utf8Path) -> Result { - let mut options = tokio_epoll_uring::ops::open_at::OpenOptions::new(); - options.read(true); - Self::open_with_options_async(path, options).await + Self::open_with_options(path, OpenOptions::new().read(true)).await } /// Create a new file for writing. If the file exists, it will be truncated. /// Like File::create. pub async fn create(path: &Utf8Path) -> Result { - let mut options = tokio_epoll_uring::ops::open_at::OpenOptions::new(); - options.write(true).create(true).truncate(true); - Self::open_with_options_async(path, options).await + Self::open_with_options( + path, + OpenOptions::new().write(true).create(true).truncate(true), + ) + .await } /// Open a file with given options. @@ -296,7 +271,6 @@ impl VirtualFile { /// Note: If any custom flags were set in 'open_options' through OpenOptionsExt, /// they will be applied also when the file is subsequently re-opened, not only /// on the first time. Make sure that's sane! - #[cfg(test)] pub async fn open_with_options( path: &Utf8Path, open_options: &OpenOptions, @@ -312,9 +286,11 @@ impl VirtualFile { tenant_id = "*".to_string(); timeline_id = "*".to_string(); } - let (handle, mut slot_guard) = get_open_files().find_victim_slot().await; + let (handle, mut slot_guard) = get_open_files().find_victim_slot(); - let file = observe_duration!(StorageIoOperation::Open, open_options.open(path))?; + let file = STORAGE_IO_TIME_METRIC + .get(StorageIoOperation::Open) + .observe_closure_duration(|| open_options.open(path))?; // Strip all options other than read and write. // @@ -357,15 +333,15 @@ impl VirtualFile { )); }; std::fs::remove_file(tmp_path).or_else(fs_ext::ignore_not_found)?; - let mut file = { - let mut options = tokio_epoll_uring::ops::open_at::OpenOptions::new(); - options + let mut file = Self::open_with_options( + tmp_path, + OpenOptions::new() .write(true) // Use `create_new` so that, if we race with ourselves or something else, // we bail out instead of causing damage. - .create_new(true); - Self::open_with_options_async(tmp_path, options).await? - }; + .create_new(true), + ) + .await?; file.write_all(content).await?; file.sync_all().await?; drop(file); // before the rename, that's important! @@ -376,94 +352,30 @@ impl VirtualFile { // the current `find_victim_slot` impl might pick the same slot for both // VirtualFile., and it eventually does a blocking write lock instead of // try_lock. - let final_parent_dirfd = { - let mut options = tokio_epoll_uring::ops::open_at::OpenOptions::new(); - options.read(true); - Self::open_with_options_async(final_path_parent, options).await? - }; + let final_parent_dirfd = + Self::open_with_options(final_path_parent, OpenOptions::new().read(true)).await?; final_parent_dirfd.sync_all().await?; Ok(()) } - /// Open a file with given options. - /// - /// Note: If any custom flags were set in 'open_options' through OpenOptionsExt, - /// they will be applied also when the file is subsequently re-opened, not only - /// on the first time. Make sure that's sane! - pub async fn open_with_options_async( - path: &Utf8Path, - open_options: tokio_epoll_uring::ops::open_at::OpenOptions, - ) -> Result { - let path_str = path.to_string(); - let parts = path_str.split('/').collect::>(); - let tenant_id; - let timeline_id; - if parts.len() > 5 && parts[parts.len() - 5] == "tenants" { - tenant_id = parts[parts.len() - 4].to_string(); - timeline_id = parts[parts.len() - 2].to_string(); - } else { - tenant_id = "*".to_string(); - timeline_id = "*".to_string(); - } - let (handle, mut slot_guard) = get_open_files().find_victim_slot().await; - - slot_guard.file = Some({ - let system = tokio_epoll_uring::thread_local_system().await; - let file: OwnedFd = system - .open(path, &open_options) - .await - .map_err(|e| match e { - tokio_epoll_uring::Error::Op(e) => e, - tokio_epoll_uring::Error::System(system) => { - std::io::Error::new(std::io::ErrorKind::Other, system) - } - })?; - let file = File::from(file); - file - }); - - // Strip all options other than read and write. - // - // It would perhaps be nicer to check just for the read and write flags - // explicitly, but OpenOptions doesn't contain any functions to read flags, - // only to set them. - let mut reopen_options = open_options; - reopen_options.create(false); - reopen_options.create_new(false); - reopen_options.truncate(false); - - let vfile = VirtualFile { - handle: RwLock::new(handle), - pos: 0, - path: path.to_path_buf(), - open_options: reopen_options, - tenant_id, - timeline_id, - }; - - Ok(vfile) - } - /// Call File::sync_all() on the underlying File. pub async fn sync_all(&self) -> Result<(), Error> { - with_file!(self, StorageIoOperation::Fsync, |file| file - .as_ref() - .sync_all()) + self.with_file(StorageIoOperation::Fsync, |file| file.sync_all()) + .await? } pub async fn metadata(&self) -> Result { - with_file!(self, StorageIoOperation::Metadata, |file| file - .as_ref() - .metadata()) + self.with_file(StorageIoOperation::Metadata, |file| file.metadata()) + .await? } - /// Helper function internal to `VirtualFile` that looks up the underlying File, - /// opens it and evicts some other File if necessary. The passed parameter is - /// assumed to be a function available for the physical `File`. - /// - /// We are doing it via a macro as Rust doesn't support async closures that - /// take on parameters with lifetimes. - async fn lock_file(&self) -> Result, Error> { + /// Helper function that looks up the underlying File for this VirtualFile, + /// opening it and evicting some other File if necessary. It calls 'func' + /// with the physical File. + async fn with_file(&self, op: StorageIoOperation, mut func: F) -> Result + where + F: FnMut(&File) -> R, + { let open_files = get_open_files(); let mut handle_guard = { @@ -473,23 +385,27 @@ impl VirtualFile { // We only need to hold the handle lock while we read the current handle. If // another thread closes the file and recycles the slot for a different file, // we will notice that the handle we read is no longer valid and retry. - let mut handle = *self.handle.read().await; + let mut handle = *self.handle.read().unwrap(); loop { // Check if the slot contains our File { let slot = &open_files.slots[handle.index]; - let slot_guard = slot.inner.read().await; - if slot_guard.tag == handle.tag && slot_guard.file.is_some() { - // Found a cached file descriptor. - slot.recently_used.store(true, Ordering::Relaxed); - return Ok(FileGuard { slot_guard }); + let slot_guard = slot.inner.read().unwrap(); + if slot_guard.tag == handle.tag { + if let Some(file) = &slot_guard.file { + // Found a cached file descriptor. + slot.recently_used.store(true, Ordering::Relaxed); + return Ok(STORAGE_IO_TIME_METRIC + .get(op) + .observe_closure_duration(|| func(file))); + } } } // The slot didn't contain our File. We will have to open it ourselves, // but before that, grab a write lock on handle in the VirtualFile, so // that no other thread will try to concurrently open the same file. - let handle_guard = self.handle.write().await; + let handle_guard = self.handle.write().unwrap(); // If another thread changed the handle while we were not holding the lock, // then the handle might now be valid again. Loop back to retry. @@ -503,23 +419,17 @@ impl VirtualFile { // We need to open the file ourselves. The handle in the VirtualFile is // now locked in write-mode. Find a free slot to put it in. - let (handle, mut slot_guard) = open_files.find_victim_slot().await; + let (handle, mut slot_guard) = open_files.find_victim_slot(); // Open the physical file - let file = { - let system = tokio_epoll_uring::thread_local_system().await; - let file: OwnedFd = system - .open(&self.path, &self.open_options) - .await - .map_err(|e| match e { - tokio_epoll_uring::Error::Op(e) => e, - tokio_epoll_uring::Error::System(system) => { - std::io::Error::new(std::io::ErrorKind::Other, system) - } - })?; - let file = File::from(file); - file - }; + let file = STORAGE_IO_TIME_METRIC + .get(StorageIoOperation::Open) + .observe_closure_duration(|| self.open_options.open(&self.path))?; + + // Perform the requested operation on it + let result = STORAGE_IO_TIME_METRIC + .get(op) + .observe_closure_duration(|| func(&file)); // Store the File in the slot and update the handle in the VirtualFile // to point to it. @@ -527,9 +437,7 @@ impl VirtualFile { *handle_guard = handle; - return Ok(FileGuard { - slot_guard: slot_guard.downgrade(), - }); + Ok(result) } pub fn remove(self) { @@ -544,9 +452,11 @@ impl VirtualFile { self.pos = offset; } SeekFrom::End(offset) => { - self.pos = with_file!(self, StorageIoOperation::Seek, |file| file - .as_ref() - .seek(SeekFrom::End(offset)))? + self.pos = self + .with_file(StorageIoOperation::Seek, |mut file| { + file.seek(SeekFrom::End(offset)) + }) + .await?? } SeekFrom::Current(offset) => { let pos = self.pos as i128 + offset as i128; @@ -566,59 +476,24 @@ impl VirtualFile { } // Copied from https://doc.rust-lang.org/1.72.0/src/std/os/unix/fs.rs.html#117-135 - pub async fn read_exact_at( - &self, - write_guard: PageWriteGuard<'static>, - offset: u64, - ) -> Result, Error> { - let file_guard: FileGuard<'static> = self.lock_file().await?; - - let system = tokio_epoll_uring::thread_local_system().await; - struct PageWriteGuardBuf { - buf: PageWriteGuard<'static>, - init_up_to: usize, - } - unsafe impl tokio_epoll_uring::IoBuf for PageWriteGuardBuf { - fn stable_ptr(&self) -> *const u8 { - self.buf.as_ptr() - } - fn bytes_init(&self) -> usize { - self.init_up_to - } - fn bytes_total(&self) -> usize { - self.buf.len() + pub async fn read_exact_at(&self, mut buf: &mut [u8], mut offset: u64) -> Result<(), Error> { + while !buf.is_empty() { + match self.read_at(buf, offset).await { + Ok(0) => { + return Err(Error::new( + std::io::ErrorKind::UnexpectedEof, + "failed to fill whole buffer", + )) + } + Ok(n) => { + buf = &mut buf[n..]; + offset += n as u64; + } + Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => {} + Err(e) => return Err(e), } } - unsafe impl tokio_epoll_uring::IoBufMut for PageWriteGuardBuf { - fn stable_mut_ptr(&mut self) -> *mut u8 { - self.buf.as_mut_ptr() - } - - unsafe fn set_init(&mut self, pos: usize) { - assert!(pos <= self.buf.len()); - self.init_up_to = pos; - } - } - let buf = PageWriteGuardBuf { - buf: write_guard, - init_up_to: 0, - }; - let owned_fd = unsafe { OwnedFd::from_raw_fd(file_guard.as_ref().as_raw_fd()) }; - let guard = scopeguard::guard(file_guard, |_| { - panic!("must not drop future while operation is ongoing (todo: pass file_guard to tokio_epoll_uring to aovid this)") - }); - let ((owned_fd, buf), res) = system.read(owned_fd, offset, buf).await; - let _ = OwnedFd::into_raw_fd(owned_fd); - let _ = scopeguard::ScopeGuard::into_inner(guard); - let PageWriteGuardBuf { - buf: write_guard, - init_up_to, - } = buf; - if let Ok(num_read) = res { - assert!(init_up_to == num_read); // TODO need to deal with short reads here - } - res.map(|_| write_guard) - .map_err(|e| Error::new(ErrorKind::Other, e)) + Ok(()) } // Copied from https://doc.rust-lang.org/1.72.0/src/std/os/unix/fs.rs.html#219-235 @@ -668,10 +543,22 @@ impl VirtualFile { Ok(n) } + pub async fn read_at(&self, buf: &mut [u8], offset: u64) -> Result { + let result = self + .with_file(StorageIoOperation::Read, |file| file.read_at(buf, offset)) + .await?; + if let Ok(size) = result { + STORAGE_IO_SIZE + .with_label_values(&["read", &self.tenant_id, &self.timeline_id]) + .add(size as i64); + } + result + } + async fn write_at(&self, buf: &[u8], offset: u64) -> Result { - let result = with_file!(self, StorageIoOperation::Write, |file| file - .as_ref() - .write_at(buf, offset)); + let result = self + .with_file(StorageIoOperation::Write, |file| file.write_at(buf, offset)) + .await?; if let Ok(size) = result { STORAGE_IO_SIZE .with_label_values(&["write", &self.tenant_id, &self.timeline_id]) @@ -681,18 +568,6 @@ impl VirtualFile { } } -struct FileGuard<'a> { - slot_guard: RwLockReadGuard<'a, SlotInner>, -} - -impl<'a> AsRef for FileGuard<'a> { - fn as_ref(&self) -> &File { - // This unwrap is safe because we only create `FileGuard`s - // if we know that the file is Some. - self.slot_guard.file.as_ref().unwrap() - } -} - #[cfg(test)] impl VirtualFile { pub(crate) async fn read_blk( @@ -725,39 +600,20 @@ impl VirtualFile { impl Drop for VirtualFile { /// If a VirtualFile is dropped, close the underlying file if it was open. fn drop(&mut self) { - let handle = self.handle.get_mut(); + let handle = self.handle.get_mut().unwrap(); - fn clean_slot(slot: &Slot, mut slot_guard: RwLockWriteGuard<'_, SlotInner>, tag: u64) { - if slot_guard.tag == tag { - slot.recently_used.store(false, Ordering::Relaxed); - // there is also the `CloseByReplace` operation for closes done on eviction for - // comparison. - STORAGE_IO_TIME_METRIC - .get(StorageIoOperation::Close) - .observe_closure_duration(|| drop(slot_guard.file.take())); - } - } - - // We don't have async drop so we cannot directly await the lock here. - // Instead, first do a best-effort attempt at closing the underlying - // file descriptor by using `try_write`, and if that fails, spawn - // a tokio task to do it asynchronously: we just want it to be - // cleaned up eventually. - // Most of the time, the `try_lock` should succeed though, - // as we have `&mut self` access. In other words, if the slot - // is still occupied by our file, there should be no access from - // other I/O operations; the only other possible place to lock - // the slot is the lock algorithm looking for free slots. + // We could check with a read-lock first, to avoid waiting on an + // unrelated I/O. let slot = &get_open_files().slots[handle.index]; - if let Ok(slot_guard) = slot.inner.try_write() { - clean_slot(slot, slot_guard, handle.tag); - } else { - let tag = handle.tag; - tokio::spawn(async move { - let slot_guard = slot.inner.write().await; - clean_slot(slot, slot_guard, tag); - }); - }; + let mut slot_guard = slot.inner.write().unwrap(); + if slot_guard.tag == handle.tag { + slot.recently_used.store(false, Ordering::Relaxed); + // there is also operation "close-by-replace" for closes done on eviction for + // comparison. + STORAGE_IO_TIME_METRIC + .get(StorageIoOperation::Close) + .observe_closure_duration(|| drop(slot_guard.file.take())); + } } }