From 0b8ff8dbe09356b208a3e6857840abd7f7139ea6 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Tue, 29 Aug 2023 16:42:14 +0000 Subject: [PATCH] Revert "switch back to spawn_blocking to make the comparison" This reverts commit 60971e282e18fdc094c0b034732e9b9a168aaa54. --- pageserver/src/virtual_file.rs | 48 ++++++++++++++++++++++++++++------ 1 file changed, 40 insertions(+), 8 deletions(-) diff --git a/pageserver/src/virtual_file.rs b/pageserver/src/virtual_file.rs index 6c7e174462..b16ba2b5be 100644 --- a/pageserver/src/virtual_file.rs +++ b/pageserver/src/virtual_file.rs @@ -283,7 +283,7 @@ impl VirtualFile { // Copied from https://doc.rust-lang.org/1.72.0/src/std/os/unix/fs.rs.html#117-135 pub async fn read_exact_at_async( &self, - mut buf: crate::buffer_pool::Buffer, + mut write_guard: crate::buffer_pool::Buffer, offset: u64, ) -> Result { let file = self.handle.lock().unwrap().take().unwrap(); @@ -294,16 +294,48 @@ impl VirtualFile { panic!("mut put self.handle back") } }; - let ((file, buf), res) = tokio::task::spawn_blocking(move || { - let res = file.read_exact_at(buf.as_mut(), offset); - ((file, buf), res) - }) - .await - .expect("spawn_blocking"); + let system = tokio_epoll_uring::thread_local_system().await; + struct PageWriteGuardBuf { + buf: crate::buffer_pool::Buffer, + init_up_to: usize, + } + unsafe impl tokio_epoll_uring::IoBuf for PageWriteGuardBuf { + fn stable_ptr(&self) -> *const u8 { + self.buf.as_ptr() + } + fn bytes_init(&self) -> usize { + self.init_up_to + } + fn bytes_total(&self) -> usize { + self.buf.len() + } + } + unsafe impl tokio_epoll_uring::IoBufMut for PageWriteGuardBuf { + fn stable_mut_ptr(&mut self) -> *mut u8 { + self.buf.as_mut_ptr() + } + + unsafe fn set_init(&mut self, pos: usize) { + assert!(pos <= self.buf.len()); + self.init_up_to = pos; + } + } + let buf = PageWriteGuardBuf { + buf: write_guard, + init_up_to: 0, + }; + let ((file, buf), res) = system.read(file.into(), offset, buf).await; + let PageWriteGuardBuf { + buf: write_guard, + init_up_to, + } = buf; + if let Ok(num_read) = res { + assert!(init_up_to <= num_read); + } let replaced = self.handle.lock().unwrap().replace(File::from(file)); assert!(replaced.is_none()); put_back.store(true, std::sync::atomic::Ordering::Relaxed); - res.map(|_| buf) + res.map(|_| write_guard) .map_err(|e| Error::new(ErrorKind::Other, e)) }