From 82d9c686679bdfdc7c6498c03b548e90922042d2 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Tue, 29 Aug 2023 12:24:30 +0000 Subject: [PATCH] CP tokio_epoll_uring for read path --- Cargo.lock | 78 ++++++++++++++++++++--- pageserver/Cargo.toml | 1 + pageserver/src/tenant/block_io.rs | 6 +- pageserver/src/tenant/ephemeral_file.rs | 6 +- pageserver/src/virtual_file.rs | 83 ++++++++++++++++--------- 5 files changed, 128 insertions(+), 46 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index f7598a79cf..4f48756266 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2568,6 +2568,26 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "io-uring" +version = "0.5.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dd1e1a01cfb924fd8c5c43b6827965db394f5a3a16c599ce03452266e1cf984c" +dependencies = [ + "bitflags", + "libc", +] + +[[package]] +name = "io-uring" +version = "0.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "141a0f4546a50b2ed637c7a6df0d7dff45c9f41523254996764461c8ae0d9424" +dependencies = [ + "bitflags", + "libc", +] + [[package]] name = "ipnet" version = "2.7.2" @@ -2676,9 +2696,9 @@ checksum = "830d08ce1d1d941e6b30645f1a0eb5643013d835ce3779a5fc208261dbe10f55" [[package]] name = "libc" -version = "0.2.144" +version = "0.2.147" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2b00cc1c228a6782d0f076e7b232802e0c5689d41bb5df366f2a6b6621cfdfe1" +checksum = "b4668fb0ea861c1df094127ac5f1da3409a82116a4ba74fca2e58ef927159bb3" [[package]] name = "libloading" @@ -3281,6 +3301,7 @@ dependencies = [ "tenant_size_model", "thiserror", "tokio", + "tokio-epoll-uring", "tokio-io-timeout", "tokio-postgres", "tokio-tar", @@ -4524,6 +4545,12 @@ dependencies = [ "windows-sys 0.42.0", ] +[[package]] +name = "scoped-tls" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e1cf6437eb19a8f4a6cc0f7dca544973b0b78843adbfeb3683d1a94a0024a294" + [[package]] name = "scopeguard" version = "1.1.0" @@ -5220,18 +5247,18 @@ dependencies = [ [[package]] name = "thiserror" -version = "1.0.40" +version = "1.0.47" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "978c9a314bd8dc99be594bc3c175faaa9794be04a5a5e153caba6915336cebac" +checksum = "97a802ec30afc17eee47b2855fc72e0c4cd62be9b4efe6591edde0ec5bd68d8f" dependencies = [ "thiserror-impl", ] [[package]] name = "thiserror-impl" -version = "1.0.40" +version = "1.0.47" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f9456a42c5b0d803c8cd86e73dd7cc9edd429499f37a3550d286d5e86720569f" +checksum = "6bb623b56e39ab7dcd4b1b98bb6c8f8d907ed255b18de254088016b27a8ee19b" dependencies = [ "proc-macro2", "quote", @@ -5357,22 +5384,39 @@ dependencies = [ [[package]] name = "tokio" -version = "1.28.1" +version = "1.32.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0aa32867d44e6f2ce3385e89dceb990188b8bb0fb25b0cf576647a6f98ac5105" +checksum = "17ed6077ed6cd6c74735e21f37eb16dc3935f96878b1fe961074089cc80893f9" dependencies = [ - "autocfg", + "backtrace", "bytes", "libc", "mio", "num_cpus", + "parking_lot 0.12.1", "pin-project-lite", "signal-hook-registry", - "socket2 0.4.9", + "socket2 0.5.3", "tokio-macros", "windows-sys 0.48.0", ] +[[package]] +name = "tokio-epoll-uring" +version = "0.1.0" +dependencies = [ + "futures", + "io-uring 0.6.1", + "libc", + "once_cell", + "scopeguard", + "thiserror", + "tokio", + "tokio-uring", + "tokio-util", + "tracing", +] + [[package]] name = "tokio-io-timeout" version = "1.2.0" @@ -5489,6 +5533,20 @@ dependencies = [ "tungstenite", ] +[[package]] +name = "tokio-uring" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0d5e02bb137e030b3a547c65a3bd2f1836d66a97369fdcc69034002b10e155ef" +dependencies = [ + "io-uring 0.5.13", + "libc", + "scoped-tls", + "slab", + "socket2 0.4.9", + "tokio", +] + [[package]] name = "tokio-util" version = "0.7.8" diff --git a/pageserver/Cargo.toml b/pageserver/Cargo.toml index 3eb01003df..9dfda27e61 100644 --- a/pageserver/Cargo.toml +++ b/pageserver/Cargo.toml @@ -82,6 +82,7 @@ enum-map.workspace = true enumset.workspace = true strum.workspace = true strum_macros.workspace = true +tokio-epoll-uring = { path = "/home/admin/tokio-epoll-uring/tokio-epoll-uring" } [dev-dependencies] criterion.workspace = true diff --git a/pageserver/src/tenant/block_io.rs b/pageserver/src/tenant/block_io.rs index 0617017528..7084a7f7d8 100644 --- a/pageserver/src/tenant/block_io.rs +++ b/pageserver/src/tenant/block_io.rs @@ -5,7 +5,7 @@ use super::ephemeral_file::EphemeralFile; use super::storage_layer::delta_layer::{Adapter, DeltaLayerInner}; use crate::context::RequestContext; -use crate::page_cache::{self, PageReadGuard, ReadBufResult, PAGE_SZ}; +use crate::page_cache::{self, PageReadGuard, ReadBufResult, PAGE_SZ, PageWriteGuard}; use crate::virtual_file::VirtualFile; use bytes::Bytes; use std::ops::{Deref, DerefMut}; @@ -169,7 +169,7 @@ impl FileBlockReader { } /// Read a page from the underlying file into given buffer. - async fn fill_buffer(&self, buf: &mut [u8], blkno: u32) -> Result<(), std::io::Error> { + async fn fill_buffer(&self, buf: PageWriteGuard<'static>, blkno: u32) -> Result, std::io::Error> { assert!(buf.len() == PAGE_SZ); self.file .read_exact_at(buf, blkno as u64 * PAGE_SZ as u64) @@ -198,7 +198,7 @@ impl FileBlockReader { ReadBufResult::Found(guard) => Ok(guard.into()), ReadBufResult::NotFound(mut write_guard) => { // Read the page from disk into the buffer - self.fill_buffer(write_guard.deref_mut(), blknum).await?; + let write_guard = self.fill_buffer(write_guard, blknum).await?; Ok(write_guard.mark_valid().into()) } } diff --git a/pageserver/src/tenant/ephemeral_file.rs b/pageserver/src/tenant/ephemeral_file.rs index 9a06d9df61..c3fdeb32bc 100644 --- a/pageserver/src/tenant/ephemeral_file.rs +++ b/pageserver/src/tenant/ephemeral_file.rs @@ -89,10 +89,8 @@ impl EphemeralFile { return Ok(BlockLease::PageReadGuard(guard)) } page_cache::ReadBufResult::NotFound(mut write_guard) => { - let buf: &mut [u8] = write_guard.deref_mut(); - debug_assert_eq!(buf.len(), PAGE_SZ); - self.file - .read_exact_at(&mut buf[..], blknum as u64 * PAGE_SZ as u64) + let write_guard = self.file + .read_exact_at(write_guard, blknum as u64 * PAGE_SZ as u64) .await?; let read_guard = write_guard.mark_valid(); return Ok(BlockLease::PageReadGuard(read_guard)); diff --git a/pageserver/src/virtual_file.rs b/pageserver/src/virtual_file.rs index 281ad2ed1d..c94f3801b6 100644 --- a/pageserver/src/virtual_file.rs +++ b/pageserver/src/virtual_file.rs @@ -11,11 +11,13 @@ //! src/backend/storage/file/fd.c //! use crate::metrics::{StorageIoOperation, STORAGE_IO_SIZE, STORAGE_IO_TIME_METRIC}; +use crate::page_cache::PageWriteGuard; use crate::tenant::TENANTS_SEGMENT_NAME; use camino::{Utf8Path, Utf8PathBuf}; use once_cell::sync::OnceCell; use std::fs::{self, File, OpenOptions}; use std::io::{Error, ErrorKind, Seek, SeekFrom}; +use std::os::fd::{AsRawFd, FromRawFd, IntoRawFd, OwnedFd}; use std::os::unix::fs::FileExt; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard}; @@ -370,7 +372,7 @@ impl VirtualFile { /// /// We are doing it via a macro as Rust doesn't support async closures that /// take on parameters with lifetimes. - async fn lock_file(&self) -> Result, Error> { + async fn lock_file(&self) -> Result, Error> { let open_files = get_open_files(); let mut handle_guard = { @@ -460,24 +462,59 @@ impl VirtualFile { } // Copied from https://doc.rust-lang.org/1.72.0/src/std/os/unix/fs.rs.html#117-135 - pub async fn read_exact_at(&self, mut buf: &mut [u8], mut offset: u64) -> Result<(), Error> { - while !buf.is_empty() { - match self.read_at(buf, offset).await { - Ok(0) => { - return Err(Error::new( - std::io::ErrorKind::UnexpectedEof, - "failed to fill whole buffer", - )) - } - Ok(n) => { - buf = &mut buf[n..]; - offset += n as u64; - } - Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => {} - Err(e) => return Err(e), + pub async fn read_exact_at( + &self, + mut write_guard: PageWriteGuard<'static>, + mut offset: u64, + ) -> Result, Error> { + let file_guard: FileGuard<'static> = self.lock_file().await?; + + let system = tokio_epoll_uring::thread_local_system().await; + struct PageWriteGuardBuf { + buf: PageWriteGuard<'static>, + init_up_to: usize, + } + unsafe impl tokio_epoll_uring::IoBuf for PageWriteGuardBuf { + fn stable_ptr(&self) -> *const u8 { + self.buf.as_ptr() + } + fn bytes_init(&self) -> usize { + self.init_up_to + } + fn bytes_total(&self) -> usize { + self.buf.len() } } - Ok(()) + unsafe impl tokio_epoll_uring::IoBufMut for PageWriteGuardBuf { + fn stable_mut_ptr(&mut self) -> *mut u8 { + self.buf.as_mut_ptr() + } + + unsafe fn set_init(&mut self, pos: usize) { + assert!(pos <= self.buf.len()); + self.init_up_to = pos; + } + } + let buf = PageWriteGuardBuf { + buf: write_guard, + init_up_to: 0, + }; + let owned_fd = unsafe { OwnedFd::from_raw_fd(file_guard.as_ref().as_raw_fd()) }; + let guard = scopeguard::guard(file_guard, |_| { + panic!("must not drop future while operation is ongoing (todo: pass file_guard to tokio_epoll_uring to aovid this)") + }); + let ((owned_fd, buf), res) = system.read(owned_fd, offset, buf).await; + let _ = OwnedFd::into_raw_fd(owned_fd); + let _ = scopeguard::ScopeGuard::into_inner(guard); + let PageWriteGuardBuf { + buf: write_guard, + init_up_to, + } = buf; + if let Ok(num_read) = res { + assert!(init_up_to == num_read); // TODO need to deal with short reads here + } + res.map(|_| write_guard) + .map_err(|e| Error::new(ErrorKind::Other, e)) } // Copied from https://doc.rust-lang.org/1.72.0/src/std/os/unix/fs.rs.html#219-235 @@ -527,18 +564,6 @@ impl VirtualFile { Ok(n) } - pub async fn read_at(&self, buf: &mut [u8], offset: u64) -> Result { - let result = with_file!(self, StorageIoOperation::Read, |file| file - .as_ref() - .read_at(buf, offset)); - if let Ok(size) = result { - STORAGE_IO_SIZE - .with_label_values(&["read", &self.tenant_id, &self.timeline_id]) - .add(size as i64); - } - result - } - async fn write_at(&self, buf: &[u8], offset: u64) -> Result { let result = with_file!(self, StorageIoOperation::Write, |file| file .as_ref()