diff --git a/Cargo.lock b/Cargo.lock index c6c1f4df2b..8e862bf05d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5158,7 +5158,7 @@ dependencies = [ [[package]] name = "tokio-epoll-uring" version = "0.1.0" -source = "git+https://github.com/neondatabase/tokio-epoll-uring.git?branch=main#53c832d322244f704c7612125496df5ac117ac39" +source = "git+https://github.com/neondatabase/tokio-epoll-uring.git?branch=main#2ae71cebd8f032ebfa987b713a77bc3d23aac57c" dependencies = [ "futures", "once_cell", @@ -5714,7 +5714,7 @@ dependencies = [ [[package]] name = "uring-common" version = "0.1.0" -source = "git+https://github.com/neondatabase/tokio-epoll-uring.git?branch=main#53c832d322244f704c7612125496df5ac117ac39" +source = "git+https://github.com/neondatabase/tokio-epoll-uring.git?branch=main#2ae71cebd8f032ebfa987b713a77bc3d23aac57c" dependencies = [ "io-uring", "libc", diff --git a/pageserver/Cargo.toml b/pageserver/Cargo.toml index 76dc6a5764..a83bf833a1 100644 --- a/pageserver/Cargo.toml +++ b/pageserver/Cargo.toml @@ -83,7 +83,6 @@ enum-map.workspace = true enumset.workspace = true strum.workspace = true strum_macros.workspace = true -# WIP PR: https://github.com/neondatabase/tokio-epoll-uring/pull/25 #tokio-epoll-uring = { path = "../../tokio-epoll-uring/tokio-epoll-uring" } tokio-epoll-uring = { git = "https://github.com/neondatabase/tokio-epoll-uring.git" , branch = "main" } diff --git a/pageserver/src/tenant/block_io.rs b/pageserver/src/tenant/block_io.rs index 38305e9f4e..8c9294bb99 100644 --- a/pageserver/src/tenant/block_io.rs +++ b/pageserver/src/tenant/block_io.rs @@ -39,6 +39,8 @@ pub enum BlockLease<'a> { EphemeralFileMutableTail(&'a [u8; PAGE_SZ]), #[cfg(test)] Arc(std::sync::Arc<[u8; PAGE_SZ]>), + #[cfg(test)] + Vec(Vec), } impl From> for BlockLease<'static> { @@ -63,6 +65,13 @@ impl<'a> Deref for BlockLease<'a> { BlockLease::EphemeralFileMutableTail(v) => v, #[cfg(test)] BlockLease::Arc(v) => v.deref(), + #[cfg(test)] + BlockLease::Vec(v) => { + let v: &Vec = v; + assert_eq!(v.len(), PAGE_SZ, "caller must ensure that v has PAGE_SZ"); + // Safety: see above assertion. + unsafe { &*(v.as_ptr() as *const [u8; PAGE_SZ]) } + } } } } @@ -176,7 +185,7 @@ impl FileBlockReader { ) -> Result, std::io::Error> { assert!(buf.len() == PAGE_SZ); self.file - .read_exact_at(buf, blkno as u64 * PAGE_SZ as u64) + .read_exact_at_page(buf, blkno as u64 * PAGE_SZ as u64) .await } /// Read a block. diff --git a/pageserver/src/tenant/ephemeral_file.rs b/pageserver/src/tenant/ephemeral_file.rs index 79741fe549..7bea496398 100644 --- a/pageserver/src/tenant/ephemeral_file.rs +++ b/pageserver/src/tenant/ephemeral_file.rs @@ -91,7 +91,7 @@ impl EphemeralFile { page_cache::ReadBufResult::NotFound(write_guard) => { let write_guard = self .file - .read_exact_at(write_guard, blknum as u64 * PAGE_SZ as u64) + .read_exact_at_page(write_guard, blknum as u64 * PAGE_SZ as u64) .await?; let read_guard = write_guard.mark_valid(); return Ok(BlockLease::PageReadGuard(read_guard)); diff --git a/pageserver/src/virtual_file.rs b/pageserver/src/virtual_file.rs index 5042aa5061..070a794372 100644 --- a/pageserver/src/virtual_file.rs +++ b/pageserver/src/virtual_file.rs @@ -21,6 +21,7 @@ use std::os::unix::fs::FileExt; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard}; use tokio::time::Instant; +use tokio_epoll_uring::IoBufMut; use utils::fs_ext; /// @@ -108,6 +109,37 @@ struct SlotInner { file: Option, } +/// Impl of [`tokio_epoll_uring::IoBuf`] and [`tokio_epoll_uring::IoBufMut`] for [`PageWriteGuard`]. +struct PageWriteGuardBuf { + page: PageWriteGuard<'static>, + init_up_to: usize, +} +// Safety: the [`PageWriteGuard`] gives us exclusive ownership of the page cache slot, +// and the location remains stable even if [`Self`] or the [`PageWriteGuard`] is moved. +unsafe impl tokio_epoll_uring::IoBuf for PageWriteGuardBuf { + fn stable_ptr(&self) -> *const u8 { + self.page.as_ptr() + } + fn bytes_init(&self) -> usize { + self.init_up_to + } + fn bytes_total(&self) -> usize { + self.page.len() + } +} +// Safety: see above, plus: the ownership of [`PageWriteGuard`] means exclusive access, +// hence it's safe to hand out the `stable_mut_ptr()`. +unsafe impl tokio_epoll_uring::IoBufMut for PageWriteGuardBuf { + fn stable_mut_ptr(&mut self) -> *mut u8 { + self.page.as_mut_ptr() + } + + unsafe fn set_init(&mut self, pos: usize) { + assert!(pos <= self.page.len()); + self.init_up_to = pos; + } +} + impl OpenFiles { /// Find a slot to use, evicting an existing file descriptor if needed. /// @@ -291,61 +323,6 @@ impl VirtualFile { Self::open_with_options_async(path, options).await } - /// Open a file with given options. - /// - /// Note: If any custom flags were set in 'open_options' through OpenOptionsExt, - /// they will be applied also when the file is subsequently re-opened, not only - /// on the first time. Make sure that's sane! - #[cfg(test)] - pub async fn open_with_options( - path: &Utf8Path, - open_options: &OpenOptions, - ) -> Result { - let path_str = path.to_string(); - let parts = path_str.split('/').collect::>(); - let tenant_id; - let timeline_id; - if parts.len() > 5 && parts[parts.len() - 5] == TENANTS_SEGMENT_NAME { - tenant_id = parts[parts.len() - 4].to_string(); - timeline_id = parts[parts.len() - 2].to_string(); - } else { - tenant_id = "*".to_string(); - timeline_id = "*".to_string(); - } - let (handle, mut slot_guard) = get_open_files().find_victim_slot().await; - - // NB: there is also StorageIoOperation::OpenAfterReplace which is for the case - // where our caller doesn't get to use the returned VirtualFile before its - // slot gets re-used by someone else. - let file = observe_duration!(StorageIoOperation::Open, open_options.open(path))?; - - // Strip all options other than read and write. - // - // It would perhaps be nicer to check just for the read and write flags - // explicitly, but OpenOptions doesn't contain any functions to read flags, - // only to set them. - let mut reopen_options = open_options.clone(); - reopen_options.create(false); - reopen_options.create_new(false); - reopen_options.truncate(false); - - let vfile = VirtualFile { - handle: RwLock::new(handle), - pos: 0, - path: path.to_path_buf(), - open_options: reopen_options, - tenant_id, - timeline_id, - }; - - // TODO: Under pressure, it's likely the slot will get re-used and - // the underlying file closed before they get around to using it. - // => https://github.com/neondatabase/neon/issues/6065 - slot_guard.file.replace(file); - - Ok(vfile) - } - /// Writes a file to the specified `final_path` in a crash safe fasion /// /// The file is first written to the specified tmp_path, and in a second @@ -424,8 +401,7 @@ impl VirtualFile { std::io::Error::new(std::io::ErrorKind::Other, system) } })?; - let file = File::from(file); - file + File::from(file) })); // Strip all options other than read and write. @@ -526,8 +502,7 @@ impl VirtualFile { std::io::Error::new(std::io::ErrorKind::Other, system) } })?; - let file = File::from(file); - file + File::from(file) }); // Store the File in the slot and update the handle in the VirtualFile @@ -575,66 +550,47 @@ impl VirtualFile { } // Copied from https://doc.rust-lang.org/1.72.0/src/std/os/unix/fs.rs.html#117-135 - pub async fn read_exact_at( + pub async fn read_exact_at(&self, buf: B, mut offset: u64) -> Result + where + B: IoBufMut + Send, + { + use tokio_epoll_uring::BoundedBuf; + let mut buf: tokio_epoll_uring::Slice = buf.slice_full(); + while buf.bytes_total() != 0 { + let res; + (buf, res) = self.read_at(buf, offset).await; + match res { + Ok(0) => break, + Ok(n) => { + buf = buf.slice(n..); + offset += n as u64; + } + Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => {} + Err(e) => return Err(e), + } + } + if !buf.is_empty() { + Err(std::io::Error::new( + std::io::ErrorKind::UnexpectedEof, + "failed to fill whole buffer", + )) + } else { + Ok(buf.into_inner()) + } + } + + /// Like [`Self::read_exact_at`] but for [`PageWriteGuard`]. + pub async fn read_exact_at_page( &self, page: PageWriteGuard<'static>, offset: u64, ) -> Result, Error> { - with_file!(self, StorageIoOperation::Read, |file_guard| { - self.read_exact_at0(file_guard, page, offset).await - }) - } - async fn read_exact_at0( - &self, - file_guard: FileGuard<'static>, - write_guard: PageWriteGuard<'static>, - offset: u64, - ) -> Result, Error> { - let system = tokio_epoll_uring::thread_local_system().await; - struct PageWriteGuardBuf { - buf: PageWriteGuard<'static>, - init_up_to: usize, - } - unsafe impl tokio_epoll_uring::IoBuf for PageWriteGuardBuf { - fn stable_ptr(&self) -> *const u8 { - self.buf.as_ptr() - } - fn bytes_init(&self) -> usize { - self.init_up_to - } - fn bytes_total(&self) -> usize { - self.buf.len() - } - } - unsafe impl tokio_epoll_uring::IoBufMut for PageWriteGuardBuf { - fn stable_mut_ptr(&mut self) -> *mut u8 { - self.buf.as_mut_ptr() - } - - unsafe fn set_init(&mut self, pos: usize) { - assert!(pos <= self.buf.len()); - self.init_up_to = pos; - } - } let buf = PageWriteGuardBuf { - buf: write_guard, + page, init_up_to: 0, }; - let owned_fd = unsafe { OwnedFd::from_raw_fd(file_guard.as_ref().as_raw_fd()) }; - let guard = scopeguard::guard(file_guard, |_| { - panic!("must not drop future while operation is ongoing (todo: pass file_guard to tokio_epoll_uring to aovid this)") - }); - let ((owned_fd, buf), res) = system.read(owned_fd, offset, buf).await; - let _ = OwnedFd::into_raw_fd(owned_fd); - let _ = scopeguard::ScopeGuard::into_inner(guard); - let PageWriteGuardBuf { - buf: write_guard, - init_up_to, - } = buf; - if let Ok(num_read) = res { - assert!(init_up_to == num_read); // TODO need to deal with short reads here - } - res.map(|_| write_guard) + let res = self.read_exact_at(buf, offset).await; + res.map(|PageWriteGuardBuf { page, .. }| page) .map_err(|e| Error::new(ErrorKind::Other, e)) } @@ -685,11 +641,59 @@ impl VirtualFile { Ok(n) } + pub(crate) async fn read_at(&self, buf: B, offset: u64) -> (B, Result) + where + B: tokio_epoll_uring::BoundedBufMut + Send, + { + let file_guard = match self.lock_file().await { + Ok(file_guard) => file_guard, + Err(e) => return (buf, Err(e)), + }; + let (buf, result) = observe_duration!(StorageIoOperation::Read, { + self.read_at0(file_guard, buf, offset).await + }); + (buf, result) + } + async fn read_at0( + &self, + file_guard: FileGuard<'_>, + buf: B, + offset: u64, + ) -> (B, Result) + where + B: tokio_epoll_uring::BoundedBufMut + Send, + { + let system = tokio_epoll_uring::thread_local_system().await; + // SAFETY: when file_guard gets dropped, the raw fd becomes invalid or may get re-used + // while the io_uring operation is still executing. + // The `file_guard` could get dropped due to future cancellation-by-drop. + // We prevent this situation using the scopeguard: it will abort the process in such cases. + // Fixing this is subject of https://github.com/neondatabase/neon/pull/6101 + let owned_fd = unsafe { OwnedFd::from_raw_fd(file_guard.as_ref().as_raw_fd()) }; + let guard = scopeguard::guard(file_guard, |_| { + eprintln!("must not drop future while operation is ongoing (todo: pass file_guard to tokio_epoll_uring to aovid this)"); + std::process::abort(); + }); + let ((owned_fd, buf), res) = system.read(owned_fd, offset, buf).await; + let _ = OwnedFd::into_raw_fd(owned_fd); + let _ = scopeguard::ScopeGuard::into_inner(guard); + if let Ok(size) = res { + // TODO: don't use with_label_values on hot path + // https://github.com/neondatabase/neon/issues/6107 + STORAGE_IO_SIZE + .with_label_values(&["read", &self.tenant_id, &self.timeline_id]) + .add(size as i64); + } + (buf, res.map_err(|e| Error::new(ErrorKind::Other, e))) + } + async fn write_at(&self, buf: &[u8], offset: u64) -> Result { let result = with_file!(self, StorageIoOperation::Write, |file| file .as_ref() .write_at(buf, offset)); if let Ok(size) = result { + // TODO: don't use with_label_values on hot path + // https://github.com/neondatabase/neon/issues/6107 STORAGE_IO_SIZE .with_label_values(&["write", &self.tenant_id, &self.timeline_id]) .add(size as i64); @@ -717,16 +721,19 @@ impl VirtualFile { blknum: u32, ) -> Result, std::io::Error> { use crate::page_cache::PAGE_SZ; - let mut buf = [0; PAGE_SZ]; - self.read_exact_at(&mut buf, blknum as u64 * (PAGE_SZ as u64)) + let buf = vec![0; PAGE_SZ]; + let buf = self + .read_exact_at(buf, blknum as u64 * (PAGE_SZ as u64)) .await?; - Ok(std::sync::Arc::new(buf).into()) + Ok(crate::tenant::block_io::BlockLease::Vec(buf)) } async fn read_to_end(&mut self, buf: &mut Vec) -> Result<(), Error> { + let mut tmp = vec![0; 128]; loop { - let mut tmp = [0; 128]; - match self.read_at(&mut tmp, self.pos).await { + let res; + (tmp, res) = self.read_at(tmp, self.pos).await; + match res { Ok(0) => return Ok(()), Ok(n) => { self.pos += n as u64; @@ -850,10 +857,10 @@ mod tests { } impl MaybeVirtualFile { - async fn read_exact_at(&self, buf: &mut [u8], offset: u64) -> Result<(), Error> { + async fn read_exact_at(&self, mut buf: Vec, offset: u64) -> Result, Error> { match self { MaybeVirtualFile::VirtualFile(file) => file.read_exact_at(buf, offset).await, - MaybeVirtualFile::File(file) => file.read_exact_at(buf, offset), + MaybeVirtualFile::File(file) => file.read_exact_at(&mut buf, offset).map(|()| buf), } } async fn write_all_at(&self, buf: &[u8], offset: u64) -> Result<(), Error> { @@ -895,14 +902,15 @@ mod tests { // Helper function to slurp a portion of a file into a string async fn read_string_at(&mut self, pos: u64, len: usize) -> Result { - let mut buf = vec![0; len]; - self.read_exact_at(&mut buf, pos).await?; + let buf = vec![0; len]; + let buf = self.read_exact_at(buf, pos).await?; Ok(String::from_utf8(buf).unwrap()) } } #[tokio::test] - async fn test_virtual_files() -> Result<(), Error> { + async fn test_virtual_files() -> anyhow::Result<()> { + crate::tenant::harness::setup_logging(); // The real work is done in the test_files() helper function. This // allows us to run the same set of tests against a native File, and // VirtualFile. We trust the native Files and wouldn't need to test them, @@ -911,23 +919,35 @@ mod tests { // native files, you will run out of file descriptors if the ulimit // is low enough.) test_files("virtual_files", |path, open_options| async move { - let vf = VirtualFile::open_with_options(&path, &open_options).await?; + let vf = VirtualFile::open_with_options_async(&path, open_options).await?; Ok(MaybeVirtualFile::VirtualFile(vf)) }) .await } #[tokio::test] - async fn test_physical_files() -> Result<(), Error> { + async fn test_physical_files() -> anyhow::Result<()> { test_files("physical_files", |path, open_options| async move { - Ok(MaybeVirtualFile::File(open_options.open(path)?)) + Ok(MaybeVirtualFile::File({ + let system = tokio_epoll_uring::thread_local_system().await; + let owned_fd = system + .open(path, &open_options) + .await + .map_err(|e| match e { + tokio_epoll_uring::Error::Op(e) => e, + tokio_epoll_uring::Error::System(system) => { + std::io::Error::new(std::io::ErrorKind::Other, system) + } + })?; + File::from(owned_fd) + })) }) .await } - async fn test_files(testname: &str, openfunc: OF) -> Result<(), Error> + async fn test_files(testname: &str, openfunc: OF) -> anyhow::Result<()> where - OF: Fn(Utf8PathBuf, OpenOptions) -> FT, + OF: Fn(Utf8PathBuf, tokio_epoll_uring::ops::open_at::OpenOptions) -> FT, FT: Future>, { let testdir = crate::config::PageServerConf::test_repo_dir(testname); @@ -936,7 +956,7 @@ mod tests { let path_a = testdir.join("file_a"); let mut file_a = openfunc( path_a.clone(), - OpenOptions::new() + tokio_epoll_uring::ops::open_at::OpenOptions::new() .write(true) .create(true) .truncate(true) @@ -949,7 +969,13 @@ mod tests { let _ = file_a.read_string().await.unwrap_err(); // Close the file and re-open for reading - let mut file_a = openfunc(path_a, OpenOptions::new().read(true).to_owned()).await?; + let mut file_a = openfunc( + path_a, + tokio_epoll_uring::ops::open_at::OpenOptions::new() + .read(true) + .to_owned(), + ) + .await?; // cannot write to a file opened in read-only mode let _ = file_a.write_all(b"bar").await.unwrap_err(); @@ -986,7 +1012,7 @@ mod tests { let path_b = testdir.join("file_b"); let mut file_b = openfunc( path_b.clone(), - OpenOptions::new() + tokio_epoll_uring::ops::open_at::OpenOptions::new() .read(true) .write(true) .create(true) @@ -1007,8 +1033,13 @@ mod tests { let mut vfiles = Vec::new(); for _ in 0..100 { - let mut vfile = - openfunc(path_b.clone(), OpenOptions::new().read(true).to_owned()).await?; + let mut vfile = openfunc( + path_b.clone(), + tokio_epoll_uring::ops::open_at::OpenOptions::new() + .read(true) + .to_owned(), + ) + .await?; assert_eq!("FOOBAR", vfile.read_string().await?); vfiles.push(vfile); } @@ -1053,8 +1084,12 @@ mod tests { // Open the file many times. let mut files = Vec::new(); for _ in 0..VIRTUAL_FILES { - let f = VirtualFile::open_with_options(&test_file_path, OpenOptions::new().read(true)) - .await?; + let f = VirtualFile::open_with_options_async(&test_file_path, { + let mut options = tokio_epoll_uring::ops::open_at::OpenOptions::new(); + options.read(true); + options + }) + .await?; files.push(f); } let files = Arc::new(files); @@ -1069,11 +1104,11 @@ mod tests { for _threadno in 0..THREADS { let files = files.clone(); let hdl = rt.spawn(async move { - let mut buf = [0u8; SIZE]; + let mut buf = vec![0u8; SIZE]; let mut rng = rand::rngs::OsRng; for _ in 1..1000 { let f = &files[rng.gen_range(0..files.len())]; - f.read_exact_at(&mut buf, 0).await.unwrap(); + buf = f.read_exact_at(buf, 0).await.unwrap(); assert!(buf == SAMPLE); } });