diff --git a/Cargo.lock b/Cargo.lock index 9f4f7e45ad..ba8c367d53 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2066,26 +2066,6 @@ dependencies = [ "windows-sys 0.48.0", ] -[[package]] -name = "io-uring" -version = "0.5.13" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dd1e1a01cfb924fd8c5c43b6827965db394f5a3a16c599ce03452266e1cf984c" -dependencies = [ - "bitflags", - "libc", -] - -[[package]] -name = "io-uring" -version = "0.6.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "141a0f4546a50b2ed637c7a6df0d7dff45c9f41523254996764461c8ae0d9424" -dependencies = [ - "bitflags", - "libc", -] - [[package]] name = "ipnet" version = "2.7.2" @@ -2185,9 +2165,9 @@ checksum = "830d08ce1d1d941e6b30645f1a0eb5643013d835ce3779a5fc208261dbe10f55" [[package]] name = "libc" -version = "0.2.147" +version = "0.2.144" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b4668fb0ea861c1df094127ac5f1da3409a82116a4ba74fca2e58ef927159bb3" +checksum = "2b00cc1c228a6782d0f076e7b232802e0c5689d41bb5df366f2a6b6621cfdfe1" [[package]] name = "libloading" @@ -2767,7 +2747,6 @@ dependencies = [ "tenant_size_model", "thiserror", "tokio", - "tokio-epoll-uring", "tokio-io-timeout", "tokio-postgres", "tokio-tar", @@ -3888,12 +3867,6 @@ dependencies = [ "windows-sys 0.42.0", ] -[[package]] -name = "scoped-tls" -version = "1.0.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e1cf6437eb19a8f4a6cc0f7dca544973b0b78843adbfeb3683d1a94a0024a294" - [[package]] name = "scopeguard" version = "1.1.0" @@ -4478,18 +4451,18 @@ dependencies = [ [[package]] name = "thiserror" -version = "1.0.47" +version = "1.0.40" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "97a802ec30afc17eee47b2855fc72e0c4cd62be9b4efe6591edde0ec5bd68d8f" +checksum = "978c9a314bd8dc99be594bc3c175faaa9794be04a5a5e153caba6915336cebac" dependencies = [ "thiserror-impl", ] [[package]] name = "thiserror-impl" -version = "1.0.47" +version = "1.0.40" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6bb623b56e39ab7dcd4b1b98bb6c8f8d907ed255b18de254088016b27a8ee19b" +checksum = "f9456a42c5b0d803c8cd86e73dd7cc9edd429499f37a3550d286d5e86720569f" dependencies = [ "proc-macro2", "quote", @@ -4574,39 +4547,22 @@ dependencies = [ [[package]] name = "tokio" -version = "1.32.0" +version = "1.28.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "17ed6077ed6cd6c74735e21f37eb16dc3935f96878b1fe961074089cc80893f9" +checksum = "0aa32867d44e6f2ce3385e89dceb990188b8bb0fb25b0cf576647a6f98ac5105" dependencies = [ - "backtrace", + "autocfg", "bytes", "libc", "mio", "num_cpus", - "parking_lot 0.12.1", "pin-project-lite", "signal-hook-registry", - "socket2 0.5.3", + "socket2 0.4.9", "tokio-macros", "windows-sys 0.48.0", ] -[[package]] -name = "tokio-epoll-uring" -version = "0.1.0" -dependencies = [ - "futures", - "io-uring 0.6.1", - "libc", - "once_cell", - "scopeguard", - "thiserror", - "tokio", - "tokio-uring", - "tokio-util", - "tracing", -] - [[package]] name = "tokio-io-timeout" version = "1.2.0" @@ -4723,20 +4679,6 @@ dependencies = [ "tungstenite", ] -[[package]] -name = "tokio-uring" -version = "0.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0d5e02bb137e030b3a547c65a3bd2f1836d66a97369fdcc69034002b10e155ef" -dependencies = [ - "io-uring 0.5.13", - "libc", - "scoped-tls", - "slab", - "socket2 0.4.9", - "tokio", -] - [[package]] name = "tokio-util" version = "0.7.8" diff --git a/pageserver/Cargo.toml b/pageserver/Cargo.toml index 34a159660d..a40bd133fc 100644 --- a/pageserver/Cargo.toml +++ b/pageserver/Cargo.toml @@ -83,7 +83,6 @@ enum-map.workspace = true enumset.workspace = true strum.workspace = true strum_macros.workspace = true -tokio-epoll-uring = { path = "/home/admin/tokio-epoll-uring/tokio-epoll-uring" } [dev-dependencies] criterion.workspace = true diff --git a/pageserver/src/tenant/block_io.rs b/pageserver/src/tenant/block_io.rs index 7084a7f7d8..0617017528 100644 --- a/pageserver/src/tenant/block_io.rs +++ b/pageserver/src/tenant/block_io.rs @@ -5,7 +5,7 @@ use super::ephemeral_file::EphemeralFile; use super::storage_layer::delta_layer::{Adapter, DeltaLayerInner}; use crate::context::RequestContext; -use crate::page_cache::{self, PageReadGuard, ReadBufResult, PAGE_SZ, PageWriteGuard}; +use crate::page_cache::{self, PageReadGuard, ReadBufResult, PAGE_SZ}; use crate::virtual_file::VirtualFile; use bytes::Bytes; use std::ops::{Deref, DerefMut}; @@ -169,7 +169,7 @@ impl FileBlockReader { } /// Read a page from the underlying file into given buffer. - async fn fill_buffer(&self, buf: PageWriteGuard<'static>, blkno: u32) -> Result, std::io::Error> { + async fn fill_buffer(&self, buf: &mut [u8], blkno: u32) -> Result<(), std::io::Error> { assert!(buf.len() == PAGE_SZ); self.file .read_exact_at(buf, blkno as u64 * PAGE_SZ as u64) @@ -198,7 +198,7 @@ impl FileBlockReader { ReadBufResult::Found(guard) => Ok(guard.into()), ReadBufResult::NotFound(mut write_guard) => { // Read the page from disk into the buffer - let write_guard = self.fill_buffer(write_guard, blknum).await?; + self.fill_buffer(write_guard.deref_mut(), blknum).await?; Ok(write_guard.mark_valid().into()) } } diff --git a/pageserver/src/tenant/ephemeral_file.rs b/pageserver/src/tenant/ephemeral_file.rs index 7be2e8be6d..5b99a1dd03 100644 --- a/pageserver/src/tenant/ephemeral_file.rs +++ b/pageserver/src/tenant/ephemeral_file.rs @@ -89,8 +89,10 @@ impl EphemeralFile { return Ok(BlockLease::PageReadGuard(guard)) } page_cache::ReadBufResult::NotFound(mut write_guard) => { - let write_guard = self.file - .read_exact_at(write_guard, blknum as u64 * PAGE_SZ as u64) + let buf: &mut [u8] = write_guard.deref_mut(); + debug_assert_eq!(buf.len(), PAGE_SZ); + self.file + .read_exact_at(&mut buf[..], blknum as u64 * PAGE_SZ as u64) .await?; let read_guard = write_guard.mark_valid(); return Ok(BlockLease::PageReadGuard(read_guard)); diff --git a/pageserver/src/virtual_file.rs b/pageserver/src/virtual_file.rs index c94f3801b6..281ad2ed1d 100644 --- a/pageserver/src/virtual_file.rs +++ b/pageserver/src/virtual_file.rs @@ -11,13 +11,11 @@ //! src/backend/storage/file/fd.c //! use crate::metrics::{StorageIoOperation, STORAGE_IO_SIZE, STORAGE_IO_TIME_METRIC}; -use crate::page_cache::PageWriteGuard; use crate::tenant::TENANTS_SEGMENT_NAME; use camino::{Utf8Path, Utf8PathBuf}; use once_cell::sync::OnceCell; use std::fs::{self, File, OpenOptions}; use std::io::{Error, ErrorKind, Seek, SeekFrom}; -use std::os::fd::{AsRawFd, FromRawFd, IntoRawFd, OwnedFd}; use std::os::unix::fs::FileExt; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard}; @@ -372,7 +370,7 @@ impl VirtualFile { /// /// We are doing it via a macro as Rust doesn't support async closures that /// take on parameters with lifetimes. - async fn lock_file(&self) -> Result, Error> { + async fn lock_file(&self) -> Result, Error> { let open_files = get_open_files(); let mut handle_guard = { @@ -462,59 +460,24 @@ impl VirtualFile { } // Copied from https://doc.rust-lang.org/1.72.0/src/std/os/unix/fs.rs.html#117-135 - pub async fn read_exact_at( - &self, - mut write_guard: PageWriteGuard<'static>, - mut offset: u64, - ) -> Result, Error> { - let file_guard: FileGuard<'static> = self.lock_file().await?; - - let system = tokio_epoll_uring::thread_local_system().await; - struct PageWriteGuardBuf { - buf: PageWriteGuard<'static>, - init_up_to: usize, - } - unsafe impl tokio_epoll_uring::IoBuf for PageWriteGuardBuf { - fn stable_ptr(&self) -> *const u8 { - self.buf.as_ptr() - } - fn bytes_init(&self) -> usize { - self.init_up_to - } - fn bytes_total(&self) -> usize { - self.buf.len() + pub async fn read_exact_at(&self, mut buf: &mut [u8], mut offset: u64) -> Result<(), Error> { + while !buf.is_empty() { + match self.read_at(buf, offset).await { + Ok(0) => { + return Err(Error::new( + std::io::ErrorKind::UnexpectedEof, + "failed to fill whole buffer", + )) + } + Ok(n) => { + buf = &mut buf[n..]; + offset += n as u64; + } + Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => {} + Err(e) => return Err(e), } } - unsafe impl tokio_epoll_uring::IoBufMut for PageWriteGuardBuf { - fn stable_mut_ptr(&mut self) -> *mut u8 { - self.buf.as_mut_ptr() - } - - unsafe fn set_init(&mut self, pos: usize) { - assert!(pos <= self.buf.len()); - self.init_up_to = pos; - } - } - let buf = PageWriteGuardBuf { - buf: write_guard, - init_up_to: 0, - }; - let owned_fd = unsafe { OwnedFd::from_raw_fd(file_guard.as_ref().as_raw_fd()) }; - let guard = scopeguard::guard(file_guard, |_| { - panic!("must not drop future while operation is ongoing (todo: pass file_guard to tokio_epoll_uring to aovid this)") - }); - let ((owned_fd, buf), res) = system.read(owned_fd, offset, buf).await; - let _ = OwnedFd::into_raw_fd(owned_fd); - let _ = scopeguard::ScopeGuard::into_inner(guard); - let PageWriteGuardBuf { - buf: write_guard, - init_up_to, - } = buf; - if let Ok(num_read) = res { - assert!(init_up_to == num_read); // TODO need to deal with short reads here - } - res.map(|_| write_guard) - .map_err(|e| Error::new(ErrorKind::Other, e)) + Ok(()) } // Copied from https://doc.rust-lang.org/1.72.0/src/std/os/unix/fs.rs.html#219-235 @@ -564,6 +527,18 @@ impl VirtualFile { Ok(n) } + pub async fn read_at(&self, buf: &mut [u8], offset: u64) -> Result { + let result = with_file!(self, StorageIoOperation::Read, |file| file + .as_ref() + .read_at(buf, offset)); + if let Ok(size) = result { + STORAGE_IO_SIZE + .with_label_values(&["read", &self.tenant_id, &self.timeline_id]) + .add(size as i64); + } + result + } + async fn write_at(&self, buf: &[u8], offset: u64) -> Result { let result = with_file!(self, StorageIoOperation::Write, |file| file .as_ref()