diff --git a/Cargo.lock b/Cargo.lock index 512145f6c8..fc8c652031 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2520,6 +2520,16 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "io-uring" +version = "0.6.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "460648e47a07a43110fbfa2e0b14afb2be920093c31e5dccc50e49568e099762" +dependencies = [ + "bitflags 1.3.2", + "libc", +] + [[package]] name = "ipnet" version = "2.9.0" @@ -3315,6 +3325,7 @@ dependencies = [ "tenant_size_model", "thiserror", "tokio", + "tokio-epoll-uring", "tokio-io-timeout", "tokio-postgres", "tokio-stream", @@ -5334,18 +5345,18 @@ dependencies = [ [[package]] name = "thiserror" -version = "1.0.40" +version = "1.0.47" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "978c9a314bd8dc99be594bc3c175faaa9794be04a5a5e153caba6915336cebac" +checksum = "97a802ec30afc17eee47b2855fc72e0c4cd62be9b4efe6591edde0ec5bd68d8f" dependencies = [ "thiserror-impl", ] [[package]] name = "thiserror-impl" -version = "1.0.40" +version = "1.0.47" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f9456a42c5b0d803c8cd86e73dd7cc9edd429499f37a3550d286d5e86720569f" +checksum = "6bb623b56e39ab7dcd4b1b98bb6c8f8d907ed255b18de254088016b27a8ee19b" dependencies = [ "proc-macro2", "quote", @@ -5469,6 +5480,21 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "tokio-epoll-uring" +version = "0.1.0" +source = "git+https://github.com/neondatabase/tokio-epoll-uring.git?branch=main#9c5ea716add1357dc8c180167e43a2ae596eba68" +dependencies = [ + "futures", + "once_cell", + "scopeguard", + "thiserror", + "tokio", + "tokio-util", + "tracing", + "uring-common", +] + [[package]] name = "tokio-io-timeout" version = "1.2.0" @@ -6020,6 +6046,15 @@ dependencies = [ "webpki-roots 0.23.1", ] +[[package]] +name = "uring-common" +version = "0.1.0" +source = "git+https://github.com/neondatabase/tokio-epoll-uring.git?branch=main#9c5ea716add1357dc8c180167e43a2ae596eba68" +dependencies = [ + "io-uring", + "libc", +] + [[package]] name = "url" version = "2.3.1" diff --git a/Cargo.toml b/Cargo.toml index 2d8fbaffa8..ddcdad91a6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -150,6 +150,8 @@ test-context = "0.1" thiserror = "1.0" tls-listener = { version = "0.7", features = ["rustls", "hyper-h1"] } tokio = { version = "1.17", features = ["macros"] } +#tokio-epoll-uring = { path = "../tokio-epoll-uring/tokio-epoll-uring" } +tokio-epoll-uring = { git = "https://github.com/neondatabase/tokio-epoll-uring.git" , branch = "main" } tokio-io-timeout = "1.2.0" tokio-postgres-rustls = "0.10.0" tokio-rustls = "0.24" diff --git a/pageserver/Cargo.toml b/pageserver/Cargo.toml index 980fbab22e..e44501d1ed 100644 --- a/pageserver/Cargo.toml +++ b/pageserver/Cargo.toml @@ -61,6 +61,7 @@ sync_wrapper.workspace = true tokio-tar.workspace = true thiserror.workspace = true tokio = { workspace = true, features = ["process", "sync", "fs", "rt", "io-util", "time"] } +tokio-epoll-uring.workspace = true tokio-io-timeout.workspace = true tokio-postgres.workspace = true tokio-stream.workspace = true diff --git a/pageserver/src/tenant/block_io.rs b/pageserver/src/tenant/block_io.rs index 0617017528..8c9294bb99 100644 --- a/pageserver/src/tenant/block_io.rs +++ b/pageserver/src/tenant/block_io.rs @@ -5,10 +5,10 @@ use super::ephemeral_file::EphemeralFile; use super::storage_layer::delta_layer::{Adapter, DeltaLayerInner}; use crate::context::RequestContext; -use crate::page_cache::{self, PageReadGuard, ReadBufResult, PAGE_SZ}; +use crate::page_cache::{self, PageReadGuard, PageWriteGuard, ReadBufResult, PAGE_SZ}; use crate::virtual_file::VirtualFile; use bytes::Bytes; -use std::ops::{Deref, DerefMut}; +use std::ops::Deref; /// This is implemented by anything that can read 8 kB (PAGE_SZ) /// blocks, using the page cache @@ -39,6 +39,8 @@ pub enum BlockLease<'a> { EphemeralFileMutableTail(&'a [u8; PAGE_SZ]), #[cfg(test)] Arc(std::sync::Arc<[u8; PAGE_SZ]>), + #[cfg(test)] + Vec(Vec), } impl From> for BlockLease<'static> { @@ -63,6 +65,13 @@ impl<'a> Deref for BlockLease<'a> { BlockLease::EphemeralFileMutableTail(v) => v, #[cfg(test)] BlockLease::Arc(v) => v.deref(), + #[cfg(test)] + BlockLease::Vec(v) => { + let v: &Vec = v; + assert_eq!(v.len(), PAGE_SZ, "caller must ensure that v has PAGE_SZ"); + // Safety: see above assertion. + unsafe { &*(v.as_ptr() as *const [u8; PAGE_SZ]) } + } } } } @@ -169,10 +178,14 @@ impl FileBlockReader { } /// Read a page from the underlying file into given buffer. - async fn fill_buffer(&self, buf: &mut [u8], blkno: u32) -> Result<(), std::io::Error> { + async fn fill_buffer( + &self, + buf: PageWriteGuard<'static>, + blkno: u32, + ) -> Result, std::io::Error> { assert!(buf.len() == PAGE_SZ); self.file - .read_exact_at(buf, blkno as u64 * PAGE_SZ as u64) + .read_exact_at_page(buf, blkno as u64 * PAGE_SZ as u64) .await } /// Read a block. @@ -196,9 +209,9 @@ impl FileBlockReader { ) })? { ReadBufResult::Found(guard) => Ok(guard.into()), - ReadBufResult::NotFound(mut write_guard) => { + ReadBufResult::NotFound(write_guard) => { // Read the page from disk into the buffer - self.fill_buffer(write_guard.deref_mut(), blknum).await?; + let write_guard = self.fill_buffer(write_guard, blknum).await?; Ok(write_guard.mark_valid().into()) } } diff --git a/pageserver/src/tenant/ephemeral_file.rs b/pageserver/src/tenant/ephemeral_file.rs index 9f7b0a7b78..6b8cd77d78 100644 --- a/pageserver/src/tenant/ephemeral_file.rs +++ b/pageserver/src/tenant/ephemeral_file.rs @@ -92,11 +92,10 @@ impl EphemeralFile { page_cache::ReadBufResult::Found(guard) => { return Ok(BlockLease::PageReadGuard(guard)) } - page_cache::ReadBufResult::NotFound(mut write_guard) => { - let buf: &mut [u8] = write_guard.deref_mut(); - debug_assert_eq!(buf.len(), PAGE_SZ); - self.file - .read_exact_at(&mut buf[..], blknum as u64 * PAGE_SZ as u64) + page_cache::ReadBufResult::NotFound(write_guard) => { + let write_guard = self + .file + .read_exact_at_page(write_guard, blknum as u64 * PAGE_SZ as u64) .await?; let read_guard = write_guard.mark_valid(); return Ok(BlockLease::PageReadGuard(read_guard)); diff --git a/pageserver/src/virtual_file.rs b/pageserver/src/virtual_file.rs index 057805b3a9..af757c385d 100644 --- a/pageserver/src/virtual_file.rs +++ b/pageserver/src/virtual_file.rs @@ -12,11 +12,13 @@ //! use crate::metrics::{StorageIoOperation, STORAGE_IO_SIZE, STORAGE_IO_TIME_METRIC}; +use crate::page_cache::PageWriteGuard; use crate::tenant::TENANTS_SEGMENT_NAME; use camino::{Utf8Path, Utf8PathBuf}; use once_cell::sync::OnceCell; use std::fs::{self, File}; use std::io::{Error, ErrorKind, Seek, SeekFrom}; +use tokio_epoll_uring::IoBufMut; use std::os::fd::{AsRawFd, FromRawFd, IntoRawFd, OwnedFd}; use std::os::unix::fs::FileExt; @@ -113,6 +115,37 @@ struct SlotInner { file: Option, } +/// Impl of [`tokio_epoll_uring::IoBuf`] and [`tokio_epoll_uring::IoBufMut`] for [`PageWriteGuard`]. +struct PageWriteGuardBuf { + page: PageWriteGuard<'static>, + init_up_to: usize, +} +// Safety: the [`PageWriteGuard`] gives us exclusive ownership of the page cache slot, +// and the location remains stable even if [`Self`] or the [`PageWriteGuard`] is moved. +unsafe impl tokio_epoll_uring::IoBuf for PageWriteGuardBuf { + fn stable_ptr(&self) -> *const u8 { + self.page.as_ptr() + } + fn bytes_init(&self) -> usize { + self.init_up_to + } + fn bytes_total(&self) -> usize { + self.page.len() + } +} +// Safety: see above, plus: the ownership of [`PageWriteGuard`] means exclusive access, +// hence it's safe to hand out the `stable_mut_ptr()`. +unsafe impl tokio_epoll_uring::IoBufMut for PageWriteGuardBuf { + fn stable_mut_ptr(&mut self) -> *mut u8 { + self.page.as_mut_ptr() + } + + unsafe fn set_init(&mut self, pos: usize) { + assert!(pos <= self.page.len()); + self.init_up_to = pos; + } +} + impl OpenFiles { /// Find a slot to use, evicting an existing file descriptor if needed. /// @@ -509,13 +542,19 @@ impl VirtualFile { } // Copied from https://doc.rust-lang.org/1.72.0/src/std/os/unix/fs.rs.html#117-135 - pub async fn read_exact_at(&self, mut buf: &mut [u8], mut offset: u64) -> std::io::Result<()> { - while !buf.is_empty() { - match self.read_at(buf, offset).await { + pub async fn read_exact_at(&self, buf: B, mut offset: u64) -> Result + where + B: IoBufMut + Send, + { + use tokio_epoll_uring::BoundedBuf; + let mut buf: tokio_epoll_uring::Slice = buf.slice_full(); + while buf.bytes_total() != 0 { + let res; + (buf, res) = self.read_at(buf, offset).await; + match res { Ok(0) => break, Ok(n) => { - let tmp = buf; - buf = &mut tmp[n..]; + buf = buf.slice(n..); offset += n as u64; } Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => {} @@ -528,10 +567,25 @@ impl VirtualFile { "failed to fill whole buffer", )) } else { - Ok(()) + Ok(buf.into_inner()) } } + /// Like [`Self::read_exact_at`] but for [`PageWriteGuard`]. + pub async fn read_exact_at_page( + &self, + page: PageWriteGuard<'static>, + offset: u64, + ) -> Result, Error> { + let buf = PageWriteGuardBuf { + page, + init_up_to: 0, + }; + let res = self.read_exact_at(buf, offset).await; + res.map(|PageWriteGuardBuf { page, .. }| page) + .map_err(|e| Error::new(ErrorKind::Other, e)) + } + // Copied from https://doc.rust-lang.org/1.72.0/src/std/os/unix/fs.rs.html#219-235 pub async fn write_all_at(&self, mut buf: &[u8], mut offset: u64) -> Result<(), Error> { while !buf.is_empty() { @@ -579,15 +633,41 @@ impl VirtualFile { Ok(n) } - pub async fn read_at(&self, buf: &mut [u8], offset: u64) -> Result { - let result = with_file!(self, StorageIoOperation::Read, |file_guard| file_guard - .with_std_file(|std_file| std_file.read_at(buf, offset))); + pub(crate) async fn read_at(&self, mut buf: B, offset: u64) -> (B, Result) + where + B: tokio_epoll_uring::BoundedBufMut + Send, + { + let (buf, result) = async move { + let file_guard = match self.lock_file().await { + Err(e) => return (buf, Err(e)), + Ok(file_guard) => file_guard, + }; + observe_duration!(StorageIoOperation::Read, { + // SAFETY: `dst` only lives at most as long as this match arm, during which buf remains valid memory. + let dst = unsafe { + std::slice::from_raw_parts_mut(buf.stable_mut_ptr(), buf.bytes_total()) + }; + let res = file_guard.with_std_file(|std_file| std_file.read_at(dst, offset)); + if let Ok(nbytes) = &res { + assert!(*nbytes <= buf.bytes_total()); + // SAFETY: see above assertion + unsafe { + buf.set_init(*nbytes); + } + } + #[allow(dropping_references)] + drop(dst); + drop(file_guard); + (buf, res) + }) + } + .await; if let Ok(size) = result { STORAGE_IO_SIZE .with_label_values(&["read", &self.tenant_id, &self.timeline_id]) .add(size as i64); } - result + (buf, result) } async fn write_at(&self, buf: &[u8], offset: u64) -> Result { @@ -651,16 +731,19 @@ impl VirtualFile { blknum: u32, ) -> Result, std::io::Error> { use crate::page_cache::PAGE_SZ; - let mut buf = [0; PAGE_SZ]; - self.read_exact_at(&mut buf, blknum as u64 * (PAGE_SZ as u64)) + let buf = vec![0; PAGE_SZ]; + let buf = self + .read_exact_at(buf, blknum as u64 * (PAGE_SZ as u64)) .await?; - Ok(std::sync::Arc::new(buf).into()) + Ok(crate::tenant::block_io::BlockLease::Vec(buf)) } async fn read_to_end(&mut self, buf: &mut Vec) -> Result<(), Error> { + let mut tmp = vec![0; 128]; loop { - let mut tmp = [0; 128]; - match self.read_at(&mut tmp, self.pos).await { + let res; + (tmp, res) = self.read_at(tmp, self.pos).await; + match res { Ok(0) => return Ok(()), Ok(n) => { self.pos += n as u64; @@ -784,10 +867,10 @@ mod tests { } impl MaybeVirtualFile { - async fn read_exact_at(&self, buf: &mut [u8], offset: u64) -> Result<(), Error> { + async fn read_exact_at(&self, mut buf: Vec, offset: u64) -> Result, Error> { match self { MaybeVirtualFile::VirtualFile(file) => file.read_exact_at(buf, offset).await, - MaybeVirtualFile::File(file) => file.read_exact_at(buf, offset), + MaybeVirtualFile::File(file) => file.read_exact_at(&mut buf, offset).map(|()| buf), } } async fn write_all_at(&self, buf: &[u8], offset: u64) -> Result<(), Error> { @@ -829,8 +912,8 @@ mod tests { // Helper function to slurp a portion of a file into a string async fn read_string_at(&mut self, pos: u64, len: usize) -> Result { - let mut buf = vec![0; len]; - self.read_exact_at(&mut buf, pos).await?; + let buf = vec![0; len]; + let buf = self.read_exact_at(buf, pos).await?; Ok(String::from_utf8(buf).unwrap()) } } @@ -1006,11 +1089,11 @@ mod tests { for _threadno in 0..THREADS { let files = files.clone(); let hdl = rt.spawn(async move { - let mut buf = [0u8; SIZE]; + let mut buf = vec![0u8; SIZE]; let mut rng = rand::rngs::OsRng; for _ in 1..1000 { let f = &files[rng.gen_range(0..files.len())]; - f.read_exact_at(&mut buf, 0).await.unwrap(); + buf = f.read_exact_at(buf, 0).await.unwrap(); assert!(buf == SAMPLE); } });