From a5b6e32b01629ecaf1239bf3421d10cdf13bffec Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Tue, 29 Aug 2023 09:59:42 +0000 Subject: [PATCH] PoC using spawn_blocking --- pageserver/src/tenant/ephemeral_file.rs | 9 +++++---- pageserver/src/virtual_file.rs | 27 +++++++++++++++---------- 2 files changed, 21 insertions(+), 15 deletions(-) diff --git a/pageserver/src/tenant/ephemeral_file.rs b/pageserver/src/tenant/ephemeral_file.rs index 02ef7166e5..de0c8c4a36 100644 --- a/pageserver/src/tenant/ephemeral_file.rs +++ b/pageserver/src/tenant/ephemeral_file.rs @@ -2,7 +2,7 @@ //! used to keep in-memory layers spilled on disk. use crate::config::PageServerConf; -use crate::page_cache::{self, PAGE_SZ}; +use crate::page_cache::{self, PAGE_SZ, PageWriteGuard}; use crate::tenant::block_io::{BlockCursor, BlockLease, BlockReader}; use crate::virtual_file::VirtualFile; use std::cmp::min; @@ -83,11 +83,12 @@ impl EphemeralFile { page_cache::ReadBufResult::Found(guard) => { return Ok(BlockLease::PageReadGuard(guard)) } - page_cache::ReadBufResult::NotFound(mut write_guard) => { + page_cache::ReadBufResult::NotFound(write_guard) => { + let mut write_guard: PageWriteGuard<'static> = write_guard; let buf: &mut [u8] = write_guard.deref_mut(); debug_assert_eq!(buf.len(), PAGE_SZ); - self.file - .read_exact_at(&mut buf[..], blknum as u64 * PAGE_SZ as u64)?; + let mut write_guard = self.file + .read_exact_at_async(write_guard, blknum as u64 * PAGE_SZ as u64).await?; write_guard.mark_valid(); // Swap for read lock diff --git a/pageserver/src/virtual_file.rs b/pageserver/src/virtual_file.rs index 1b28ef036c..197e13d19a 100644 --- a/pageserver/src/virtual_file.rs +++ b/pageserver/src/virtual_file.rs @@ -11,13 +11,14 @@ //! src/backend/storage/file/fd.c //! use crate::metrics::{STORAGE_IO_SIZE, STORAGE_IO_TIME}; +use crate::page_cache::PageWriteGuard; use std::fs::{self, File, OpenOptions}; use std::io::{Error, ErrorKind, Seek, SeekFrom, Write}; -use std::os::fd::AsRawFd; use std::os::unix::fs::FileExt; use std::path::{Path, PathBuf}; +use std::sync::atomic::AtomicBool; use std::sync::{Arc, Mutex}; /// @@ -215,23 +216,27 @@ impl VirtualFile { // Copied from https://doc.rust-lang.org/1.72.0/src/std/os/unix/fs.rs.html#117-135 pub async fn read_exact_at_async( &self, - mut buf: &mut [u8], - mut offset: u64, - ) -> Result<(), Error> { + mut write_guard: PageWriteGuard<'static>, + offset: u64, + ) -> Result, Error> { let file = self.handle.lock().unwrap().take().unwrap(); - let mut put_back = false; + let put_back = AtomicBool::new(false); + let put_back_ref = &put_back; scopeguard::defer! { - if !put_back { + if !put_back_ref.load(std::sync::atomic::Ordering::Relaxed) { panic!("mut put self.handle back") } }; - let res = tokio::task::spawn_blocking(|| file.read_exact_at(buf, offset)) - .await - .expect("spawn_blocking"); + let ((file, write_guard), res) = tokio::task::spawn_blocking(move || { + let res = file.read_exact_at(write_guard.as_mut(), offset); + ((file, write_guard), res) + }) + .await + .expect("spawn_blocking"); let replaced = self.handle.lock().unwrap().replace(file); assert!(replaced.is_none()); - put_back = true; - res + put_back.store(true, std::sync::atomic::Ordering::Relaxed); + res.map(|()| write_guard) } // Copied from https://doc.rust-lang.org/1.72.0/src/std/os/unix/fs.rs.html#219-235