From bb88e5bf57e2bb7f5bccb4517f24ce04f5063240 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Tue, 29 Aug 2023 09:53:01 +0000 Subject: [PATCH] try to convert to async, now lifetime errors because buffer lifetime is insufficient --- pageserver/src/virtual_file.rs | 30 +++++++++++++++++++++++++++--- 1 file changed, 27 insertions(+), 3 deletions(-) diff --git a/pageserver/src/virtual_file.rs b/pageserver/src/virtual_file.rs index 7b12cc89ac..1b28ef036c 100644 --- a/pageserver/src/virtual_file.rs +++ b/pageserver/src/virtual_file.rs @@ -15,8 +15,10 @@ use crate::metrics::{STORAGE_IO_SIZE, STORAGE_IO_TIME}; use std::fs::{self, File, OpenOptions}; use std::io::{Error, ErrorKind, Seek, SeekFrom, Write}; +use std::os::fd::AsRawFd; use std::os::unix::fs::FileExt; use std::path::{Path, PathBuf}; +use std::sync::{Arc, Mutex}; /// /// A virtual file descriptor. You can use this just like std::fs::File, but internally @@ -38,7 +40,7 @@ pub struct VirtualFile { /// Lazy handle to the global file descriptor cache. The slot that this points to /// might contain our File, or it may be empty, or it may contain a File that /// belongs to a different VirtualFile. - handle: File, + handle: Arc>>, // only transiently None /// Current file position pos: u64, @@ -108,7 +110,7 @@ impl VirtualFile { reopen_options.truncate(false); let vfile = VirtualFile { - handle: file, + handle: Arc::new(Mutex::new(Some(file))), pos: 0, path: path.to_path_buf(), tenant_id, @@ -138,7 +140,7 @@ impl VirtualFile { { return Ok(STORAGE_IO_TIME .with_label_values(&[op]) - .observe_closure_duration(|| func(&self.handle))); + .observe_closure_duration(|| func(&*self.handle.lock().unwrap().as_ref().unwrap()))); } pub fn remove(self) { @@ -210,6 +212,28 @@ impl VirtualFile { Ok(()) } + // Copied from https://doc.rust-lang.org/1.72.0/src/std/os/unix/fs.rs.html#117-135 + pub async fn read_exact_at_async( + &self, + mut buf: &mut [u8], + mut offset: u64, + ) -> Result<(), Error> { + let file = self.handle.lock().unwrap().take().unwrap(); + let mut put_back = false; + scopeguard::defer! { + if !put_back { + panic!("mut put self.handle back") + } + }; + let res = tokio::task::spawn_blocking(|| file.read_exact_at(buf, offset)) + .await + .expect("spawn_blocking"); + let replaced = self.handle.lock().unwrap().replace(file); + assert!(replaced.is_none()); + put_back = true; + res + } + // Copied from https://doc.rust-lang.org/1.72.0/src/std/os/unix/fs.rs.html#219-235 pub fn write_all_at(&self, mut buf: &[u8], mut offset: u64) -> Result<(), Error> { while !buf.is_empty() {